1 /** 2 * ae.net.sync 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <vladimir@thecybershadow.net> 12 */ 13 14 module ae.net.sync; 15 16 import core.atomic; 17 import core.thread; 18 19 import std.exception; 20 import std.socket; 21 import std.typecons : Flag, Yes; 22 23 import ae.net.asockets; 24 25 /** 26 An object which allows calling a function in a different thread. 27 Create ThreadAnchor in the main thread (the thread in which the 28 code will run in), and then call runWait or runAsync from a 29 different thread. 30 31 The main thread must be running an unblocked ae.net.asockets 32 event loop. 33 34 Example: 35 --- 36 void onConnect(TcpConnection socket) 37 { 38 auto anchor = new ThreadAnchor; 39 new Thread({ 40 string s = readln(); 41 anchor.runAsync({ 42 socket.send(s); 43 socket.disconnect(); 44 }); 45 }).start(); 46 } 47 --- 48 **/ 49 50 final class ThreadAnchor : TcpConnection 51 { 52 private: 53 enum Command : ubyte 54 { 55 none, 56 runWait, 57 runAsync, 58 runWaitDone, 59 } 60 61 alias Dg = void delegate(); 62 63 enum queueSize = 1024; 64 65 final static class AnchorSocket : TcpConnection 66 { 67 Socket pinger; 68 Dg[queueSize] queue; 69 shared size_t readIndex, writeIndex; 70 71 this(bool daemon) 72 { 73 auto pair = socketPair(); 74 pair[0].blocking = false; 75 super(pair[0]); 76 pinger = pair[1]; 77 this.handleReadData = &onReadData; 78 this.daemon = daemon; 79 } 80 81 void onReadData(Data data) 82 { 83 import ae.utils.array; 84 85 foreach (cmd; cast(Command[])data.contents) 86 { 87 Dg* pdg = &queue[(readIndex.atomicOp!"+="(1)-1) % $]; 88 Dg dg = *pdg; 89 *pdg = null; 90 switch (cmd) 91 { 92 case Command.runAsync: 93 dg(); 94 break; 95 case Command.runWait: 96 { 97 dg(); 98 Command[] reply = [Command.runWaitDone]; 99 this.send(Data(reply)); 100 break; 101 } 102 default: 103 assert(false); 104 } 105 } 106 } 107 } 108 109 AnchorSocket socket; 110 111 void sendCommand(Command command) nothrow @nogc 112 { 113 // https://github.com/dlang/phobos/pull/4273 114 (cast(void delegate(Command) nothrow @nogc)&sendCommandImpl)(command); 115 } 116 117 void sendCommandImpl(Command command) 118 { 119 Command[1] data; 120 data[0] = command; 121 socket.pinger.send(data[]); 122 } 123 124 public: 125 this(Flag!"daemon" daemon = Yes.daemon) 126 { 127 socket = new AnchorSocket(daemon); 128 } 129 130 void runAsync(Dg dg) nothrow @nogc 131 { 132 socket.queue[(socket.writeIndex.atomicOp!"+="(1)-1) % $] = dg; 133 sendCommand(Command.runAsync); 134 } 135 136 void runWait(Dg dg) 137 { 138 socket.queue[(socket.writeIndex.atomicOp!"+="(1)-1) % $] = dg; 139 sendCommand(Command.runWait); 140 141 Command[] data = [Command.none]; 142 data = data[0..socket.pinger.receive(data)]; 143 enforce(data.length && data[0] == Command.runWaitDone, "runWait error"); 144 } 145 146 void close() 147 { 148 socket.pinger.close(); 149 } 150 } 151 152 unittest 153 { 154 // keep socketManager running - 155 // ThreadAnchor sockets are daemon 156 auto dummy = new TcpServer(); 157 dummy.listen(0, "localhost"); 158 159 import ae.sys.timing; 160 161 int n = 0; 162 Thread t; 163 164 // Queue to run this as soon as event loop starts 165 setTimeout({ 166 auto anchor = new ThreadAnchor; 167 t = new Thread({ 168 anchor.runWait({ 169 assert(n==0); n++; 170 }); 171 anchor.runAsync(&dummy.close); 172 assert(n==1); n++; 173 }).start(); 174 }, Duration.zero); 175 176 socketManager.loop(); 177 t.join(); 178 assert(n==2); 179 }