1 /**
2  * ae.net.sync
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <vladimir@thecybershadow.net>
12  */
13 
14 module ae.net.sync;
15 
16 import core.thread;
17 
18 import std.socket;
19 
20 import ae.net.asockets;
21 
22 /**
23 	An object which allows calling a function in a different thread.
24 	Create ThreadAnchor in the main thread (the thread in which the
25 	code will run in), and then call runWait or runAsync from a
26 	different thread.
27 
28 	The main thread must be running an unblocked ae.net.asockets
29 	event loop.
30 
31 	Example:
32 	---
33 	void onConnect(ClientSocket socket)
34 	{
35 		auto anchor = new ThreadAnchor;
36 		new Thread({
37 			string s = readln();
38 			anchor.runAsync({
39 				socket.send(s);
40 				socket.disconnect();
41 			});
42 		}).start();
43 	}
44 	---
45 **/
46 
47 final class ThreadAnchor : ClientSocket
48 {
49 private:
50 	enum Command : ubyte
51 	{
52 		none,
53 		runWait,
54 		runAsync,
55 		runWaitDone,
56 	}
57 
58 	alias Dg = void delegate();
59 
60 	final static class AnchorSocket : ClientSocket
61 	{
62 		Socket pinger;
63 		Dg[] queue;
64 
65 		this()
66 		{
67 			auto pair = socketPair();
68 			super(pair[0]);
69 			pinger = pair[1];
70 			this.handleReadData = &onReadData;
71 			this.daemon = true;
72 		}
73 
74 		void onReadData(ClientSocket socket, Data data)
75 		{
76 			import ae.utils.array;
77 
78 			foreach (cmd; cast(Command[])data.contents)
79 			{
80 				Dg dg;
81 				synchronized(this) dg = queue.shift();
82 				switch (cmd)
83 				{
84 					case Command.runAsync:
85 						dg();
86 						break;
87 					case Command.runWait:
88 					{
89 						dg();
90 						Command[] reply = [Command.runWaitDone];
91 						this.send(Data(reply));
92 						break;
93 					}
94 					default:
95 						assert(false);
96 				}
97 			}
98 		}
99 	}
100 
101 	AnchorSocket socket;
102 
103 public:
104 	this()
105 	{
106 		socket = new AnchorSocket();
107 	}
108 
109 	void runAsync(Dg dg)
110 	{
111 		synchronized(socket) socket.queue ~= dg;
112 		Command[] data = [Command.runWait];
113 		socket.pinger.send(data);
114 	}
115 
116 	void runWait(Dg dg)
117 	{
118 		synchronized(socket) socket.queue ~= dg;
119 		Command[] data = [Command.runWait];
120 		socket.pinger.send(data);
121 		data = [Command.none];
122 		data = data[0..socket.pinger.receive(data)];
123 		enforce(data.length && data[0] == Command.runWaitDone, "runWait error");
124 	}
125 
126 	void close()
127 	{
128 		socket.pinger.close();
129 	}
130 }
131 
132 unittest
133 {
134 	// keep socketManager running -
135 	// ThreadAnchor sockets are daemon
136 	auto dummy = new ServerSocket();
137 	dummy.listen(0, "localhost");
138 
139 	import ae.sys.timing;
140 
141 	int n = 0;
142 	Thread t;
143 
144 	// Queue to run this as soon as event loop starts
145 	setTimeout({
146 		auto anchor = new ThreadAnchor;
147 		t = new Thread({
148 			anchor.runWait({
149 				assert(n==0); n++;
150 			});
151 			anchor.runAsync(&dummy.close);
152 			assert(n==1); n++;
153 		}).start();
154 	}, Duration.zero);
155 
156 	socketManager.loop();
157 	t.join();
158 	assert(n==2);
159 }