1 /** 2 * NNTP listener (periodically poll server for new messages). 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.net.nntp.listener; 15 16 import ae.net.nntp.client; 17 18 import std.datetime; 19 import std.typecons; 20 21 import ae.sys.timing; 22 import ae.sys.log; 23 24 /// `NntpListener` polls with this interval. 25 const POLL_PERIOD = 2.seconds; 26 27 /// NNTP client which polls the server for new messages. 28 class NntpListener 29 { 30 private: 31 typeof(scoped!NntpClient(Logger.init)) client; 32 string server; 33 string lastDate; 34 bool[string] oldMessages; 35 TimerTask pollTimer; 36 bool connected, polling; 37 int queued; 38 39 void reconnect() 40 { 41 assert(!connected); 42 client.connect(server, &onConnect); 43 } 44 45 void schedulePoll() 46 { 47 pollTimer = setTimeout(&poll, POLL_PERIOD); 48 } 49 50 void poll() 51 { 52 pollTimer = null; 53 client.getDate(&onDate); 54 client.getNewNews("*", lastDate[0..8] ~ " " ~ lastDate[8..14] ~ " GMT", &onNewNews); 55 } 56 57 void onConnect() 58 { 59 connected = true; 60 queued = 0; 61 62 if (polling) 63 { 64 if (lastDate) 65 poll(); 66 else 67 client.getDate(&onDate); 68 } 69 } 70 71 void onDisconnect(string reason, DisconnectType type) 72 { 73 connected = false; 74 if (polling) 75 { 76 if (pollTimer && pollTimer.isWaiting()) 77 pollTimer.cancel(); 78 if (type != DisconnectType.requested) 79 setTimeout(&reconnect, 10.seconds); 80 } 81 } 82 83 void onDate(string date) 84 { 85 if (polling) 86 { 87 if (lastDate is null) 88 schedulePoll(); 89 lastDate = date; 90 } 91 } 92 93 void onNewNews(string[] reply) 94 { 95 auto messages = reply[1..$]; 96 97 assert(queued == 0); 98 foreach (message; messages) 99 if (message !in oldMessages) 100 { 101 oldMessages[message] = true; 102 client.getMessage(message, &onMessage); 103 queued++; 104 } 105 if (queued==0) 106 schedulePoll(); 107 } 108 109 void onMessage(string[] lines, string num, string id) 110 { 111 if (handleMessage) 112 handleMessage(lines, num, id); 113 114 if (polling) 115 { 116 queued--; 117 if (queued==0) 118 schedulePoll(); 119 } 120 } 121 122 public: 123 this(Logger log) 124 { 125 client = scoped!NntpClient(log); 126 client.handleDisconnect = &onDisconnect; 127 } /// 128 129 void connect(string server) 130 { 131 this.server = server; 132 reconnect(); 133 } /// 134 135 void disconnect() 136 { 137 client.disconnect(); 138 } /// 139 140 void startPolling(string lastDate = null) 141 { 142 assert(!polling, "Already polling"); 143 polling = true; 144 this.lastDate = lastDate; 145 if (connected) 146 poll(); 147 } /// 148 149 /// Called when NEWNEWS reports a new message. 150 void delegate(string[] lines, string num, string id) handleMessage; 151 }