1 /** 2 * Read a PortForward replay log and answer to inbound connections with recorded data. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <vladimir@thecybershadow.net> 12 */ 13 14 module ae.demo.portforward.replayincoming; 15 16 import ae.demo.portforward.replay; 17 import ae.net.asockets; 18 import ae.sys.timing; 19 import ae.sys.log; 20 21 import std.datetime : SysTime, Duration, Clock; 22 import std.string : format; 23 import std.getopt; 24 25 Logger log; 26 27 class InboundReplayer : Replayer 28 { 29 this(string fn) 30 { 31 super(fn); 32 } 33 34 protected: 35 override bool handleListen(SysTime time, ushort port) 36 { 37 listeners[port] = new Listener(port); 38 log(format("Listening on port %d", port)); 39 return true; 40 } 41 42 override bool handleAccept(SysTime time, uint index, ushort port) 43 { 44 listeners[port].s.handleAccept = (TcpConnection s) { onSocketAccept(s, time, index); }; 45 log(format("Waiting for connection %d on port %d", index, port)); 46 return false; 47 } 48 49 private void onSocketAccept(TcpConnection s, SysTime time, uint index) 50 { 51 log(format("Accepted connection %d from %s", index, s.remoteAddress())); 52 auto c = new Connection; 53 c.s = s; 54 c.recordStart = time; 55 c.playStart = Clock.currTime(); 56 connections[index] = c; 57 58 nextLine(); 59 } 60 61 override bool handleOutgoingData(SysTime time, uint index, void[] data) 62 { 63 log(format("Sending %d bytes of data to connection %d", data.length, index)); 64 connections[index].at(time, { sendData(index, data); }); 65 return false; 66 } 67 68 private void sendData(uint index, void[] data) 69 { 70 connections[index].s.send(Data(data)); 71 nextLine(); 72 } 73 74 override bool handleOutgoingDisconnect(SysTime time, uint index, string reason) 75 { 76 connections[index].at(time, { sendDisconnect(index); }); 77 return false; 78 } 79 80 private void sendDisconnect(uint index) 81 { 82 connections[index].s.disconnect("Record"); 83 nextLine(); 84 } 85 86 private: 87 Listener[ushort] listeners; 88 89 class Listener 90 { 91 TcpServer s; 92 93 this(ushort port) 94 { 95 s = new TcpServer(); 96 s.listen(port); 97 } 98 } 99 100 Connection[uint] connections; 101 102 class Connection 103 { 104 TcpConnection s; 105 SysTime recordStart, playStart; 106 107 void at(SysTime recordTime, void delegate() fn) 108 { 109 SysTime playTime = playStart + (recordTime - recordStart); 110 setTimeout(fn, playTime - Clock.currTime()); 111 } 112 } 113 } 114 115 void main(string[] args) 116 { 117 bool quiet = false; 118 getopt(args, std.getopt.config.bundling, 119 "q|quiet", &quiet); 120 log = quiet ? new FileLogger("PortForwardReplayIncoming") : new FileAndConsoleLogger("PortForwardReplayIncoming"); 121 new InboundReplayer(args[1]); 122 socketManager.loop(); 123 }