1 /**
2  * NNTP listener (periodically poll server for new messages).
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.net.nntp.listener;
15 
16 import ae.net.nntp.client;
17 
18 import std.datetime;
19 import std.typecons;
20 
21 import ae.sys.timing;
22 import ae.sys.log;
23 
24 /// `NntpListener` polls with this interval.
25 const POLL_PERIOD = 2.seconds;
26 
27 /// NNTP client which polls the server for new messages.
28 class NntpListener
29 {
30 private:
31 	typeof(scoped!NntpClient(Logger.init)) client;
32 	string server;
33 	string lastDate;
34 	bool[string] oldMessages;
35 	TimerTask pollTimer;
36 	bool connected, polling;
37 	int queued;
38 
39 	void reconnect()
40 	{
41 		assert(!connected);
42 		client.connect(server, &onConnect);
43 	}
44 
45 	void schedulePoll()
46 	{
47 		pollTimer = setTimeout(&poll, POLL_PERIOD);
48 	}
49 
50 	void poll()
51 	{
52 		pollTimer = null;
53 		client.getDate(&onDate);
54 		client.getNewNews("*", lastDate[0..8] ~ " " ~ lastDate[8..14] ~ " GMT", &onNewNews);
55 	}
56 
57 	void onConnect()
58 	{
59 		connected = true;
60 		queued = 0;
61 
62 		if (polling)
63 		{
64 			if (lastDate)
65 				poll();
66 			else
67 				client.getDate(&onDate);
68 		}
69 	}
70 
71 	void onDisconnect(string reason, DisconnectType type)
72 	{
73 		connected = false;
74 		if (polling)
75 		{
76 			if (pollTimer && pollTimer.isWaiting())
77 				pollTimer.cancel();
78 			if (type != DisconnectType.requested)
79 				setTimeout(&reconnect, 10.seconds);
80 		}
81 	}
82 
83 	void onDate(string date)
84 	{
85 		if (polling)
86 		{
87 			if (lastDate is null)
88 				schedulePoll();
89 			lastDate = date;
90 		}
91 	}
92 
93 	void onNewNews(string[] reply)
94 	{
95 		auto messages = reply[1..$];
96 
97 		assert(queued == 0);
98 		foreach (message; messages)
99 			if (message !in oldMessages)
100 			{
101 				oldMessages[message] = true;
102 				client.getMessage(message, &onMessage);
103 				queued++;
104 			}
105 		if (queued==0)
106 			schedulePoll();
107 	}
108 
109 	void onMessage(string[] lines, string num, string id)
110 	{
111 		if (handleMessage)
112 			handleMessage(lines, num, id);
113 
114 		if (polling)
115 		{
116 			queued--;
117 			if (queued==0)
118 				schedulePoll();
119 		}
120 	}
121 
122 public:
123 	this(Logger log)
124 	{
125 		client = scoped!NntpClient(log);
126 		client.handleDisconnect = &onDisconnect;
127 	} ///
128 
129 	void connect(string server)
130 	{
131 		this.server = server;
132 		reconnect();
133 	} ///
134 
135 	void disconnect()
136 	{
137 		client.disconnect();
138 	} ///
139 
140 	void startPolling(string lastDate = null)
141 	{
142 		assert(!polling, "Already polling");
143 		polling = true;
144 		this.lastDate = lastDate;
145 		if (connected)
146 			poll();
147 	} ///
148 
149 	/// Called when NEWNEWS reports a new message.
150 	void delegate(string[] lines, string num, string id) handleMessage;
151 }