1 /** 2 * ae.net.sync 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <vladimir@thecybershadow.net> 12 */ 13 14 module ae.net.sync; 15 16 import core.thread; 17 18 import std.exception; 19 import std.socket; 20 21 import ae.net.asockets; 22 23 /** 24 An object which allows calling a function in a different thread. 25 Create ThreadAnchor in the main thread (the thread in which the 26 code will run in), and then call runWait or runAsync from a 27 different thread. 28 29 The main thread must be running an unblocked ae.net.asockets 30 event loop. 31 32 Example: 33 --- 34 void onConnect(TcpConnection socket) 35 { 36 auto anchor = new ThreadAnchor; 37 new Thread({ 38 string s = readln(); 39 anchor.runAsync({ 40 socket.send(s); 41 socket.disconnect(); 42 }); 43 }).start(); 44 } 45 --- 46 **/ 47 48 final class ThreadAnchor : TcpConnection 49 { 50 private: 51 enum Command : ubyte 52 { 53 none, 54 runWait, 55 runAsync, 56 runWaitDone, 57 } 58 59 alias Dg = void delegate(); 60 61 final static class AnchorSocket : TcpConnection 62 { 63 Socket pinger; 64 Dg[] queue; 65 66 this() 67 { 68 auto pair = socketPair(); 69 pair[0].blocking = false; 70 super(pair[0]); 71 pinger = pair[1]; 72 this.handleReadData = &onReadData; 73 this.daemon = true; 74 } 75 76 void onReadData(Data data) 77 { 78 import ae.utils.array; 79 80 foreach (cmd; cast(Command[])data.contents) 81 { 82 Dg dg; 83 synchronized(this) dg = queue.shift(); 84 switch (cmd) 85 { 86 case Command.runAsync: 87 dg(); 88 break; 89 case Command.runWait: 90 { 91 dg(); 92 Command[] reply = [Command.runWaitDone]; 93 this.send(Data(reply)); 94 break; 95 } 96 default: 97 assert(false); 98 } 99 } 100 } 101 } 102 103 AnchorSocket socket; 104 105 public: 106 this() 107 { 108 socket = new AnchorSocket(); 109 } 110 111 void runAsync(Dg dg) 112 { 113 synchronized(socket) socket.queue ~= dg; 114 Command[] data = [Command.runAsync]; 115 socket.pinger.send(data); 116 } 117 118 void runWait(Dg dg) 119 { 120 synchronized(socket) socket.queue ~= dg; 121 Command[] data = [Command.runWait]; 122 socket.pinger.send(data); 123 data = [Command.none]; 124 data = data[0..socket.pinger.receive(data)]; 125 enforce(data.length && data[0] == Command.runWaitDone, "runWait error"); 126 } 127 128 void close() 129 { 130 socket.pinger.close(); 131 } 132 } 133 134 unittest 135 { 136 // keep socketManager running - 137 // ThreadAnchor sockets are daemon 138 auto dummy = new TcpServer(); 139 dummy.listen(0, "localhost"); 140 141 import ae.sys.timing; 142 143 int n = 0; 144 Thread t; 145 146 // Queue to run this as soon as event loop starts 147 setTimeout({ 148 auto anchor = new ThreadAnchor; 149 t = new Thread({ 150 anchor.runWait({ 151 assert(n==0); n++; 152 }); 153 anchor.runAsync(&dummy.close); 154 assert(n==1); n++; 155 }).start(); 156 }, Duration.zero); 157 158 socketManager.loop(); 159 t.join(); 160 assert(n==2); 161 }