1 /** 2 * ae.net.sync 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.net.sync; 15 16 import core.atomic; 17 import core.sync.semaphore; 18 import core.thread; 19 20 import std.exception; 21 import std.socket; 22 import std.typecons : Flag, Yes; 23 24 import ae.net.asockets; 25 26 /** 27 An object which allows calling a function in a different thread. 28 Create ThreadAnchor in the main thread (the thread in which the 29 code will run in), and then call runWait or runAsync from a 30 different thread. 31 32 The main thread must be running an unblocked ae.net.asockets 33 event loop. 34 35 Example: 36 --- 37 void onConnect(TcpConnection socket) 38 { 39 auto mainThread = thisThread; 40 new Thread({ 41 string s = readln(); 42 mainThread.runAsync({ 43 socket.send(s); 44 socket.disconnect(); 45 }); 46 }).start(); 47 } 48 --- 49 **/ 50 51 final class ThreadAnchor : TcpConnection 52 { 53 private: 54 alias Dg = void delegate(); 55 56 static struct Command 57 { 58 Dg dg; 59 Semaphore* semaphore; 60 } 61 62 enum queueSize = 1024; 63 64 final static class AnchorSocket : TcpConnection 65 { 66 Socket pinger; 67 68 // Ensure the GC can reach delegates 69 // Must be preallocated - can't allocate in signal handlers 70 Command[queueSize] queue; 71 shared size_t writeIndex; 72 73 // Best-effort attempt at keeping `AnchorSocket` alive 74 // (non-daemon) when there are in-flight submissions, 75 // even when `daemon==true`. 76 // For this to work reliably, the target thread's event loop 77 // must have some reason to not exit (other than this 78 // `ThreadAnchor`) up until `runAsync` (in the sender thread) 79 // returns. 80 shared size_t numPending; 81 bool daemon; 82 83 this(bool daemon) 84 { 85 auto pair = tcpSocketPair(); 86 pair[0].blocking = false; 87 super(pair[0]); 88 pinger = pair[1]; 89 this.handleReadData = &onReadData; 90 this.daemon = daemon; 91 this.daemonRead = daemon; 92 } 93 94 void onReadData(Data data) 95 { 96 data.asDataOf!size_t.enter((scope indices) { 97 foreach (index; indices) 98 { 99 auto command = queue[index]; 100 queue[index] = Command.init; 101 command.dg(); 102 if (command.semaphore) 103 command.semaphore.notify(); 104 } 105 auto remaining = numPending.atomicOp!"-="(indices.length); 106 this.daemonRead = daemon && remaining == 0; 107 }); 108 } 109 } 110 111 AnchorSocket socket; 112 113 void sendCommand(size_t index) nothrow @nogc 114 { 115 // https://github.com/dlang/phobos/pull/4273 116 (cast(void delegate(size_t index) nothrow @nogc)&sendCommandImpl)(index); 117 } 118 119 void sendCommandImpl(size_t index) 120 { 121 size_t[1] data; 122 data[0] = index; 123 socket.pinger.send(data[]); 124 } 125 126 void runCommand(Command command) nothrow @nogc 127 { 128 assert(command.dg); 129 auto index = (socket.writeIndex.atomicOp!"+="(1)-1) % queueSize; 130 if (socket.queue[index].dg !is null) 131 assert(false, "ThreadAnchor queue overrun"); 132 socket.queue[index] = command; 133 socket.daemonRead = false; 134 atomicOp!"+="(socket.numPending, 1); 135 sendCommand(index); 136 } 137 138 public: 139 /// Constructor. 140 /// Params: 141 /// daemon = If `Yes.daemon` (the default), don't block the event 142 /// loop from exiting until `close` is called. 143 this(Flag!"daemon" daemon = Yes.daemon) 144 { 145 socket = new AnchorSocket(daemon); 146 } 147 148 /// Run the specified delegate in the origin thread, 149 /// without waiting for it to finish. 150 void runAsync(Dg dg) nothrow @nogc 151 { 152 runCommand(Command(dg)); 153 } 154 155 /// Run the specified delegate in the origin thread, 156 /// and wait for it to finish. 157 void runWait(Dg dg) 158 { 159 scope semaphore = new Semaphore(); 160 runCommand(Command(dg, &semaphore)); 161 semaphore.wait(); 162 } 163 164 /// Close the connection to the main thread. 165 void close() 166 { 167 socket.pinger.close(); 168 } 169 } 170 171 unittest 172 { 173 // keep socketManager running - 174 // ThreadAnchor sockets are daemon 175 auto dummy = new TcpServer(); 176 dummy.listen(0, "localhost"); 177 dummy.handleAccept = (TcpConnection incoming) {}; 178 179 import ae.sys.timing; 180 181 int n = 0; 182 Thread t; 183 184 // Queue to run this as soon as event loop starts 185 setTimeout({ 186 auto anchor = new ThreadAnchor; 187 t = new Thread({ 188 anchor.runWait({ 189 assert(n==0); n++; 190 }); 191 anchor.runAsync(&dummy.close); 192 assert(n==1); n++; 193 }).start(); 194 }, Duration.zero); 195 196 socketManager.loop(); 197 t.join(); 198 assert(n==2); 199 } 200 201 /// Return a `ThreadAnchor` for the current thread. 202 /// One instance is created and reused per thread. 203 @property ThreadAnchor thisThread() 204 { 205 static ThreadAnchor instance; 206 if (!instance) 207 instance = new ThreadAnchor(); 208 return instance; 209 } 210 211 /// A version of `std.socket.socketPair` which always creates TCP sockets, like on Windows. 212 /// Used to work around https://stackoverflow.com/q/10899814/21501, 213 /// i.e. AF_UNIX socket pairs' limit of not being able 214 /// to enqueue more than 278 packets without blocking. 215 private Socket[2] tcpSocketPair() @trusted 216 { 217 Socket[2] result; 218 219 auto listener = new TcpSocket(); 220 listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 221 listener.bind(new InternetAddress(INADDR_LOOPBACK, InternetAddress.PORT_ANY)); 222 auto addr = listener.localAddress; 223 listener.listen(1); 224 225 result[0] = new TcpSocket(addr); 226 result[1] = listener.accept(); 227 228 listener.close(); 229 return result; 230 }