1 /** 2 * ae.net.sync 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <vladimir@thecybershadow.net> 12 */ 13 14 module ae.net.sync; 15 16 import core.atomic; 17 import core.thread; 18 19 import std.exception; 20 import std.socket; 21 22 import ae.net.asockets; 23 24 /** 25 An object which allows calling a function in a different thread. 26 Create ThreadAnchor in the main thread (the thread in which the 27 code will run in), and then call runWait or runAsync from a 28 different thread. 29 30 The main thread must be running an unblocked ae.net.asockets 31 event loop. 32 33 Example: 34 --- 35 void onConnect(TcpConnection socket) 36 { 37 auto anchor = new ThreadAnchor; 38 new Thread({ 39 string s = readln(); 40 anchor.runAsync({ 41 socket.send(s); 42 socket.disconnect(); 43 }); 44 }).start(); 45 } 46 --- 47 **/ 48 49 final class ThreadAnchor : TcpConnection 50 { 51 private: 52 enum Command : ubyte 53 { 54 none, 55 runWait, 56 runAsync, 57 runWaitDone, 58 } 59 60 alias Dg = void delegate(); 61 62 enum queueSize = 1024; 63 64 final static class AnchorSocket : TcpConnection 65 { 66 Socket pinger; 67 Dg[queueSize] queue; 68 shared size_t readIndex, writeIndex; 69 70 this() 71 { 72 auto pair = socketPair(); 73 pair[0].blocking = false; 74 super(pair[0]); 75 pinger = pair[1]; 76 this.handleReadData = &onReadData; 77 this.daemon = true; 78 } 79 80 void onReadData(Data data) 81 { 82 import ae.utils.array; 83 84 foreach (cmd; cast(Command[])data.contents) 85 { 86 Dg dg = queue[readIndex.atomicOp!"+="(1)-1 % $]; 87 switch (cmd) 88 { 89 case Command.runAsync: 90 dg(); 91 break; 92 case Command.runWait: 93 { 94 dg(); 95 Command[] reply = [Command.runWaitDone]; 96 this.send(Data(reply)); 97 break; 98 } 99 default: 100 assert(false); 101 } 102 } 103 } 104 } 105 106 AnchorSocket socket; 107 108 void sendCommand(Command command) nothrow @nogc 109 { 110 // https://github.com/dlang/phobos/pull/4273 111 (cast(void delegate(Command) nothrow @nogc)&sendCommandImpl)(command); 112 } 113 114 void sendCommandImpl(Command command) 115 { 116 Command[1] data; 117 data[0] = command; 118 socket.pinger.send(data[]); 119 } 120 121 public: 122 this() 123 { 124 socket = new AnchorSocket(); 125 } 126 127 void runAsync(Dg dg) nothrow @nogc 128 { 129 socket.queue[socket.writeIndex.atomicOp!"+="(1)-1 % $] = dg; 130 sendCommand(Command.runAsync); 131 } 132 133 void runWait(Dg dg) 134 { 135 socket.queue[socket.writeIndex.atomicOp!"+="(1)-1 % $] = dg; 136 sendCommand(Command.runWait); 137 138 Command[] data = [Command.none]; 139 data = data[0..socket.pinger.receive(data)]; 140 enforce(data.length && data[0] == Command.runWaitDone, "runWait error"); 141 } 142 143 void close() 144 { 145 socket.pinger.close(); 146 } 147 } 148 149 unittest 150 { 151 // keep socketManager running - 152 // ThreadAnchor sockets are daemon 153 auto dummy = new TcpServer(); 154 dummy.listen(0, "localhost"); 155 156 import ae.sys.timing; 157 158 int n = 0; 159 Thread t; 160 161 // Queue to run this as soon as event loop starts 162 setTimeout({ 163 auto anchor = new ThreadAnchor; 164 t = new Thread({ 165 anchor.runWait({ 166 assert(n==0); n++; 167 }); 168 anchor.runAsync(&dummy.close); 169 assert(n==1); n++; 170 }).start(); 171 }, Duration.zero); 172 173 socketManager.loop(); 174 t.join(); 175 assert(n==2); 176 }