1 /**
2  * ae.net.sync
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <vladimir@thecybershadow.net>
12  */
13 
14 module ae.net.sync;
15 
16 import core.thread;
17 
18 import std.exception;
19 import std.socket;
20 
21 import ae.net.asockets;
22 
23 /**
24 	An object which allows calling a function in a different thread.
25 	Create ThreadAnchor in the main thread (the thread in which the
26 	code will run in), and then call runWait or runAsync from a
27 	different thread.
28 
29 	The main thread must be running an unblocked ae.net.asockets
30 	event loop.
31 
32 	Example:
33 	---
34 	void onConnect(TcpConnection socket)
35 	{
36 		auto anchor = new ThreadAnchor;
37 		new Thread({
38 			string s = readln();
39 			anchor.runAsync({
40 				socket.send(s);
41 				socket.disconnect();
42 			});
43 		}).start();
44 	}
45 	---
46 **/
47 
48 final class ThreadAnchor : TcpConnection
49 {
50 private:
51 	enum Command : ubyte
52 	{
53 		none,
54 		runWait,
55 		runAsync,
56 		runWaitDone,
57 	}
58 
59 	alias Dg = void delegate();
60 
61 	final static class AnchorSocket : TcpConnection
62 	{
63 		Socket pinger;
64 		Dg[] queue;
65 
66 		this()
67 		{
68 			auto pair = socketPair();
69 			pair[0].blocking = false;
70 			super(pair[0]);
71 			pinger = pair[1];
72 			this.handleReadData = &onReadData;
73 			this.daemon = true;
74 		}
75 
76 		void onReadData(Data data)
77 		{
78 			import ae.utils.array;
79 
80 			foreach (cmd; cast(Command[])data.contents)
81 			{
82 				Dg dg;
83 				synchronized(this) dg = queue.shift();
84 				switch (cmd)
85 				{
86 					case Command.runAsync:
87 						dg();
88 						break;
89 					case Command.runWait:
90 					{
91 						dg();
92 						Command[] reply = [Command.runWaitDone];
93 						this.send(Data(reply));
94 						break;
95 					}
96 					default:
97 						assert(false);
98 				}
99 			}
100 		}
101 	}
102 
103 	AnchorSocket socket;
104 
105 public:
106 	this()
107 	{
108 		socket = new AnchorSocket();
109 	}
110 
111 	void runAsync(Dg dg)
112 	{
113 		synchronized(socket) socket.queue ~= dg;
114 		Command[] data = [Command.runAsync];
115 		socket.pinger.send(data);
116 	}
117 
118 	void runWait(Dg dg)
119 	{
120 		synchronized(socket) socket.queue ~= dg;
121 		Command[] data = [Command.runWait];
122 		socket.pinger.send(data);
123 		data = [Command.none];
124 		data = data[0..socket.pinger.receive(data)];
125 		enforce(data.length && data[0] == Command.runWaitDone, "runWait error");
126 	}
127 
128 	void close()
129 	{
130 		socket.pinger.close();
131 	}
132 }
133 
134 unittest
135 {
136 	// keep socketManager running -
137 	// ThreadAnchor sockets are daemon
138 	auto dummy = new TcpServer();
139 	dummy.listen(0, "localhost");
140 
141 	import ae.sys.timing;
142 
143 	int n = 0;
144 	Thread t;
145 
146 	// Queue to run this as soon as event loop starts
147 	setTimeout({
148 		auto anchor = new ThreadAnchor;
149 		t = new Thread({
150 			anchor.runWait({
151 				assert(n==0); n++;
152 			});
153 			anchor.runAsync(&dummy.close);
154 			assert(n==1); n++;
155 		}).start();
156 	}, Duration.zero);
157 
158 	socketManager.loop();
159 	t.join();
160 	assert(n==2);
161 }