1 /**
2  * NNTP listener (periodically poll server for new messages).
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <vladimir@thecybershadow.net>
12  */
13 
14 module ae.net.nntp.listener;
15 
16 import ae.net.nntp.client;
17 
18 import std.datetime;
19 
20 import ae.sys.timing;
21 import ae.sys.log;
22 
23 const POLL_PERIOD = 2.seconds;
24 
25 class NntpListener
26 {
27 private:
28 	NntpClient client;
29 	string server;
30 	string lastDate;
31 	bool[string] oldMessages;
32 	TimerTask pollTimer;
33 	bool connected, polling;
34 	int queued;
35 
36 	void reconnect()
37 	{
38 		assert(!connected);
39 		client.connect(server, &onConnect);
40 	}
41 
42 	void schedulePoll()
43 	{
44 		pollTimer = setTimeout(&poll, POLL_PERIOD);
45 	}
46 
47 	void poll()
48 	{
49 		pollTimer = null;
50 		client.getDate(&onDate);
51 		client.getNewNews("*", lastDate[0..8] ~ " " ~ lastDate[8..14] ~ " GMT", &onNewNews);
52 	}
53 
54 	void onConnect()
55 	{
56 		connected = true;
57 
58 		if (polling)
59 		{
60 			if (lastDate)
61 				poll();
62 			else
63 				client.getDate(&onDate);
64 		}
65 	}
66 
67 	void onDisconnect(string reason, DisconnectType type)
68 	{
69 		connected = false;
70 		if (polling)
71 		{
72 			if (pollTimer && pollTimer.isWaiting())
73 				pollTimer.cancel();
74 			if (type != DisconnectType.requested)
75 				setTimeout(&reconnect, 10.seconds);
76 		}
77 	}
78 
79 	void onDate(string date)
80 	{
81 		if (polling)
82 		{
83 			if (lastDate is null)
84 				schedulePoll();
85 			lastDate = date;
86 		}
87 	}
88 
89 	void onNewNews(string[] reply)
90 	{
91 		bool[string] messages;
92 		foreach (message; reply[1..$])
93 			messages[message] = true;
94 
95 		assert(queued == 0);
96 		foreach (message, b; messages)
97 			if (!(message in oldMessages))
98 			{
99 				client.getMessage(message, &onMessage);
100 				queued++;
101 			}
102 		oldMessages = messages;
103 		if (queued==0)
104 			schedulePoll();
105 	}
106 
107 	void onMessage(string[] lines, string num, string id)
108 	{
109 		if (handleMessage)
110 			handleMessage(lines, num, id);
111 
112 		if (polling)
113 		{
114 			queued--;
115 			if (queued==0)
116 				schedulePoll();
117 		}
118 	}
119 
120 public:
121 	this(Logger log)
122 	{
123 		client = new NntpClient(log);
124 		client.handleDisconnect = &onDisconnect;
125 	}
126 
127 	void connect(string server)
128 	{
129 		this.server = server;
130 		reconnect();
131 	}
132 
133 	void disconnect()
134 	{
135 		client.disconnect();
136 	}
137 
138 	void startPolling(string lastDate = null)
139 	{
140 		assert(!polling, "Already polling");
141 		polling = true;
142 		this.lastDate = lastDate;
143 		if (connected)
144 			poll();
145 	}
146 
147 	void delegate(string[] lines, string num, string id) handleMessage;
148 }