1 /** 2 * NNTP listener (periodically poll server for new messages). 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <vladimir@thecybershadow.net> 12 */ 13 14 module ae.net.nntp.listener; 15 16 import ae.net.nntp.client; 17 18 import std.datetime; 19 import std.typecons; 20 21 import ae.sys.timing; 22 import ae.sys.log; 23 24 const POLL_PERIOD = 2.seconds; 25 26 class NntpListener 27 { 28 private: 29 typeof(scoped!NntpClient(Logger.init)) client; 30 string server; 31 string lastDate; 32 bool[string] oldMessages; 33 TimerTask pollTimer; 34 bool connected, polling; 35 int queued; 36 37 void reconnect() 38 { 39 assert(!connected); 40 client.connect(server, &onConnect); 41 } 42 43 void schedulePoll() 44 { 45 pollTimer = setTimeout(&poll, POLL_PERIOD); 46 } 47 48 void poll() 49 { 50 pollTimer = null; 51 client.getDate(&onDate); 52 client.getNewNews("*", lastDate[0..8] ~ " " ~ lastDate[8..14] ~ " GMT", &onNewNews); 53 } 54 55 void onConnect() 56 { 57 connected = true; 58 queued = 0; 59 60 if (polling) 61 { 62 if (lastDate) 63 poll(); 64 else 65 client.getDate(&onDate); 66 } 67 } 68 69 void onDisconnect(string reason, DisconnectType type) 70 { 71 connected = false; 72 if (polling) 73 { 74 if (pollTimer && pollTimer.isWaiting()) 75 pollTimer.cancel(); 76 if (type != DisconnectType.requested) 77 setTimeout(&reconnect, 10.seconds); 78 } 79 } 80 81 void onDate(string date) 82 { 83 if (polling) 84 { 85 if (lastDate is null) 86 schedulePoll(); 87 lastDate = date; 88 } 89 } 90 91 void onNewNews(string[] reply) 92 { 93 bool[string] messages; 94 foreach (message; reply[1..$]) 95 messages[message] = true; 96 97 assert(queued == 0); 98 foreach (message, b; messages) 99 if (!(message in oldMessages)) 100 { 101 client.getMessage(message, &onMessage); 102 queued++; 103 } 104 oldMessages = messages; 105 if (queued==0) 106 schedulePoll(); 107 } 108 109 void onMessage(string[] lines, string num, string id) 110 { 111 if (handleMessage) 112 handleMessage(lines, num, id); 113 114 if (polling) 115 { 116 queued--; 117 if (queued==0) 118 schedulePoll(); 119 } 120 } 121 122 public: 123 this(Logger log) 124 { 125 client = scoped!NntpClient(log); 126 client.handleDisconnect = &onDisconnect; 127 } 128 129 void connect(string server) 130 { 131 this.server = server; 132 reconnect(); 133 } 134 135 void disconnect() 136 { 137 client.disconnect(); 138 } 139 140 void startPolling(string lastDate = null) 141 { 142 assert(!polling, "Already polling"); 143 polling = true; 144 this.lastDate = lastDate; 145 if (connected) 146 poll(); 147 } 148 149 void delegate(string[] lines, string num, string id) handleMessage; 150 }