1 /**
2  * NNTP client supporting a small subset of the protocol.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <vladimir@thecybershadow.net>
12  */
13 
14 module ae.net.nntp.client;
15 
16 import std.conv;
17 import std.string;
18 import std.exception;
19 
20 import ae.net.asockets;
21 import ae.sys.log;
22 import ae.utils.array;
23 
24 import core.time;
25 
26 public import ae.net.asockets : DisconnectType;
27 
28 struct GroupInfo { string name; int high, low; char mode; }
29 
30 class NntpClient
31 {
32 private:
33 	/// Socket connection.
34 	LineBufferedSocket conn;
35 
36 	/// Protocol log.
37 	Logger log;
38 
39 	/// One possible reply to an NNTP command
40 	static struct Reply
41 	{
42 		bool multiLine;
43 		void delegate(string[] lines) handleReply;
44 
45 		this(void delegate(string[] lines) handler)
46 		{
47 			multiLine = true;
48 			handleReply = handler;
49 		}
50 
51 		this(void delegate(string line) handler)
52 		{
53 			multiLine = false;
54 			handleReply = (string[] lines) { assert(lines.length==1); handler(lines[0]); };
55 		}
56 
57 		this(void delegate() handler)
58 		{
59 			multiLine = false;
60 			handleReply = (string[] lines) { assert(lines.length==1); handler(); };
61 		}
62 	}
63 
64 	/// One pipelined command
65 	static struct Command
66 	{
67 		string[] lines;
68 		bool pipelineable;
69 		Reply[int] replies;
70 		void delegate(string error) handleError;
71 	}
72 
73 	/// Commands queued to be sent to the NNTP server.
74 	Command[] queuedCommands;
75 
76 	/// Commands that have been sent to the NNTP server, and are expecting a reply.
77 	Command[] sentCommands;
78 
79 	void onConnect(ClientSocket sender)
80 	{
81 		log("* Connected, waiting for greeting...");
82 	}
83 
84 	void onDisconnect(ClientSocket sender, string reason, DisconnectType type)
85 	{
86 		log("* Disconnected (" ~ reason ~ ")");
87 		foreach (command; queuedCommands ~ sentCommands)
88 			if (command.handleError)
89 				command.handleError("Disconnected from server (" ~ reason ~ ")");
90 
91 		queuedCommands = sentCommands = null;
92 		replyLines = null;
93 		currentReply = null;
94 
95 		if (handleDisconnect)
96 			handleDisconnect(reason, type);
97 	}
98 
99 	void sendLine(string line)
100 	{
101 		log("< " ~ line);
102 		conn.send(line);
103 	}
104 
105 	/// Reply line buffer.
106 	string[] replyLines;
107 
108 	/// Reply currently being received/processed.
109 	Reply* currentReply;
110 
111 	void onReadLine(LineBufferedSocket s, string line)
112 	{
113 		try
114 		{
115 			log("> " ~ line);
116 
117 			bool replyDone;
118 			if (replyLines.length==0)
119 			{
120 				enforce(sentCommands.length, "No command was queued when the server sent line: " ~ line);
121 				auto command = sentCommands.queuePeek();
122 
123 				int code = line.split()[0].to!int();
124 				currentReply = code in command.replies;
125 
126 				// Assume that unknown replies are single-line error messages.
127 				replyDone = currentReply ? !currentReply.multiLine : true;
128 				replyLines = [line];
129 			}
130 			else
131 			{
132 				if (line == ".")
133 					replyDone = true;
134 				else
135 				{
136 					if (line.length && line[0] == '.')
137 						line = line[1..$];
138 					replyLines ~= line;
139 				}
140 			}
141 
142 			if (replyDone)
143 			{
144 				auto command = sentCommands.queuePop();
145 				void handleReply()
146 				{
147 					enforce(currentReply, `Unexpected reply "` ~ replyLines[0] ~ `" to command "` ~ command.lines[0] ~ `"`);
148 					currentReply.handleReply(replyLines);
149 				}
150 
151 				if (command.handleError)
152 					try
153 						handleReply();
154 					catch (Exception e)
155 						command.handleError(e.msg);
156 				else
157 					handleReply(); // In the absence of an error handler, treat command handling exceptions like fatal protocol errors
158 
159 				replyLines = null;
160 				currentReply = null;
161 
162 				updateQueue();
163 			}
164 		}
165 		catch (Exception e)
166 			conn.disconnect("Unhandled " ~ e.classinfo.name ~ ": " ~ e.msg);
167 	}
168 
169 	enum PIPELINE_LIMIT = 64;
170 
171 	void updateQueue()
172 	{
173 		if (!sentCommands.length && !queuedCommands.length && handleIdle)
174 			handleIdle();
175 		else
176 		while (
177 			queuedCommands.length                                                   // Got something to send?
178 		 && (sentCommands.length == 0 || sentCommands.queuePeekLast().pipelineable) // Can pipeline?
179 		 && sentCommands.length < PIPELINE_LIMIT)                                   // Not pipelining too much?
180 			send(queuedCommands.queuePop());
181 	}
182 
183 	void queue(Command command)
184 	{
185 		queuedCommands.queuePush(command);
186 		updateQueue();
187 	}
188 
189 	void send(Command command)
190 	{
191 		foreach (line; command.lines)
192 			sendLine(line);
193 		sentCommands.queuePush(command);
194 	}
195 
196 public:
197 	this(Logger log)
198 	{
199 		this.log = log;
200 	}
201 
202 	void connect(string server, void delegate() handleConnect=null)
203 	{
204 		conn = new LineBufferedSocket(30.seconds);
205 		conn.handleConnect = &onConnect;
206 		conn.handleDisconnect = &onDisconnect;
207 		conn.handleReadLine = &onReadLine;
208 
209 		// Manually place a fake command in the queue
210 		// (server automatically sends a greeting when a client connects).
211 		sentCommands ~= Command(null, false, [
212 			200:Reply({
213 				if (handleConnect)
214 					handleConnect();
215 			}),
216 		]);
217 
218 		log("* Connecting to " ~ server ~ "...");
219 		conn.connect(server, 119);
220 	}
221 
222 	void disconnect()
223 	{
224 		conn.disconnect();
225 	}
226 
227 	void listGroups(void delegate(GroupInfo[] groups) handleGroups, void delegate(string) handleError=null)
228 	{
229 		queue(Command(["LIST"], true, [
230 			215:Reply((string[] reply) {
231 				GroupInfo[] groups = new GroupInfo[reply.length-1];
232 				foreach (i, line; reply[1..$])
233 				{
234 					auto info = split(line);
235 					enforce(info.length == 4, "Unrecognized LIST reply");
236 					groups[i] = GroupInfo(info[0], to!int(info[1]), to!int(info[2]), info[3][0]);
237 				}
238 				if (handleGroups)
239 					handleGroups(groups);
240 			}),
241 		], handleError));
242 	}
243 
244 	void selectGroup(string name, void delegate() handleSuccess=null, void delegate(string) handleError=null)
245 	{
246 		queue(Command(["GROUP " ~ name], true, [
247 			211:Reply({
248 				if (handleSuccess)
249 					handleSuccess();
250 			}),
251 		], handleError));
252 	}
253 
254 	void listGroup(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null)
255 	{
256 		string line = from > 1 ? format("LISTGROUP %s %d-", name, from) : format("LISTGROUP %s", name);
257 
258 		queue(Command([line], true, [
259 			211:Reply((string[] reply) {
260 				if (handleListGroup)
261 					handleListGroup(reply[1..$]);
262 			}),
263 		], handleError));
264 	}
265 
266 	void listGroup(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroup(name, 1, handleListGroup, handleError); }
267 
268 	void listGroupXover(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null)
269 	{
270 		// TODO: handle GROUP command failure
271 		selectGroup(name);
272 		queue(Command([format("XOVER %d-", from)], true, [
273 			224:Reply((string[] reply) {
274 				auto messages = new string[reply.length-1];
275 				foreach (i, line; reply[1..$])
276 					messages[i] = line.split("\t")[0];
277 				if (handleListGroup)
278 					handleListGroup(messages);
279 			}),
280 		], handleError));
281 	}
282 
283 	void listGroupXover(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroupXover(name, 1, handleListGroup, handleError); }
284 
285 	void getMessage(string numOrID, void delegate(string[] lines, string num, string id) handleMessage, void delegate(string) handleError=null)
286 	{
287 		queue(Command(["ARTICLE " ~ numOrID], true, [
288 			220:Reply((string[] reply) {
289 				auto message = reply[1..$];
290 				auto firstLine = reply[0].split();
291 				if (handleMessage)
292 					handleMessage(message, firstLine[1], firstLine[2]);
293 			}),
294 		], handleError));
295 	}
296 
297 	void getDate(void delegate(string date) handleDate, void delegate(string) handleError=null)
298 	{
299 		queue(Command(["DATE"], true, [
300 			111:Reply((string reply) {
301 				auto date = reply.split()[1];
302 				enforce(date.length == 14, "Invalid DATE format");
303 				if (handleDate)
304 					handleDate(date);
305 			}),
306 		], handleError));
307 	}
308 
309 	void getNewNews(string wildmat, string dateTime, void delegate(string[] messages) handleNewNews, void delegate(string) handleError=null)
310 	{
311 		queue(Command(["NEWNEWS " ~ wildmat ~ " " ~ dateTime], true, [
312 			230:Reply((string[] reply) {
313 				if (handleNewNews)
314 					handleNewNews(reply);
315 			}),
316 		], handleError));
317 	}
318 
319 	void postMessage(string[] lines, void delegate() handlePosted=null, void delegate(string) handleError=null)
320 	{
321 		queue(Command(["POST"], false, [
322 			340:Reply({
323 				string[] postLines;
324 				foreach (line; lines)
325 					if (line.startsWith("."))
326 						postLines ~= "." ~ line;
327 					else
328 						postLines ~= line;
329 				postLines ~= ".";
330 
331 				send(Command(postLines, true, [
332 					240:Reply({
333 						if (handlePosted)
334 							handlePosted();
335 					}),
336 				], handleError));
337 			}),
338 		], handleError));
339 	}
340 
341 	void delegate(string reason, DisconnectType type) handleDisconnect;
342 	void delegate() handleIdle;
343 }