1 /** 2 * ae.net.sync 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.net.sync; 15 16 import core.atomic; 17 import core.sync.semaphore; 18 import core.thread; 19 20 import std.exception; 21 import std.socket; 22 import std.typecons : Flag, Yes; 23 24 import ae.net.asockets; 25 26 /** 27 An object which allows calling a function in a different thread. 28 Create ThreadAnchor in the main thread (the thread in which the 29 code will run in), and then call runWait or runAsync from a 30 different thread. 31 32 The main thread must be running an unblocked ae.net.asockets 33 event loop. 34 35 Example: 36 --- 37 void onConnect(TcpConnection socket) 38 { 39 auto anchor = new ThreadAnchor; 40 new Thread({ 41 string s = readln(); 42 anchor.runAsync({ 43 socket.send(s); 44 socket.disconnect(); 45 }); 46 }).start(); 47 } 48 --- 49 **/ 50 51 final class ThreadAnchor : TcpConnection 52 { 53 private: 54 alias Dg = void delegate(); 55 56 static struct Command 57 { 58 Dg dg; 59 Semaphore* semaphore; 60 } 61 62 enum queueSize = 1024; 63 64 final static class AnchorSocket : TcpConnection 65 { 66 Socket pinger; 67 68 // Ensure the GC can reach delegates 69 // Must be preallocated - can't allocate in signal handlers 70 Command[queueSize] queue; 71 shared size_t writeIndex; 72 73 this(bool daemon) 74 { 75 auto pair = socketPair(); 76 pair[0].blocking = false; 77 super(pair[0]); 78 pinger = pair[1]; 79 this.handleReadData = &onReadData; 80 this.daemonRead = daemon; 81 } 82 83 void onReadData(Data data) 84 { 85 foreach (index; cast(size_t[])data.contents) 86 { 87 auto command = queue[index]; 88 queue[index] = Command.init; 89 command.dg(); 90 if (command.semaphore) 91 command.semaphore.notify(); 92 } 93 } 94 } 95 96 AnchorSocket socket; 97 98 void sendCommand(size_t index) nothrow @nogc 99 { 100 // https://github.com/dlang/phobos/pull/4273 101 (cast(void delegate(size_t index) nothrow @nogc)&sendCommandImpl)(index); 102 } 103 104 void sendCommandImpl(size_t index) 105 { 106 size_t[1] data; 107 data[0] = index; 108 socket.pinger.send(data[]); 109 } 110 111 void runCommand(Command command) nothrow @nogc 112 { 113 assert(command.dg); 114 auto index = (socket.writeIndex.atomicOp!"+="(1)-1) % queueSize; 115 if (socket.queue[index].dg !is null) 116 assert(false, "ThreadAnchor queue overrun"); 117 socket.queue[index] = command; 118 sendCommand(index); 119 } 120 121 public: 122 /// Constructor. 123 /// Params: 124 /// daemon = If `Yes.daemon` (the default), don't block the event 125 /// loop from exiting until `close` is called. 126 this(Flag!"daemon" daemon = Yes.daemon) 127 { 128 socket = new AnchorSocket(daemon); 129 } 130 131 /// Run the specified delegate in the origin thread, 132 /// without waiting for it to finish. 133 void runAsync(Dg dg) nothrow @nogc 134 { 135 runCommand(Command(dg)); 136 } 137 138 /// Run the specified delegate in the origin thread, 139 /// and wait for it to finish. 140 void runWait(Dg dg) 141 { 142 scope semaphore = new Semaphore(); 143 runCommand(Command(dg, &semaphore)); 144 semaphore.wait(); 145 } 146 147 /// Close the connection to the main thread. 148 void close() 149 { 150 socket.pinger.close(); 151 } 152 } 153 154 unittest 155 { 156 // keep socketManager running - 157 // ThreadAnchor sockets are daemon 158 auto dummy = new TcpServer(); 159 dummy.listen(0, "localhost"); 160 dummy.handleAccept = (TcpConnection incoming) {}; 161 162 import ae.sys.timing; 163 164 int n = 0; 165 Thread t; 166 167 // Queue to run this as soon as event loop starts 168 setTimeout({ 169 auto anchor = new ThreadAnchor; 170 t = new Thread({ 171 anchor.runWait({ 172 assert(n==0); n++; 173 }); 174 anchor.runAsync(&dummy.close); 175 assert(n==1); n++; 176 }).start(); 177 }, Duration.zero); 178 179 socketManager.loop(); 180 t.join(); 181 assert(n==2); 182 }