1 /** 2 * ae.net.sync 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <vladimir@thecybershadow.net> 12 */ 13 14 module ae.net.sync; 15 16 import core.atomic; 17 import core.sync.semaphore; 18 import core.thread; 19 20 import std.exception; 21 import std.socket; 22 import std.typecons : Flag, Yes; 23 24 import ae.net.asockets; 25 26 /** 27 An object which allows calling a function in a different thread. 28 Create ThreadAnchor in the main thread (the thread in which the 29 code will run in), and then call runWait or runAsync from a 30 different thread. 31 32 The main thread must be running an unblocked ae.net.asockets 33 event loop. 34 35 Example: 36 --- 37 void onConnect(TcpConnection socket) 38 { 39 auto anchor = new ThreadAnchor; 40 new Thread({ 41 string s = readln(); 42 anchor.runAsync({ 43 socket.send(s); 44 socket.disconnect(); 45 }); 46 }).start(); 47 } 48 --- 49 **/ 50 51 final class ThreadAnchor : TcpConnection 52 { 53 private: 54 alias Dg = void delegate(); 55 56 static struct Command 57 { 58 Dg dg; 59 Semaphore* semaphore; 60 } 61 62 enum queueSize = 1024; 63 64 final static class AnchorSocket : TcpConnection 65 { 66 Socket pinger; 67 68 // Ensure the GC can reach delegates 69 // Must be preallocated - can't allocate in signal handlers 70 Command[queueSize] queue; 71 shared size_t writeIndex; 72 73 this(bool daemon) 74 { 75 auto pair = socketPair(); 76 pair[0].blocking = false; 77 super(pair[0]); 78 pinger = pair[1]; 79 this.handleReadData = &onReadData; 80 this.daemon = daemon; 81 } 82 83 void onReadData(Data data) 84 { 85 foreach (index; cast(size_t[])data.contents) 86 { 87 auto command = queue[index]; 88 queue[index] = Command.init; 89 command.dg(); 90 if (command.semaphore) 91 command.semaphore.notify(); 92 } 93 } 94 } 95 96 AnchorSocket socket; 97 98 void sendCommand(size_t index) nothrow @nogc 99 { 100 // https://github.com/dlang/phobos/pull/4273 101 (cast(void delegate(size_t index) nothrow @nogc)&sendCommandImpl)(index); 102 } 103 104 void sendCommandImpl(size_t index) 105 { 106 size_t[1] data; 107 data[0] = index; 108 socket.pinger.send(data[]); 109 } 110 111 void runCommand(Command command) nothrow @nogc 112 { 113 assert(command.dg); 114 auto index = (socket.writeIndex.atomicOp!"+="(1)-1) % queueSize; 115 if (socket.queue[index].dg !is null) 116 assert(false, "ThreadAnchor queue overrun"); 117 socket.queue[index] = command; 118 sendCommand(index); 119 } 120 121 public: 122 this(Flag!"daemon" daemon = Yes.daemon) 123 { 124 socket = new AnchorSocket(daemon); 125 } 126 127 void runAsync(Dg dg) nothrow @nogc 128 { 129 runCommand(Command(dg)); 130 } 131 132 void runWait(Dg dg) 133 { 134 scope semaphore = new Semaphore(); 135 runCommand(Command(dg, &semaphore)); 136 semaphore.wait(); 137 } 138 139 void close() 140 { 141 socket.pinger.close(); 142 } 143 } 144 145 unittest 146 { 147 // keep socketManager running - 148 // ThreadAnchor sockets are daemon 149 auto dummy = new TcpServer(); 150 dummy.listen(0, "localhost"); 151 152 import ae.sys.timing; 153 154 int n = 0; 155 Thread t; 156 157 // Queue to run this as soon as event loop starts 158 setTimeout({ 159 auto anchor = new ThreadAnchor; 160 t = new Thread({ 161 anchor.runWait({ 162 assert(n==0); n++; 163 }); 164 anchor.runAsync(&dummy.close); 165 assert(n==1); n++; 166 }).start(); 167 }, Duration.zero); 168 169 socketManager.loop(); 170 t.join(); 171 assert(n==2); 172 }