1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <ae@cy.md>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.dataset : DataVec;
20 import ae.sys.timing;
21 import ae.utils.array : asSlice, asBytes, queuePush, queuePop;
22 import ae.utils.math;
23 public import ae.sys.data;
24 
25 import core.stdc.stdint : int32_t;
26 
27 import std.exception;
28 import std.parallelism : totalCPUs;
29 import std.socket;
30 import std.string : format;
31 public import std.socket : Address, AddressInfo, Socket;
32 
33 version (Windows)
34 	private import c_socks = core.sys.windows.winsock2;
35 else version (Posix)
36 	private import c_socks = core.sys.posix.sys.socket;
37 
38 debug(ASOCKETS) import std.stdio : stderr;
39 debug(PRINTDATA) import std.stdio : stderr;
40 debug(PRINTDATA) import ae.utils.text : hexDump;
41 private import std.conv : to;
42 
43 
44 // https://issues.dlang.org/show_bug.cgi?id=7016
45 static import ae.utils.array;
46 
47 version(LIBEV)
48 {
49 	import deimos.ev;
50 	pragma(lib, "ev");
51 }
52 
53 version(Windows)
54 {
55 	import core.sys.windows.windows : Sleep;
56 	private enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
57 }
58 else
59 	private enum USE_SLEEP = false;
60 
61 int eventCounter;
62 
63 version (LIBEV)
64 {
65 	// Watchers are a GenericSocket field (as declared in SocketMixin).
66 	// Use one watcher per read and write event.
67 	// Start/stop those as higher-level code declares interest in those events.
68 	// Use the "data" ev_io field to store the parent GenericSocket address.
69 	// Also use the "data" field as a flag to indicate whether the watcher is active
70 	// (data is null when the watcher is stopped).
71 
72 	/// `libev`-based event loop implementation.
73 	struct SocketManager
74 	{
75 	private:
76 		size_t count;
77 
78 		void delegate()[] nextTickHandlers;
79 
80 		extern(C)
81 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
82 		{
83 			auto socket = cast(GenericSocket)w.data;
84 			assert(socket, "libev event fired on stopped watcher");
85 			debug (ASOCKETS) stderr.writefln("ioCallback(%s, 0x%X)", socket, revents);
86 
87 			// TODO? Need to get proper SocketManager instance to call updateTimer on
88 			socketManager.preEvent();
89 
90 			if (revents & EV_READ)
91 				socket.onReadable();
92 			else
93 			if (revents & EV_WRITE)
94 				socket.onWritable();
95 			else
96 				assert(false, "Unknown event fired from libev");
97 
98 			socketManager.postEvent(false);
99 		}
100 
101 		ev_timer evTimer;
102 		MonoTime lastNextEvent = MonoTime.max;
103 
104 		extern(C)
105 		static void timerCallback(ev_loop_t* l, ev_timer* w, int /*revents*/)
106 		{
107 			debug (ASOCKETS) stderr.writefln("Timer callback called.");
108 
109 			socketManager.preEvent(); // This also updates socketManager.now
110 			mainTimer.prod(socketManager.now);
111 
112 			socketManager.postEvent(true);
113 			debug (ASOCKETS) stderr.writefln("Timer callback exiting.");
114 		}
115 
116 		/// Called upon waking up, before calling any users' event handlers.
117 		void preEvent()
118 		{
119 			eventCounter++;
120 			socketManager.now = MonoTime.currTime();
121 		}
122 
123 		/// Called before going back to sleep, after calling any users' event handlers.
124 		void postEvent(bool wokeDueToTimeout)
125 		{
126 			while (nextTickHandlers.length)
127 			{
128 				auto thisTickHandlers = nextTickHandlers;
129 				nextTickHandlers = null;
130 
131 				foreach (handler; thisTickHandlers)
132 					handler();
133 			}
134 
135 			socketManager.updateTimer(wokeDueToTimeout);
136 		}
137 
138 		void updateTimer(bool force)
139 		{
140 			auto nextEvent = mainTimer.getNextEvent();
141 			if (force || lastNextEvent != nextEvent)
142 			{
143 				debug (ASOCKETS) stderr.writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
144 				if (nextEvent == MonoTime.max) // Stopping
145 				{
146 					if (lastNextEvent != MonoTime.max)
147 						ev_timer_stop(ev_default_loop(0), &evTimer);
148 				}
149 				else
150 				{
151 					auto remaining = mainTimer.getRemainingTime(socketManager.now);
152 					while (remaining <= Duration.zero)
153 					{
154 						debug (ASOCKETS) stderr.writefln("remaining=%s, prodding timer.", remaining);
155 						mainTimer.prod(socketManager.now);
156 						remaining = mainTimer.getRemainingTime(socketManager.now);
157 					}
158 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
159 					debug (ASOCKETS) stderr.writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
160 					if (lastNextEvent == MonoTime.max) // Starting
161 					{
162 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
163 						ev_timer_start(ev_default_loop(0), &evTimer);
164 					}
165 					else // Adjusting
166 					{
167 						evTimer.repeat = tstamp;
168 						ev_timer_again(ev_default_loop(0), &evTimer);
169 					}
170 				}
171 				lastNextEvent = nextEvent;
172 			}
173 		}
174 
175 	public:
176 		MonoTime now;
177 
178 		/// Register a socket with the manager.
179 		void register(GenericSocket socket)
180 		{
181 			debug (ASOCKETS) stderr.writefln("Registering %s", socket);
182 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
183 			auto fd = socket.conn.handle;
184 			assert(fd, "Must have fd before socket registration");
185 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
186 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
187 			count++;
188 		}
189 
190 		/// Unregister a socket with the manager.
191 		void unregister(GenericSocket socket)
192 		{
193 			debug (ASOCKETS) stderr.writefln("Unregistering %s", socket);
194 			socket.notifyRead  = false;
195 			socket.notifyWrite = false;
196 			count--;
197 		}
198 
199 		/// Returns the number of registered sockets.
200 		size_t size()
201 		{
202 			return count;
203 		}
204 
205 		/// Loop continuously until no sockets are left.
206 		void loop()
207 		{
208 			auto evLoop = ev_default_loop(0);
209 			enforce(evLoop, "libev initialization failure");
210 
211 			updateTimer(true);
212 			debug (ASOCKETS) stderr.writeln("ev_run");
213 			ev_run(ev_default_loop(0), 0);
214 		}
215 	}
216 
217 	private mixin template SocketMixin()
218 	{
219 		private ev_io evRead, evWrite;
220 
221 		private final void setWatcherState(ref ev_io ev, bool newValue, int /*event*/)
222 		{
223 			if (!conn)
224 			{
225 				// Can happen when setting delegates before connecting.
226 				return;
227 			}
228 
229 			if (newValue && !ev.data)
230 			{
231 				// Start
232 				ev.data = cast(void*)this;
233 				ev_io_start(ev_default_loop(0), &ev);
234 			}
235 			else
236 			if (!newValue && ev.data)
237 			{
238 				// Stop
239 				assert(ev.data is cast(void*)this);
240 				ev.data = null;
241 				ev_io_stop(ev_default_loop(0), &ev);
242 			}
243 		}
244 
245 		private final bool getWatcherState(ref ev_io ev) { return !!ev.data; }
246 
247 		// Flags that determine socket wake-up events.
248 
249 		/// Interested in read notifications (onReadable)?
250 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
251 		@property final bool notifyRead () { return getWatcherState(evRead); } /// ditto
252 		/// Interested in write notifications (onWritable)?
253 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
254 		@property final bool notifyWrite() { return getWatcherState(evWrite); } /// ditto
255 
256 		debug ~this() @nogc
257 		{
258 			// The LIBEV SocketManager holds no references to registered sockets.
259 			// TODO: Add a doubly-linked list?
260 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
261 		}
262 	}
263 }
264 else // Use select
265 {
266 	/// `select`-based event loop implementation.
267 	struct SocketManager
268 	{
269 	private:
270 		enum FD_SETSIZE = 1024;
271 
272 		/// List of all sockets to poll.
273 		GenericSocket[] sockets;
274 
275 		/// Debug AA to check for dangling socket references.
276 		debug GenericSocket[socket_t] socketHandles;
277 
278 		void delegate()[] nextTickHandlers, idleHandlers;
279 
280 	public:
281 		MonoTime now;
282 
283 		/// Register a socket with the manager.
284 		void register(GenericSocket conn)
285 		{
286 			debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1);
287 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
288 			sockets ~= conn;
289 
290 			debug
291 			{
292 				auto handle = conn.socket.handle;
293 				assert(handle != socket_t.init, "Can't register a closed socket");
294 				assert(handle !in socketHandles, "This socket handle is already registered");
295 				socketHandles[handle] = conn;
296 			}
297 		}
298 
299 		/// Unregister a socket with the manager.
300 		void unregister(GenericSocket conn)
301 		{
302 			debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
303 
304 			debug
305 			{
306 				auto handle = conn.socket.handle;
307 				assert(handle != socket_t.init, "Can't unregister a closed socket");
308 				auto pconn = handle in socketHandles;
309 				assert(pconn, "This socket handle is not registered");
310 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
311 				socketHandles.remove(handle);
312 			}
313 
314 			foreach (size_t i, GenericSocket j; sockets)
315 				if (j is conn)
316 				{
317 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
318 					return;
319 				}
320 			assert(false, "Socket not registered");
321 		}
322 
323 		/// Returns the number of registered sockets.
324 		size_t size()
325 		{
326 			return sockets.length;
327 		}
328 
329 		/// Loop continuously until no sockets are left.
330 		void loop()
331 		{
332 			debug (ASOCKETS) stderr.writeln("Starting event loop.");
333 
334 			SocketSet readset, writeset;
335 			size_t sockcount;
336 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
337 			readset  = new SocketSet(setSize);
338 			writeset = new SocketSet(setSize);
339 			while (true)
340 			{
341 				if (nextTickHandlers.length)
342 				{
343 					auto thisTickHandlers = nextTickHandlers;
344 					nextTickHandlers = null;
345 
346 					foreach (handler; thisTickHandlers)
347 						handler();
348 
349 					continue;
350 				}
351 
352 				uint minSize = 0;
353 				version(Windows)
354 					minSize = cast(uint)sockets.length;
355 				else
356 				{
357 					foreach (s; sockets)
358 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
359 							minSize = s.socket.handle;
360 				}
361 				minSize++;
362 
363 				if (setSize < minSize)
364 				{
365 					debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
366 					setSize = minSize * 2;
367 					readset  = new SocketSet(setSize);
368 					writeset = new SocketSet(setSize);
369 				}
370 				else
371 				{
372 					readset.reset();
373 					writeset.reset();
374 				}
375 
376 				sockcount = 0;
377 				bool haveActive;
378 				debug (ASOCKETS) stderr.writeln("Populating sets");
379 				foreach (GenericSocket conn; sockets)
380 				{
381 					if (!conn.socket)
382 						continue;
383 					sockcount++;
384 
385 					debug (ASOCKETS) stderr.writef("\t%s:", conn);
386 					if (conn.notifyRead)
387 					{
388 						readset.add(conn.socket);
389 						if (!conn.daemonRead)
390 							haveActive = true;
391 						debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : "");
392 					}
393 					if (conn.notifyWrite)
394 					{
395 						writeset.add(conn.socket);
396 						if (!conn.daemonWrite)
397 							haveActive = true;
398 						debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : "");
399 					}
400 					debug (ASOCKETS) stderr.writeln();
401 				}
402 				debug (ASOCKETS)
403 				{
404 					stderr.writefln("Sets populated as follows:");
405 					printSets(readset, writeset);
406 				}
407 
408 				debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...",
409 					haveActive ? "" : "not ",
410 					sockcount,
411 					mainTimer.isWaiting() ? "with" : "no",
412 					idleHandlers.length,
413 				);
414 				if (!haveActive && !mainTimer.isWaiting() && !nextTickHandlers.length)
415 				{
416 					debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop.");
417 					break;
418 				}
419 
420 				debug (ASOCKETS) stderr.flush();
421 
422 				int events;
423 				if (idleHandlers.length)
424 				{
425 					if (sockcount==0)
426 						events = 0;
427 					else
428 						events = Socket.select(readset, writeset, null, 0.seconds);
429 				}
430 				else
431 				if (USE_SLEEP && sockcount==0)
432 				{
433 					version (Windows)
434 					{
435 						now = MonoTime.currTime();
436 						auto duration = mainTimer.getRemainingTime(now).total!"msecs"();
437 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
438 						if (duration <= 0)
439 							duration = 1; // Avoid busywait
440 						else
441 						if (duration > int.max)
442 							duration = int.max;
443 						Sleep(cast(int)duration);
444 						events = 0;
445 					}
446 					else
447 						assert(0);
448 				}
449 				else
450 				if (mainTimer.isWaiting())
451 				{
452 					// Refresh time before sleeping, to ensure that a
453 					// slow event handler does not skew everything else
454 					now = MonoTime.currTime();
455 
456 					events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime(now));
457 				}
458 				else
459 					events = Socket.select(readset, writeset, null);
460 
461 				debug (ASOCKETS) stderr.writefln("%d events fired.", events);
462 
463 				// Update time after sleeping
464 				now = MonoTime.currTime();
465 
466 				if (events > 0)
467 				{
468 					// Handle just one event at a time, as the first
469 					// handler might invalidate select()'s results.
470 					handleEvent(readset, writeset);
471 				}
472 				else
473 				if (idleHandlers.length)
474 				{
475 					import ae.utils.array : shift;
476 					auto handler = idleHandlers.shift();
477 
478 					// Rotate the idle handler queue before running it,
479 					// in case the handler unregisters itself.
480 					idleHandlers ~= handler;
481 
482 					handler();
483 				}
484 
485 				// Timers may invalidate our select results, so fire them after processing the latter
486 				mainTimer.prod(now);
487 
488 				eventCounter++;
489 			}
490 		}
491 
492 		debug (ASOCKETS)
493 		private void printSets(SocketSet readset, SocketSet writeset)
494 		{
495 			foreach (GenericSocket conn; sockets)
496 			{
497 				if (!conn.socket)
498 					stderr.writefln("\t\t%s is unset", conn);
499 				else
500 				{
501 					if (readset.isSet(conn.socket))
502 						stderr.writefln("\t\t%s is readable", conn);
503 					if (writeset.isSet(conn.socket))
504 						stderr.writefln("\t\t%s is writable", conn);
505 				}
506 			}
507 		}
508 
509 		private void handleEvent(SocketSet readset, SocketSet writeset)
510 		{
511 			debug (ASOCKETS)
512 			{
513 				stderr.writefln("\tSelect results:");
514 				printSets(readset, writeset);
515 			}
516 
517 			foreach (GenericSocket conn; sockets)
518 			{
519 				if (!conn.socket)
520 					continue;
521 
522 				if (writeset.isSet(conn.socket))
523 				{
524 					debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn);
525 					return conn.onWritable();
526 				}
527 				if (readset.isSet(conn.socket))
528 				{
529 					debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn);
530 					return conn.onReadable();
531 				}
532 			}
533 
534 			assert(false, "select() reported events available, but no registered sockets are set");
535 		}
536 	}
537 
538 	// Use UFCS to allow removeIdleHandler to have a predicate with context
539 	/// Register a function to be called when the event loop is idle,
540 	/// and would otherwise sleep.
541 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
542 	{
543 		foreach (i, idleHandler; socketManager.idleHandlers)
544 			assert(handler !is idleHandler);
545 
546 		socketManager.idleHandlers ~= handler;
547 	}
548 
549 	/// Unregister a function previously registered with `addIdleHandler`.
550 	void removeIdleHandler(alias pred=(a, b) => a is b, Args...)(ref SocketManager socketManager, Args args)
551 	{
552 		foreach (i, idleHandler; socketManager.idleHandlers)
553 			if (pred(idleHandler, args))
554 			{
555 				import std.algorithm : remove;
556 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
557 				return;
558 			}
559 		assert(false, "No such idle handler");
560 	}
561 
562 	private mixin template SocketMixin()
563 	{
564 		// Flags that determine socket wake-up events.
565 
566 		/// Interested in read notifications (onReadable)?
567 		bool notifyRead;
568 		/// Interested in write notifications (onWritable)?
569 		bool notifyWrite;
570 	}
571 }
572 
573 /// The default socket manager.
574 SocketManager socketManager;
575 
576 /// Schedule a function to run on the next event loop iteration.
577 /// Can be used to queue logic to run once all current execution frames exit.
578 /// Similar to e.g. process.nextTick in Node.
579 void onNextTick(ref SocketManager socketManager, void delegate() dg) pure @safe nothrow
580 {
581 	socketManager.nextTickHandlers ~= dg;
582 }
583 
584 /// The current monotonic time.
585 /// Updated by the event loop whenever it is awoken.
586 @property MonoTime now()
587 {
588 	if (socketManager.now == MonoTime.init)
589 	{
590 		// Event loop not yet started.
591 		socketManager.now = MonoTime.currTime();
592 	}
593 	return socketManager.now;
594 }
595 
596 // ***************************************************************************
597 
598 /// General methods for an asynchronous socket.
599 abstract class GenericSocket
600 {
601 	/// Declares notifyRead and notifyWrite.
602 	mixin SocketMixin;
603 
604 protected:
605 	/// The socket this class wraps.
606 	Socket conn;
607 
608 // protected:
609 	void onReadable()
610 	{
611 	}
612 
613 	void onWritable()
614 	{
615 	}
616 
617 	void onError(string /*reason*/)
618 	{
619 	}
620 
621 public:
622 	/// Retrieve the socket class this class wraps.
623 	@property final Socket socket()
624 	{
625 		return conn;
626 	}
627 
628 	/// allow getting the address of connections that are already disconnected
629 	private Address[2] cachedAddress;
630 
631 	/*private*/ final @property Address _address(bool local)()
632 	{
633 		if (cachedAddress[local] !is null)
634 			return cachedAddress[local];
635 		else
636 		if (conn is null)
637 			return null;
638 		else
639 		{
640 			Address a;
641 			if (conn.addressFamily == AddressFamily.UNSPEC)
642 			{
643 				// Socket will attempt to construct an UnknownAddress,
644 				// which will almost certainly not match the real address length.
645 				static if (local)
646 					alias getname = c_socks.getsockname;
647 				else
648 					alias getname = c_socks.getpeername;
649 
650 				c_socks.socklen_t nameLen = 0;
651 				if (getname(conn.handle, null, &nameLen) < 0)
652 					throw new SocketOSException("Unable to obtain socket address");
653 
654 				auto buf = new ubyte[nameLen];
655 				auto sa = cast(c_socks.sockaddr*)buf.ptr;
656 				if (getname(conn.handle, sa, &nameLen) < 0)
657 					throw new SocketOSException("Unable to obtain socket address");
658 				a = new UnknownAddressReference(sa, nameLen);
659 			}
660 			else
661 				a = local ? conn.localAddress() : conn.remoteAddress();
662 			return cachedAddress[local] = a;
663 		}
664 	}
665 
666 	alias localAddress = _address!true; /// Retrieve this socket's local address.
667 	alias remoteAddress = _address!false; /// Retrieve this socket's remote address.
668 
669 	/*private*/ final @property string _addressStr(bool local)() nothrow
670 	{
671 		try
672 		{
673 			auto a = _address!local;
674 			if (a is null)
675 				return "[null address]";
676 			string host = a.toAddrString();
677 			import std.string : indexOf;
678 			if (host.indexOf(':') >= 0)
679 				host = "[" ~ host ~ "]";
680 			try
681 			{
682 				string port = a.toPortString();
683 				return host ~ ":" ~ port;
684 			}
685 			catch (Exception e)
686 				return host;
687 		}
688 		catch (Exception e)
689 			return "[error: " ~ e.msg ~ "]";
690 	}
691 
692 	alias localAddressStr = _addressStr!true; /// Retrieve this socket's local address, as a string.
693 	alias remoteAddressStr = _addressStr!false; /// Retrieve this socket's remote address, as a string.
694 
695 	/// Don't block the process from exiting, even if the socket is ready to receive data.
696 	/// TODO: Not implemented with libev
697 	bool daemonRead;
698 
699 	/// Don't block the process from exiting, even if the socket is ready to send data.
700 	/// TODO: Not implemented with libev
701 	bool daemonWrite;
702 
703 	deprecated alias daemon = daemonRead;
704 
705 	/// Enable TCP keep-alive on the socket with the given settings.
706 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
707 	{
708 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
709 		if (enabled)
710 		{
711 			try
712 				conn.setKeepAlive(time, interval);
713 			catch (SocketException)
714 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
715 		}
716 		else
717 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
718 	}
719 
720 	/// Returns a string containing the class name, address, and file descriptor.
721 	override string toString() const
722 	{
723 		import std.string : format, split;
724 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
725 	}
726 }
727 
728 // ***************************************************************************
729 
730 /// Classifies the cause of the disconnect.
731 /// Can be used to decide e.g. when it makes sense to reconnect.
732 enum DisconnectType
733 {
734 	requested, /// Initiated by the application.
735 	graceful,  /// The peer gracefully closed the connection.
736 	error      /// Some abnormal network condition.
737 }
738 
739 /// Used to indicate the state of a connection throughout its lifecycle.
740 enum ConnectionState
741 {
742 	/// The initial state, or the state after a disconnect was fully processed.
743 	disconnected,
744 
745 	/// Name resolution. Currently done synchronously.
746 	resolving,
747 
748 	/// A connection attempt is in progress.
749 	connecting,
750 
751 	/// A connection is established.
752 	connected,
753 
754 	/// Disconnecting in progress. No data can be sent or received at this point.
755 	/// We are waiting for queued data to be actually sent before disconnecting.
756 	disconnecting,
757 }
758 
759 /// Returns true if this is a connection state for which disconnecting is valid.
760 /// Generally, applications should be aware of the life cycle of their sockets,
761 /// so checking the state of a connection is unnecessary (and a code smell).
762 /// However, unconditionally disconnecting some connected sockets can be useful
763 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle),
764 /// such as in response to a signal.
765 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; }
766 
767 /// Common interface for connections and adapters.
768 interface IConnection
769 {
770 	/// `send` queues data for sending in one of five queues, indexed
771 	/// by a numeric priority.
772 	/// `MAX_PRIORITY` is the highest (least urgent) priority index.
773 	/// `DEFAULT_PRIORITY` is the default priority
774 	enum MAX_PRIORITY = 4;
775 	enum DEFAULT_PRIORITY = 2; /// ditto
776 
777 	/// This is the default value for the `disconnect` `reason` string parameter.
778 	static const defaultDisconnectReason = "Software closed the connection";
779 
780 	/// Get connection state.
781 	@property ConnectionState state();
782 
783 	/// Has a connection been established?
784 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
785 
786 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
787 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
788 
789 	/// Queue Data for sending.
790 	void send(scope Data[] data, int priority = DEFAULT_PRIORITY);
791 
792 	/// ditto
793 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
794 	{
795 		this.send(datum.asSlice, priority);
796 	}
797 
798 	/// Terminate the connection.
799 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
800 
801 	/// Callback setter for when a connection has been established
802 	/// (if applicable).
803 	alias ConnectHandler = void delegate();
804 	@property void handleConnect(ConnectHandler value); /// ditto
805 
806 	/// Callback setter for when new data is read.
807 	alias ReadDataHandler = void delegate(Data data);
808 	@property void handleReadData(ReadDataHandler value); /// ditto
809 
810 	/// Callback setter for when a connection was closed.
811 	alias DisconnectHandler = void delegate(string reason, DisconnectType type);
812 	@property void handleDisconnect(DisconnectHandler value); /// ditto
813 
814 	/// Callback setter for when all queued data has been sent.
815 	alias BufferFlushedHandler = void delegate();
816 	@property void handleBufferFlushed(BufferFlushedHandler value); /// ditto
817 }
818 
819 // ***************************************************************************
820 
821 /// Implementation of `IConnection` using a socket.
822 /// Implements receiving data when readable and sending queued data
823 /// when writable.
824 class Connection : GenericSocket, IConnection
825 {
826 private:
827 	ConnectionState _state;
828 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
829 
830 public:
831 	/// Get connection state.
832 	override @property ConnectionState state() { return _state; }
833 
834 protected:
835 	abstract sizediff_t doSend(scope const(void)[] buffer);
836 	enum sizediff_t doReceiveEOF = -1;
837 	abstract sizediff_t doReceive(scope void[] buffer);
838 
839 	/// The send buffers.
840 	DataVec[MAX_PRIORITY+1] outQueue;
841 	/// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled).
842 	int partiallySent = -1;
843 
844 	/// Constructor used by a ServerSocket for new connections
845 	this(Socket conn)
846 	{
847 		this();
848 		this.conn = conn;
849 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
850 		if (conn)
851 			socketManager.register(this);
852 		updateFlags();
853 	}
854 
855 	final void updateFlags()
856 	{
857 		if (state == ConnectionState.connecting)
858 			notifyWrite = true;
859 		else
860 			notifyWrite = writePending;
861 
862 		notifyRead = state == ConnectionState.connected && readDataHandler;
863 		debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite);
864 	}
865 
866 	// We reuse the same buffer across read calls.
867 	// It is allocated on the first read, and also
868 	// if the user code decides to keep a reference to it.
869 	static Data inBuffer;
870 
871 	/// Called when a socket is readable.
872 	override void onReadable()
873 	{
874 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
875 		if (!inBuffer)
876 			inBuffer = Data(0x10000);
877 		else
878 			inBuffer = inBuffer.ensureUnique();
879 		sizediff_t received;
880 		inBuffer.enter((scope contents) {
881 			received = doReceive(contents);
882 		});
883 
884 		if (received == doReceiveEOF)
885 			return disconnect("Connection closed", DisconnectType.graceful);
886 
887 		if (received == Socket.ERROR)
888 		{
889 		//	if (wouldHaveBlocked)
890 		//	{
891 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
892 		//		return;
893 		//	}
894 		//	else
895 				onError("recv() error: " ~ lastSocketError);
896 		}
897 		else
898 		{
899 			debug (PRINTDATA)
900 			{
901 				stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr);
902 				stderr.write(hexDump(inBuffer.unsafeContents[0 .. received]));
903 				stderr.flush();
904 			}
905 
906 			if (state == ConnectionState.disconnecting)
907 			{
908 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this);
909 			}
910 			else
911 			if (!readDataHandler)
912 			{
913 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this);
914 			}
915 			else
916 			{
917 				auto data = inBuffer[0 .. received];
918 				readDataHandler(data);
919 			}
920 		}
921 	}
922 
923 	/// Called when an error occurs on the socket.
924 	override void onError(string reason)
925 	{
926 		if (state == ConnectionState.disconnecting)
927 		{
928 			debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
929 			return close();
930 		}
931 
932 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
933 		disconnect("Socket error: " ~ reason, DisconnectType.error);
934 	}
935 
936 	this()
937 	{
938 	}
939 
940 public:
941 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
942 	/// The disconnect handler will be called immediately, even when not all data has been flushed yet.
943 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
944 	{
945 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
946 		assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state));
947 
948 		if (writePending)
949 		{
950 			if (type==DisconnectType.requested)
951 			{
952 				assert(conn, "Attempting to disconnect on an uninitialized socket");
953 				// queue disconnect after all data is sent
954 				debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason);
955 				state = ConnectionState.disconnecting;
956 				//setIdleTimeout(30.seconds);
957 				if (disconnectHandler)
958 					disconnectHandler(reason, type);
959 				updateFlags();
960 				return;
961 			}
962 			else
963 				discardQueues();
964 		}
965 
966 		debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
967 
968 		if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected)
969 			close();
970 		else
971 		{
972 			assert(conn is null, "Registered but %s socket".format(state));
973 			if (state == ConnectionState.resolving)
974 				state = ConnectionState.disconnected;
975 		}
976 
977 		if (disconnectHandler)
978 			disconnectHandler(reason, type);
979 		updateFlags();
980 	}
981 
982 	private final void close()
983 	{
984 		assert(conn, "Attempting to close an unregistered socket");
985 		socketManager.unregister(this);
986 		conn.close();
987 		conn = null;
988 		outQueue[] = DataVec.init;
989 		state = ConnectionState.disconnected;
990 	}
991 
992 	/// Append data to the send buffer.
993 	void send(scope Data[] data, int priority = DEFAULT_PRIORITY)
994 	{
995 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
996 		outQueue[priority] ~= data;
997 		notifyWrite = true; // Fast updateFlags()
998 
999 		debug (PRINTDATA)
1000 		{
1001 			stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr);
1002 			foreach (datum; data)
1003 				if (datum.length)
1004 					stderr.write(hexDump(datum.unsafeContents));
1005 				else
1006 					stderr.writeln("(empty Data)");
1007 			stderr.flush();
1008 		}
1009 	}
1010 
1011 	/// ditto
1012 	alias send = IConnection.send;
1013 
1014 	/// Cancel all queued `Data` packets with the given priority.
1015 	/// Does not cancel any partially-sent `Data`.
1016 	final void clearQueue(int priority)
1017 	{
1018 		if (priority == partiallySent)
1019 		{
1020 			assert(outQueue[priority].length > 0);
1021 			outQueue[priority].length = 1;
1022 		}
1023 		else
1024 			outQueue[priority] = null;
1025 		updateFlags();
1026 	}
1027 
1028 	/// Clears all queues, even partially sent content.
1029 	private final void discardQueues()
1030 	{
1031 		foreach (priority; 0..MAX_PRIORITY+1)
1032 			outQueue[priority] = null;
1033 		partiallySent = -1;
1034 		updateFlags();
1035 	}
1036 
1037 	/// Returns true if any queues have pending data.
1038 	@property
1039 	final bool writePending()
1040 	{
1041 		foreach (ref queue; outQueue)
1042 			if (queue.length)
1043 				return true;
1044 		return false;
1045 	}
1046 
1047 	/// Returns true if there are any queued `Data` which have not yet
1048 	/// begun to be sent.
1049 	final bool queuePresent(int priority = DEFAULT_PRIORITY)
1050 	{
1051 		if (priority == partiallySent)
1052 		{
1053 			assert(outQueue[priority].length > 0);
1054 			return outQueue[priority].length > 1;
1055 		}
1056 		else
1057 			return outQueue[priority].length > 0;
1058 	}
1059 
1060 	/// Returns the number of queued `Data` at the given priority.
1061 	final size_t packetsQueued(int priority = DEFAULT_PRIORITY)
1062 	{
1063 		return outQueue[priority].length;
1064 	}
1065 
1066 	/// Returns the number of queued bytes at the given priority.
1067 	final size_t bytesQueued(int priority = DEFAULT_PRIORITY)
1068 	{
1069 		size_t bytes;
1070 		foreach (datum; outQueue[priority])
1071 			bytes += datum.length;
1072 		return bytes;
1073 	}
1074 
1075 // public:
1076 	private ConnectHandler connectHandler;
1077 	/// Callback for when a connection has been established.
1078 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
1079 
1080 	private ReadDataHandler readDataHandler;
1081 	/// Callback for incoming data.
1082 	/// Data will not be received unless this handler is set.
1083 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
1084 
1085 	private DisconnectHandler disconnectHandler;
1086 	/// Callback for when a connection was closed.
1087 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
1088 
1089 	private BufferFlushedHandler bufferFlushedHandler;
1090 	/// Callback setter for when all queued data has been sent.
1091 	@property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); }
1092 }
1093 
1094 /// Implements a stream connection.
1095 /// Queued `Data` is allowed to be fragmented.
1096 class StreamConnection : Connection
1097 {
1098 protected:
1099 	this()
1100 	{
1101 		super();
1102 	}
1103 
1104 	/// Called when a socket is writable.
1105 	override void onWritable()
1106 	{
1107 		//scope(success) updateFlags();
1108 		onWritableImpl();
1109 		updateFlags();
1110 	}
1111 
1112 	// Work around scope(success) breaking debugger stack traces
1113 	final private void onWritableImpl()
1114 	{
1115 		debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state);
1116 		if (state == ConnectionState.connecting)
1117 		{
1118 			int32_t error;
1119 			conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error);
1120 			if (error)
1121 				return disconnect(formatSocketError(error), DisconnectType.error);
1122 
1123 			state = ConnectionState.connected;
1124 
1125 			//debug writefln("[%s] Connected", remoteAddressStr);
1126 			try
1127 				setKeepAlive();
1128 			catch (Exception e)
1129 				return disconnect(e.msg, DisconnectType.error);
1130 			if (connectHandler)
1131 				connectHandler();
1132 			return;
1133 		}
1134 		//debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
1135 
1136 		foreach (sendPartial; [true, false])
1137 			foreach (int priority, ref queue; outQueue)
1138 				while (queue.length && (!sendPartial || priority == partiallySent))
1139 				{
1140 					assert(partiallySent == -1 || partiallySent == priority);
1141 
1142 					ptrdiff_t sent = 0;
1143 					if (!queue.front.empty)
1144 					{
1145 						queue.front.enter((scope contents) {
1146 							sent = doSend(contents);
1147 						});
1148 						debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, queue.front.length);
1149 					}
1150 					else
1151 					{
1152 						debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this);
1153 					}
1154 
1155 					if (sent == Socket.ERROR)
1156 					{
1157 						if (wouldHaveBlocked())
1158 							return;
1159 						else
1160 							return onError("send() error: " ~ lastSocketError);
1161 					}
1162 					else
1163 					if (sent < queue.front.length)
1164 					{
1165 						if (sent > 0)
1166 						{
1167 							queue.front = queue.front[sent..queue.front.length];
1168 							partiallySent = priority;
1169 						}
1170 						return;
1171 					}
1172 					else
1173 					{
1174 						assert(sent == queue.front.length);
1175 						//debug writefln("[%s] Sent data:", remoteAddressStr);
1176 						//debug writefln("%s", hexDump(queue.front.contents[0..sent]));
1177 						queue.front.clear();
1178 						queue.popFront();
1179 						partiallySent = -1;
1180 						if (queue.length == 0)
1181 							queue = null;
1182 					}
1183 				}
1184 
1185 		// outQueue is now empty
1186 		if (bufferFlushedHandler)
1187 			bufferFlushedHandler();
1188 		if (state == ConnectionState.disconnecting)
1189 		{
1190 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1191 			close();
1192 		}
1193 	}
1194 
1195 public:
1196 	this(Socket conn)
1197 	{
1198 		super(conn);
1199 	} ///
1200 }
1201 
1202 // ***************************************************************************
1203 
1204 /// A POSIX file stream.
1205 /// Allows adding a file (e.g. stdin/stdout) to the socket manager.
1206 /// Does not dup the given file descriptor, so "disconnecting" this connection
1207 /// will close it.
1208 version (Posix)
1209 class FileConnection : StreamConnection
1210 {
1211 	this(int fileno)
1212 	{
1213 		auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC);
1214 		conn.blocking = false;
1215 		super(conn);
1216 	} ///
1217 
1218 protected:
1219 	import core.sys.posix.unistd : read, write;
1220 
1221 	override sizediff_t doSend(scope const(void)[] buffer)
1222 	{
1223 		return write(socket.handle, buffer.ptr, buffer.length);
1224 	}
1225 
1226 	override sizediff_t doReceive(scope void[] buffer)
1227 	{
1228 		auto bytesRead = read(socket.handle, buffer.ptr, buffer.length);
1229 		if (bytesRead == 0)
1230 			return doReceiveEOF;
1231 		return bytesRead;
1232 	}
1233 }
1234 
1235 /// Separates reading and writing, e.g. for stdin/stdout.
1236 class Duplex : IConnection
1237 {
1238 	///
1239 	IConnection reader, writer;
1240 
1241 	this(IConnection reader, IConnection writer)
1242 	{
1243 		this.reader = reader;
1244 		this.writer = writer;
1245 		reader.handleConnect = &onConnect;
1246 		writer.handleConnect = &onConnect;
1247 		reader.handleDisconnect = &onDisconnect;
1248 		writer.handleDisconnect = &onDisconnect;
1249 	} ///
1250 
1251 	@property ConnectionState state()
1252 	{
1253 		if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting)
1254 			return ConnectionState.disconnecting;
1255 		else
1256 			return reader.state < writer.state ? reader.state : writer.state;
1257 	} ///
1258 
1259 	/// Queue Data for sending.
1260 	void send(scope Data[] data, int priority)
1261 	{
1262 		writer.send(data, priority);
1263 	}
1264 
1265 	alias send = IConnection.send; /// ditto
1266 
1267 	/// Terminate the connection.
1268 	/// Note: this isn't quite fleshed out - applications may want to
1269 	/// wait and send some more data even after stdin is closed, but
1270 	/// such an interface can't be fitted into an IConnection
1271 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1272 	{
1273 		if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting)
1274 			reader.disconnect(reason, type);
1275 		if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting)
1276 			writer.disconnect(reason, type);
1277 		debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1278 	}
1279 
1280 	protected void onConnect()
1281 	{
1282 		if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected)
1283 			connectHandler();
1284 	}
1285 
1286 	protected void onDisconnect(string reason, DisconnectType type)
1287 	{
1288 		debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1289 		if (disconnectHandler)
1290 		{
1291 			disconnectHandler(reason, type);
1292 			disconnectHandler = null; // don't call it twice for the other connection
1293 		}
1294 		// It is our responsibility to disconnect the other connection
1295 		// Use DisconnectType.requested to ensure that any written data is flushed
1296 		disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested);
1297 	}
1298 
1299 	/// Callback for when a connection has been established.
1300 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1301 	private ConnectHandler connectHandler;
1302 
1303 	/// Callback setter for when new data is read.
1304 	@property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; }
1305 
1306 	/// Callback setter for when a connection was closed.
1307 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1308 	private DisconnectHandler disconnectHandler;
1309 
1310 	/// Callback setter for when all queued data has been written.
1311 	@property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; }
1312 }
1313 
1314 debug(ae_unittest) unittest { if (false) new Duplex(null, null); }
1315 
1316 // ***************************************************************************
1317 
1318 /// An asynchronous socket-based connection.
1319 class SocketConnection : StreamConnection
1320 {
1321 protected:
1322 	AddressInfo[] addressQueue;
1323 	bool datagram;
1324 
1325 	this(Socket conn, bool datagram = false)
1326 	{
1327 		super(conn);
1328 		this.datagram = datagram;
1329 	}
1330 
1331 	override sizediff_t doSend(scope const(void)[] buffer)
1332 	{
1333 		return conn.send(buffer);
1334 	}
1335 
1336 	override sizediff_t doReceive(scope void[] buffer)
1337 	{
1338 		auto bytesReceived = conn.receive(buffer);
1339 		if (bytesReceived == 0 && !datagram)
1340 			return doReceiveEOF;
1341 		return bytesReceived;
1342 	}
1343 
1344 	final void tryNextAddress()
1345 	{
1346 		assert(state == ConnectionState.connecting);
1347 		auto addressInfo = addressQueue[0];
1348 		addressQueue = addressQueue[1..$];
1349 
1350 		try
1351 		{
1352 			conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
1353 			conn.blocking = false;
1354 
1355 			socketManager.register(this);
1356 			updateFlags();
1357 			debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString());
1358 			conn.connect(addressInfo.address);
1359 		}
1360 		catch (SocketException e)
1361 			return onError("Connect error: " ~ e.msg);
1362 	}
1363 
1364 	/// Called when an error occurs on the socket.
1365 	override void onError(string reason)
1366 	{
1367 		if (state == ConnectionState.connecting && addressQueue.length)
1368 		{
1369 			socketManager.unregister(this);
1370 			conn.close();
1371 			conn = null;
1372 
1373 			return tryNextAddress();
1374 		}
1375 
1376 		super.onError(reason);
1377 	}
1378 
1379 public:
1380 	/// Default constructor
1381 	this()
1382 	{
1383 		debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this);
1384 	}
1385 
1386 	/// Start establishing a connection.
1387 	final void connect(AddressInfo[] addresses)
1388 	{
1389 		assert(addresses.length, "No addresses specified");
1390 
1391 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1392 		assert(!conn);
1393 
1394 		addressQueue = addresses;
1395 		state = ConnectionState.connecting;
1396 		tryNextAddress();
1397 	}
1398 }
1399 
1400 /// An asynchronous TCP connection.
1401 class TcpConnection : SocketConnection
1402 {
1403 protected:
1404 	this(Socket conn)
1405 	{
1406 		super(conn, false);
1407 	}
1408 
1409 public:
1410 	/// Default constructor
1411 	this()
1412 	{
1413 		debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this);
1414 	}
1415 
1416 	///
1417 	alias connect = SocketConnection.connect; // raise overload
1418 
1419 	/// Start establishing a connection.
1420 	final void connect(string host, ushort port)
1421 	{
1422 		assert(host.length, "Empty host");
1423 		assert(port, "No port specified");
1424 
1425 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1426 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1427 
1428 		state = ConnectionState.resolving;
1429 
1430 		AddressInfo[] addressInfos;
1431 		try
1432 		{
1433 			auto addresses = getAddress(host, port);
1434 			enforce(addresses.length, "No addresses found");
1435 			debug (ASOCKETS)
1436 			{
1437 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1438 				foreach (address; addresses)
1439 					stderr.writefln("- %s", address.toString());
1440 			}
1441 
1442 			if (addresses.length > 1)
1443 			{
1444 				import std.random : randomShuffle;
1445 				randomShuffle(addresses);
1446 			}
1447 
1448 			foreach (address; addresses)
1449 				addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host);
1450 		}
1451 		catch (SocketException e)
1452 			return onError("Lookup error: " ~ e.msg);
1453 
1454 		state = ConnectionState.disconnected;
1455 		connect(addressInfos);
1456 	}
1457 }
1458 
1459 // ***************************************************************************
1460 
1461 /// An asynchronous connection server for socket-based connections.
1462 class SocketServer
1463 {
1464 protected:
1465 	/// Class that actually performs listening on a certain address family
1466 	final class Listener : GenericSocket
1467 	{
1468 		this(Socket conn)
1469 		{
1470 			debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this);
1471 			this.conn = conn;
1472 			socketManager.register(this);
1473 		}
1474 
1475 		/// Called when a socket is readable.
1476 		override void onReadable()
1477 		{
1478 			debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this);
1479 			Socket acceptSocket = conn.accept();
1480 			acceptSocket.blocking = false;
1481 			if (handleAccept)
1482 			{
1483 				auto connection = createConnection(acceptSocket);
1484 				debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr);
1485 				connection.setKeepAlive();
1486 				//assert(connection.connected);
1487 				//connection.connected = true;
1488 				acceptHandler(connection);
1489 			}
1490 			else
1491 				acceptSocket.close();
1492 		}
1493 
1494 		/// Called when a socket is writable.
1495 		override void onWritable()
1496 		{
1497 		}
1498 
1499 		/// Called when an error occurs on the socket.
1500 		override void onError(string reason)
1501 		{
1502 			close(); // call parent
1503 		}
1504 
1505 		void closeListener()
1506 		{
1507 			assert(conn);
1508 			socketManager.unregister(this);
1509 			conn.close();
1510 			conn = null;
1511 		}
1512 	}
1513 
1514 	SocketConnection createConnection(Socket socket)
1515 	{
1516 		return new SocketConnection(socket, datagram);
1517 	}
1518 
1519 	/// Whether the socket is listening.
1520 	bool listening;
1521 	/// Listener instances
1522 	Listener[] listeners;
1523 	bool datagram;
1524 
1525 	final void updateFlags()
1526 	{
1527 		foreach (listener; listeners)
1528 			listener.notifyRead = handleAccept !is null;
1529 	}
1530 
1531 public:
1532 	/// Start listening on this socket.
1533 	final void listen(AddressInfo[] addressInfos)
1534 	{
1535 		foreach (ref addressInfo; addressInfos)
1536 		{
1537 			try
1538 			{
1539 				Socket conn = new Socket(addressInfo);
1540 				conn.blocking = false;
1541 				if (addressInfo.family == AddressFamily.INET6)
1542 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1543 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1544 
1545 				conn.bind(addressInfo.address);
1546 				conn.listen(totalCPUs * 2);
1547 
1548 				listeners ~= new Listener(conn);
1549 			}
1550 			catch (SocketException e)
1551 			{
1552 				debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1553 				debug(ASOCKETS) stderr.writeln(e.msg);
1554 			}
1555 		}
1556 
1557 		if (listeners.length==0)
1558 			throw new Exception("Unable to bind service");
1559 
1560 		listening = true;
1561 
1562 		updateFlags();
1563 	}
1564 
1565 	this()
1566 	{
1567 		this(false);
1568 	} ///
1569 
1570 	this(bool datagram)
1571 	{
1572 		this.datagram = datagram;
1573 	} ///
1574 
1575 	/// Creates a Server with the given sockets.
1576 	/// The sockets must have already had `bind` and `listen` called on them.
1577 	this(Socket[] sockets...)
1578 	{
1579 		foreach (socket; sockets)
1580 			listeners ~= new Listener(socket);
1581 	}
1582 
1583 	/// Returns all listening addresses.
1584 	final @property Address[] localAddresses()
1585 	{
1586 		Address[] result;
1587 		foreach (listener; listeners)
1588 			result ~= listener.localAddress;
1589 		return result;
1590 	}
1591 
1592 	/// Returns `true` if the server is listening for incoming connections.
1593 	final @property bool isListening()
1594 	{
1595 		return listening;
1596 	}
1597 
1598 	/// Stop listening on this socket.
1599 	final void close()
1600 	{
1601 		foreach (listener;listeners)
1602 			listener.closeListener();
1603 		listeners = null;
1604 		listening = false;
1605 		if (handleClose)
1606 			handleClose();
1607 	}
1608 
1609 	/// Create a SocketServer using the handle passed on standard input,
1610 	/// for which `listen` had already been called. Used by
1611 	/// e.g. FastCGI and systemd sockets with "Listen = yes".
1612 	static SocketServer fromStdin()
1613 	{
1614 		socket_t socket;
1615 		version (Windows)
1616 		{
1617 			import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE;
1618 			socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE);
1619 		}
1620 		else
1621 			socket = cast(socket_t)0;
1622 
1623 		auto s = new Socket(socket, AddressFamily.UNSPEC);
1624 		s.blocking = false;
1625 		return new SocketServer(s);
1626 	}
1627 
1628 	/// Callback for when the socket was closed.
1629 	void delegate() handleClose;
1630 
1631 	private void delegate(SocketConnection incoming) acceptHandler;
1632 	/// Callback for an incoming connection.
1633 	/// Connections will not be accepted unless this handler is set.
1634 	@property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; }
1635 	/// ditto
1636 	@property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); }
1637 }
1638 
1639 /// An asynchronous TCP connection server.
1640 class TcpServer : SocketServer
1641 {
1642 protected:
1643 	override SocketConnection createConnection(Socket socket)
1644 	{
1645 		return new TcpConnection(socket);
1646 	}
1647 
1648 public:
1649 	this()
1650 	{
1651 	} ///
1652 
1653 	this(Socket[] sockets...)
1654 	{
1655 		super(sockets);
1656 	} /// Construct from the given sockets.
1657 
1658 	///
1659 	alias listen = SocketServer.listen; // raise overload
1660 
1661 	/// Start listening on this socket.
1662 	final ushort listen(ushort port, string addr = null)
1663 	{
1664 		debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port);
1665 		//assert(!listening, "Attempting to listen on a listening socket");
1666 
1667 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1668 
1669 		debug (ASOCKETS)
1670 		{
1671 			stderr.writefln("Resolved to %s addresses:", addressInfos.length);
1672 			foreach (ref addressInfo; addressInfos)
1673 				stderr.writefln("- %s", addressInfo);
1674 		}
1675 
1676 		// listen on random ports only on IPv4 for now
1677 		if (port == 0)
1678 		{
1679 			foreach_reverse (i, ref addressInfo; addressInfos)
1680 				if (addressInfo.family != AddressFamily.INET)
1681 					addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$];
1682 		}
1683 
1684 		listen(addressInfos);
1685 
1686 		foreach (listener; listeners)
1687 		{
1688 			auto address = listener.conn.localAddress();
1689 			if (address.addressFamily == AddressFamily.INET)
1690 				port = to!ushort(address.toPortString());
1691 		}
1692 
1693 		return port;
1694 	}
1695 
1696 	deprecated("Use SocketServer.fromStdin")
1697 	static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; }
1698 
1699 	/// Delegate to be called when a connection is accepted.
1700 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); }
1701 }
1702 
1703 // ***************************************************************************
1704 
1705 /// Base class for connection-less socket protocols, i.e. those for
1706 /// which we must use sendto instead of connect/send.
1707 /// These generally correspond to stateless / datagram-based
1708 /// protocols, like UDP.
1709 /// This module's class hierarchy is mostly oriented towards
1710 /// stateful, stream-based protocols; to represent connectionless
1711 /// protocols, this class encapsulates a socket with a fixed
1712 /// destination (sendto) address, and optionally bound to a local
1713 /// address.
1714 /// Currently received packets' address is not exposed.
1715 class ConnectionlessSocketConnection : Connection
1716 {
1717 protected:
1718 	this(Socket conn)
1719 	{
1720 		super(conn);
1721 	}
1722 
1723 	/// Called when a socket is writable.
1724 	override void onWritable()
1725 	{
1726 		//scope(success) updateFlags();
1727 		onWritableImpl();
1728 		updateFlags();
1729 	}
1730 
1731 	// Work around scope(success) breaking debugger stack traces
1732 	final private void onWritableImpl()
1733 	{
1734 		foreach (priority, ref queue; outQueue)
1735 			while (queue.length)
1736 			{
1737 				ptrdiff_t sent;
1738 				queue.front.enter((scope contents) {
1739 					sent = conn.sendTo(contents, remoteAddress);
1740 				});
1741 
1742 				if (sent == Socket.ERROR)
1743 				{
1744 					if (wouldHaveBlocked())
1745 						return;
1746 					else
1747 						return onError("send() error: " ~ lastSocketError);
1748 				}
1749 				else
1750 				if (sent < queue.front.length)
1751 				{
1752 					return onError("Sent only %d/%d bytes of the datagram!".format(sent, queue.front.length));
1753 				}
1754 				else
1755 				{
1756 					assert(sent == queue.front.length);
1757 					//debug writefln("[%s] Sent data:", remoteAddressStr);
1758 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1759 					queue.front.clear();
1760 					queue.popFront();
1761 					if (queue.length == 0)
1762 						queue = null;
1763 				}
1764 			}
1765 
1766 		// outQueue is now empty
1767 		if (bufferFlushedHandler)
1768 			bufferFlushedHandler();
1769 		if (state == ConnectionState.disconnecting)
1770 		{
1771 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1772 			close();
1773 		}
1774 	}
1775 
1776 	override sizediff_t doSend(scope const(void)[] buffer)
1777 	{
1778 		assert(false); // never called (called only from overridden methods)
1779 	}
1780 
1781 	override sizediff_t doReceive(scope void[] buffer)
1782 	{
1783 		return conn.receive(buffer);
1784 	}
1785 
1786 public:
1787 	/// Default constructor
1788 	this()
1789 	{
1790 		debug (ASOCKETS) stderr.writefln("New ConnectionlessSocketConnection @ %s", cast(void*)this);
1791 	}
1792 
1793 	/// Initialize with the given `AddressFamily`, without binding to an address.
1794 	final void initialize(AddressFamily family, SocketType type, ProtocolType protocol)
1795 	{
1796 		initializeImpl(family, type, protocol);
1797 		if (connectHandler)
1798 			connectHandler();
1799 	}
1800 
1801 	private final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol)
1802 	{
1803 		assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state));
1804 		assert(!conn);
1805 
1806 		conn = new Socket(family, type, protocol);
1807 		conn.blocking = false;
1808 		socketManager.register(this);
1809 		state = ConnectionState.connected;
1810 		updateFlags();
1811 	}
1812 
1813 	/// Bind to a local address in order to receive packets sent there.
1814 	final ushort bind(AddressInfo addressInfo)
1815 	{
1816 		initialize(addressInfo.family, addressInfo.type, addressInfo.protocol);
1817 		conn.bind(addressInfo.address);
1818 
1819 		auto address = conn.localAddress();
1820 		auto port = to!ushort(address.toPortString());
1821 
1822 		if (connectHandler)
1823 			connectHandler();
1824 
1825 		return port;
1826 	}
1827 
1828 // public:
1829 	/// Where to send packets to.
1830 	Address remoteAddress;
1831 }
1832 
1833 /// An asynchronous UDP stream.
1834 /// UDP does not have connections, so this class encapsulates a socket
1835 /// with a fixed destination (sendto) address, and optionally bound to
1836 /// a local address.
1837 /// Currently received packets' address is not exposed.
1838 class UdpConnection : ConnectionlessSocketConnection
1839 {
1840 protected:
1841 	this(Socket conn)
1842 	{
1843 		super(conn);
1844 	}
1845 
1846 public:
1847 	/// Default constructor
1848 	this()
1849 	{
1850 		debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this);
1851 	}
1852 
1853 	/// Initialize with the given `AddressFamily`, without binding to an address.
1854 	final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM)
1855 	{
1856 		super.initialize(family, type, ProtocolType.UDP);
1857 	}
1858 
1859 	/// Bind to a local address in order to receive packets sent there.
1860 	final ushort bind(string host, ushort port)
1861 	{
1862 		assert(host.length, "Empty host");
1863 
1864 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1865 
1866 		state = ConnectionState.resolving;
1867 
1868 		AddressInfo addressInfo;
1869 		try
1870 		{
1871 			auto addresses = getAddress(host, port);
1872 			enforce(addresses.length, "No addresses found");
1873 			debug (ASOCKETS)
1874 			{
1875 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1876 				foreach (address; addresses)
1877 					stderr.writefln("- %s", address.toString());
1878 			}
1879 
1880 			Address address;
1881 			if (addresses.length > 1)
1882 			{
1883 				import std.random : uniform;
1884 				address = addresses[uniform(0, $)];
1885 			}
1886 			else
1887 				address = addresses[0];
1888 			addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host);
1889 		}
1890 		catch (SocketException e)
1891 		{
1892 			onError("Lookup error: " ~ e.msg);
1893 			return 0;
1894 		}
1895 
1896 		state = ConnectionState.disconnected;
1897 		return super.bind(addressInfo);
1898 	}
1899 }
1900 
1901 ///
1902 debug(ae_unittest) unittest
1903 {
1904 	auto server = new UdpConnection();
1905 	server.bind("127.0.0.1", 0);
1906 
1907 	auto client = new UdpConnection();
1908 	client.initialize(server.localAddress.addressFamily);
1909 
1910 	string[] packets = ["", "Hello", "there"];
1911 	client.remoteAddress = server.localAddress;
1912 	client.send({
1913 		DataVec data;
1914 		foreach (packet; packets)
1915 			data ~= Data(packet.asBytes);
1916 		return data;
1917 	}()[]);
1918 
1919 	server.handleReadData = (Data data)
1920 	{
1921 		assert(data.unsafeContents == packets[0]);
1922 		packets = packets[1..$];
1923 		if (!packets.length)
1924 		{
1925 			server.close();
1926 			client.close();
1927 		}
1928 	};
1929 	socketManager.loop();
1930 	assert(!packets.length);
1931 }
1932 
1933 // ***************************************************************************
1934 
1935 /// Base class for a connection adapter.
1936 /// By itself, does nothing.
1937 class ConnectionAdapter : IConnection
1938 {
1939 	/// The next connection in the chain (towards the raw transport).
1940 	IConnection next;
1941 
1942 	this(IConnection next)
1943 	{
1944 		this.next = next;
1945 		next.handleConnect = &onConnect;
1946 		next.handleDisconnect = &onDisconnect;
1947 		next.handleBufferFlushed = &onBufferFlushed;
1948 	} ///
1949 
1950 	@property ConnectionState state() { return next.state; } ///
1951 
1952 	/// Queue Data for sending.
1953 	void send(scope Data[] data, int priority)
1954 	{
1955 		next.send(data, priority);
1956 	}
1957 
1958 	alias send = IConnection.send; /// ditto
1959 
1960 	/// Terminate the connection.
1961 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1962 	{
1963 		next.disconnect(reason, type);
1964 	}
1965 
1966 	protected void onConnect()
1967 	{
1968 		if (connectHandler)
1969 			connectHandler();
1970 	}
1971 
1972 	protected void onReadData(Data data)
1973 	{
1974 		// onReadData should be fired only if readDataHandler is set
1975 		assert(readDataHandler, "onReadData caled with null readDataHandler");
1976 		readDataHandler(data);
1977 	}
1978 
1979 	protected void onDisconnect(string reason, DisconnectType type)
1980 	{
1981 		if (disconnectHandler)
1982 			disconnectHandler(reason, type);
1983 	}
1984 
1985 	protected void onBufferFlushed()
1986 	{
1987 		if (bufferFlushedHandler)
1988 			bufferFlushedHandler();
1989 	}
1990 
1991 	/// Callback for when a connection has been established.
1992 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1993 	protected ConnectHandler connectHandler;
1994 
1995 	/// Callback setter for when new data is read.
1996 	@property void handleReadData(ReadDataHandler value)
1997 	{
1998 		readDataHandler = value;
1999 		next.handleReadData = value ? &onReadData : null ;
2000 	}
2001 	protected ReadDataHandler readDataHandler;
2002 
2003 	/// Callback setter for when a connection was closed.
2004 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
2005 	protected DisconnectHandler disconnectHandler;
2006 
2007 	/// Callback setter for when all queued data has been written.
2008 	@property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; }
2009 	protected BufferFlushedHandler bufferFlushedHandler;
2010 }
2011 
2012 // ***************************************************************************
2013 
2014 /// Buffers the data received from the next connection,
2015 /// even when this object is not marked as readable (handleReadData unset).
2016 class BufferingAdapter : ConnectionAdapter
2017 {
2018 	Data[] buffer;
2019 
2020 	this(IConnection next)
2021 	{
2022 		super(next);
2023 		next.handleReadData = &onReadData;
2024 	}
2025 
2026 	override void onReadData(Data data)
2027 	{
2028 		if (readDataHandler)
2029 		{
2030 			assert(buffer.length == 0);
2031 			readDataHandler(data);
2032 		}
2033 		else
2034 			buffer.queuePush(data);
2035 	}
2036 
2037 	override @property void handleReadData(ReadDataHandler value)
2038 	{
2039 		readDataHandler = value;
2040 		while (readDataHandler && buffer.length)
2041 			readDataHandler(buffer.queuePop);
2042 	}
2043 }
2044 
2045 // ***************************************************************************
2046 
2047 /// Adapter for connections with a line-based protocol.
2048 /// Splits data stream into delimiter-separated lines.
2049 class LineBufferedAdapter : ConnectionAdapter
2050 {
2051 	/// The protocol's line delimiter.
2052 	string delimiter = "\r\n";
2053 
2054 	/// Maximum line length (0 means unlimited).
2055 	size_t maxLength = 0;
2056 
2057 	this(IConnection next)
2058 	{
2059 		super(next);
2060 	} ///
2061 
2062 	/// Append a line to the send buffer.
2063 	void send(string line)
2064 	{
2065 		//super.send(Data(line ~ delimiter));
2066 		// https://issues.dlang.org/show_bug.cgi?id=13985
2067 		ConnectionAdapter ca = this;
2068 		ca.send(Data(line.asBytes ~ delimiter.asBytes));
2069 	}
2070 
2071 protected:
2072 	/// The receive buffer.
2073 	Data inBuffer;
2074 
2075 	/// Called when data has been received.
2076 	final override void onReadData(Data data)
2077 	{
2078 		import std.string : indexOf;
2079 		auto startIndex = inBuffer.length;
2080 		if (inBuffer.length)
2081 			inBuffer ~= data;
2082 		else
2083 			inBuffer = data;
2084 
2085 		assert(delimiter.length >= 1);
2086 		if (startIndex >= delimiter.length)
2087 			startIndex -= delimiter.length - 1;
2088 		else
2089 			startIndex = 0;
2090 
2091 		auto index = inBuffer[startIndex .. $].indexOf(delimiter.asBytes);
2092 		while (index >= 0)
2093 		{
2094 			if (!processLine(startIndex + index))
2095 				break;
2096 
2097 			startIndex = 0;
2098 			index = inBuffer.indexOf(delimiter.asBytes);
2099 		}
2100 
2101 		if (maxLength && inBuffer.length > maxLength)
2102 			disconnect("Line too long", DisconnectType.error);
2103 	}
2104 
2105 	// `index` is the index of the delimiter's first character.
2106 	final bool processLine(size_t index)
2107 	{
2108 		if (maxLength && index > maxLength)
2109 		{
2110 			disconnect("Line too long", DisconnectType.error);
2111 			return false;
2112 		}
2113 		auto line = inBuffer[0..index];
2114 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
2115 		super.onReadData(line);
2116 		return true;
2117 	}
2118 
2119 	override void onDisconnect(string reason, DisconnectType type)
2120 	{
2121 		super.onDisconnect(reason, type);
2122 		inBuffer.clear();
2123 	}
2124 }
2125 
2126 // ***************************************************************************
2127 
2128 /// Fires an event handler or disconnects connections
2129 /// after a period of inactivity.
2130 class TimeoutAdapter : ConnectionAdapter
2131 {
2132 	this(IConnection next)
2133 	{
2134 		debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this);
2135 		super(next);
2136 	} ///
2137 
2138 	/// Set the `Duration` indicating the period of inactivity after which to take action.
2139 	final void setIdleTimeout(Duration timeout)
2140 	{
2141 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
2142 		assert(timeout > Duration.zero);
2143 		this.timeout = timeout;
2144 
2145 		// Configure idleTask
2146 		if (idleTask is null)
2147 		{
2148 			idleTask = new TimerTask();
2149 			idleTask.handleTask = &onTask_Idle;
2150 		}
2151 		else if (idleTask.isWaiting())
2152 			idleTask.cancel();
2153 
2154 		mainTimer.add(idleTask, now + timeout);
2155 	}
2156 
2157 	/// Manually mark this connection as non-idle, restarting the idle timer.
2158 	/// `handleNonIdle` will be called, if set.
2159 	void markNonIdle()
2160 	{
2161 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
2162 		if (handleNonIdle)
2163 			handleNonIdle();
2164 		if (idleTask && idleTask.isWaiting())
2165 			idleTask.restart(now + timeout);
2166 	}
2167 
2168 	/// Stop the idle timer.
2169 	void cancelIdleTimeout()
2170 	{
2171 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
2172 		assert(idleTask !is null);
2173 		assert(idleTask.isWaiting());
2174 		idleTask.cancel();
2175 	}
2176 
2177 	/// Restart the idle timer.
2178 	void resumeIdleTimeout()
2179 	{
2180 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
2181 		assert(idleTask !is null);
2182 		assert(!idleTask.isWaiting());
2183 		mainTimer.add(idleTask, now + timeout);
2184 	}
2185 
2186 	/// Returns the point in time when the idle handler is scheduled
2187 	/// to be called, or null if it is not scheduled.
2188 	/*Nullable!MonoTime*/auto when()()
2189 	{
2190 		import std.typecons : Nullable;
2191 		return idleTask.isWaiting()
2192 			? Nullable!MonoTime(idleTask.when)
2193 			: Nullable!MonoTime.init;
2194  	}
2195 
2196 	/// Callback for when a connection has stopped responding.
2197 	/// If unset, the connection will be disconnected.
2198 	void delegate() handleIdleTimeout;
2199 
2200 	/// Callback for when a connection is marked as non-idle
2201 	/// (when data is received).
2202 	void delegate() handleNonIdle;
2203 
2204 protected:
2205 	override void onConnect()
2206 	{
2207 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
2208 		markNonIdle();
2209 		super.onConnect();
2210 	}
2211 
2212 	override void onReadData(Data data)
2213 	{
2214 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
2215 		markNonIdle();
2216 		super.onReadData(data);
2217 	}
2218 
2219 	override void onDisconnect(string reason, DisconnectType type)
2220 	{
2221 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
2222 		if (idleTask && idleTask.isWaiting())
2223 			idleTask.cancel();
2224 		super.onDisconnect(reason, type);
2225 	}
2226 
2227 private:
2228 	TimerTask idleTask; // non-null if an idle timeout has been set
2229 	Duration timeout;
2230 
2231 	final void onTask_Idle(Timer /*timer*/, TimerTask /*task*/)
2232 	{
2233 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this);
2234 		if (state == ConnectionState.disconnecting)
2235 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
2236 
2237 		if (state == ConnectionState.disconnected)
2238 			return;
2239 
2240 		if (handleIdleTimeout)
2241 		{
2242 			resumeIdleTimeout(); // reschedule (by default)
2243 			handleIdleTimeout();
2244 		}
2245 		else
2246 			disconnect("Time-out", DisconnectType.error);
2247 	}
2248 }
2249 
2250 // ***************************************************************************
2251 
2252 debug(ae_unittest) unittest
2253 {
2254 	void testTimer()
2255 	{
2256 		bool fired;
2257 		setTimeout({fired = true;}, 10.msecs);
2258 		socketManager.loop();
2259 		assert(fired);
2260 	}
2261 
2262 	testTimer();
2263 }