1 /** 2 * ae.net.sync 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.net.sync; 15 16 import core.atomic; 17 import core.sync.semaphore; 18 import core.thread; 19 20 import std.exception; 21 import std.socket; 22 import std.typecons : Flag, Yes; 23 24 import ae.net.asockets; 25 26 /** 27 An object which allows calling a function in a different thread. 28 Create ThreadAnchor in the main thread (the thread in which the 29 code will run in), and then call runWait or runAsync from a 30 different thread. 31 32 The main thread must be running an unblocked ae.net.asockets 33 event loop. 34 35 Example: 36 --- 37 void onConnect(TcpConnection socket) 38 { 39 auto mainThread = thisThread; 40 new Thread({ 41 string s = readln(); 42 mainThread.runAsync({ 43 socket.send(s); 44 socket.disconnect(); 45 }); 46 }).start(); 47 } 48 --- 49 **/ 50 51 final class ThreadAnchor : TcpConnection 52 { 53 private: 54 alias Dg = void delegate(); 55 56 static struct Command 57 { 58 Dg dg; 59 Semaphore* semaphore; 60 } 61 62 enum queueSize = 1024; 63 64 final static class AnchorSocket : TcpConnection 65 { 66 Socket pinger; 67 68 // Ensure the GC can reach delegates 69 // Must be preallocated - can't allocate in signal handlers 70 Command[queueSize] queue; 71 shared size_t writeIndex; 72 73 // Best-effort attempt at keeping `AnchorSocket` alive 74 // (non-daemon) when there are in-flight submissions, 75 // even when `daemon==true`. 76 // For this to work reliably, the target thread's event loop 77 // must have some reason to not exit (other than this 78 // `ThreadAnchor`) up until `runAsync` (in the sender thread) 79 // returns. 80 shared size_t numPending; 81 bool daemon; 82 83 this(bool daemon) 84 { 85 auto pair = tcpSocketPair(); 86 pair[0].blocking = false; 87 super(pair[0]); 88 pinger = pair[1]; 89 this.handleReadData = &onReadData; 90 this.daemon = daemon; 91 this.daemonRead = daemon; 92 } 93 94 void onReadData(Data data) 95 { 96 auto indices = cast(size_t[])data.contents; 97 foreach (index; indices) 98 { 99 auto command = queue[index]; 100 queue[index] = Command.init; 101 command.dg(); 102 if (command.semaphore) 103 command.semaphore.notify(); 104 } 105 auto remaining = numPending.atomicOp!"-="(indices.length); 106 this.daemonRead = daemon && remaining == 0; 107 } 108 } 109 110 AnchorSocket socket; 111 112 void sendCommand(size_t index) nothrow @nogc 113 { 114 // https://github.com/dlang/phobos/pull/4273 115 (cast(void delegate(size_t index) nothrow @nogc)&sendCommandImpl)(index); 116 } 117 118 void sendCommandImpl(size_t index) 119 { 120 size_t[1] data; 121 data[0] = index; 122 socket.pinger.send(data[]); 123 } 124 125 void runCommand(Command command) nothrow @nogc 126 { 127 assert(command.dg); 128 auto index = (socket.writeIndex.atomicOp!"+="(1)-1) % queueSize; 129 if (socket.queue[index].dg !is null) 130 assert(false, "ThreadAnchor queue overrun"); 131 socket.queue[index] = command; 132 socket.daemonRead = false; 133 atomicOp!"+="(socket.numPending, 1); 134 sendCommand(index); 135 } 136 137 public: 138 /// Constructor. 139 /// Params: 140 /// daemon = If `Yes.daemon` (the default), don't block the event 141 /// loop from exiting until `close` is called. 142 this(Flag!"daemon" daemon = Yes.daemon) 143 { 144 socket = new AnchorSocket(daemon); 145 } 146 147 /// Run the specified delegate in the origin thread, 148 /// without waiting for it to finish. 149 void runAsync(Dg dg) nothrow @nogc 150 { 151 runCommand(Command(dg)); 152 } 153 154 /// Run the specified delegate in the origin thread, 155 /// and wait for it to finish. 156 void runWait(Dg dg) 157 { 158 scope semaphore = new Semaphore(); 159 runCommand(Command(dg, &semaphore)); 160 semaphore.wait(); 161 } 162 163 /// Close the connection to the main thread. 164 void close() 165 { 166 socket.pinger.close(); 167 } 168 } 169 170 unittest 171 { 172 // keep socketManager running - 173 // ThreadAnchor sockets are daemon 174 auto dummy = new TcpServer(); 175 dummy.listen(0, "localhost"); 176 dummy.handleAccept = (TcpConnection incoming) {}; 177 178 import ae.sys.timing; 179 180 int n = 0; 181 Thread t; 182 183 // Queue to run this as soon as event loop starts 184 setTimeout({ 185 auto anchor = new ThreadAnchor; 186 t = new Thread({ 187 anchor.runWait({ 188 assert(n==0); n++; 189 }); 190 anchor.runAsync(&dummy.close); 191 assert(n==1); n++; 192 }).start(); 193 }, Duration.zero); 194 195 socketManager.loop(); 196 t.join(); 197 assert(n==2); 198 } 199 200 /// Return a `ThreadAnchor` for the current thread. 201 /// One instance is created and reused per thread. 202 @property ThreadAnchor thisThread() 203 { 204 static ThreadAnchor instance; 205 if (!instance) 206 instance = new ThreadAnchor(); 207 return instance; 208 } 209 210 /// A version of `std.socket.socketPair` which always creates TCP sockets, like on Windows. 211 /// Used to work around https://stackoverflow.com/q/10899814/21501, 212 /// i.e. AF_UNIX socket pairs' limit of not being able 213 /// to enqueue more than 278 packets without blocking. 214 private Socket[2] tcpSocketPair() @trusted 215 { 216 Socket[2] result; 217 218 auto listener = new TcpSocket(); 219 listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 220 listener.bind(new InternetAddress(INADDR_LOOPBACK, InternetAddress.PORT_ANY)); 221 auto addr = listener.localAddress; 222 listener.listen(1); 223 224 result[0] = new TcpSocket(addr); 225 result[1] = listener.accept(); 226 227 listener.close(); 228 return result; 229 }