1 /**
2  * ae.net.sync
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <vladimir@thecybershadow.net>
12  */
13 
14 module ae.net.sync;
15 
16 import core.atomic;
17 import core.thread;
18 
19 import std.exception;
20 import std.socket;
21 
22 import ae.net.asockets;
23 
24 /**
25 	An object which allows calling a function in a different thread.
26 	Create ThreadAnchor in the main thread (the thread in which the
27 	code will run in), and then call runWait or runAsync from a
28 	different thread.
29 
30 	The main thread must be running an unblocked ae.net.asockets
31 	event loop.
32 
33 	Example:
34 	---
35 	void onConnect(TcpConnection socket)
36 	{
37 		auto anchor = new ThreadAnchor;
38 		new Thread({
39 			string s = readln();
40 			anchor.runAsync({
41 				socket.send(s);
42 				socket.disconnect();
43 			});
44 		}).start();
45 	}
46 	---
47 **/
48 
49 final class ThreadAnchor : TcpConnection
50 {
51 private:
52 	enum Command : ubyte
53 	{
54 		none,
55 		runWait,
56 		runAsync,
57 		runWaitDone,
58 	}
59 
60 	alias Dg = void delegate();
61 
62 	enum queueSize = 1024;
63 
64 	final static class AnchorSocket : TcpConnection
65 	{
66 		Socket pinger;
67 		Dg[queueSize] queue;
68 		shared size_t readIndex, writeIndex;
69 
70 		this()
71 		{
72 			auto pair = socketPair();
73 			pair[0].blocking = false;
74 			super(pair[0]);
75 			pinger = pair[1];
76 			this.handleReadData = &onReadData;
77 			this.daemon = true;
78 		}
79 
80 		void onReadData(Data data)
81 		{
82 			import ae.utils.array;
83 
84 			foreach (cmd; cast(Command[])data.contents)
85 			{
86 				Dg dg = queue[readIndex.atomicOp!"+="(1)-1 % $];
87 				switch (cmd)
88 				{
89 					case Command.runAsync:
90 						dg();
91 						break;
92 					case Command.runWait:
93 					{
94 						dg();
95 						Command[] reply = [Command.runWaitDone];
96 						this.send(Data(reply));
97 						break;
98 					}
99 					default:
100 						assert(false);
101 				}
102 			}
103 		}
104 	}
105 
106 	AnchorSocket socket;
107 
108 	void sendCommand(Command command) nothrow @nogc
109 	{
110 		// https://github.com/dlang/phobos/pull/4273
111 		(cast(void delegate(Command) nothrow @nogc)&sendCommandImpl)(command);
112 	}
113 
114 	void sendCommandImpl(Command command)
115 	{
116 		Command[1] data;
117 		data[0] = command;
118 		socket.pinger.send(data[]);
119 	}
120 
121 public:
122 	this()
123 	{
124 		socket = new AnchorSocket();
125 	}
126 
127 	void runAsync(Dg dg) nothrow @nogc
128 	{
129 		socket.queue[socket.writeIndex.atomicOp!"+="(1)-1 % $] = dg;
130 		sendCommand(Command.runAsync);
131 	}
132 
133 	void runWait(Dg dg)
134 	{
135 		socket.queue[socket.writeIndex.atomicOp!"+="(1)-1 % $] = dg;
136 		sendCommand(Command.runWait);
137 
138 		Command[] data = [Command.none];
139 		data = data[0..socket.pinger.receive(data)];
140 		enforce(data.length && data[0] == Command.runWaitDone, "runWait error");
141 	}
142 
143 	void close()
144 	{
145 		socket.pinger.close();
146 	}
147 }
148 
149 unittest
150 {
151 	// keep socketManager running -
152 	// ThreadAnchor sockets are daemon
153 	auto dummy = new TcpServer();
154 	dummy.listen(0, "localhost");
155 
156 	import ae.sys.timing;
157 
158 	int n = 0;
159 	Thread t;
160 
161 	// Queue to run this as soon as event loop starts
162 	setTimeout({
163 		auto anchor = new ThreadAnchor;
164 		t = new Thread({
165 			anchor.runWait({
166 				assert(n==0); n++;
167 			});
168 			anchor.runAsync(&dummy.close);
169 			assert(n==1); n++;
170 		}).start();
171 	}, Duration.zero);
172 
173 	socketManager.loop();
174 	t.join();
175 	assert(n==2);
176 }