1 /**
2  * ae.net.sync
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <vladimir@thecybershadow.net>
12  */
13 
14 module ae.net.sync;
15 
16 import core.atomic;
17 import core.thread;
18 
19 import std.exception;
20 import std.socket;
21 import std.typecons : Flag, Yes;
22 
23 import ae.net.asockets;
24 
25 /**
26 	An object which allows calling a function in a different thread.
27 	Create ThreadAnchor in the main thread (the thread in which the
28 	code will run in), and then call runWait or runAsync from a
29 	different thread.
30 
31 	The main thread must be running an unblocked ae.net.asockets
32 	event loop.
33 
34 	Example:
35 	---
36 	void onConnect(TcpConnection socket)
37 	{
38 		auto anchor = new ThreadAnchor;
39 		new Thread({
40 			string s = readln();
41 			anchor.runAsync({
42 				socket.send(s);
43 				socket.disconnect();
44 			});
45 		}).start();
46 	}
47 	---
48 **/
49 
50 final class ThreadAnchor : TcpConnection
51 {
52 private:
53 	enum Command : ubyte
54 	{
55 		none,
56 		runWait,
57 		runAsync,
58 		runWaitDone,
59 	}
60 
61 	alias Dg = void delegate();
62 
63 	enum queueSize = 1024;
64 
65 	final static class AnchorSocket : TcpConnection
66 	{
67 		Socket pinger;
68 		Dg[queueSize] queue;
69 		shared size_t readIndex, writeIndex;
70 
71 		this(bool daemon)
72 		{
73 			auto pair = socketPair();
74 			pair[0].blocking = false;
75 			super(pair[0]);
76 			pinger = pair[1];
77 			this.handleReadData = &onReadData;
78 			this.daemon = daemon;
79 		}
80 
81 		void onReadData(Data data)
82 		{
83 			import ae.utils.array;
84 
85 			foreach (cmd; cast(Command[])data.contents)
86 			{
87 				Dg* pdg = &queue[(readIndex.atomicOp!"+="(1)-1) % $];
88 				Dg dg = *pdg;
89 				*pdg = null;
90 				switch (cmd)
91 				{
92 					case Command.runAsync:
93 						dg();
94 						break;
95 					case Command.runWait:
96 					{
97 						dg();
98 						Command[] reply = [Command.runWaitDone];
99 						this.send(Data(reply));
100 						break;
101 					}
102 					default:
103 						assert(false);
104 				}
105 			}
106 		}
107 	}
108 
109 	AnchorSocket socket;
110 
111 	void sendCommand(Command command) nothrow @nogc
112 	{
113 		// https://github.com/dlang/phobos/pull/4273
114 		(cast(void delegate(Command) nothrow @nogc)&sendCommandImpl)(command);
115 	}
116 
117 	void sendCommandImpl(Command command)
118 	{
119 		Command[1] data;
120 		data[0] = command;
121 		socket.pinger.send(data[]);
122 	}
123 
124 public:
125 	this(Flag!"daemon" daemon = Yes.daemon)
126 	{
127 		socket = new AnchorSocket(daemon);
128 	}
129 
130 	void runAsync(Dg dg) nothrow @nogc
131 	{
132 		socket.queue[(socket.writeIndex.atomicOp!"+="(1)-1) % $] = dg;
133 		sendCommand(Command.runAsync);
134 	}
135 
136 	void runWait(Dg dg)
137 	{
138 		socket.queue[(socket.writeIndex.atomicOp!"+="(1)-1) % $] = dg;
139 		sendCommand(Command.runWait);
140 
141 		Command[] data = [Command.none];
142 		data = data[0..socket.pinger.receive(data)];
143 		enforce(data.length && data[0] == Command.runWaitDone, "runWait error");
144 	}
145 
146 	void close()
147 	{
148 		socket.pinger.close();
149 	}
150 }
151 
152 unittest
153 {
154 	// keep socketManager running -
155 	// ThreadAnchor sockets are daemon
156 	auto dummy = new TcpServer();
157 	dummy.listen(0, "localhost");
158 
159 	import ae.sys.timing;
160 
161 	int n = 0;
162 	Thread t;
163 
164 	// Queue to run this as soon as event loop starts
165 	setTimeout({
166 		auto anchor = new ThreadAnchor;
167 		t = new Thread({
168 			anchor.runWait({
169 				assert(n==0); n++;
170 			});
171 			anchor.runAsync(&dummy.close);
172 			assert(n==1); n++;
173 		}).start();
174 	}, Duration.zero);
175 
176 	socketManager.loop();
177 	t.join();
178 	assert(n==2);
179 }