1 /**
2  * Read a PortForward replay log and answer to inbound connections with recorded data.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.demo.portforward.replayincoming;
15 
16 import ae.demo.portforward.replay;
17 import ae.net.asockets;
18 import ae.sys.timing;
19 import ae.sys.log;
20 
21 import std.datetime : SysTime, Duration, Clock;
22 import std.string : format;
23 import std.getopt;
24 
25 Logger log;
26 
27 class InboundReplayer : Replayer
28 {
29 	this(string fn)
30 	{
31 		super(fn);
32 	}
33 
34 protected:
35 	override bool handleListen(SysTime time, ushort port)
36 	{
37 		listeners[port] = new Listener(port);
38 		log(format("Listening on port %d", port));
39 		return true;
40 	}
41 
42 	override bool handleAccept(SysTime time, uint index, ushort port)
43 	{
44 		listeners[port].s.handleAccept = (TcpConnection s) { onSocketAccept(s, time, index); };
45 		log(format("Waiting for connection %d on port %d", index, port));
46 		return false;
47 	}
48 
49 	private void onSocketAccept(TcpConnection s, SysTime time, uint index)
50 	{
51 		log(format("Accepted connection %d from %s", index, s.remoteAddress()));
52 		auto c = new Connection;
53 		c.s = s;
54 		c.recordStart = time;
55 		c.playStart = Clock.currTime();
56 		connections[index] = c;
57 
58 		nextLine();
59 	}
60 
61 	override bool handleOutgoingData(SysTime time, uint index, void[] data)
62 	{
63 		log(format("Sending %d bytes of data to connection %d", data.length, index));
64 		connections[index].at(time, { sendData(index, data); });
65 		return false;
66 	}
67 
68 	private void sendData(uint index, void[] data)
69 	{
70 		connections[index].s.send(Data(data));
71 		nextLine();
72 	}
73 
74 	override bool handleOutgoingDisconnect(SysTime time, uint index, string reason)
75 	{
76 		connections[index].at(time, { sendDisconnect(index); });
77 		return false;
78 	}
79 
80 	private void sendDisconnect(uint index)
81 	{
82 		connections[index].s.disconnect("Record");
83 		nextLine();
84 	}
85 
86 private:
87 	Listener[ushort] listeners;
88 
89 	class Listener
90 	{
91 		TcpServer s;
92 
93 		this(ushort port)
94 		{
95 			s = new TcpServer();
96 			s.listen(port);
97 		}
98 	}
99 
100 	Connection[uint] connections;
101 
102 	class Connection
103 	{
104 		TcpConnection s;
105 		SysTime recordStart, playStart;
106 
107 		void at(SysTime recordTime, void delegate() fn)
108 		{
109 			SysTime playTime = playStart + (recordTime - recordStart);
110 			setTimeout(fn, playTime - Clock.currTime());
111 		}
112 	}
113 }
114 
115 void main(string[] args)
116 {
117 	bool quiet = false;
118 	getopt(args, std.getopt.config.bundling,
119 		"q|quiet", &quiet);
120 	log = quiet ? new FileLogger("PortForwardReplayIncoming") : new FileAndConsoleLogger("PortForwardReplayIncoming");
121 	new InboundReplayer(args[1]);
122 	socketManager.loop();
123 }