1 /** 2 * NNTP client supporting a small subset of the protocol. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <vladimir@thecybershadow.net> 12 */ 13 14 module ae.net.nntp.client; 15 16 import std.conv; 17 import std.string; 18 import std.exception; 19 20 import ae.net.asockets; 21 import ae.sys.log; 22 import ae.utils.array; 23 24 import core.time; 25 26 public import ae.net.asockets : DisconnectType; 27 28 struct GroupInfo { string name; int high, low; char mode; } 29 30 class NntpClient 31 { 32 private: 33 /// Socket connection. 34 LineBufferedSocket conn; 35 36 /// Protocol log. 37 Logger log; 38 39 /// One possible reply to an NNTP command 40 static struct Reply 41 { 42 bool multiLine; 43 void delegate(string[] lines) handleReply; 44 45 this(void delegate(string[] lines) handler) 46 { 47 multiLine = true; 48 handleReply = handler; 49 } 50 51 this(void delegate(string line) handler) 52 { 53 multiLine = false; 54 handleReply = (string[] lines) { assert(lines.length==1); handler(lines[0]); }; 55 } 56 57 this(void delegate() handler) 58 { 59 multiLine = false; 60 handleReply = (string[] lines) { assert(lines.length==1); handler(); }; 61 } 62 } 63 64 /// One pipelined command 65 static struct Command 66 { 67 string[] lines; 68 bool pipelineable; 69 Reply[int] replies; 70 void delegate(string error) handleError; 71 } 72 73 /// Commands queued to be sent to the NNTP server. 74 Command[] queuedCommands; 75 76 /// Commands that have been sent to the NNTP server, and are expecting a reply. 77 Command[] sentCommands; 78 79 void onConnect(ClientSocket sender) 80 { 81 log("* Connected, waiting for greeting..."); 82 } 83 84 void onDisconnect(ClientSocket sender, string reason, DisconnectType type) 85 { 86 log("* Disconnected (" ~ reason ~ ")"); 87 foreach (command; queuedCommands ~ sentCommands) 88 if (command.handleError) 89 command.handleError("Disconnected from server (" ~ reason ~ ")"); 90 91 queuedCommands = sentCommands = null; 92 replyLines = null; 93 currentReply = null; 94 95 if (handleDisconnect) 96 handleDisconnect(reason, type); 97 } 98 99 void sendLine(string line) 100 { 101 log("< " ~ line); 102 conn.send(line); 103 } 104 105 /// Reply line buffer. 106 string[] replyLines; 107 108 /// Reply currently being received/processed. 109 Reply* currentReply; 110 111 void onReadLine(LineBufferedSocket s, string line) 112 { 113 try 114 { 115 log("> " ~ line); 116 117 bool replyDone; 118 if (replyLines.length==0) 119 { 120 enforce(sentCommands.length, "No command was queued when the server sent line: " ~ line); 121 auto command = sentCommands.queuePeek(); 122 123 int code = line.split()[0].to!int(); 124 currentReply = code in command.replies; 125 126 // Assume that unknown replies are single-line error messages. 127 replyDone = currentReply ? !currentReply.multiLine : true; 128 replyLines = [line]; 129 } 130 else 131 { 132 if (line == ".") 133 replyDone = true; 134 else 135 { 136 if (line.length && line[0] == '.') 137 line = line[1..$]; 138 replyLines ~= line; 139 } 140 } 141 142 if (replyDone) 143 { 144 auto command = sentCommands.queuePop(); 145 void handleReply() 146 { 147 enforce(currentReply, `Unexpected reply "` ~ replyLines[0] ~ `" to command "` ~ command.lines[0] ~ `"`); 148 currentReply.handleReply(replyLines); 149 } 150 151 if (command.handleError) 152 try 153 handleReply(); 154 catch (Exception e) 155 command.handleError(e.msg); 156 else 157 handleReply(); // In the absence of an error handler, treat command handling exceptions like fatal protocol errors 158 159 replyLines = null; 160 currentReply = null; 161 162 updateQueue(); 163 } 164 } 165 catch (Exception e) 166 conn.disconnect("Unhandled " ~ e.classinfo.name ~ ": " ~ e.msg); 167 } 168 169 enum PIPELINE_LIMIT = 64; 170 171 void updateQueue() 172 { 173 if (!sentCommands.length && !queuedCommands.length && handleIdle) 174 handleIdle(); 175 else 176 while ( 177 queuedCommands.length // Got something to send? 178 && (sentCommands.length == 0 || sentCommands.queuePeekLast().pipelineable) // Can pipeline? 179 && sentCommands.length < PIPELINE_LIMIT) // Not pipelining too much? 180 send(queuedCommands.queuePop()); 181 } 182 183 void queue(Command command) 184 { 185 queuedCommands.queuePush(command); 186 updateQueue(); 187 } 188 189 void send(Command command) 190 { 191 foreach (line; command.lines) 192 sendLine(line); 193 sentCommands.queuePush(command); 194 } 195 196 public: 197 this(Logger log) 198 { 199 this.log = log; 200 } 201 202 void connect(string server, void delegate() handleConnect=null) 203 { 204 conn = new LineBufferedSocket(30.seconds); 205 conn.handleConnect = &onConnect; 206 conn.handleDisconnect = &onDisconnect; 207 conn.handleReadLine = &onReadLine; 208 209 // Manually place a fake command in the queue 210 // (server automatically sends a greeting when a client connects). 211 sentCommands ~= Command(null, false, [ 212 200:Reply({ 213 if (handleConnect) 214 handleConnect(); 215 }), 216 ]); 217 218 log("* Connecting to " ~ server ~ "..."); 219 conn.connect(server, 119); 220 } 221 222 void disconnect() 223 { 224 conn.disconnect(); 225 } 226 227 void listGroups(void delegate(GroupInfo[] groups) handleGroups, void delegate(string) handleError=null) 228 { 229 queue(Command(["LIST"], true, [ 230 215:Reply((string[] reply) { 231 GroupInfo[] groups = new GroupInfo[reply.length-1]; 232 foreach (i, line; reply[1..$]) 233 { 234 auto info = split(line); 235 enforce(info.length == 4, "Unrecognized LIST reply"); 236 groups[i] = GroupInfo(info[0], to!int(info[1]), to!int(info[2]), info[3][0]); 237 } 238 if (handleGroups) 239 handleGroups(groups); 240 }), 241 ], handleError)); 242 } 243 244 void selectGroup(string name, void delegate() handleSuccess=null, void delegate(string) handleError=null) 245 { 246 queue(Command(["GROUP " ~ name], true, [ 247 211:Reply({ 248 if (handleSuccess) 249 handleSuccess(); 250 }), 251 ], handleError)); 252 } 253 254 void listGroup(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) 255 { 256 string line = from > 1 ? format("LISTGROUP %s %d-", name, from) : format("LISTGROUP %s", name); 257 258 queue(Command([line], true, [ 259 211:Reply((string[] reply) { 260 if (handleListGroup) 261 handleListGroup(reply[1..$]); 262 }), 263 ], handleError)); 264 } 265 266 void listGroup(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroup(name, 1, handleListGroup, handleError); } 267 268 void listGroupXover(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) 269 { 270 // TODO: handle GROUP command failure 271 selectGroup(name); 272 queue(Command([format("XOVER %d-", from)], true, [ 273 224:Reply((string[] reply) { 274 auto messages = new string[reply.length-1]; 275 foreach (i, line; reply[1..$]) 276 messages[i] = line.split("\t")[0]; 277 if (handleListGroup) 278 handleListGroup(messages); 279 }), 280 ], handleError)); 281 } 282 283 void listGroupXover(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroupXover(name, 1, handleListGroup, handleError); } 284 285 void getMessage(string numOrID, void delegate(string[] lines, string num, string id) handleMessage, void delegate(string) handleError=null) 286 { 287 queue(Command(["ARTICLE " ~ numOrID], true, [ 288 220:Reply((string[] reply) { 289 auto message = reply[1..$]; 290 auto firstLine = reply[0].split(); 291 if (handleMessage) 292 handleMessage(message, firstLine[1], firstLine[2]); 293 }), 294 ], handleError)); 295 } 296 297 void getDate(void delegate(string date) handleDate, void delegate(string) handleError=null) 298 { 299 queue(Command(["DATE"], true, [ 300 111:Reply((string reply) { 301 auto date = reply.split()[1]; 302 enforce(date.length == 14, "Invalid DATE format"); 303 if (handleDate) 304 handleDate(date); 305 }), 306 ], handleError)); 307 } 308 309 void getNewNews(string wildmat, string dateTime, void delegate(string[] messages) handleNewNews, void delegate(string) handleError=null) 310 { 311 queue(Command(["NEWNEWS " ~ wildmat ~ " " ~ dateTime], true, [ 312 230:Reply((string[] reply) { 313 if (handleNewNews) 314 handleNewNews(reply); 315 }), 316 ], handleError)); 317 } 318 319 void postMessage(string[] lines, void delegate() handlePosted=null, void delegate(string) handleError=null) 320 { 321 queue(Command(["POST"], false, [ 322 340:Reply({ 323 string[] postLines; 324 foreach (line; lines) 325 if (line.startsWith(".")) 326 postLines ~= "." ~ line; 327 else 328 postLines ~= line; 329 postLines ~= "."; 330 331 send(Command(postLines, true, [ 332 240:Reply({ 333 if (handlePosted) 334 handlePosted(); 335 }), 336 ], handleError)); 337 }), 338 ], handleError)); 339 } 340 341 void delegate(string reason, DisconnectType type) handleDisconnect; 342 void delegate() handleIdle; 343 }