1 /** 2 * NNTP client supporting a small subset of the protocol. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.net.nntp.client; 15 16 import std.conv; 17 import std.string; 18 import std.exception; 19 20 import ae.net.asockets; 21 import ae.sys.log; 22 import ae.utils.array; 23 24 import core.time; 25 26 public import ae.net.asockets : DisconnectType; 27 28 /// Encodes a parsed entry of a LIST reply. 29 struct GroupInfo 30 { 31 /// Group name. 32 string name; 33 34 /// High and low water mark for the group. 35 int high, low; 36 37 /// The group's status on this server. 38 char mode; 39 } 40 41 /// Implements an NNTP client connection. 42 class NntpClient 43 { 44 private: 45 /// Socket connection. 46 LineBufferedAdapter lineAdapter; 47 IConnection conn; 48 49 /// Protocol log. 50 Logger log; 51 52 /// One possible reply to an NNTP command 53 static struct Reply 54 { 55 bool multiLine; 56 void delegate(string[] lines) handleReply; 57 58 this(void delegate(string[] lines) handler) 59 { 60 multiLine = true; 61 handleReply = handler; 62 } 63 64 this(void delegate(string line) handler) 65 { 66 multiLine = false; 67 handleReply = (string[] lines) { assert(lines.length==1); handler(lines[0]); }; 68 } 69 70 this(void delegate() handler) 71 { 72 multiLine = false; 73 handleReply = (string[] lines) { assert(lines.length==1); handler(); }; 74 } 75 } 76 77 /// One pipelined command 78 static struct Command 79 { 80 string[] lines; 81 bool pipelineable; 82 Reply[int] replies; 83 void delegate(string error) handleError; 84 } 85 86 /// Commands queued to be sent to the NNTP server. 87 Command[] queuedCommands; 88 89 /// Commands that have been sent to the NNTP server, and are expecting a reply. 90 Command[] sentCommands; 91 92 void onConnect() 93 { 94 log("* Connected, waiting for greeting..."); 95 } 96 97 void onDisconnect(string reason, DisconnectType type) 98 { 99 log("* Disconnected (" ~ reason ~ ")"); 100 connected = false; 101 foreach (command; queuedCommands ~ sentCommands) 102 if (command.handleError) 103 command.handleError("Disconnected from server (" ~ reason ~ ")"); 104 105 queuedCommands = sentCommands = null; 106 replyLines = null; 107 currentReply = null; 108 109 if (handleDisconnect) 110 handleDisconnect(reason, type); 111 } 112 113 void sendLine(string line) 114 { 115 log("< " ~ line); 116 lineAdapter.send(line); 117 } 118 119 /// Reply line buffer. 120 string[] replyLines; 121 122 /// Reply currently being received/processed. 123 Reply* currentReply; 124 125 void onReadData(Data data) 126 { 127 try 128 { 129 auto line = cast(string)data.toHeap(); 130 log("> " ~ line); 131 132 bool replyDone; 133 if (replyLines.length==0) 134 { 135 enforce(sentCommands.length, "No command was queued when the server sent line: " ~ line); 136 auto command = sentCommands.queuePeek(); 137 138 int code = line.split()[0].to!int(); 139 currentReply = code in command.replies; 140 141 // Assume that unknown replies are single-line error messages. 142 replyDone = currentReply ? !currentReply.multiLine : true; 143 replyLines = [line]; 144 } 145 else 146 { 147 if (line == ".") 148 replyDone = true; 149 else 150 { 151 if (line.length && line[0] == '.') 152 line = line[1..$]; 153 replyLines ~= line; 154 } 155 } 156 157 if (replyDone) 158 { 159 auto command = sentCommands.queuePop(); 160 void handleReply() 161 { 162 enforce(currentReply, `Unexpected reply "` ~ replyLines[0] ~ `" to command "` ~ command.lines[0] ~ `"`); 163 currentReply.handleReply(replyLines); 164 } 165 166 if (command.handleError) 167 try 168 handleReply(); 169 catch (Exception e) 170 command.handleError(e.msg); 171 else 172 handleReply(); // In the absence of an error handler, treat command handling exceptions like fatal protocol errors 173 174 replyLines = null; 175 currentReply = null; 176 177 updateQueue(); 178 } 179 } 180 catch (Exception e) 181 { 182 foreach (line; e.toString().splitLines()) 183 log("* " ~ line); 184 conn.disconnect("Unhandled " ~ e.classinfo.name ~ ": " ~ e.msg); 185 } 186 } 187 188 enum PIPELINE_LIMIT = 64; 189 190 void updateQueue() 191 { 192 if (!sentCommands.length && !queuedCommands.length && handleIdle) 193 handleIdle(); 194 else 195 while ( 196 queuedCommands.length // Got something to send? 197 && (sentCommands.length == 0 || sentCommands.queuePeekLast().pipelineable) // Can pipeline? 198 && sentCommands.length < PIPELINE_LIMIT) // Not pipelining too much? 199 send(queuedCommands.queuePop()); 200 } 201 202 void queue(Command command) 203 { 204 queuedCommands.queuePush(command); 205 updateQueue(); 206 } 207 208 void send(Command command) 209 { 210 foreach (line; command.lines) 211 sendLine(line); 212 sentCommands.queuePush(command); 213 } 214 215 public: 216 this(Logger log) 217 { 218 this.log = log; 219 } /// 220 221 /// Connect to the given server. 222 void connect(string server, void delegate() handleConnect=null) 223 { 224 auto tcp = new TcpConnection(); 225 IConnection c = tcp; 226 227 c = lineAdapter = new LineBufferedAdapter(c); 228 229 TimeoutAdapter timer; 230 c = timer = new TimeoutAdapter(c); 231 timer.setIdleTimeout(30.seconds); 232 233 conn = c; 234 235 conn.handleConnect = &onConnect; 236 conn.handleDisconnect = &onDisconnect; 237 conn.handleReadData = &onReadData; 238 239 // Manually place a fake command in the queue 240 // (server automatically sends a greeting when a client connects). 241 sentCommands ~= Command(null, false, [ 242 200:Reply({ 243 connected = true; 244 if (handleConnect) 245 handleConnect(); 246 }), 247 ]); 248 249 log("* Connecting to " ~ server ~ "..."); 250 tcp.connect(server, 119); 251 } 252 253 /// Disconnect. 254 void disconnect(string reason = IConnection.defaultDisconnectReason) 255 { 256 conn.disconnect(reason); 257 } 258 259 /// True when a connection is fully established and a greeting is received. 260 bool connected; 261 262 /// Send a LIST command. 263 void listGroups(void delegate(GroupInfo[] groups) handleGroups, void delegate(string) handleError=null) 264 { 265 queue(Command(["LIST"], true, [ 266 215:Reply((string[] reply) { 267 GroupInfo[] groups = new GroupInfo[reply.length-1]; 268 foreach (i, line; reply[1..$]) 269 { 270 auto info = split(line); 271 enforce(info.length == 4, "Unrecognized LIST reply"); 272 groups[i] = GroupInfo(info[0], to!int(info[1]), to!int(info[2]), info[3][0]); 273 } 274 if (handleGroups) 275 handleGroups(groups); 276 }), 277 ], handleError)); 278 } 279 280 /// Send a GROUP command. 281 void selectGroup(string name, void delegate() handleSuccess=null, void delegate(string) handleError=null) 282 { 283 queue(Command(["GROUP " ~ name], true, [ 284 211:Reply({ 285 if (handleSuccess) 286 handleSuccess(); 287 }), 288 ], handleError)); 289 } 290 291 /// Send a LISTGROUP command. 292 void listGroup(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) 293 { 294 string line = from > 1 ? format("LISTGROUP %s %d-", name, from) : format("LISTGROUP %s", name); 295 296 queue(Command([line], true, [ 297 211:Reply((string[] reply) { 298 if (handleListGroup) 299 handleListGroup(reply[1..$]); 300 }), 301 ], handleError)); 302 } 303 304 /// ditto 305 void listGroup(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroup(name, 1, handleListGroup, handleError); } 306 307 /// Send a XOVER command. 308 void listGroupXover(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) 309 { 310 // TODO: handle GROUP command failure 311 selectGroup(name); 312 queue(Command([format("XOVER %d-", from)], true, [ 313 224:Reply((string[] reply) { 314 auto messages = new string[reply.length-1]; 315 foreach (i, line; reply[1..$]) 316 messages[i] = line.split("\t")[0]; 317 if (handleListGroup) 318 handleListGroup(messages); 319 }), 320 ], handleError)); 321 } 322 323 /// ditto 324 void listGroupXover(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroupXover(name, 1, handleListGroup, handleError); } 325 326 /// Send an ARTICLE command. 327 void getMessage(string numOrID, void delegate(string[] lines, string num, string id) handleMessage, void delegate(string) handleError=null) 328 { 329 queue(Command(["ARTICLE " ~ numOrID], true, [ 330 220:Reply((string[] reply) { 331 auto message = reply[1..$]; 332 auto firstLine = reply[0].split(); 333 if (handleMessage) 334 handleMessage(message, firstLine[1], firstLine[2]); 335 }), 336 ], handleError)); 337 } 338 339 /// Send a DATE command. 340 void getDate(void delegate(string date) handleDate, void delegate(string) handleError=null) 341 { 342 queue(Command(["DATE"], true, [ 343 111:Reply((string reply) { 344 auto date = reply.split()[1]; 345 enforce(date.length == 14, "Invalid DATE format"); 346 if (handleDate) 347 handleDate(date); 348 }), 349 ], handleError)); 350 } 351 352 /// Send a NEWNEWS command. 353 void getNewNews(string wildmat, string dateTime, void delegate(string[] messages) handleNewNews, void delegate(string) handleError=null) 354 { 355 queue(Command(["NEWNEWS " ~ wildmat ~ " " ~ dateTime], true, [ 356 230:Reply((string[] reply) { 357 if (handleNewNews) 358 handleNewNews(reply); 359 }), 360 ], handleError)); 361 } 362 363 /// Send a POST command. 364 void postMessage(string[] lines, void delegate() handlePosted=null, void delegate(string) handleError=null) 365 { 366 queue(Command(["POST"], false, [ 367 340:Reply({ 368 string[] postLines; 369 foreach (line; lines) 370 if (line.startsWith(".")) 371 postLines ~= "." ~ line; 372 else 373 postLines ~= line; 374 postLines ~= "."; 375 376 send(Command(postLines, true, [ 377 240:Reply({ 378 if (handlePosted) 379 handlePosted(); 380 }), 381 ], handleError)); 382 }), 383 ], handleError)); 384 } 385 386 /// Called when the connection is disconnected. 387 void delegate(string reason, DisconnectType type) handleDisconnect; 388 389 /// Called when the command queue is empty. 390 void delegate() handleIdle; 391 }