1 /** 2 * ae.net.sync 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <vladimir@thecybershadow.net> 12 */ 13 14 module ae.net.sync; 15 16 import core.thread; 17 18 import std.socket; 19 20 import ae.net.asockets; 21 22 /** 23 An object which allows calling a function in a different thread. 24 Create ThreadAnchor in the main thread (the thread in which the 25 code will run in), and then call runWait or runAsync from a 26 different thread. 27 28 The main thread must be running an unblocked ae.net.asockets 29 event loop. 30 31 Example: 32 --- 33 void onConnect(ClientSocket socket) 34 { 35 auto anchor = new ThreadAnchor; 36 new Thread({ 37 string s = readln(); 38 anchor.runAsync({ 39 socket.send(s); 40 socket.disconnect(); 41 }); 42 }).start(); 43 } 44 --- 45 **/ 46 47 final class ThreadAnchor : ClientSocket 48 { 49 private: 50 enum Command : ubyte 51 { 52 none, 53 runWait, 54 runAsync, 55 runWaitDone, 56 } 57 58 alias Dg = void delegate(); 59 60 final static class AnchorSocket : ClientSocket 61 { 62 Socket pinger; 63 Dg[] queue; 64 65 this() 66 { 67 auto pair = socketPair(); 68 super(pair[0]); 69 pinger = pair[1]; 70 this.handleReadData = &onReadData; 71 this.daemon = true; 72 } 73 74 void onReadData(ClientSocket socket, Data data) 75 { 76 import ae.utils.array; 77 78 foreach (cmd; cast(Command[])data.contents) 79 { 80 Dg dg; 81 synchronized(this) dg = queue.shift(); 82 switch (cmd) 83 { 84 case Command.runAsync: 85 dg(); 86 break; 87 case Command.runWait: 88 { 89 dg(); 90 Command[] reply = [Command.runWaitDone]; 91 this.send(Data(reply)); 92 break; 93 } 94 default: 95 assert(false); 96 } 97 } 98 } 99 } 100 101 AnchorSocket socket; 102 103 public: 104 this() 105 { 106 socket = new AnchorSocket(); 107 } 108 109 void runAsync(Dg dg) 110 { 111 synchronized(socket) socket.queue ~= dg; 112 Command[] data = [Command.runWait]; 113 socket.pinger.send(data); 114 } 115 116 void runWait(Dg dg) 117 { 118 synchronized(socket) socket.queue ~= dg; 119 Command[] data = [Command.runWait]; 120 socket.pinger.send(data); 121 data = [Command.none]; 122 data = data[0..socket.pinger.receive(data)]; 123 enforce(data.length && data[0] == Command.runWaitDone, "runWait error"); 124 } 125 126 void close() 127 { 128 socket.pinger.close(); 129 } 130 } 131 132 unittest 133 { 134 // keep socketManager running - 135 // ThreadAnchor sockets are daemon 136 auto dummy = new ServerSocket(); 137 dummy.listen(0, "localhost"); 138 139 import ae.sys.timing; 140 141 int n = 0; 142 Thread t; 143 144 // Queue to run this as soon as event loop starts 145 setTimeout({ 146 auto anchor = new ThreadAnchor; 147 t = new Thread({ 148 anchor.runWait({ 149 assert(n==0); n++; 150 }); 151 anchor.runAsync(&dummy.close); 152 assert(n==1); n++; 153 }).start(); 154 }, Duration.zero); 155 156 socketManager.loop(); 157 t.join(); 158 assert(n==2); 159 }