1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <vladimir@thecybershadow.net>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.math;
21 public import ae.sys.data;
22 
23 import std.exception;
24 import std.socket;
25 import std.string : format;
26 public import std.socket : Address, Socket;
27 
28 debug(ASOCKETS) import std.stdio;
29 debug(PRINTDATA) import ae.utils.text : hexDump;
30 private import std.conv : to;
31 
32 
33 // http://d.puremagic.com/issues/show_bug.cgi?id=7016
34 static import ae.utils.array;
35 
36 version(LIBEV)
37 {
38 	import deimos.ev;
39 	pragma(lib, "ev");
40 }
41 
42 version(Windows)
43 {
44 	import core.sys.windows.windows : Sleep;
45 	enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
46 }
47 else
48 	enum USE_SLEEP = false;
49 
50 /// Flags that determine socket wake-up events.
51 int eventCounter;
52 
53 version(LIBEV)
54 {
55 	// Watchers are a GenericSocket field (as declared in SocketMixin).
56 	// Use one watcher per read and write event.
57 	// Start/stop those as higher-level code declares interest in those events.
58 	// Use the "data" ev_io field to store the parent GenericSocket address.
59 	// Also use the "data" field as a flag to indicate whether the watcher is active
60 	// (data is null when the watcher is stopped).
61 
62 	struct SocketManager
63 	{
64 	private:
65 		size_t count;
66 
67 		extern(C)
68 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
69 		{
70 			eventCounter++;
71 			auto socket = cast(GenericSocket)w.data;
72 			assert(socket, "libev event fired on stopped watcher");
73 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents);
74 
75 			if (revents & EV_READ)
76 				socket.onReadable();
77 			else
78 			if (revents & EV_WRITE)
79 				socket.onWritable();
80 			else
81 				assert(false, "Unknown event fired from libev");
82 
83 			// TODO? Need to get proper SocketManager instance to call updateTimer on
84 			socketManager.updateTimer(false);
85 		}
86 
87 		ev_timer evTimer;
88 		MonoTime lastNextEvent = MonoTime.max;
89 
90 		extern(C)
91 		static void timerCallback(ev_loop_t* l, ev_timer* w, int revents)
92 		{
93 			eventCounter++;
94 			debug (ASOCKETS) writefln("Timer callback called.");
95 			mainTimer.prod();
96 
97 			socketManager.updateTimer(true);
98 			debug (ASOCKETS) writefln("Timer callback exiting.");
99 		}
100 
101 		void updateTimer(bool force)
102 		{
103 			auto nextEvent = mainTimer.getNextEvent();
104 			if (force || lastNextEvent != nextEvent)
105 			{
106 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
107 				if (nextEvent == MonoTime.max) // Stopping
108 				{
109 					if (lastNextEvent != MonoTime.max)
110 						ev_timer_stop(ev_default_loop(0), &evTimer);
111 				}
112 				else
113 				{
114 					auto remaining = mainTimer.getRemainingTime();
115 					while (remaining <= Duration.zero)
116 					{
117 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
118 						mainTimer.prod();
119 						remaining = mainTimer.getRemainingTime();
120 					}
121 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
122 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
123 					if (lastNextEvent == MonoTime.max) // Starting
124 					{
125 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
126 						ev_timer_start(ev_default_loop(0), &evTimer);
127 					}
128 					else // Adjusting
129 					{
130 						evTimer.repeat = tstamp;
131 						ev_timer_again(ev_default_loop(0), &evTimer);
132 					}
133 				}
134 				lastNextEvent = nextEvent;
135 			}
136 		}
137 
138 		/// Register a socket with the manager.
139 		void register(GenericSocket socket)
140 		{
141 			debug (ASOCKETS) writefln("Registering %s", socket);
142 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
143 			auto fd = socket.conn.handle;
144 			assert(fd, "Must have fd before socket registration");
145 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
146 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
147 			count++;
148 		}
149 
150 		/// Unregister a socket with the manager.
151 		void unregister(GenericSocket socket)
152 		{
153 			debug (ASOCKETS) writefln("Unregistering %s", socket);
154 			socket.notifyRead  = false;
155 			socket.notifyWrite = false;
156 			count--;
157 		}
158 
159 	public:
160 		size_t size()
161 		{
162 			return count;
163 		}
164 
165 		/// Loop continuously until no sockets are left.
166 		void loop()
167 		{
168 			auto evLoop = ev_default_loop(0);
169 			enforce(evLoop, "libev initialization failure");
170 
171 			updateTimer(true);
172 			debug (ASOCKETS) writeln("ev_run");
173 			ev_run(ev_default_loop(0), 0);
174 		}
175 	}
176 
177 	private mixin template SocketMixin()
178 	{
179 		private ev_io evRead, evWrite;
180 
181 		private void setWatcherState(ref ev_io ev, bool newValue, int event)
182 		{
183 			if (!conn)
184 			{
185 				// Can happen when setting delegates before connecting.
186 				return;
187 			}
188 
189 			if (newValue && !ev.data)
190 			{
191 				// Start
192 				ev.data = cast(void*)this;
193 				ev_io_start(ev_default_loop(0), &ev);
194 			}
195 			else
196 			if (!newValue && ev.data)
197 			{
198 				// Stop
199 				assert(ev.data is cast(void*)this);
200 				ev.data = null;
201 				ev_io_stop(ev_default_loop(0), &ev);
202 			}
203 		}
204 
205 		/// Interested in read notifications (onReadable)?
206 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
207 		/// Interested in write notifications (onWritable)?
208 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
209 
210 		debug ~this()
211 		{
212 			// The LIBEV SocketManager holds no references to registered sockets.
213 			// TODO: Add a doubly-linked list?
214 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
215 		}
216 	}
217 }
218 else // Use select
219 {
220 	struct SocketManager
221 	{
222 	private:
223 		enum FD_SETSIZE = 1024;
224 
225 		/// List of all sockets to poll.
226 		GenericSocket[] sockets;
227 
228 		/// Debug AA to check for dangling socket references.
229 		debug GenericSocket[socket_t] socketHandles;
230 
231 		/// Register a socket with the manager.
232 		void register(GenericSocket conn)
233 		{
234 			debug (ASOCKETS) writefln("Registering %s (%d total)", conn, sockets.length + 1);
235 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
236 			sockets ~= conn;
237 
238 			debug
239 			{
240 				auto handle = conn.socket.handle;
241 				assert(handle != socket_t.init, "Can't register a closed socket");
242 				assert(handle !in socketHandles, "This socket handle is already registered");
243 				socketHandles[handle] = conn;
244 			}
245 		}
246 
247 		/// Unregister a socket with the manager.
248 		void unregister(GenericSocket conn)
249 		{
250 			debug (ASOCKETS) writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
251 
252 			debug
253 			{
254 				auto handle = conn.socket.handle;
255 				assert(handle != socket_t.init, "Can't unregister a closed socket");
256 				auto pconn = handle in socketHandles;
257 				assert(pconn, "This socket handle is not registered");
258 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
259 				socketHandles.remove(handle);
260 			}
261 
262 			foreach (size_t i, GenericSocket j; sockets)
263 				if (j is conn)
264 				{
265 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
266 					return;
267 				}
268 			assert(false, "Socket not registered");
269 		}
270 
271 		void delegate()[] idleHandlers;
272 
273 	public:
274 		size_t size()
275 		{
276 			return sockets.length;
277 		}
278 
279 		/// Loop continuously until no sockets are left.
280 		void loop()
281 		{
282 			debug (ASOCKETS) writeln("Starting event loop.");
283 
284 			SocketSet readset, writeset, errorset;
285 			size_t sockcount;
286 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
287 			readset  = new SocketSet(setSize);
288 			writeset = new SocketSet(setSize);
289 			errorset = new SocketSet(setSize);
290 			while (true)
291 			{
292 				uint minSize = 0;
293 				version(Windows)
294 					minSize = cast(uint)sockets.length;
295 				else
296 				{
297 					foreach (s; sockets)
298 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
299 							minSize = s.socket.handle;
300 				}
301 				minSize++;
302 
303 				if (setSize < minSize)
304 				{
305 					debug (ASOCKETS) writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
306 					setSize = minSize * 2;
307 					readset  = new SocketSet(setSize);
308 					writeset = new SocketSet(setSize);
309 					errorset = new SocketSet(setSize);
310 				}
311 				else
312 				{
313 					readset.reset();
314 					writeset.reset();
315 					errorset.reset();
316 				}
317 
318 				sockcount = 0;
319 				bool haveActive;
320 				debug (ASOCKETS) writeln("Populating sets");
321 				foreach (GenericSocket conn; sockets)
322 				{
323 					if (!conn.socket)
324 						continue;
325 					sockcount++;
326 					if (!conn.daemon)
327 						haveActive = true;
328 
329 					debug (ASOCKETS) writef("\t%s:", conn);
330 					if (conn.notifyRead)
331 					{
332 						readset.add(conn.socket);
333 						debug (ASOCKETS) write(" READ");
334 					}
335 					if (conn.notifyWrite)
336 					{
337 						writeset.add(conn.socket);
338 						debug (ASOCKETS) write(" WRITE");
339 					}
340 					errorset.add(conn.socket);
341 					debug (ASOCKETS) writeln();
342 				}
343 				debug (ASOCKETS)
344 				{
345 					writefln("Sets populated as follows:");
346 					printSets(readset, writeset, errorset);
347 				}
348 
349 				debug (ASOCKETS) writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...",
350 					sockcount,
351 					mainTimer.isWaiting() ? "with" : "no",
352 					idleHandlers.length,
353 				);
354 				if (!haveActive && !mainTimer.isWaiting())
355 				{
356 					debug (ASOCKETS) writeln("No more sockets or timer events, exiting loop.");
357 					break;
358 				}
359 
360 				debug (ASOCKETS) { stdout.flush(); stderr.flush(); }
361 
362 				int events;
363 				if (idleHandlers.length)
364 				{
365 					if (sockcount==0)
366 						events = 0;
367 					else
368 						events = Socket.select(readset, writeset, errorset, 0.seconds);
369 				}
370 				else
371 				if (USE_SLEEP && sockcount==0)
372 				{
373 					version(Windows)
374 					{
375 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
376 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
377 						if (duration <= 0)
378 							duration = 1; // Avoid busywait
379 						else
380 						if (duration > int.max)
381 							duration = int.max;
382 						Sleep(cast(int)duration);
383 						events = 0;
384 					}
385 					else
386 						assert(0);
387 				}
388 				else
389 				if (mainTimer.isWaiting())
390 					events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime());
391 				else
392 					events = Socket.select(readset, writeset, errorset);
393 
394 				debug (ASOCKETS) writefln("%d events fired.", events);
395 
396 				if (events > 0)
397 				{
398 					// Handle just one event at a time, as the first
399 					// handler might invalidate select()'s results.
400 					handleEvent(readset, writeset, errorset);
401 				}
402 				else
403 				if (idleHandlers.length)
404 				{
405 					import ae.utils.array;
406 					auto handler = idleHandlers.shift();
407 
408 					// Rotate the idle handler queue before running it,
409 					// in case the handler unregisters itself.
410 					idleHandlers ~= handler;
411 
412 					handler();
413 				}
414 
415 				// Timers may invalidate our select results, so fire them after processing the latter
416 				mainTimer.prod();
417 
418 				eventCounter++;
419 			}
420 		}
421 
422 		debug (ASOCKETS)
423 		void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset)
424 		{
425 			foreach (GenericSocket conn; sockets)
426 			{
427 				if (!conn.socket)
428 					writefln("\t\t%s is unset", conn);
429 				else
430 				if (readset.isSet(conn.socket))
431 					writefln("\t\t%s is readable", conn);
432 				else
433 				if (writeset.isSet(conn.socket))
434 					writefln("\t\t%s is writable", conn);
435 				else
436 				if (errorset.isSet(conn.socket))
437 					writefln("\t\t%s is errored", conn);
438 			}
439 		}
440 
441 		void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset)
442 		{
443 			debug (ASOCKETS)
444 			{
445 				writefln("\tSelect results:");
446 				printSets(readset, writeset, errorset);
447 			}
448 
449 			foreach (GenericSocket conn; sockets)
450 			{
451 				if (!conn.socket)
452 					continue;
453 
454 				if (readset.isSet(conn.socket))
455 				{
456 					debug (ASOCKETS) writefln("\t%s - calling onReadable", conn);
457 					return conn.onReadable();
458 				}
459 				else
460 				if (writeset.isSet(conn.socket))
461 				{
462 					debug (ASOCKETS) writefln("\t%s - calling onWritable", conn);
463 					return conn.onWritable();
464 				}
465 				else
466 				if (errorset.isSet(conn.socket))
467 				{
468 					debug (ASOCKETS) writefln("\t%s - calling onError", conn);
469 					return conn.onError("select() error: " ~ conn.socket.getErrorText());
470 				}
471 			}
472 
473 			assert(false, "select() reported events available, but no registered sockets are set");
474 		}
475 	}
476 
477 	// Use UFCS to allow removeIdleHandler to have a predicate with context
478 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
479 	{
480 		foreach (i, idleHandler; socketManager.idleHandlers)
481 			assert(handler !is idleHandler);
482 
483 		socketManager.idleHandlers ~= handler;
484 	}
485 
486 	static bool isFun(T)(T a, T b) { return a is b; }
487 	void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args)
488 	{
489 		foreach (i, idleHandler; socketManager.idleHandlers)
490 			if (pred(idleHandler, args))
491 			{
492 				import std.algorithm;
493 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
494 				return;
495 			}
496 		assert(false, "No such idle handler");
497 	}
498 
499 	private mixin template SocketMixin()
500 	{
501 		/// Interested in read notifications (onReadable)?
502 		bool notifyRead;
503 		/// Interested in write notifications (onWritable)?
504 		bool notifyWrite;
505 	}
506 }
507 
508 /// The default socket manager.
509 SocketManager socketManager;
510 
511 // ***************************************************************************
512 
513 /// General methods for an asynchronous socket.
514 private abstract class GenericSocket
515 {
516 	/// Declares notifyRead and notifyWrite.
517 	mixin SocketMixin;
518 
519 protected:
520 	/// The socket this class wraps.
521 	Socket conn;
522 
523 protected:
524 	/// Retrieve the socket class this class wraps.
525 	@property final Socket socket()
526 	{
527 		return conn;
528 	}
529 
530 	void onReadable()
531 	{
532 	}
533 
534 	void onWritable()
535 	{
536 	}
537 
538 	void onError(string reason)
539 	{
540 	}
541 
542 public:
543 	/// allow getting the address of connections that are already disconnected
544 	private Address cachedLocalAddress, cachedRemoteAddress;
545 
546 	/// Don't block the process from exiting.
547 	/// TODO: Not implemented with libev
548 	bool daemon;
549 
550 	final @property Address localAddress()
551 	{
552 		if (cachedLocalAddress !is null)
553 			return cachedLocalAddress;
554 		else
555 		if (conn is null)
556 			return null;
557 		else
558 			return cachedLocalAddress = conn.localAddress();
559 	}
560 
561 	final @property string localAddressStr() nothrow
562 	{
563 		try
564 		{
565 			auto a = localAddress;
566 			return a is null ? "[null address]" : a.toString();
567 		}
568 		catch (Exception e)
569 			return "[error: " ~ e.msg ~ "]";
570 	}
571 
572 	final @property Address remoteAddress()
573 	{
574 		if (cachedRemoteAddress !is null)
575 			return cachedRemoteAddress;
576 		else
577 		if (conn is null)
578 			return null;
579 		else
580 			return cachedRemoteAddress = conn.remoteAddress();
581 	}
582 
583 	final @property string remoteAddressStr() nothrow
584 	{
585 		try
586 		{
587 			auto a = remoteAddress;
588 			return a is null ? "[null address]" : a.toString();
589 		}
590 		catch (Exception e)
591 			return "[error: " ~ e.msg ~ "]";
592 	}
593 
594 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
595 	{
596 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
597 		if (enabled)
598 		{
599 			try
600 				conn.setKeepAlive(time, interval);
601 			catch (SocketFeatureException)
602 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
603 		}
604 		else
605 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
606 	}
607 
608 	override string toString() const
609 	{
610 		import std.string;
611 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
612 	}
613 }
614 
615 // ***************************************************************************
616 
617 enum DisconnectType
618 {
619 	requested, // initiated by the application
620 	graceful,  // peer gracefully closed the connection
621 	error      // abnormal network condition
622 }
623 
624 enum ConnectionState
625 {
626 	/// The initial state, or the state after a disconnect was fully processed.
627 	disconnected,
628 
629 	/// Name resolution. Currently done synchronously.
630 	resolving,
631 
632 	/// A connection attempt is in progress.
633 	connecting,
634 
635 	/// A connection is established.
636 	connected,
637 
638 	/// Disconnecting in progress. No data can be sent or received at this point.
639 	/// We are waiting for queued data to be actually sent before disconnecting.
640 	disconnecting,
641 }
642 
643 /// Common interface for connections and adapters.
644 interface IConnection
645 {
646 	enum MAX_PRIORITY = 4;
647 	enum DEFAULT_PRIORITY = 2;
648 
649 	static const defaultDisconnectReason = "Software closed the connection";
650 
651 	/// Get connection state.
652 	@property ConnectionState state();
653 
654 	/// Has a connection been established?
655 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
656 
657 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
658 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
659 
660 	/// Queue Data for sending.
661 	void send(Data[] data, int priority = DEFAULT_PRIORITY);
662 
663 	/// ditto
664 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
665 	{
666 		Data[1] data;
667 		data[0] = datum;
668 		this.send(data);
669 		data[] = Data.init;
670 	}
671 
672 	/// Terminate the connection.
673 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
674 
675 	/// Callback setter for when a connection has been established
676 	/// (if applicable).
677 	alias void delegate() ConnectHandler;
678 	@property void handleConnect(ConnectHandler value); /// ditto
679 
680 	/// Callback setter for when new data is read.
681 	alias void delegate(Data data) ReadDataHandler;
682 	@property void handleReadData(ReadDataHandler value); /// ditto
683 
684 	/// Callback setter for when a connection was closed.
685 	alias void delegate(string reason, DisconnectType type) DisconnectHandler;
686 	@property void handleDisconnect(DisconnectHandler value); /// ditto
687 }
688 
689 // ***************************************************************************
690 
691 class StreamConnection : GenericSocket, IConnection
692 {
693 private:
694 	/// Blocks of data larger than this value are passed as unmanaged memory
695 	/// (in Data objects). Blocks smaller than this value will be reallocated
696 	/// on the managed heap. The disadvantage of placing large objects on the
697 	/// managed heap is false pointers; the disadvantage of using Data for
698 	/// small objects is wasted slack space due to the page size alignment
699 	/// requirement.
700 	enum UNMANAGED_THRESHOLD = 256;
701 
702 	/// Queue of addresses to try connecting to.
703 	Address[] addressQueue;
704 
705 	ConnectionState _state;
706 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
707 
708 public:
709 	/// Get connection state.
710 	override @property ConnectionState state() { return _state; }
711 
712 protected:
713 	abstract sizediff_t doSend(in void[] buffer);
714 	abstract sizediff_t doReceive(void[] buffer);
715 
716 	/// The send buffers.
717 	Data[][MAX_PRIORITY+1] outQueue;
718 	/// Whether the first item from each queue has been partially sent (and thus can't be cancelled).
719 	bool[MAX_PRIORITY+1] partiallySent;
720 
721 	/// Constructor used by a ServerSocket for new connections
722 	this(Socket conn)
723 	{
724 		this();
725 		this.conn = conn;
726 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
727 		if (conn)
728 			socketManager.register(this);
729 		updateFlags();
730 	}
731 
732 	final void updateFlags()
733 	{
734 		if (state == ConnectionState.connecting)
735 			notifyWrite = true;
736 		else
737 			notifyWrite = writePending;
738 
739 		notifyRead = state == ConnectionState.connected && readDataHandler;
740 	}
741 
742 	/// Called when a socket is readable.
743 	override void onReadable()
744 	{
745 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
746 		static ubyte[0x10000] inBuffer;
747 		auto received = doReceive(inBuffer);
748 
749 		if (received == 0)
750 			return disconnect("Connection closed", DisconnectType.graceful);
751 
752 		if (received == Socket.ERROR)
753 		{
754 		//	if (wouldHaveBlocked)
755 		//	{
756 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
757 		//		return;
758 		//	}
759 		//	else
760 				onError("recv() error: " ~ lastSocketError);
761 		}
762 		else
763 		{
764 			debug (PRINTDATA)
765 			{
766 				std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress);
767 				std.stdio.write(hexDump(inBuffer[0 .. received]));
768 				std.stdio.stdout.flush();
769 			}
770 
771 			if (state == ConnectionState.disconnecting)
772 			{
773 				debug (ASOCKETS) writefln("\t\t%s: Discarding received data because we are disconnecting", this);
774 			}
775 			else
776 			if (!readDataHandler)
777 			{
778 				debug (ASOCKETS) writefln("\t\t%s: Discarding received data because there is no data handler", this);
779 			}
780 			else
781 			{
782 				// Currently, unlike the D1 version of this module,
783 				// we will always reallocate read network data.
784 				// This disfavours code which doesn't need to store
785 				// read data after processing it, but otherwise
786 				// makes things simpler and safer all around.
787 
788 				if (received < UNMANAGED_THRESHOLD)
789 				{
790 					// Copy to the managed heap
791 					readDataHandler(Data(inBuffer[0 .. received].dup));
792 				}
793 				else
794 				{
795 					// Copy to unmanaged memory
796 					readDataHandler(Data(inBuffer[0 .. received], true));
797 				}
798 			}
799 		}
800 	}
801 
802 	/// Called when a socket is writable.
803 	override void onWritable()
804 	{
805 		//scope(success) updateFlags();
806 		onWritableImpl();
807 		updateFlags();
808 	}
809 
810 	// Work around scope(success) breaking debugger stack traces
811 	final private void onWritableImpl()
812 	{
813 		if (state == ConnectionState.connecting)
814 		{
815 			state = ConnectionState.connected;
816 
817 			//debug writefln("[%s] Connected", remoteAddress);
818 			try
819 				setKeepAlive();
820 			catch (Exception e)
821 				return disconnect(e.msg, DisconnectType.error);
822 			if (connectHandler)
823 				connectHandler();
824 			return;
825 		}
826 		//debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
827 
828 		foreach (priority, ref queue; outQueue)
829 			while (queue.length)
830 			{
831 				auto pdata = queue.ptr; // pointer to first data
832 
833 				ptrdiff_t sent = 0;
834 				if (pdata.length)
835 				{
836 					sent = doSend(pdata.contents);
837 					debug (ASOCKETS) writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length);
838 				}
839 				else
840 				{
841 					debug (ASOCKETS) writefln("\t\t%s: empty Data object", this);
842 				}
843 
844 				if (sent == Socket.ERROR)
845 				{
846 					if (wouldHaveBlocked())
847 						return;
848 					else
849 						return onError("send() error: " ~ lastSocketError);
850 				}
851 				else
852 				if (sent < pdata.length)
853 				{
854 					if (sent > 0)
855 					{
856 						*pdata = (*pdata)[sent..pdata.length];
857 						partiallySent[priority] = true;
858 					}
859 					return;
860 				}
861 				else
862 				{
863 					assert(sent == pdata.length);
864 					//debug writefln("[%s] Sent data:", remoteAddress);
865 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
866 					pdata.clear();
867 					queue = queue[1..$];
868 					partiallySent[priority] = false;
869 					if (queue.length == 0)
870 						queue = null;
871 				}
872 			}
873 
874 		// outQueue is now empty
875 		if (handleBufferFlushed)
876 			handleBufferFlushed();
877 		if (state == ConnectionState.disconnecting)
878 		{
879 			debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
880 			close();
881 		}
882 	}
883 
884 	/// Called when an error occurs on the socket.
885 	override void onError(string reason)
886 	{
887 		if (state == ConnectionState.disconnecting)
888 		{
889 			debug (ASOCKETS) writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
890 			return close();
891 		}
892 
893 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
894 		disconnect("Socket error: " ~ reason, DisconnectType.error);
895 	}
896 
897 	this()
898 	{
899 	}
900 
901 public:
902 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
903 	/// The disconnect handler will be called when all data has been flushed.
904 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
905 	{
906 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
907 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected, "Attempting to disconnect on a %s socket".format(state));
908 
909 		if (writePending)
910 		{
911 			if (type==DisconnectType.requested)
912 			{
913 				assert(conn, "Attempting to disconnect on an uninitialized socket");
914 				// queue disconnect after all data is sent
915 				debug (ASOCKETS) writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason);
916 				state = ConnectionState.disconnecting;
917 				//setIdleTimeout(30.seconds);
918 				if (disconnectHandler)
919 					disconnectHandler(reason, type);
920 				updateFlags();
921 				return;
922 			}
923 			else
924 				discardQueues();
925 		}
926 
927 		debug (ASOCKETS) writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
928 
929 		if (state == ConnectionState.connecting || state == ConnectionState.connected)
930 			close();
931 		else
932 			assert(conn is null, "Registered but %s socket".format(state));
933 
934 		if (disconnectHandler)
935 			disconnectHandler(reason, type);
936 		updateFlags();
937 	}
938 
939 	private final void close()
940 	{
941 		assert(conn, "Attempting to close an unregistered socket");
942 		socketManager.unregister(this);
943 		conn.close();
944 		conn = null;
945 		outQueue[] = null;
946 		state = ConnectionState.disconnected;
947 	}
948 
949 	/// Append data to the send buffer.
950 	void send(Data[] data, int priority = DEFAULT_PRIORITY)
951 	{
952 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
953 		outQueue[priority] ~= data;
954 		notifyWrite = true; // Fast updateFlags()
955 
956 		debug (PRINTDATA)
957 		{
958 			std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress);
959 			foreach (datum; data)
960 				if (datum.length)
961 					std.stdio.write(hexDump(datum.contents));
962 				else
963 					std.stdio.writeln("(empty Data)");
964 			std.stdio.stdout.flush();
965 		}
966 	}
967 
968 	/// ditto
969 	alias send = IConnection.send;
970 
971 	final void clearQueue(int priority)
972 	{
973 		if (partiallySent[priority])
974 		{
975 			assert(outQueue[priority].length > 0);
976 			outQueue[priority] = outQueue[priority][0..1];
977 		}
978 		else
979 			outQueue[priority] = null;
980 		updateFlags();
981 	}
982 
983 	/// Clears all queues, even partially sent content.
984 	private final void discardQueues()
985 	{
986 		foreach (priority; 0..MAX_PRIORITY+1)
987 		{
988 			outQueue[priority] = null;
989 			partiallySent[priority] = false;
990 		}
991 		updateFlags();
992 	}
993 
994 	@property
995 	final bool writePending()
996 	{
997 		foreach (queue; outQueue)
998 			if (queue.length)
999 				return true;
1000 		return false;
1001 	}
1002 
1003 	final bool queuePresent(int priority)
1004 	{
1005 		if (partiallySent[priority])
1006 		{
1007 			assert(outQueue[priority].length > 0);
1008 			return outQueue[priority].length > 1;
1009 		}
1010 		else
1011 			return outQueue[priority].length > 0;
1012 	}
1013 
1014 public:
1015 	private ConnectHandler connectHandler;
1016 	/// Callback for when a connection has been established.
1017 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
1018 
1019 	/// Callback for when the send buffer has been flushed.
1020 	void delegate() handleBufferFlushed;
1021 
1022 	private ReadDataHandler readDataHandler;
1023 	/// Callback for incoming data.
1024 	/// Data will not be received unless this handler is set.
1025 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
1026 
1027 	private DisconnectHandler disconnectHandler;
1028 	/// Callback for when a connection was closed.
1029 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
1030 }
1031 
1032 // ***************************************************************************
1033 
1034 /// A POSIX file stream.
1035 /// Allows adding a file (e.g. stdin/stdout) to the socket manager.
1036 /// Does not dup the given file descriptor, so "disconnecting" this connection
1037 /// will close it.
1038 version (Posix)
1039 class FileConnection : StreamConnection
1040 {
1041 	this(int fileno)
1042 	{
1043 		auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC);
1044 		conn.blocking = false;
1045 		super(conn);
1046 	}
1047 
1048 protected:
1049 	import core.sys.posix.unistd : read, write;
1050 
1051 	override sizediff_t doSend(in void[] buffer)
1052 	{
1053 		return write(socket.handle, buffer.ptr, buffer.length);
1054 	}
1055 
1056 	override sizediff_t doReceive(void[] buffer)
1057 	{
1058 		return read(socket.handle, buffer.ptr, buffer.length);
1059 	}
1060 }
1061 
1062 // ***************************************************************************
1063 
1064 /// An asynchronous TCP connection.
1065 class TcpConnection : StreamConnection
1066 {
1067 protected:
1068 	this(Socket conn)
1069 	{
1070 		super(conn);
1071 	}
1072 
1073 	override sizediff_t doSend(in void[] buffer)
1074 	{
1075 		return conn.send(buffer);
1076 	}
1077 
1078 	override sizediff_t doReceive(void[] buffer)
1079 	{
1080 		return conn.receive(buffer);
1081 	}
1082 
1083 	final void tryNextAddress()
1084 	{
1085 		assert(state == ConnectionState.connecting);
1086 		auto address = addressQueue[0];
1087 		addressQueue = addressQueue[1..$];
1088 
1089 		try
1090 		{
1091 			conn = new Socket(address.addressFamily(), SocketType.STREAM, ProtocolType.TCP);
1092 			conn.blocking = false;
1093 
1094 			socketManager.register(this);
1095 			updateFlags();
1096 			debug (ASOCKETS) writefln("Attempting connection to %s", address.toString());
1097 			conn.connect(address);
1098 		}
1099 		catch (SocketException e)
1100 			return onError("Connect error: " ~ e.msg);
1101 	}
1102 
1103 	/// Called when an error occurs on the socket.
1104 	override void onError(string reason)
1105 	{
1106 		if (state == ConnectionState.connecting && addressQueue.length)
1107 		{
1108 			socketManager.unregister(this);
1109 			conn.close();
1110 			conn = null;
1111 
1112 			return tryNextAddress();
1113 		}
1114 
1115 		super.onError(reason);
1116 	}
1117 
1118 public:
1119 	/// Default constructor
1120 	this()
1121 	{
1122 		debug (ASOCKETS) writefln("New TcpConnection @ %s", cast(void*)this);
1123 	}
1124 
1125 	/// Start establishing a connection.
1126 	final void connect(string host, ushort port)
1127 	{
1128 		assert(host.length, "Empty host");
1129 		assert(port, "No port specified");
1130 
1131 		debug (ASOCKETS) writefln("Connecting to %s:%s", host, port);
1132 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1133 		assert(!conn);
1134 
1135 		state = ConnectionState.resolving;
1136 
1137 		try
1138 		{
1139 			addressQueue = getAddress(host, port);
1140 			enforce(addressQueue.length, "No addresses found");
1141 			debug (ASOCKETS)
1142 			{
1143 				writefln("Resolved to %s addresses:", addressQueue.length);
1144 				foreach (address; addressQueue)
1145 					writefln("- %s", address.toString());
1146 			}
1147 
1148 			state = ConnectionState.connecting;
1149 			if (addressQueue.length > 1)
1150 			{
1151 				import std.random : randomShuffle;
1152 				randomShuffle(addressQueue);
1153 			}
1154 		}
1155 		catch (SocketException e)
1156 			return onError("Lookup error: " ~ e.msg);
1157 
1158 		tryNextAddress();
1159 	}
1160 
1161 }
1162 
1163 // ***************************************************************************
1164 
1165 /// An asynchronous TCP connection server.
1166 final class TcpServer
1167 {
1168 private:
1169 	/// Class that actually performs listening on a certain address family
1170 	final class Listener : GenericSocket
1171 	{
1172 		this(Socket conn)
1173 		{
1174 			debug (ASOCKETS) writefln("New Listener @ %s", cast(void*)this);
1175 			this.conn = conn;
1176 			socketManager.register(this);
1177 		}
1178 
1179 		/// Called when a socket is readable.
1180 		override void onReadable()
1181 		{
1182 			debug (ASOCKETS) writefln("Accepting connection from listener @ %s", cast(void*)this);
1183 			Socket acceptSocket = conn.accept();
1184 			acceptSocket.blocking = false;
1185 			if (handleAccept)
1186 			{
1187 				TcpConnection connection = new TcpConnection(acceptSocket);
1188 				debug (ASOCKETS) writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress);
1189 				connection.setKeepAlive();
1190 				//assert(connection.connected);
1191 				//connection.connected = true;
1192 				acceptHandler(connection);
1193 			}
1194 			else
1195 				acceptSocket.close();
1196 		}
1197 
1198 		/// Called when a socket is writable.
1199 		override void onWritable()
1200 		{
1201 		}
1202 
1203 		/// Called when an error occurs on the socket.
1204 		override void onError(string reason)
1205 		{
1206 			close(); // call parent
1207 		}
1208 
1209 		void closeListener()
1210 		{
1211 			assert(conn);
1212 			socketManager.unregister(this);
1213 			conn.close();
1214 			conn = null;
1215 		}
1216 	}
1217 
1218 	/// Whether the socket is listening.
1219 	bool listening;
1220 	/// Listener instances
1221 	Listener[] listeners;
1222 
1223 	final void updateFlags()
1224 	{
1225 		foreach (listener; listeners)
1226 			listener.notifyRead = handleAccept !is null;
1227 	}
1228 
1229 public:
1230 	/// Start listening on this socket.
1231 	ushort listen(ushort port, string addr = null)
1232 	{
1233 		debug(ASOCKETS) writefln("Attempting to listen on %s:%d", addr, port);
1234 		//assert(!listening, "Attempting to listen on a listening socket");
1235 
1236 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1237 
1238 		foreach (ref addressInfo; addressInfos)
1239 		{
1240 			if (addressInfo.family != AddressFamily.INET && port == 0)
1241 				continue;  // listen on random ports only on IPv4 for now
1242 
1243 			try
1244 			{
1245 				Socket conn = new Socket(addressInfo);
1246 				conn.blocking = false;
1247 				if (addressInfo.family == AddressFamily.INET6)
1248 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1249 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1250 
1251 				conn.bind(addressInfo.address);
1252 				conn.listen(8);
1253 
1254 				if (addressInfo.family == AddressFamily.INET)
1255 					port = to!ushort(conn.localAddress().toPortString());
1256 
1257 				listeners ~= new Listener(conn);
1258 			}
1259 			catch (SocketException e)
1260 			{
1261 				debug(ASOCKETS) writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1262 				debug(ASOCKETS) writeln(e.msg);
1263 			}
1264 		}
1265 
1266 		if (listeners.length==0)
1267 			throw new Exception("Unable to bind service");
1268 
1269 		listening = true;
1270 
1271 		updateFlags();
1272 
1273 		return port;
1274 	}
1275 
1276 	@property Address[] localAddresses()
1277 	{
1278 		Address[] result;
1279 		foreach (listener; listeners)
1280 			result ~= listener.localAddress;
1281 		return result;
1282 	}
1283 
1284 	@property bool isListening()
1285 	{
1286 		return listening;
1287 	}
1288 
1289 	/// Stop listening on this socket.
1290 	void close()
1291 	{
1292 		foreach (listener;listeners)
1293 			listener.closeListener();
1294 		listeners = null;
1295 		listening = false;
1296 		if (handleClose)
1297 			handleClose();
1298 	}
1299 
1300 public:
1301 	/// Callback for when the socket was closed.
1302 	void delegate() handleClose;
1303 
1304 	private void delegate(TcpConnection incoming) acceptHandler;
1305 	/// Callback for an incoming connection.
1306 	/// Connections will not be accepted unless this handler is set.
1307 	@property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; }
1308 	/// ditto
1309 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); }
1310 }
1311 
1312 // ***************************************************************************
1313 
1314 /// Base class for a connection adapter.
1315 /// By itself, does nothing.
1316 class ConnectionAdapter : IConnection
1317 {
1318 	IConnection next;
1319 
1320 	this(IConnection next)
1321 	{
1322 		this.next = next;
1323 		next.handleConnect = &onConnect;
1324 		next.handleDisconnect = &onDisconnect;
1325 	}
1326 
1327 	@property ConnectionState state() { return next.state; }
1328 
1329 	/// Queue Data for sending.
1330 	void send(Data[] data, int priority)
1331 	{
1332 		next.send(data, priority);
1333 	}
1334 
1335 	alias send = IConnection.send; /// ditto
1336 
1337 	/// Terminate the connection.
1338 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1339 	{
1340 		next.disconnect(reason, type);
1341 	}
1342 
1343 	protected void onConnect()
1344 	{
1345 		if (connectHandler)
1346 			connectHandler();
1347 	}
1348 
1349 	protected void onReadData(Data data)
1350 	{
1351 		// onReadData should be fired only if readDataHandler is set
1352 		readDataHandler(data);
1353 	}
1354 
1355 	protected void onDisconnect(string reason, DisconnectType type)
1356 	{
1357 		if (disconnectHandler)
1358 			disconnectHandler(reason, type);
1359 	}
1360 
1361 	/// Callback for when a connection has been established.
1362 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1363 	private ConnectHandler connectHandler;
1364 
1365 	/// Callback setter for when new data is read.
1366 	@property void handleReadData(ReadDataHandler value)
1367 	{
1368 		readDataHandler = value;
1369 		next.handleReadData = value ? &onReadData : null ;
1370 	}
1371 	private ReadDataHandler readDataHandler;
1372 
1373 	/// Callback setter for when a connection was closed.
1374 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1375 	private DisconnectHandler disconnectHandler;
1376 }
1377 
1378 // ***************************************************************************
1379 
1380 /// Adapter for connections with a line-based protocol.
1381 /// Splits data stream into delimiter-separated lines.
1382 class LineBufferedAdapter : ConnectionAdapter
1383 {
1384 	/// The protocol's line delimiter.
1385 	string delimiter = "\r\n";
1386 
1387 	this(IConnection next)
1388 	{
1389 		super(next);
1390 	}
1391 
1392 	/// Append a line to the send buffer.
1393 	void send(string line)
1394 	{
1395 		//super.send(Data(line ~ delimiter));
1396 		// https://issues.dlang.org/show_bug.cgi?id=13985
1397 		ConnectionAdapter ca = this;
1398 		ca.send(Data(line ~ delimiter));
1399 	}
1400 
1401 protected:
1402 	/// The receive buffer.
1403 	Data inBuffer;
1404 
1405 	/// Called when data has been received.
1406 	final override void onReadData(Data data)
1407 	{
1408 		import std.string;
1409 		auto oldBufferLength = inBuffer.length;
1410 		if (oldBufferLength)
1411 			inBuffer ~= data;
1412 		else
1413 			inBuffer = data;
1414 
1415 		if (delimiter.length == 1)
1416 		{
1417 			import core.stdc.string; // memchr
1418 
1419 			char c = delimiter[0];
1420 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1421 			while (p)
1422 			{
1423 				sizediff_t index = p - inBuffer.ptr;
1424 				processLine(index);
1425 
1426 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1427 			}
1428 		}
1429 		else
1430 		{
1431 			sizediff_t index;
1432 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1433 			while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0)
1434 				processLine(index);
1435 		}
1436 	}
1437 
1438 	final void processLine(size_t index)
1439 	{
1440 		auto line = inBuffer[0..index];
1441 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1442 		super.onReadData(line);
1443 	}
1444 
1445 	override void onDisconnect(string reason, DisconnectType type)
1446 	{
1447 		super.onDisconnect(reason, type);
1448 		inBuffer.clear();
1449 	}
1450 }
1451 
1452 // ***************************************************************************
1453 
1454 /// Fires an event handler or disconnects connections
1455 /// after a period of inactivity.
1456 class TimeoutAdapter : ConnectionAdapter
1457 {
1458 	this(IConnection next)
1459 	{
1460 		debug (ASOCKETS) writefln("New TimeoutAdapter @ %s", cast(void*)this);
1461 		super(next);
1462 	}
1463 
1464 	void cancelIdleTimeout()
1465 	{
1466 		debug (ASOCKETS) writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
1467 		assert(idleTask !is null);
1468 		assert(idleTask.isWaiting());
1469 		idleTask.cancel();
1470 	}
1471 
1472 	void resumeIdleTimeout()
1473 	{
1474 		debug (ASOCKETS) writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
1475 		assert(state == ConnectionState.connected);
1476 		assert(idleTask !is null);
1477 		assert(!idleTask.isWaiting());
1478 		mainTimer.add(idleTask);
1479 	}
1480 
1481 	final void setIdleTimeout(Duration duration)
1482 	{
1483 		debug (ASOCKETS) writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
1484 		assert(duration > Duration.zero);
1485 		if (idleTask is null)
1486 		{
1487 			idleTask = new TimerTask(duration);
1488 			idleTask.handleTask = &onTask_Idle;
1489 		}
1490 		else
1491 		{
1492 			if (idleTask.isWaiting())
1493 				idleTask.cancel();
1494 			idleTask.delay = duration;
1495 		}
1496 		if (state == ConnectionState.connected)
1497 			mainTimer.add(idleTask);
1498 	}
1499 
1500 	void markNonIdle()
1501 	{
1502 		debug (ASOCKETS) writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
1503 		assert(idleTask !is null);
1504 		if (handleNonIdle)
1505 			handleNonIdle();
1506 		if (idleTask.isWaiting())
1507 			idleTask.restart();
1508 	}
1509 
1510 	/// Callback for when a connection has stopped responding.
1511 	/// If unset, the connection will be disconnected.
1512 	void delegate() handleIdleTimeout;
1513 
1514 	/// Callback for when a connection is marked as non-idle
1515 	/// (when data is received).
1516 	void delegate() handleNonIdle;
1517 
1518 protected:
1519 	override void onConnect()
1520 	{
1521 		debug (ASOCKETS) writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
1522 		super.onConnect();
1523 		if (idleTask)
1524 			resumeIdleTimeout();
1525 	}
1526 
1527 	override void onReadData(Data data)
1528 	{
1529 		debug (ASOCKETS) writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
1530 		markNonIdle();
1531 		super.onReadData(data);
1532 	}
1533 
1534 	override void onDisconnect(string reason, DisconnectType type)
1535 	{
1536 		debug (ASOCKETS) writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
1537 		super.onDisconnect(reason, type);
1538 		if (idleTask && idleTask.isWaiting())
1539 			idleTask.cancel();
1540 	}
1541 
1542 private:
1543 	TimerTask idleTask;
1544 
1545 	final void onTask_Idle(Timer timer, TimerTask task)
1546 	{
1547 		if (state == ConnectionState.disconnecting)
1548 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
1549 
1550 		if (state != ConnectionState.connected)
1551 			return;
1552 
1553 		if (handleIdleTimeout)
1554 		{
1555 			handleIdleTimeout();
1556 			if (state == ConnectionState.connected)
1557 			{
1558 				assert(!idleTask.isWaiting());
1559 				mainTimer.add(idleTask);
1560 			}
1561 		}
1562 		else
1563 			disconnect("Time-out", DisconnectType.error);
1564 	}
1565 }
1566 
1567 // ***************************************************************************
1568 
1569 unittest
1570 {
1571 	void testTimer()
1572 	{
1573 		bool fired;
1574 		setTimeout({fired = true;}, 10.msecs);
1575 		socketManager.loop();
1576 		assert(fired);
1577 	}
1578 
1579 	testTimer();
1580 }