1 /** 2 * NNTP client supporting a small subset of the protocol. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <vladimir@thecybershadow.net> 12 */ 13 14 module ae.net.nntp.client; 15 16 import std.conv; 17 import std.string; 18 import std.exception; 19 20 import ae.net.asockets; 21 import ae.sys.log; 22 import ae.utils.array; 23 24 import core.time; 25 26 public import ae.net.asockets : DisconnectType; 27 28 struct GroupInfo { string name; int high, low; char mode; } 29 30 class NntpClient 31 { 32 private: 33 /// Socket connection. 34 LineBufferedAdapter lineAdapter; 35 IConnection conn; 36 37 /// Protocol log. 38 Logger log; 39 40 /// One possible reply to an NNTP command 41 static struct Reply 42 { 43 bool multiLine; 44 void delegate(string[] lines) handleReply; 45 46 this(void delegate(string[] lines) handler) 47 { 48 multiLine = true; 49 handleReply = handler; 50 } 51 52 this(void delegate(string line) handler) 53 { 54 multiLine = false; 55 handleReply = (string[] lines) { assert(lines.length==1); handler(lines[0]); }; 56 } 57 58 this(void delegate() handler) 59 { 60 multiLine = false; 61 handleReply = (string[] lines) { assert(lines.length==1); handler(); }; 62 } 63 } 64 65 /// One pipelined command 66 static struct Command 67 { 68 string[] lines; 69 bool pipelineable; 70 Reply[int] replies; 71 void delegate(string error) handleError; 72 } 73 74 /// Commands queued to be sent to the NNTP server. 75 Command[] queuedCommands; 76 77 /// Commands that have been sent to the NNTP server, and are expecting a reply. 78 Command[] sentCommands; 79 80 void onConnect() 81 { 82 log("* Connected, waiting for greeting..."); 83 } 84 85 void onDisconnect(string reason, DisconnectType type) 86 { 87 log("* Disconnected (" ~ reason ~ ")"); 88 connected = false; 89 foreach (command; queuedCommands ~ sentCommands) 90 if (command.handleError) 91 command.handleError("Disconnected from server (" ~ reason ~ ")"); 92 93 queuedCommands = sentCommands = null; 94 replyLines = null; 95 currentReply = null; 96 97 if (handleDisconnect) 98 handleDisconnect(reason, type); 99 } 100 101 void sendLine(string line) 102 { 103 log("< " ~ line); 104 lineAdapter.send(line); 105 } 106 107 /// Reply line buffer. 108 string[] replyLines; 109 110 /// Reply currently being received/processed. 111 Reply* currentReply; 112 113 void onReadData(Data data) 114 { 115 try 116 { 117 auto line = cast(string)data.toHeap(); 118 log("> " ~ line); 119 120 bool replyDone; 121 if (replyLines.length==0) 122 { 123 enforce(sentCommands.length, "No command was queued when the server sent line: " ~ line); 124 auto command = sentCommands.queuePeek(); 125 126 int code = line.split()[0].to!int(); 127 currentReply = code in command.replies; 128 129 // Assume that unknown replies are single-line error messages. 130 replyDone = currentReply ? !currentReply.multiLine : true; 131 replyLines = [line]; 132 } 133 else 134 { 135 if (line == ".") 136 replyDone = true; 137 else 138 { 139 if (line.length && line[0] == '.') 140 line = line[1..$]; 141 replyLines ~= line; 142 } 143 } 144 145 if (replyDone) 146 { 147 auto command = sentCommands.queuePop(); 148 void handleReply() 149 { 150 enforce(currentReply, `Unexpected reply "` ~ replyLines[0] ~ `" to command "` ~ command.lines[0] ~ `"`); 151 currentReply.handleReply(replyLines); 152 } 153 154 if (command.handleError) 155 try 156 handleReply(); 157 catch (Exception e) 158 command.handleError(e.msg); 159 else 160 handleReply(); // In the absence of an error handler, treat command handling exceptions like fatal protocol errors 161 162 replyLines = null; 163 currentReply = null; 164 165 updateQueue(); 166 } 167 } 168 catch (Exception e) 169 { 170 foreach (line; e.toString().splitLines()) 171 log("* " ~ line); 172 conn.disconnect("Unhandled " ~ e.classinfo.name ~ ": " ~ e.msg); 173 } 174 } 175 176 enum PIPELINE_LIMIT = 64; 177 178 void updateQueue() 179 { 180 if (!sentCommands.length && !queuedCommands.length && handleIdle) 181 handleIdle(); 182 else 183 while ( 184 queuedCommands.length // Got something to send? 185 && (sentCommands.length == 0 || sentCommands.queuePeekLast().pipelineable) // Can pipeline? 186 && sentCommands.length < PIPELINE_LIMIT) // Not pipelining too much? 187 send(queuedCommands.queuePop()); 188 } 189 190 void queue(Command command) 191 { 192 queuedCommands.queuePush(command); 193 updateQueue(); 194 } 195 196 void send(Command command) 197 { 198 foreach (line; command.lines) 199 sendLine(line); 200 sentCommands.queuePush(command); 201 } 202 203 public: 204 this(Logger log) 205 { 206 this.log = log; 207 } 208 209 void connect(string server, void delegate() handleConnect=null) 210 { 211 auto tcp = new TcpConnection(); 212 IConnection c = tcp; 213 214 c = lineAdapter = new LineBufferedAdapter(c); 215 216 TimeoutAdapter timer; 217 c = timer = new TimeoutAdapter(c); 218 timer.setIdleTimeout(30.seconds); 219 220 conn = c; 221 222 conn.handleConnect = &onConnect; 223 conn.handleDisconnect = &onDisconnect; 224 conn.handleReadData = &onReadData; 225 226 // Manually place a fake command in the queue 227 // (server automatically sends a greeting when a client connects). 228 sentCommands ~= Command(null, false, [ 229 200:Reply({ 230 connected = true; 231 if (handleConnect) 232 handleConnect(); 233 }), 234 ]); 235 236 log("* Connecting to " ~ server ~ "..."); 237 tcp.connect(server, 119); 238 } 239 240 void disconnect(string reason = IConnection.defaultDisconnectReason) 241 { 242 conn.disconnect(reason); 243 } 244 245 bool connected; 246 247 void listGroups(void delegate(GroupInfo[] groups) handleGroups, void delegate(string) handleError=null) 248 { 249 queue(Command(["LIST"], true, [ 250 215:Reply((string[] reply) { 251 GroupInfo[] groups = new GroupInfo[reply.length-1]; 252 foreach (i, line; reply[1..$]) 253 { 254 auto info = split(line); 255 enforce(info.length == 4, "Unrecognized LIST reply"); 256 groups[i] = GroupInfo(info[0], to!int(info[1]), to!int(info[2]), info[3][0]); 257 } 258 if (handleGroups) 259 handleGroups(groups); 260 }), 261 ], handleError)); 262 } 263 264 void selectGroup(string name, void delegate() handleSuccess=null, void delegate(string) handleError=null) 265 { 266 queue(Command(["GROUP " ~ name], true, [ 267 211:Reply({ 268 if (handleSuccess) 269 handleSuccess(); 270 }), 271 ], handleError)); 272 } 273 274 void listGroup(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) 275 { 276 string line = from > 1 ? format("LISTGROUP %s %d-", name, from) : format("LISTGROUP %s", name); 277 278 queue(Command([line], true, [ 279 211:Reply((string[] reply) { 280 if (handleListGroup) 281 handleListGroup(reply[1..$]); 282 }), 283 ], handleError)); 284 } 285 286 void listGroup(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroup(name, 1, handleListGroup, handleError); } 287 288 void listGroupXover(string name, int from/* = 1*/, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) 289 { 290 // TODO: handle GROUP command failure 291 selectGroup(name); 292 queue(Command([format("XOVER %d-", from)], true, [ 293 224:Reply((string[] reply) { 294 auto messages = new string[reply.length-1]; 295 foreach (i, line; reply[1..$]) 296 messages[i] = line.split("\t")[0]; 297 if (handleListGroup) 298 handleListGroup(messages); 299 }), 300 ], handleError)); 301 } 302 303 void listGroupXover(string name, void delegate(string[] messages) handleListGroup, void delegate(string) handleError=null) { listGroupXover(name, 1, handleListGroup, handleError); } 304 305 void getMessage(string numOrID, void delegate(string[] lines, string num, string id) handleMessage, void delegate(string) handleError=null) 306 { 307 queue(Command(["ARTICLE " ~ numOrID], true, [ 308 220:Reply((string[] reply) { 309 auto message = reply[1..$]; 310 auto firstLine = reply[0].split(); 311 if (handleMessage) 312 handleMessage(message, firstLine[1], firstLine[2]); 313 }), 314 ], handleError)); 315 } 316 317 void getDate(void delegate(string date) handleDate, void delegate(string) handleError=null) 318 { 319 queue(Command(["DATE"], true, [ 320 111:Reply((string reply) { 321 auto date = reply.split()[1]; 322 enforce(date.length == 14, "Invalid DATE format"); 323 if (handleDate) 324 handleDate(date); 325 }), 326 ], handleError)); 327 } 328 329 void getNewNews(string wildmat, string dateTime, void delegate(string[] messages) handleNewNews, void delegate(string) handleError=null) 330 { 331 queue(Command(["NEWNEWS " ~ wildmat ~ " " ~ dateTime], true, [ 332 230:Reply((string[] reply) { 333 if (handleNewNews) 334 handleNewNews(reply); 335 }), 336 ], handleError)); 337 } 338 339 void postMessage(string[] lines, void delegate() handlePosted=null, void delegate(string) handleError=null) 340 { 341 queue(Command(["POST"], false, [ 342 340:Reply({ 343 string[] postLines; 344 foreach (line; lines) 345 if (line.startsWith(".")) 346 postLines ~= "." ~ line; 347 else 348 postLines ~= line; 349 postLines ~= "."; 350 351 send(Command(postLines, true, [ 352 240:Reply({ 353 if (handlePosted) 354 handlePosted(); 355 }), 356 ], handleError)); 357 }), 358 ], handleError)); 359 } 360 361 void delegate(string reason, DisconnectType type) handleDisconnect; 362 void delegate() handleIdle; 363 }