1 /**
2  * ae.net.sync
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.net.sync;
15 
16 import core.atomic;
17 import core.sync.semaphore;
18 import core.thread;
19 
20 import std.exception;
21 import std.socket;
22 import std.typecons : Flag, Yes;
23 
24 import ae.net.asockets;
25 
26 /**
27 	An object which allows calling a function in a different thread.
28 	Create ThreadAnchor in the main thread (the thread in which the
29 	code will run in), and then call runWait or runAsync from a
30 	different thread.
31 
32 	The main thread must be running an unblocked ae.net.asockets
33 	event loop.
34 
35 	Example:
36 	---
37 	void onConnect(TcpConnection socket)
38 	{
39 		auto anchor = new ThreadAnchor;
40 		new Thread({
41 			string s = readln();
42 			anchor.runAsync({
43 				socket.send(s);
44 				socket.disconnect();
45 			});
46 		}).start();
47 	}
48 	---
49 **/
50 
51 final class ThreadAnchor : TcpConnection
52 {
53 private:
54 	alias Dg = void delegate();
55 
56 	static struct Command
57 	{
58 		Dg dg;
59 		Semaphore* semaphore;
60 	}
61 
62 	enum queueSize = 1024;
63 
64 	final static class AnchorSocket : TcpConnection
65 	{
66 		Socket pinger;
67 
68 		// Ensure the GC can reach delegates
69 		// Must be preallocated - can't allocate in signal handlers
70 		Command[queueSize] queue;
71 		shared size_t writeIndex;
72 
73 		this(bool daemon)
74 		{
75 			auto pair = socketPair();
76 			pair[0].blocking = false;
77 			super(pair[0]);
78 			pinger = pair[1];
79 			this.handleReadData = &onReadData;
80 			this.daemonRead = daemon;
81 		}
82 
83 		void onReadData(Data data)
84 		{
85 			foreach (index; cast(size_t[])data.contents)
86 			{
87 				auto command = queue[index];
88 				queue[index] = Command.init;
89 				command.dg();
90 				if (command.semaphore)
91 					command.semaphore.notify();
92 			}
93 		}
94 	}
95 
96 	AnchorSocket socket;
97 
98 	void sendCommand(size_t index) nothrow @nogc
99 	{
100 		// https://github.com/dlang/phobos/pull/4273
101 		(cast(void delegate(size_t index) nothrow @nogc)&sendCommandImpl)(index);
102 	}
103 
104 	void sendCommandImpl(size_t index)
105 	{
106 		size_t[1] data;
107 		data[0] = index;
108 		socket.pinger.send(data[]);
109 	}
110 
111 	void runCommand(Command command) nothrow @nogc
112 	{
113 		assert(command.dg);
114 		auto index = (socket.writeIndex.atomicOp!"+="(1)-1) % queueSize;
115 		if (socket.queue[index].dg !is null)
116 			assert(false, "ThreadAnchor queue overrun");
117 		socket.queue[index] = command;
118 		sendCommand(index);
119 	}
120 
121 public:
122 	/// Constructor.
123 	/// Params:
124 	///  daemon  = If `Yes.daemon` (the default), don't block the event
125 	///            loop from exiting until `close` is called.
126 	this(Flag!"daemon" daemon = Yes.daemon)
127 	{
128 		socket = new AnchorSocket(daemon);
129 	}
130 
131 	/// Run the specified delegate in the origin thread,
132 	/// without waiting for it to finish.
133 	void runAsync(Dg dg) nothrow @nogc
134 	{
135 		runCommand(Command(dg));
136 	}
137 
138 	/// Run the specified delegate in the origin thread,
139 	/// and wait for it to finish.
140 	void runWait(Dg dg)
141 	{
142 		scope semaphore = new Semaphore();
143 		runCommand(Command(dg, &semaphore));
144 		semaphore.wait();
145 	}
146 
147 	/// Close the connection to the main thread.
148 	void close()
149 	{
150 		socket.pinger.close();
151 	}
152 }
153 
154 unittest
155 {
156 	// keep socketManager running -
157 	// ThreadAnchor sockets are daemon
158 	auto dummy = new TcpServer();
159 	dummy.listen(0, "localhost");
160 	dummy.handleAccept = (TcpConnection incoming) {};
161 
162 	import ae.sys.timing;
163 
164 	int n = 0;
165 	Thread t;
166 
167 	// Queue to run this as soon as event loop starts
168 	setTimeout({
169 		auto anchor = new ThreadAnchor;
170 		t = new Thread({
171 			anchor.runWait({
172 				assert(n==0); n++;
173 			});
174 			anchor.runAsync(&dummy.close);
175 			assert(n==1); n++;
176 		}).start();
177 	}, Duration.zero);
178 
179 	socketManager.loop();
180 	t.join();
181 	assert(n==2);
182 }