1 /**
2  * ae.net.sync
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.net.sync;
15 
16 import core.atomic;
17 import core.sync.semaphore;
18 import core.thread;
19 
20 import std.exception;
21 import std.socket;
22 import std.typecons : Flag, Yes;
23 
24 import ae.net.asockets;
25 
26 /**
27 	An object which allows calling a function in a different thread.
28 	Create ThreadAnchor in the main thread (the thread in which the
29 	code will run in), and then call runWait or runAsync from a
30 	different thread.
31 
32 	The main thread must be running an unblocked ae.net.asockets
33 	event loop.
34 
35 	Example:
36 	---
37 	void onConnect(TcpConnection socket)
38 	{
39 		auto mainThread = thisThread;
40 		new Thread({
41 			string s = readln();
42 			mainThread.runAsync({
43 				socket.send(s);
44 				socket.disconnect();
45 			});
46 		}).start();
47 	}
48 	---
49 **/
50 
51 final class ThreadAnchor : TcpConnection
52 {
53 private:
54 	alias Dg = void delegate();
55 
56 	static struct Command
57 	{
58 		Dg dg;
59 		Semaphore* semaphore;
60 	}
61 
62 	enum queueSize = 1024;
63 
64 	final static class AnchorSocket : TcpConnection
65 	{
66 		Socket pinger;
67 
68 		// Ensure the GC can reach delegates
69 		// Must be preallocated - can't allocate in signal handlers
70 		Command[queueSize] queue;
71 		shared size_t writeIndex;
72 
73 		// Best-effort attempt at keeping `AnchorSocket` alive
74 		// (non-daemon) when there are in-flight submissions,
75 		// even when `daemon==true`.
76 		// For this to work reliably, the target thread's event loop
77 		// must have some reason to not exit (other than this
78 		// `ThreadAnchor`) up until `runAsync` (in the sender thread)
79 		// returns.
80 		shared size_t numPending;
81 		bool daemon;
82 
83 		this(bool daemon)
84 		{
85 			auto pair = tcpSocketPair();
86 			pair[0].blocking = false;
87 			super(pair[0]);
88 			pinger = pair[1];
89 			this.handleReadData = &onReadData;
90 			this.daemon = daemon;
91 			this.daemonRead = daemon;
92 		}
93 
94 		void onReadData(Data data)
95 		{
96 			data.asDataOf!size_t.enter((scope indices) {
97 				foreach (index; indices)
98 				{
99 					auto command = queue[index];
100 					queue[index] = Command.init;
101 					command.dg();
102 					if (command.semaphore)
103 						command.semaphore.notify();
104 				}
105 				auto remaining = numPending.atomicOp!"-="(indices.length);
106 				this.daemonRead = daemon && remaining == 0;
107 			});
108 		}
109 	}
110 
111 	AnchorSocket socket;
112 
113 	void sendCommand(size_t index) nothrow @nogc
114 	{
115 		// https://github.com/dlang/phobos/pull/4273
116 		(cast(void delegate(size_t index) nothrow @nogc)&sendCommandImpl)(index);
117 	}
118 
119 	void sendCommandImpl(size_t index)
120 	{
121 		size_t[1] data;
122 		data[0] = index;
123 		socket.pinger.send(data[]);
124 	}
125 
126 	void runCommand(Command command) nothrow @nogc
127 	{
128 		assert(command.dg);
129 		auto index = (socket.writeIndex.atomicOp!"+="(1)-1) % queueSize;
130 		if (socket.queue[index].dg !is null)
131 			assert(false, "ThreadAnchor queue overrun");
132 		socket.queue[index] = command;
133 		socket.daemonRead = false;
134 		atomicOp!"+="(socket.numPending, 1);
135 		sendCommand(index);
136 	}
137 
138 public:
139 	/// Constructor.
140 	/// Params:
141 	///  daemon  = If `Yes.daemon` (the default), don't block the event
142 	///            loop from exiting until `close` is called.
143 	this(Flag!"daemon" daemon = Yes.daemon)
144 	{
145 		socket = new AnchorSocket(daemon);
146 	}
147 
148 	/// Run the specified delegate in the origin thread,
149 	/// without waiting for it to finish.
150 	void runAsync(Dg dg) nothrow @nogc
151 	{
152 		runCommand(Command(dg));
153 	}
154 
155 	/// Run the specified delegate in the origin thread,
156 	/// and wait for it to finish.
157 	void runWait(Dg dg)
158 	{
159 		scope semaphore = new Semaphore();
160 		runCommand(Command(dg, &semaphore));
161 		semaphore.wait();
162 	}
163 
164 	/// Close the connection to the main thread.
165 	void close()
166 	{
167 		socket.pinger.close();
168 	}
169 }
170 
171 unittest
172 {
173 	// keep socketManager running -
174 	// ThreadAnchor sockets are daemon
175 	auto dummy = new TcpServer();
176 	dummy.listen(0, "localhost");
177 	dummy.handleAccept = (TcpConnection incoming) {};
178 
179 	import ae.sys.timing;
180 
181 	int n = 0;
182 	Thread t;
183 
184 	// Queue to run this as soon as event loop starts
185 	setTimeout({
186 		auto anchor = new ThreadAnchor;
187 		t = new Thread({
188 			anchor.runWait({
189 				assert(n==0); n++;
190 			});
191 			anchor.runAsync(&dummy.close);
192 			assert(n==1); n++;
193 		}).start();
194 	}, Duration.zero);
195 
196 	socketManager.loop();
197 	t.join();
198 	assert(n==2);
199 }
200 
201 /// Return a `ThreadAnchor` for the current thread.
202 /// One instance is created and reused per thread.
203 @property ThreadAnchor thisThread()
204 {
205 	static ThreadAnchor instance;
206 	if (!instance)
207 		instance = new ThreadAnchor();
208 	return instance;
209 }
210 
211 /// A version of `std.socket.socketPair` which always creates TCP sockets, like on Windows.
212 /// Used to work around https://stackoverflow.com/q/10899814/21501,
213 /// i.e. AF_UNIX socket pairs' limit of not being able
214 /// to enqueue more than 278 packets without blocking.
215 private Socket[2] tcpSocketPair() @trusted
216 {
217 	Socket[2] result;
218 
219 	auto listener = new TcpSocket();
220 	listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
221 	listener.bind(new InternetAddress(INADDR_LOOPBACK, InternetAddress.PORT_ANY));
222 	auto addr = listener.localAddress;
223 	listener.listen(1);
224 
225 	result[0] = new TcpSocket(addr);
226 	result[1] = listener.accept();
227 
228 	listener.close();
229 	return result;
230 }