1 /**
2  * ae.net.sync
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.net.sync;
15 
16 import core.atomic;
17 import core.sync.semaphore;
18 import core.thread;
19 
20 import std.exception;
21 import std.socket;
22 import std.typecons : Flag, Yes;
23 
24 import ae.net.asockets;
25 
26 /**
27 	An object which allows calling a function in a different thread.
28 	Create ThreadAnchor in the main thread (the thread in which the
29 	code will run in), and then call runWait or runAsync from a
30 	different thread.
31 
32 	The main thread must be running an unblocked ae.net.asockets
33 	event loop.
34 
35 	Example:
36 	---
37 	void onConnect(TcpConnection socket)
38 	{
39 		auto mainThread = thisThread;
40 		new Thread({
41 			string s = readln();
42 			mainThread.runAsync({
43 				socket.send(s);
44 				socket.disconnect();
45 			});
46 		}).start();
47 	}
48 	---
49 **/
50 
51 final class ThreadAnchor : TcpConnection
52 {
53 private:
54 	alias Dg = void delegate();
55 
56 	static struct Command
57 	{
58 		Dg dg;
59 		Semaphore* semaphore;
60 	}
61 
62 	enum queueSize = 1024;
63 
64 	final static class AnchorSocket : TcpConnection
65 	{
66 		Socket pinger;
67 
68 		// Ensure the GC can reach delegates
69 		// Must be preallocated - can't allocate in signal handlers
70 		Command[queueSize] queue;
71 		shared size_t writeIndex;
72 
73 		// Best-effort attempt at keeping `AnchorSocket` alive
74 		// (non-daemon) when there are in-flight submissions,
75 		// even when `daemon==true`.
76 		// For this to work reliably, the target thread's event loop
77 		// must have some reason to not exit (other than this
78 		// `ThreadAnchor`) up until `runAsync` (in the sender thread)
79 		// returns.
80 		shared size_t numPending;
81 		bool daemon;
82 
83 		this(bool daemon)
84 		{
85 			auto pair = tcpSocketPair();
86 			pair[0].blocking = false;
87 			super(pair[0]);
88 			pinger = pair[1];
89 			this.handleReadData = &onReadData;
90 			this.daemon = daemon;
91 			this.daemonRead = daemon;
92 		}
93 
94 		void onReadData(Data data)
95 		{
96 			auto indices = cast(size_t[])data.contents;
97 			foreach (index; indices)
98 			{
99 				auto command = queue[index];
100 				queue[index] = Command.init;
101 				command.dg();
102 				if (command.semaphore)
103 					command.semaphore.notify();
104 			}
105 			auto remaining = numPending.atomicOp!"-="(indices.length);
106 			this.daemonRead = daemon && remaining == 0;
107 		}
108 	}
109 
110 	AnchorSocket socket;
111 
112 	void sendCommand(size_t index) nothrow @nogc
113 	{
114 		// https://github.com/dlang/phobos/pull/4273
115 		(cast(void delegate(size_t index) nothrow @nogc)&sendCommandImpl)(index);
116 	}
117 
118 	void sendCommandImpl(size_t index)
119 	{
120 		size_t[1] data;
121 		data[0] = index;
122 		socket.pinger.send(data[]);
123 	}
124 
125 	void runCommand(Command command) nothrow @nogc
126 	{
127 		assert(command.dg);
128 		auto index = (socket.writeIndex.atomicOp!"+="(1)-1) % queueSize;
129 		if (socket.queue[index].dg !is null)
130 			assert(false, "ThreadAnchor queue overrun");
131 		socket.queue[index] = command;
132 		socket.daemonRead = false;
133 		atomicOp!"+="(socket.numPending, 1);
134 		sendCommand(index);
135 	}
136 
137 public:
138 	/// Constructor.
139 	/// Params:
140 	///  daemon  = If `Yes.daemon` (the default), don't block the event
141 	///            loop from exiting until `close` is called.
142 	this(Flag!"daemon" daemon = Yes.daemon)
143 	{
144 		socket = new AnchorSocket(daemon);
145 	}
146 
147 	/// Run the specified delegate in the origin thread,
148 	/// without waiting for it to finish.
149 	void runAsync(Dg dg) nothrow @nogc
150 	{
151 		runCommand(Command(dg));
152 	}
153 
154 	/// Run the specified delegate in the origin thread,
155 	/// and wait for it to finish.
156 	void runWait(Dg dg)
157 	{
158 		scope semaphore = new Semaphore();
159 		runCommand(Command(dg, &semaphore));
160 		semaphore.wait();
161 	}
162 
163 	/// Close the connection to the main thread.
164 	void close()
165 	{
166 		socket.pinger.close();
167 	}
168 }
169 
170 unittest
171 {
172 	// keep socketManager running -
173 	// ThreadAnchor sockets are daemon
174 	auto dummy = new TcpServer();
175 	dummy.listen(0, "localhost");
176 	dummy.handleAccept = (TcpConnection incoming) {};
177 
178 	import ae.sys.timing;
179 
180 	int n = 0;
181 	Thread t;
182 
183 	// Queue to run this as soon as event loop starts
184 	setTimeout({
185 		auto anchor = new ThreadAnchor;
186 		t = new Thread({
187 			anchor.runWait({
188 				assert(n==0); n++;
189 			});
190 			anchor.runAsync(&dummy.close);
191 			assert(n==1); n++;
192 		}).start();
193 	}, Duration.zero);
194 
195 	socketManager.loop();
196 	t.join();
197 	assert(n==2);
198 }
199 
200 /// Return a `ThreadAnchor` for the current thread.
201 /// One instance is created and reused per thread.
202 @property ThreadAnchor thisThread()
203 {
204 	static ThreadAnchor instance;
205 	if (!instance)
206 		instance = new ThreadAnchor();
207 	return instance;
208 }
209 
210 /// A version of `std.socket.socketPair` which always creates TCP sockets, like on Windows.
211 /// Used to work around https://stackoverflow.com/q/10899814/21501,
212 /// i.e. AF_UNIX socket pairs' limit of not being able
213 /// to enqueue more than 278 packets without blocking.
214 private Socket[2] tcpSocketPair() @trusted
215 {
216 	Socket[2] result;
217 
218 	auto listener = new TcpSocket();
219 	listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
220 	listener.bind(new InternetAddress(INADDR_LOOPBACK, InternetAddress.PORT_ANY));
221 	auto addr = listener.localAddress;
222 	listener.listen(1);
223 
224 	result[0] = new TcpSocket(addr);
225 	result[1] = listener.accept();
226 
227 	listener.close();
228 	return result;
229 }