1 /**
2  * ae.net.sync
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <vladimir@thecybershadow.net>
12  */
13 
14 module ae.net.sync;
15 
16 import core.atomic;
17 import core.sync.semaphore;
18 import core.thread;
19 
20 import std.exception;
21 import std.socket;
22 import std.typecons : Flag, Yes;
23 
24 import ae.net.asockets;
25 
26 /**
27 	An object which allows calling a function in a different thread.
28 	Create ThreadAnchor in the main thread (the thread in which the
29 	code will run in), and then call runWait or runAsync from a
30 	different thread.
31 
32 	The main thread must be running an unblocked ae.net.asockets
33 	event loop.
34 
35 	Example:
36 	---
37 	void onConnect(TcpConnection socket)
38 	{
39 		auto anchor = new ThreadAnchor;
40 		new Thread({
41 			string s = readln();
42 			anchor.runAsync({
43 				socket.send(s);
44 				socket.disconnect();
45 			});
46 		}).start();
47 	}
48 	---
49 **/
50 
51 final class ThreadAnchor : TcpConnection
52 {
53 private:
54 	alias Dg = void delegate();
55 
56 	static struct Command
57 	{
58 		Dg dg;
59 		Semaphore* semaphore;
60 	}
61 
62 	enum queueSize = 1024;
63 
64 	final static class AnchorSocket : TcpConnection
65 	{
66 		Socket pinger;
67 
68 		// Ensure the GC can reach delegates
69 		// Must be preallocated - can't allocate in signal handlers
70 		Command[queueSize] queue;
71 		shared size_t writeIndex;
72 
73 		this(bool daemon)
74 		{
75 			auto pair = socketPair();
76 			pair[0].blocking = false;
77 			super(pair[0]);
78 			pinger = pair[1];
79 			this.handleReadData = &onReadData;
80 			this.daemon = daemon;
81 		}
82 
83 		void onReadData(Data data)
84 		{
85 			foreach (index; cast(size_t[])data.contents)
86 			{
87 				auto command = queue[index];
88 				queue[index] = Command.init;
89 				command.dg();
90 				if (command.semaphore)
91 					command.semaphore.notify();
92 			}
93 		}
94 	}
95 
96 	AnchorSocket socket;
97 
98 	void sendCommand(size_t index) nothrow @nogc
99 	{
100 		// https://github.com/dlang/phobos/pull/4273
101 		(cast(void delegate(size_t index) nothrow @nogc)&sendCommandImpl)(index);
102 	}
103 
104 	void sendCommandImpl(size_t index)
105 	{
106 		size_t[1] data;
107 		data[0] = index;
108 		socket.pinger.send(data[]);
109 	}
110 
111 	void runCommand(Command command) nothrow @nogc
112 	{
113 		assert(command.dg);
114 		auto index = (socket.writeIndex.atomicOp!"+="(1)-1) % queueSize;
115 		if (socket.queue[index].dg !is null)
116 			assert(false, "ThreadAnchor queue overrun");
117 		socket.queue[index] = command;
118 		sendCommand(index);
119 	}
120 
121 public:
122 	this(Flag!"daemon" daemon = Yes.daemon)
123 	{
124 		socket = new AnchorSocket(daemon);
125 	}
126 
127 	void runAsync(Dg dg) nothrow @nogc
128 	{
129 		runCommand(Command(dg));
130 	}
131 
132 	void runWait(Dg dg)
133 	{
134 		scope semaphore = new Semaphore();
135 		runCommand(Command(dg, &semaphore));
136 		semaphore.wait();
137 	}
138 
139 	void close()
140 	{
141 		socket.pinger.close();
142 	}
143 }
144 
145 unittest
146 {
147 	// keep socketManager running -
148 	// ThreadAnchor sockets are daemon
149 	auto dummy = new TcpServer();
150 	dummy.listen(0, "localhost");
151 
152 	import ae.sys.timing;
153 
154 	int n = 0;
155 	Thread t;
156 
157 	// Queue to run this as soon as event loop starts
158 	setTimeout({
159 		auto anchor = new ThreadAnchor;
160 		t = new Thread({
161 			anchor.runWait({
162 				assert(n==0); n++;
163 			});
164 			anchor.runAsync(&dummy.close);
165 			assert(n==1); n++;
166 		}).start();
167 	}, Duration.zero);
168 
169 	socketManager.loop();
170 	t.join();
171 	assert(n==2);
172 }