1 /**
2  * NNTP client supporting a small subset of the protocol.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.net.nntp.client;
15 
16 import std.conv;
17 import std.string;
18 import std.exception;
19 
20 import ae.net.asockets;
21 import ae.sys.log;
22 import ae.utils.array;
23 
24 import core.time;
25 
26 public import ae.net.asockets : DisconnectType;
27 
28 /// Encodes a parsed entry of a LIST reply.
29 struct GroupInfo
30 {
31 	/// Group name.
32 	string name;
33 
34 	/// High and low water mark for the group.
35 	int high, low;
36 
37 	/// The group's status on this server.
38 	char mode;
39 }
40 
41 /// Implements an NNTP client connection.
42 class NntpClient
43 {
44 private:
45 	/// Socket connection.
46 	LineBufferedAdapter lineAdapter;
47 	IConnection conn;
48 
49 	/// Protocol log.
50 	Logger log;
51 
52 	/// One possible reply to an NNTP command
53 	static struct Reply
54 	{
55 		bool multiLine;
56 		void delegate(string[] lines) handleReply;
57 
58 		this(void delegate(string[] lines) handler)
59 		{
60 			multiLine = true;
61 			handleReply = handler;
62 		}
63 
64 		this(void delegate(string line) handler)
65 		{
66 			multiLine = false;
67 			handleReply = (string[] lines) { assert(lines.length==1); handler(lines[0]); };
68 		}
69 
70 		this(void delegate() handler)
71 		{
72 			multiLine = false;
73 			handleReply = (string[] lines) { assert(lines.length==1); handler(); };
74 		}
75 	}
76 
77 	/// One pipelined command
78 	static struct Command
79 	{
80 		string[] lines;
81 		bool pipelineable;
82 		Reply[int] replies;
83 		void delegate(string error) handleError;
84 	}
85 
86 	/// Commands queued to be sent to the NNTP server.
87 	Command[] queuedCommands;
88 
89 	/// Commands that have been sent to the NNTP server, and are expecting a reply.
90 	Command[] sentCommands;
91 
92 	void onConnect()
93 	{
94 		log("* Connected, waiting for greeting...");
95 	}
96 
97 	void onDisconnect(string reason, DisconnectType type)
98 	{
99 		log("* Disconnected (" ~ reason ~ ")");
100 		connected = false;
101 		foreach (command; queuedCommands ~ sentCommands)
102 			if (command.handleError)
103 				command.handleError("Disconnected from server (" ~ reason ~ ")");
104 
105 		queuedCommands = sentCommands = null;
106 		replyLines = null;
107 		currentReply = null;
108 
109 		if (handleDisconnect)
110 			handleDisconnect(reason, type);
111 	}
112 
113 	void sendLine(string line)
114 	{
115 		log("< " ~ line);
116 		lineAdapter.send(line);
117 	}
118 
119 	/// Reply line buffer.
120 	string[] replyLines;
121 
122 	/// Reply currently being received/processed.
123 	Reply* currentReply;
124 
125 	void onReadData(Data data)
126 	{
127 		try
128 		{
129 			auto line = cast(string)data.toHeap();
130 			log("> " ~ line);
131 
132 			bool replyDone;
133 			if (replyLines.length==0)
134 			{
135 				enforce(sentCommands.length, "No command was queued when the server sent line: " ~ line);
136 				auto command = sentCommands.queuePeek();
137 
138 				int code = line.split()[0].to!int();
139 				currentReply = code in command.replies;
140 
141 				// Assume that unknown replies are single-line error messages.
142 				replyDone = currentReply ? !currentReply.multiLine : true;
143 				replyLines = [line];
144 			}
145 			else
146 			{
147 				if (line == ".")
148 					replyDone = true;
149 				else
150 				{
151 					if (line.length && line[0] == '.')
152 						line = line[1..$];
153 					replyLines ~= line;
154 				}
155 			}
156 
157 			if (replyDone)
158 			{
159 				auto command = sentCommands.queuePop();
160 				void handleReply()
161 				{
162 					enforce(currentReply, `Unexpected reply "` ~ replyLines[0] ~ `" to command "` ~ command.lines[0] ~ `"`);
163 					currentReply.handleReply(replyLines);
164 				}
165 
166 				if (command.handleError)
167 					try
168 						handleReply();
169 					catch (Exception e)
170 						command.handleError(e.msg);
171 				else
172 					handleReply(); // In the absence of an error handler, treat command handling exceptions like fatal protocol errors
173 
174 				replyLines = null;
175 				currentReply = null;
176 
177 				updateQueue();
178 			}
179 		}
180 		catch (Exception e)
181 		{
182 			foreach (line; e.toString().splitLines())
183 				log("* " ~ line);
184 			conn.disconnect("Unhandled " ~ e.classinfo.name ~ ": " ~ e.msg);
185 		}
186 	}
187 
188 	enum PIPELINE_LIMIT = 64;
189 
190 	void updateQueue()
191 	{
192 		if (!sentCommands.length && !queuedCommands.length && handleIdle)
193 			handleIdle();
194 		else
195 		while (
196 			queuedCommands.length                                                   // Got something to send?
197 		 && (sentCommands.length == 0 || sentCommands.queuePeekLast().pipelineable) // Can pipeline?
198 		 && sentCommands.length < PIPELINE_LIMIT)                                   // Not pipelining too much?
199 			send(queuedCommands.queuePop());
200 	}
201 
202 	void queue(Command command)
203 	{
204 		queuedCommands.queuePush(command);
205 		updateQueue();
206 	}
207 
208 	void send(Command command)
209 	{
210 		foreach (line; command.lines)
211 			sendLine(line);
212 		sentCommands.queuePush(command);
213 	}
214 
215 public:
216 	this(Logger log)
217 	{
218 		this.log = log;
219 	} ///
220 
221 	/// Connect to the given server.
222 	void connect(string server, void delegate() handleConnect=null)
223 	{
224 		auto tcp = new TcpConnection();
225 		IConnection c = tcp;
226 
227 		c = lineAdapter = new LineBufferedAdapter(c);
228 
229 		TimeoutAdapter timer;
230 		c = timer = new TimeoutAdapter(c);
231 		timer.setIdleTimeout(30.seconds);
232 
233 		conn = c;
234 
235 		conn.handleConnect = &onConnect;
236 		conn.handleDisconnect = &onDisconnect;
237 		conn.handleReadData = &onReadData;
238 
239 		// Manually place a fake command in the queue
240 		// (server automatically sends a greeting when a client connects).
241 		sentCommands ~= Command(null, false, [
242 			200:Reply({
243 				connected = true;
244 				if (handleConnect)
245 					handleConnect();
246 			}),
247 		]);
248 
249 		log("* Connecting to " ~ server ~ "...");
250 		tcp.connect(server, 119);
251 	}
252 
253 	/// Disconnect.
254 	void disconnect(string reason = IConnection.defaultDisconnectReason)
255 	{
256 		conn.disconnect(reason);
257 	}
258 
259 	/// True when a connection is fully established and a greeting is received.
260 	bool connected;
261 
262 	/// Send a LIST command.
263 	void listGroups(void delegate(GroupInfo[] groups) handleGroups, void delegate(string) handleError=null)
264 	{
265 		queue(Command(["LIST"], true, [
266 			215:Reply((string[] reply) {
267 				GroupInfo[] groups = new GroupInfo[reply.length-1];
268 				foreach (i, line; reply[1..$])
269 				{
270 					auto info = split(line);
271 					enforce(info.length == 4, "Unrecognized LIST reply");
272 					groups[i] = GroupInfo(info[0], to!int(info[1]), to!int(info[2]), info[3][0]);
273 				}
274 				if (handleGroups)
275 					handleGroups(groups);
276 			}),
277 		], handleError));
278 	}
279 
280 	/// Send a GROUP command.
281 	void selectGroup(string name, void delegate() handleSuccess=null, void delegate(string) handleError=null)
282 	{
283 		queue(Command(["GROUP " ~ name], true, [
284 			211:Reply({
285 				if (handleSuccess)
286 					handleSuccess();
287 			}),
288 		], handleError));
289 	}
290 
291 	/// Send a LISTGROUP command.
292 	void listGroup(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null)
293 	{
294 		string line = from > 1 ? format("LISTGROUP %s %d-", name, from) : format("LISTGROUP %s", name);
295 
296 		queue(Command([line], true, [
297 			211:Reply((string[] reply) {
298 				if (handleListGroup)
299 					handleListGroup(reply[1..$]);
300 			}),
301 		], handleError));
302 	}
303 
304 	/// ditto
305 	void listGroup(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroup(name, 1, handleListGroup, handleError); }
306 
307 	/// Send a XOVER command.
308 	void listGroupXover(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null)
309 	{
310 		// TODO: handle GROUP command failure
311 		selectGroup(name);
312 		queue(Command([format("XOVER %d-", from)], true, [
313 			224:Reply((string[] reply) {
314 				auto messages = new string[reply.length-1];
315 				foreach (i, line; reply[1..$])
316 					messages[i] = line.split("\t")[0];
317 				if (handleListGroup)
318 					handleListGroup(messages);
319 			}),
320 		], handleError));
321 	}
322 
323 	/// ditto
324 	void listGroupXover(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroupXover(name, 1, handleListGroup, handleError); }
325 
326 	/// Send an ARTICLE command.
327 	void getMessage(string numOrID, void delegate(string[] lines, string num, string id) handleMessage, void delegate(string) handleError=null)
328 	{
329 		queue(Command(["ARTICLE " ~ numOrID], true, [
330 			220:Reply((string[] reply) {
331 				auto message = reply[1..$];
332 				auto firstLine = reply[0].split();
333 				if (handleMessage)
334 					handleMessage(message, firstLine[1], firstLine[2]);
335 			}),
336 		], handleError));
337 	}
338 
339 	/// Send a DATE command.
340 	void getDate(void delegate(string date) handleDate, void delegate(string) handleError=null)
341 	{
342 		queue(Command(["DATE"], true, [
343 			111:Reply((string reply) {
344 				auto date = reply.split()[1];
345 				enforce(date.length == 14, "Invalid DATE format");
346 				if (handleDate)
347 					handleDate(date);
348 			}),
349 		], handleError));
350 	}
351 
352 	/// Send a NEWNEWS command.
353 	void getNewNews(string wildmat, string dateTime, void delegate(string[] messages) handleNewNews, void delegate(string) handleError=null)
354 	{
355 		queue(Command(["NEWNEWS " ~ wildmat ~ " " ~ dateTime], true, [
356 			230:Reply((string[] reply) {
357 				if (handleNewNews)
358 					handleNewNews(reply);
359 			}),
360 		], handleError));
361 	}
362 
363 	/// Send a POST command.
364 	void postMessage(string[] lines, void delegate() handlePosted=null, void delegate(string) handleError=null)
365 	{
366 		queue(Command(["POST"], false, [
367 			340:Reply({
368 				string[] postLines;
369 				foreach (line; lines)
370 					if (line.startsWith("."))
371 						postLines ~= "." ~ line;
372 					else
373 						postLines ~= line;
374 				postLines ~= ".";
375 
376 				send(Command(postLines, true, [
377 					240:Reply({
378 						if (handlePosted)
379 							handlePosted();
380 					}),
381 				], handleError));
382 			}),
383 		], handleError));
384 	}
385 
386 	/// Called when the connection is disconnected.
387 	void delegate(string reason, DisconnectType type) handleDisconnect;
388 
389 	/// Called when the command queue is empty.
390 	void delegate() handleIdle;
391 }