1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <vladimir@thecybershadow.net>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.math;
21 public import ae.sys.data;
22 
23 import std.exception;
24 import std.socket;
25 public import std.socket : Address, Socket;
26 
27 debug(ASOCKETS) import std.stdio;
28 debug(PRINTDATA) import ae.utils.text : hexDump;
29 private import std.conv : to;
30 
31 import std.random : randomShuffle;
32 
33 // http://d.puremagic.com/issues/show_bug.cgi?id=7016
34 static import ae.utils.array;
35 
36 version(LIBEV)
37 {
38 	import deimos.ev;
39 	pragma(lib, "ev");
40 }
41 
42 version(Windows)
43 {
44 	import core.sys.windows.windows : Sleep;
45 	enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
46 }
47 else
48 	enum USE_SLEEP = false;
49 
50 /// Flags that determine socket wake-up events.
51 int eventCounter;
52 
53 version(LIBEV)
54 {
55 	// Watchers are a GenericSocket field (as declared in SocketMixin).
56 	// Use one watcher per read and write event.
57 	// Start/stop those as higher-level code declares interest in those events.
58 	// Use the "data" ev_io field to store the parent GenericSocket address.
59 	// Also use the "data" field as a flag to indicate whether the watcher is active
60 	// (data is null when the watcher is stopped).
61 
62 	struct SocketManager
63 	{
64 	private:
65 		size_t count;
66 
67 		extern(C)
68 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
69 		{
70 			eventCounter++;
71 			auto socket = cast(GenericSocket)w.data;
72 			assert(socket, "libev event fired on stopped watcher");
73 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", cast(void*)socket, revents);
74 
75 			if (revents & EV_READ)
76 				socket.onReadable();
77 			else
78 			if (revents & EV_WRITE)
79 				socket.onWritable();
80 			else
81 				assert(false, "Unknown event fired from libev");
82 
83 			// TODO? Need to get proper SocketManager instance to call updateTimer on
84 			socketManager.updateTimer(false);
85 		}
86 
87 		ev_timer evTimer;
88 		MonoTime lastNextEvent = MonoTime.max;
89 
90 		extern(C)
91 		static void timerCallback(ev_loop_t* l, ev_timer* w, int revents)
92 		{
93 			eventCounter++;
94 			debug (ASOCKETS) writefln("Timer callback called.");
95 			mainTimer.prod();
96 
97 			socketManager.updateTimer(true);
98 			debug (ASOCKETS) writefln("Timer callback exiting.");
99 		}
100 
101 		void updateTimer(bool force)
102 		{
103 			auto nextEvent = mainTimer.getNextEvent();
104 			if (force || lastNextEvent != nextEvent)
105 			{
106 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
107 				if (nextEvent == MonoTime.max) // Stopping
108 				{
109 					if (lastNextEvent != MonoTime.max)
110 						ev_timer_stop(ev_default_loop(0), &evTimer);
111 				}
112 				else
113 				{
114 					auto remaining = mainTimer.getRemainingTime();
115 					while (remaining <= Duration.zero)
116 					{
117 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
118 						mainTimer.prod();
119 						remaining = mainTimer.getRemainingTime();
120 					}
121 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
122 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
123 					if (lastNextEvent == MonoTime.max) // Starting
124 					{
125 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
126 						ev_timer_start(ev_default_loop(0), &evTimer);
127 					}
128 					else // Adjusting
129 					{
130 						evTimer.repeat = tstamp;
131 						ev_timer_again(ev_default_loop(0), &evTimer);
132 					}
133 				}
134 				lastNextEvent = nextEvent;
135 			}
136 		}
137 
138 		/// Register a socket with the manager.
139 		void register(GenericSocket socket)
140 		{
141 			debug (ASOCKETS) writefln("Registering %s", cast(void*)socket);
142 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
143 			auto fd = socket.conn.handle;
144 			assert(fd, "Must have fd before socket registration");
145 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
146 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
147 			count++;
148 		}
149 
150 		/// Unregister a socket with the manager.
151 		void unregister(GenericSocket socket)
152 		{
153 			debug (ASOCKETS) writefln("Unregistering %s", cast(void*)socket);
154 			socket.notifyRead  = false;
155 			socket.notifyWrite = false;
156 			count--;
157 		}
158 
159 	public:
160 		size_t size()
161 		{
162 			return count;
163 		}
164 
165 		/// Loop continuously until no sockets are left.
166 		void loop()
167 		{
168 			auto evLoop = ev_default_loop(0);
169 			enforce(evLoop, "libev initialization failure");
170 
171 			updateTimer(true);
172 			debug (ASOCKETS) writeln("ev_run");
173 			ev_run(ev_default_loop(0), 0);
174 		}
175 	}
176 
177 	private mixin template SocketMixin()
178 	{
179 		private ev_io evRead, evWrite;
180 
181 		private void setWatcherState(ref ev_io ev, bool newValue, int event)
182 		{
183 			if (!conn)
184 			{
185 				// Can happen when setting delegates before connecting.
186 				return;
187 			}
188 
189 			if (newValue && !ev.data)
190 			{
191 				// Start
192 				ev.data = cast(void*)this;
193 				ev_io_start(ev_default_loop(0), &ev);
194 			}
195 			else
196 			if (!newValue && ev.data)
197 			{
198 				// Stop
199 				assert(ev.data is cast(void*)this);
200 				ev.data = null;
201 				ev_io_stop(ev_default_loop(0), &ev);
202 			}
203 		}
204 
205 		/// Interested in read notifications (onReadable)?
206 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
207 		/// Interested in write notifications (onWritable)?
208 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
209 
210 		debug ~this()
211 		{
212 			// The LIBEV SocketManager holds no references to registered sockets.
213 			// TODO: Add a doubly-linked list?
214 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
215 		}
216 	}
217 }
218 else // Use select
219 {
220 	struct SocketManager
221 	{
222 	private:
223 		enum FD_SETSIZE = 1024;
224 
225 		/// List of all sockets to poll.
226 		GenericSocket[] sockets;
227 
228 		/// Register a socket with the manager.
229 		void register(GenericSocket conn)
230 		{
231 			debug (ASOCKETS) writefln("Registering %s (%d total)", cast(void*)conn, sockets.length + 1);
232 			sockets ~= conn;
233 		}
234 
235 		/// Unregister a socket with the manager.
236 		void unregister(GenericSocket conn)
237 		{
238 			debug (ASOCKETS) writefln("Unregistering %s (%d total)", cast(void*)conn, sockets.length - 1);
239 			foreach (size_t i, GenericSocket j; sockets)
240 				if (j is conn)
241 				{
242 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
243 					return;
244 				}
245 			assert(false, "Socket not registered");
246 		}
247 
248 		void delegate()[] idleHandlers;
249 
250 	public:
251 		size_t size()
252 		{
253 			return sockets.length;
254 		}
255 
256 		/// Loop continuously until no sockets are left.
257 		void loop()
258 		{
259 			debug (ASOCKETS) writeln("Starting event loop.");
260 
261 			SocketSet readset, writeset, errorset;
262 			size_t sockcount;
263 			readset  = new SocketSet(FD_SETSIZE);
264 			writeset = new SocketSet(FD_SETSIZE);
265 			errorset = new SocketSet(FD_SETSIZE);
266 			while (true)
267 			{
268 				// SocketSet.add() doesn't have an overflow check, so we need to do it manually
269 				// this is just a debug check, the actual check is done when registering sockets
270 				// TODO: this is inaccurate on POSIX, "max" means maximum fd value
271 				if (sockets.length > readset.max || sockets.length > writeset.max || sockets.length > errorset.max)
272 				{
273 					readset  = new SocketSet(to!uint(sockets.length*2));
274 					writeset = new SocketSet(to!uint(sockets.length*2));
275 					errorset = new SocketSet(to!uint(sockets.length*2));
276 				}
277 				else
278 				{
279 					readset.reset();
280 					writeset.reset();
281 					errorset.reset();
282 				}
283 
284 				sockcount = 0;
285 				bool haveActive;
286 				debug (ASOCKETS) writeln("Populating sets");
287 				foreach (GenericSocket conn; sockets)
288 				{
289 					if (!conn.socket)
290 						continue;
291 					sockcount++;
292 					if (!conn.daemon)
293 						haveActive = true;
294 
295 					debug (ASOCKETS) writef("\t%s:", cast(void*)conn);
296 					if (conn.notifyRead)
297 					{
298 						readset.add(conn.socket);
299 						debug (ASOCKETS) write(" READ");
300 					}
301 					if (conn.notifyWrite)
302 					{
303 						writeset.add(conn.socket);
304 						debug (ASOCKETS) write(" WRITE");
305 					}
306 					errorset.add(conn.socket);
307 					debug (ASOCKETS) writeln();
308 				}
309 				debug (ASOCKETS) writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...",
310 					sockcount,
311 					mainTimer.isWaiting() ? "with" : "no",
312 					idleHandlers.length,
313 				);
314 				if (!haveActive && !mainTimer.isWaiting())
315 				{
316 					debug (ASOCKETS) writeln("No more sockets or timer events, exiting loop.");
317 					break;
318 				}
319 
320 				int events;
321 				if (idleHandlers.length)
322 				{
323 					if (sockcount==0)
324 						events = 0;
325 					else
326 						events = Socket.select(readset, writeset, errorset, 0.seconds);
327 				}
328 				else
329 				if (USE_SLEEP && sockcount==0)
330 				{
331 					version(Windows)
332 					{
333 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
334 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
335 						if (duration <= 0)
336 							duration = 1; // Avoid busywait
337 						else
338 						if (duration > int.max)
339 							duration = int.max;
340 						Sleep(cast(int)duration);
341 						events = 0;
342 					}
343 					else
344 						assert(0);
345 				}
346 				else
347 				if (mainTimer.isWaiting())
348 					events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime());
349 				else
350 					events = Socket.select(readset, writeset, errorset);
351 
352 				debug (ASOCKETS) writefln("%d events fired.", events);
353 
354 				if (events > 0)
355 				{
356 					foreach (GenericSocket conn; sockets)
357 					{
358 						if (!conn.socket)
359 						{
360 							debug (ASOCKETS) writefln("\t%s is unset", cast(void*)conn);
361 							continue;
362 						}
363 						if (readset.isSet(conn.socket))
364 						{
365 							debug (ASOCKETS) writefln("\t%s is readable", cast(void*)conn);
366 							conn.onReadable();
367 						}
368 
369 						if (!conn.socket)
370 						{
371 							debug (ASOCKETS) writefln("\t%s is unset", cast(void*)conn);
372 							continue;
373 						}
374 						if (writeset.isSet(conn.socket))
375 						{
376 							debug (ASOCKETS) writefln("\t%s is writable", cast(void*)conn);
377 							conn.onWritable();
378 						}
379 
380 						if (!conn.socket)
381 						{
382 							debug (ASOCKETS) writefln("\t%s is unset", cast(void*)conn);
383 							continue;
384 						}
385 						if (errorset.isSet(conn.socket))
386 						{
387 							debug (ASOCKETS) writefln("\t%s is errored", cast(void*)conn);
388 							conn.onError("select() error: " ~ conn.socket.getErrorText());
389 						}
390 					}
391 				}
392 				else
393 				if (idleHandlers.length)
394 				{
395 					import ae.utils.array;
396 					auto handler = idleHandlers.shift();
397 
398 					// Rotate the idle handler queue before running it,
399 					// in case the handler unregisters itself.
400 					idleHandlers ~= handler;
401 
402 					handler();
403 				}
404 
405 				// Timers may invalidate our select results, so fire them after processing the latter
406 				mainTimer.prod();
407 
408 				eventCounter++;
409 			}
410 		}
411 	}
412 
413 	// Use UFCS to allow removeIdleHandler to have a predicate with context
414 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
415 	{
416 		foreach (i, idleHandler; socketManager.idleHandlers)
417 			assert(handler !is idleHandler);
418 
419 		socketManager.idleHandlers ~= handler;
420 	}
421 
422 	static bool isFun(T)(T a, T b) { return a is b; }
423 	void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args)
424 	{
425 		foreach (i, idleHandler; socketManager.idleHandlers)
426 			if (pred(idleHandler, args))
427 			{
428 				import std.algorithm;
429 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
430 				return;
431 			}
432 		assert(false, "No such idle handler");
433 	}
434 
435 	private mixin template SocketMixin()
436 	{
437 		/// Interested in read notifications (onReadable)?
438 		bool notifyRead;
439 		/// Interested in write notifications (onWritable)?
440 		bool notifyWrite;
441 	}
442 }
443 
444 enum DisconnectType
445 {
446 	Requested, // initiated by the application
447 	Graceful,  // peer gracefully closed the connection
448 	Error      // abnormal network condition
449 }
450 
451 /// General methods for an asynchronous socket
452 private abstract class GenericSocket
453 {
454 	/// Declares notifyRead and notifyWrite.
455 	mixin SocketMixin;
456 
457 protected:
458 	/// The socket this class wraps.
459 	Socket conn;
460 
461 protected:
462 	/// Retrieve the socket class this class wraps.
463 	@property final Socket socket()
464 	{
465 		return conn;
466 	}
467 
468 	void onReadable()
469 	{
470 	}
471 
472 	void onWritable()
473 	{
474 	}
475 
476 	void onError(string reason)
477 	{
478 	}
479 
480 public:
481 	/// allow getting the address of connections that are already disconnected
482 	private Address cachedLocalAddress, cachedRemoteAddress;
483 
484 	/// Don't block the process from exiting.
485 	/// TODO: Not implemented with libev
486 	bool daemon;
487 
488 	final @property Address localAddress()
489 	{
490 		if (cachedLocalAddress !is null)
491 			return cachedLocalAddress;
492 		else
493 		if (conn is null)
494 			return null;
495 		else
496 			return cachedLocalAddress = conn.localAddress();
497 	}
498 
499 	final @property Address remoteAddress()
500 	{
501 		if (cachedRemoteAddress !is null)
502 			return cachedRemoteAddress;
503 		else
504 		if (conn is null)
505 			return null;
506 		else
507 			return cachedRemoteAddress = conn.remoteAddress();
508 	}
509 
510 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
511 	{
512 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
513 		if (enabled)
514 		{
515 			try
516 				conn.setKeepAlive(time, interval);
517 			catch (SocketFeatureException)
518 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
519 		}
520 		else
521 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
522 	}
523 }
524 
525 
526 /// An asynchronous client socket.
527 class ClientSocket : GenericSocket
528 {
529 private:
530 	TimerTask idleTask;
531 
532 	/// Blocks of data larger than this value are passed as unmanaged memory
533 	/// (in Data objects). Blocks smaller than this value will be reallocated
534 	/// on the managed heap. The disadvantage of placing large objects on the
535 	/// managed heap is false pointers; the disadvantage of using Data for
536 	/// small objects is wasted slack space due to the page size alignment
537 	/// requirement.
538 	enum UNMANAGED_THRESHOLD = 256;
539 
540 	/// Queue of addresses to try connecting to.
541 	Address[] addressQueue;
542 
543 public:
544 	/// Whether the socket is connected.
545 	bool connected;
546 
547 	enum MAX_PRIORITY = 4;
548 	enum DEFAULT_PRIORITY = 2;
549 
550 protected:
551 	/// The send buffers.
552 	Data[][MAX_PRIORITY+1] outQueue;
553 	/// Whether the first item from each queue has been partially sent (and thus can't be cancelled).
554 	bool[MAX_PRIORITY+1] partiallySent;
555 	/// Whether a disconnect is pending after all data is sent
556 	bool disconnecting;
557 
558 	/// Constructor used by a ServerSocket for new connections
559 	this(Socket conn)
560 	{
561 		this();
562 		this.conn = conn;
563 		connected = !(conn is null);
564 		if (connected)
565 			socketManager.register(this);
566 		updateFlags();
567 	}
568 
569 	final void updateFlags()
570 	{
571 		if (!connected)
572 			notifyWrite = true;
573 		else
574 			notifyWrite = writePending;
575 
576 		notifyRead = connected && handleReadData;
577 	}
578 
579 	/// Called when a socket is readable.
580 	override void onReadable()
581 	{
582 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
583 		static ubyte[0x10000] inBuffer;
584 		auto received = conn.receive(inBuffer);
585 
586 		if (received == 0)
587 			return disconnect("Connection closed", DisconnectType.Graceful);
588 
589 		if (received == Socket.ERROR)
590 		{
591 			if (wouldHaveBlocked)
592 			{
593 				debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", cast(void*)this);
594 				return;
595 			}
596 			else
597 				onError("recv() error: " ~ lastSocketError);
598 		}
599 		else
600 		{
601 			debug (PRINTDATA)
602 			{
603 				std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress);
604 				std.stdio.write(hexDump(inBuffer[0 .. received]));
605 				std.stdio.stdout.flush();
606 			}
607 
608 			if (disconnecting)
609 			{
610 				debug (ASOCKETS) writefln("\t\t%s: Discarding received data because we are disconnecting", cast(void*)this);
611 			}
612 			else
613 			if (!handleReadData)
614 			{
615 				debug (ASOCKETS) writefln("\t\t%s: Discarding received data because there is no data handler", cast(void*)this);
616 			}
617 			else
618 			{
619 				// Currently, unlike the D1 version of this module,
620 				// we will always reallocate read network data.
621 				// This disfavours code which doesn't need to store
622 				// read data after processing it, but otherwise
623 				// makes things simpler and safer all around.
624 
625 				if (received < UNMANAGED_THRESHOLD)
626 				{
627 					// Copy to the managed heap
628 					_handleReadData(this, Data(inBuffer[0 .. received].dup));
629 				}
630 				else
631 				{
632 					// Copy to unmanaged memory
633 					_handleReadData(this, Data(inBuffer[0 .. received], true));
634 				}
635 			}
636 		}
637 	}
638 
639 	/// Called when a socket is writable.
640 	override void onWritable()
641 	{
642 		scope(success) updateFlags();
643 
644 		if (!connected)
645 		{
646 			connected = true;
647 			//debug writefln("[%s] Connected", remoteAddress);
648 			try
649 				setKeepAlive();
650 			catch (Exception e)
651 				return disconnect(e.msg, DisconnectType.Error);
652 			if (idleTask !is null)
653 				mainTimer.add(idleTask);
654 			if (handleConnect)
655 				handleConnect(this);
656 			return;
657 		}
658 		//debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
659 
660 		foreach (priority, ref queue; outQueue)
661 			while (queue.length)
662 			{
663 				auto pdata = queue.ptr; // pointer to first data
664 
665 				ptrdiff_t sent = 0;
666 				if (pdata.length)
667 				{
668 					sent = conn.send(pdata.contents);
669 					debug (ASOCKETS) writefln("\t\t%s: sent %d/%d bytes", cast(void*)this, sent, pdata.length);
670 				}
671 				else
672 				{
673 					debug (ASOCKETS) writefln("\t\t%s: empty Data object", cast(void*)this);
674 				}
675 
676 				if (sent == Socket.ERROR)
677 				{
678 					if (wouldHaveBlocked())
679 						return;
680 					else
681 						return onError("send() error: " ~ lastSocketError);
682 				}
683 				else
684 				if (sent < pdata.length)
685 				{
686 					if (sent > 0)
687 					{
688 						*pdata = (*pdata)[sent..pdata.length];
689 						partiallySent[priority] = true;
690 					}
691 					return;
692 				}
693 				else
694 				{
695 					assert(sent == pdata.length);
696 					//debug writefln("[%s] Sent data:", remoteAddress);
697 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
698 					pdata.clear();
699 					queue = queue[1..$];
700 					partiallySent[priority] = false;
701 					if (queue.length == 0)
702 						queue = null;
703 				}
704 			}
705 
706 		// outQueue is now empty
707 		if (handleBufferFlushed)
708 			handleBufferFlushed(this);
709 		if (disconnecting)
710 			disconnect("Delayed disconnect - buffer flushed", DisconnectType.Requested);
711 	}
712 
713 	/// Called when an error occurs on the socket.
714 	override void onError(string reason)
715 	{
716 		if (!connected && addressQueue.length)
717 			return tryNextAddress();
718 		disconnect("Socket error: " ~ reason, DisconnectType.Error);
719 	}
720 
721 	final void onTask_Idle(Timer timer, TimerTask task)
722 	{
723 		if (!connected)
724 			return;
725 
726 		if (disconnecting)
727 			return disconnect("Delayed disconnect - time-out", DisconnectType.Error);
728 
729 		if (handleIdleTimeout)
730 		{
731 			handleIdleTimeout(this);
732 			if (connected && !disconnecting)
733 			{
734 				assert(!idleTask.isWaiting());
735 				mainTimer.add(idleTask);
736 			}
737 		}
738 		else
739 			disconnect("Time-out", DisconnectType.Error);
740 	}
741 
742 	final void tryNextAddress()
743 	{
744 		auto address = addressQueue[0];
745 		addressQueue = addressQueue[1..$];
746 
747 		try
748 		{
749 			conn = new Socket(address.addressFamily(), SocketType.STREAM, ProtocolType.TCP);
750 			conn.blocking = false;
751 
752 			socketManager.register(this);
753 			updateFlags();
754 			debug (ASOCKETS) writefln("Attempting connection to %s", address.toString());
755 			conn.connect(address);
756 		}
757 		catch (SocketException e)
758 			return onError("Connect error: " ~ e.msg);
759 	}
760 
761 public:
762 	/// Default constructor
763 	this()
764 	{
765 		debug (ASOCKETS) writefln("New ClientSocket @ %s", cast(void*)this);
766 	}
767 
768 	/// Start establishing a connection.
769 	final void connect(string host, ushort port)
770 	{
771 		if (conn || connected)
772 			throw new Exception("Socket object is already connected");
773 
774 		try
775 		{
776 			addressQueue = getAddress(host, port);
777 			enforce(addressQueue.length, "No addresses found");
778 			if (addressQueue.length > 1)
779 				randomShuffle(addressQueue);
780 		}
781 		catch (SocketException e)
782 			return onError("Lookup error: " ~ e.msg);
783 
784 		tryNextAddress();
785 	}
786 
787 	static const DefaultDisconnectReason = "Software closed the connection";
788 
789 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
790 	void disconnect(string reason = DefaultDisconnectReason, DisconnectType type = DisconnectType.Requested)
791 	{
792 		scope(success) updateFlags();
793 
794 		if (writePending)
795 		{
796 			if (type==DisconnectType.Requested)
797 			{
798 				assert(conn, "Attempting to disconnect on an uninitialized socket");
799 				// queue disconnect after all data is sent
800 				debug (ASOCKETS) writefln("[%s] Queueing disconnect: %s", remoteAddress, reason);
801 				assert(!disconnecting, "Attempting to disconnect on a disconnecting socket");
802 				disconnecting = true;
803 				setIdleTimeout(30.seconds);
804 				return;
805 			}
806 			else
807 				discardQueues();
808 		}
809 
810 		debug (ASOCKETS) writefln("[%s] Disconnecting: %s", remoteAddress, reason);
811 		if (conn)
812 		{
813 			socketManager.unregister(this);
814 			conn.close();
815 			conn = null;
816 			outQueue[] = null;
817 			connected = disconnecting = false;
818 		}
819 		else
820 		{
821 			assert(!connected);
822 		}
823 		if (idleTask && idleTask.isWaiting())
824 			idleTask.cancel();
825 		if (handleDisconnect)
826 			handleDisconnect(this, reason, type);
827 	}
828 
829 	/// Append data to the send buffer.
830 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
831 	{
832 		Data[1] data;
833 		data[0] = datum;
834 		send(data);
835 		data[] = Data.init;
836 	}
837 
838 	/// ditto
839 	void send(Data[] data, int priority = DEFAULT_PRIORITY)
840 	{
841 		assert(connected, "Attempting to send on a disconnected socket");
842 		assert(!disconnecting, "Attempting to send on a disconnecting socket");
843 		outQueue[priority] ~= data;
844 		notifyWrite = true; // Fast updateFlags()
845 
846 		debug (PRINTDATA)
847 		{
848 			std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress);
849 			foreach (datum; data)
850 				if (datum.length)
851 					std.stdio.write(hexDump(datum.contents));
852 				else
853 					std.stdio.writeln("(empty Data)");
854 			std.stdio.stdout.flush();
855 		}
856 	}
857 
858 	final void clearQueue(int priority)
859 	{
860 		if (partiallySent[priority])
861 		{
862 			assert(outQueue[priority].length > 0);
863 			outQueue[priority] = outQueue[priority][0..1];
864 		}
865 		else
866 			outQueue[priority] = null;
867 		updateFlags();
868 	}
869 
870 	/// Clears all queues, even partially sent content.
871 	private final void discardQueues()
872 	{
873 		foreach (priority; 0..MAX_PRIORITY+1)
874 		{
875 			outQueue[priority] = null;
876 			partiallySent[priority] = false;
877 		}
878 		updateFlags();
879 	}
880 
881 	@property
882 	final bool writePending()
883 	{
884 		foreach (queue; outQueue)
885 			if (queue.length)
886 				return true;
887 		return false;
888 	}
889 
890 	final bool queuePresent(int priority)
891 	{
892 		if (partiallySent[priority])
893 		{
894 			assert(outQueue[priority].length > 0);
895 			return outQueue[priority].length > 1;
896 		}
897 		else
898 			return outQueue[priority].length > 0;
899 	}
900 
901 	void cancelIdleTimeout()
902 	{
903 		assert(idleTask !is null);
904 		assert(idleTask.isWaiting());
905 		idleTask.cancel();
906 	}
907 
908 	void resumeIdleTimeout()
909 	{
910 		assert(connected);
911 		assert(idleTask !is null);
912 		assert(!idleTask.isWaiting());
913 		mainTimer.add(idleTask);
914 	}
915 
916 	final void setIdleTimeout(Duration duration)
917 	{
918 		assert(duration > Duration.zero);
919 		if (idleTask is null)
920 		{
921 			idleTask = new TimerTask(duration);
922 			idleTask.handleTask = &onTask_Idle;
923 		}
924 		else
925 		{
926 			if (idleTask.isWaiting())
927 				idleTask.cancel();
928 			idleTask.delay = duration;
929 		}
930 		if (connected)
931 			mainTimer.add(idleTask);
932 	}
933 
934 	void markNonIdle()
935 	{
936 		assert(idleTask !is null);
937 		if (idleTask.isWaiting())
938 			idleTask.restart();
939 	}
940 
941 	final bool isConnected()
942 	{
943 		return connected && !disconnecting;
944 	}
945 
946 public:
947 	/// Callback for when a connection has been established.
948 	void delegate(ClientSocket sender) handleConnect;
949 	/// Callback for when a connection was closed.
950 	void delegate(ClientSocket sender, string reason, DisconnectType type) handleDisconnect;
951 	/// Callback for when a connection has stopped responding.
952 	void delegate(ClientSocket sender) handleIdleTimeout;
953 	/// Callback for when the send buffer has been flushed.
954 	void delegate(ClientSocket sender) handleBufferFlushed;
955 
956 	alias void delegate(ClientSocket sender, Data data) ReadDataHandler;
957 
958 	private ReadDataHandler _handleReadData;
959 	/// Callback for incoming data.
960 	/// Data will not be received unless this handler is set.
961 	@property final ReadDataHandler handleReadData() { return _handleReadData; }
962 	/// ditto
963 	@property final void handleReadData(ReadDataHandler value) { _handleReadData = value; updateFlags(); }
964 }
965 
966 /// An asynchronous server socket.
967 final class GenericServerSocket(T : ClientSocket)
968 {
969 private:
970 	/// Class that actually performs listening on a certain address family
971 	final class Listener : GenericSocket
972 	{
973 		this(Socket conn)
974 		{
975 			debug (ASOCKETS) writefln("New Listener @ %s", cast(void*)this);
976 			this.conn = conn;
977 			socketManager.register(this);
978 		}
979 
980 		/// Called when a socket is readable.
981 		override void onReadable()
982 		{
983 			Socket acceptSocket = conn.accept();
984 			acceptSocket.blocking = false;
985 			if (handleAccept)
986 			{
987 				T connection = new T(acceptSocket);
988 				connection.setKeepAlive();
989 				//assert(connection.connected);
990 				//connection.connected = true;
991 				_handleAccept(connection);
992 			}
993 			else
994 				acceptSocket.close();
995 		}
996 
997 		/// Called when a socket is writable.
998 		override void onWritable()
999 		{
1000 		}
1001 
1002 		/// Called when an error occurs on the socket.
1003 		override void onError(string reason)
1004 		{
1005 			close(); // call parent
1006 		}
1007 
1008 		void closeListener()
1009 		{
1010 			assert(conn);
1011 			socketManager.unregister(this);
1012 			conn.close();
1013 			conn = null;
1014 		}
1015 	}
1016 
1017 	/// Whether the socket is listening.
1018 	bool listening;
1019 	/// Listener instances
1020 	Listener[] listeners;
1021 
1022 	final void updateFlags()
1023 	{
1024 		foreach (listener; listeners)
1025 			listener.notifyRead = handleAccept !is null;
1026 	}
1027 
1028 public:
1029 	/// Debugging aids
1030 	ushort port;
1031 	string addr;
1032 
1033 	/// Start listening on this socket.
1034 	ushort listen(ushort port, string addr = null)
1035 	{
1036 		//debug writefln("Listening on %s:%d", addr, port);
1037 		assert(!listening, "Attempting to listen on a listening socket");
1038 
1039 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1040 
1041 		foreach (ref addressInfo; addressInfos)
1042 		{
1043 			if (addressInfo.family != AddressFamily.INET && port == 0)
1044 				continue;  // listen on random ports only on IPv4 for now
1045 
1046 			try
1047 			{
1048 				Socket conn = new Socket(addressInfo);
1049 				conn.blocking = false;
1050 				if (addressInfo.family == AddressFamily.INET6)
1051 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1052 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1053 
1054 				conn.bind(addressInfo.address);
1055 				conn.listen(8);
1056 
1057 				if (addressInfo.family == AddressFamily.INET)
1058 					port = to!ushort(conn.localAddress().toPortString());
1059 
1060 				listeners ~= new Listener(conn);
1061 			}
1062 			catch (SocketException e)
1063 			{
1064 				debug(ASOCKETS) writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1065 				debug(ASOCKETS) writeln(e.msg);
1066 			}
1067 		}
1068 
1069 		if (listeners.length==0)
1070 			throw new Exception("Unable to bind service");
1071 
1072 		this.port = port;
1073 		this.addr = addr;
1074 		listening = true;
1075 
1076 		updateFlags();
1077 
1078 		return port;
1079 	}
1080 
1081 	@property Address[] localAddresses()
1082 	{
1083 		Address[] result;
1084 		foreach (listener; listeners)
1085 			result ~= listener.localAddress;
1086 		return result;
1087 	}
1088 
1089 	@property bool isListening()
1090 	{
1091 		return listening;
1092 	}
1093 
1094 	/// Stop listening on this socket.
1095 	void close()
1096 	{
1097 		foreach (listener;listeners)
1098 			listener.closeListener();
1099 		listeners = null;
1100 		listening = false;
1101 		if (handleClose)
1102 			handleClose();
1103 	}
1104 
1105 public:
1106 	/// Callback for when the socket was closed.
1107 	void delegate() handleClose;
1108 
1109 	private void delegate(T incoming) _handleAccept;
1110 	/// Callback for an incoming connection.
1111 	/// Connections will not be accepted unless this handler is set.
1112 	@property final void delegate(T incoming) handleAccept() { return _handleAccept; }
1113 	/// ditto
1114 	@property final void handleAccept(void delegate(T incoming) value) { _handleAccept = value; updateFlags(); }
1115 }
1116 
1117 /// Server socket type for ordinary sockets.
1118 alias GenericServerSocket!(ClientSocket) ServerSocket;
1119 
1120 /// Asynchronous class for client sockets with a line-based protocol.
1121 class LineBufferedSocket : ClientSocket
1122 {
1123 private:
1124 	/// The receive buffer.
1125 	Data inBuffer;
1126 
1127 public:
1128 	/// The protocol's line delimiter.
1129 	string delimiter = "\r\n";
1130 
1131 private:
1132 	/// Called when data has been received.
1133 	final void onReadData(ClientSocket sender, Data data)
1134 	{
1135 		import std.string;
1136 		auto oldBufferLength = inBuffer.length;
1137 		if (oldBufferLength)
1138 			inBuffer ~= data;
1139 		else
1140 			inBuffer = data;
1141 
1142 		bool gotLines;
1143 
1144 		if (delimiter.length == 1)
1145 		{
1146 			import core.stdc.string; // memchr
1147 
1148 			char c = delimiter[0];
1149 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1150 			while (p)
1151 			{
1152 				sizediff_t index = p - inBuffer.ptr;
1153 				processLine(index);
1154 				gotLines = true;
1155 
1156 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1157 			}
1158 		}
1159 		else
1160 		{
1161 			sizediff_t index;
1162 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1163 			while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0)
1164 			{
1165 				processLine(index);
1166 				gotLines = true;
1167 			}
1168 		}
1169 
1170 		if (gotLines)
1171 			markNonIdle();
1172 	}
1173 
1174 	final void processLine(size_t index)
1175 	{
1176 		auto line = inBuffer[0..index];
1177 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1178 
1179 		if (_handleReadLine)
1180 			_handleReadLine(this, cast(string)line.toHeap());
1181 	}
1182 
1183 	final void updateHandler()
1184 	{
1185 		handleReadData = _handleReadLine ? &onReadData : null;
1186 	}
1187 
1188 public:
1189 	override void cancelIdleTimeout() { assert(false); }
1190 	override void resumeIdleTimeout() { assert(false); }
1191 	//override void setIdleTimeout(d_time duration) { assert(false); }
1192 	//override void markNonIdle() { assert(false); }
1193 
1194 	this(Duration idleTimeout)
1195 	{
1196 		super.setIdleTimeout(idleTimeout);
1197 	}
1198 
1199 	this(Socket conn)
1200 	{
1201 		super.setIdleTimeout(60.seconds);
1202 		super(conn);
1203 	}
1204 
1205 	/// Cancel a connection.
1206 	override final void disconnect(string reason = DefaultDisconnectReason, DisconnectType type = DisconnectType.Requested)
1207 	{
1208 		super.disconnect(reason, type);
1209 		inBuffer.clear();
1210 	}
1211 
1212 	/// Append a line to the send buffer.
1213 	void send(string line)
1214 	{
1215 		super.send(Data(line ~ delimiter));
1216 	}
1217 
1218 public:
1219 	alias void delegate(LineBufferedSocket sender, string line) ReadLineHandler;
1220 
1221 	private ReadLineHandler _handleReadLine;
1222 	/// Callback for an incoming line.
1223 	/// Data will not be received unless this handler is set.
1224 	@property final ReadLineHandler handleReadLine() { return _handleReadLine; }
1225 	/// ditto
1226 	@property final void handleReadLine(ReadLineHandler value) { _handleReadLine = value; updateHandler(); }
1227 }
1228 
1229 /// The default socket manager.
1230 // __gshared for ae.sys.shutdown
1231 //__gshared
1232 SocketManager socketManager;
1233 
1234 // ***************************************************************************
1235 
1236 unittest
1237 {
1238 	void testTimer()
1239 	{
1240 		bool fired;
1241 		setTimeout({fired = true;}, 10.msecs);
1242 		socketManager.loop();
1243 		assert(fired);
1244 	}
1245 
1246 	testTimer();
1247 }