1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <vladimir@thecybershadow.net>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.math;
21 public import ae.sys.data;
22 
23 import core.stdc.stdint : int32_t;
24 
25 import std.exception;
26 import std.socket;
27 import std.string : format;
28 public import std.socket : Address, AddressInfo, Socket;
29 
30 debug(ASOCKETS) import std.stdio : stderr;
31 debug(PRINTDATA) static import std.stdio;
32 debug(PRINTDATA) import ae.utils.text : hexDump;
33 private import std.conv : to;
34 
35 
36 // http://d.puremagic.com/issues/show_bug.cgi?id=7016
37 static import ae.utils.array;
38 
39 version(LIBEV)
40 {
41 	import deimos.ev;
42 	pragma(lib, "ev");
43 }
44 
45 version(Windows)
46 {
47 	import core.sys.windows.windows : Sleep;
48 	enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
49 }
50 else
51 	enum USE_SLEEP = false;
52 
53 /// Flags that determine socket wake-up events.
54 int eventCounter;
55 
56 version(LIBEV)
57 {
58 	// Watchers are a GenericSocket field (as declared in SocketMixin).
59 	// Use one watcher per read and write event.
60 	// Start/stop those as higher-level code declares interest in those events.
61 	// Use the "data" ev_io field to store the parent GenericSocket address.
62 	// Also use the "data" field as a flag to indicate whether the watcher is active
63 	// (data is null when the watcher is stopped).
64 
65 	struct SocketManager
66 	{
67 	private:
68 		size_t count;
69 
70 		extern(C)
71 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
72 		{
73 			eventCounter++;
74 			auto socket = cast(GenericSocket)w.data;
75 			assert(socket, "libev event fired on stopped watcher");
76 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents);
77 
78 			if (revents & EV_READ)
79 				socket.onReadable();
80 			else
81 			if (revents & EV_WRITE)
82 				socket.onWritable();
83 			else
84 				assert(false, "Unknown event fired from libev");
85 
86 			// TODO? Need to get proper SocketManager instance to call updateTimer on
87 			socketManager.updateTimer(false);
88 		}
89 
90 		ev_timer evTimer;
91 		MonoTime lastNextEvent = MonoTime.max;
92 
93 		extern(C)
94 		static void timerCallback(ev_loop_t* l, ev_timer* w, int revents)
95 		{
96 			eventCounter++;
97 			debug (ASOCKETS) writefln("Timer callback called.");
98 			mainTimer.prod();
99 
100 			socketManager.updateTimer(true);
101 			debug (ASOCKETS) writefln("Timer callback exiting.");
102 		}
103 
104 		void updateTimer(bool force)
105 		{
106 			auto nextEvent = mainTimer.getNextEvent();
107 			if (force || lastNextEvent != nextEvent)
108 			{
109 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
110 				if (nextEvent == MonoTime.max) // Stopping
111 				{
112 					if (lastNextEvent != MonoTime.max)
113 						ev_timer_stop(ev_default_loop(0), &evTimer);
114 				}
115 				else
116 				{
117 					auto remaining = mainTimer.getRemainingTime();
118 					while (remaining <= Duration.zero)
119 					{
120 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
121 						mainTimer.prod();
122 						remaining = mainTimer.getRemainingTime();
123 					}
124 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
125 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
126 					if (lastNextEvent == MonoTime.max) // Starting
127 					{
128 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
129 						ev_timer_start(ev_default_loop(0), &evTimer);
130 					}
131 					else // Adjusting
132 					{
133 						evTimer.repeat = tstamp;
134 						ev_timer_again(ev_default_loop(0), &evTimer);
135 					}
136 				}
137 				lastNextEvent = nextEvent;
138 			}
139 		}
140 
141 	public:
142 		/// Register a socket with the manager.
143 		void register(GenericSocket socket)
144 		{
145 			debug (ASOCKETS) writefln("Registering %s", socket);
146 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
147 			auto fd = socket.conn.handle;
148 			assert(fd, "Must have fd before socket registration");
149 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
150 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
151 			count++;
152 		}
153 
154 		/// Unregister a socket with the manager.
155 		void unregister(GenericSocket socket)
156 		{
157 			debug (ASOCKETS) writefln("Unregistering %s", socket);
158 			socket.notifyRead  = false;
159 			socket.notifyWrite = false;
160 			count--;
161 		}
162 
163 		size_t size()
164 		{
165 			return count;
166 		}
167 
168 		/// Loop continuously until no sockets are left.
169 		void loop()
170 		{
171 			auto evLoop = ev_default_loop(0);
172 			enforce(evLoop, "libev initialization failure");
173 
174 			updateTimer(true);
175 			debug (ASOCKETS) writeln("ev_run");
176 			ev_run(ev_default_loop(0), 0);
177 		}
178 	}
179 
180 	private mixin template SocketMixin()
181 	{
182 		private ev_io evRead, evWrite;
183 
184 		private void setWatcherState(ref ev_io ev, bool newValue, int event)
185 		{
186 			if (!conn)
187 			{
188 				// Can happen when setting delegates before connecting.
189 				return;
190 			}
191 
192 			if (newValue && !ev.data)
193 			{
194 				// Start
195 				ev.data = cast(void*)this;
196 				ev_io_start(ev_default_loop(0), &ev);
197 			}
198 			else
199 			if (!newValue && ev.data)
200 			{
201 				// Stop
202 				assert(ev.data is cast(void*)this);
203 				ev.data = null;
204 				ev_io_stop(ev_default_loop(0), &ev);
205 			}
206 		}
207 
208 		/// Interested in read notifications (onReadable)?
209 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
210 		/// Interested in write notifications (onWritable)?
211 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
212 
213 		debug ~this()
214 		{
215 			// The LIBEV SocketManager holds no references to registered sockets.
216 			// TODO: Add a doubly-linked list?
217 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
218 		}
219 	}
220 }
221 else // Use select
222 {
223 	struct SocketManager
224 	{
225 	private:
226 		enum FD_SETSIZE = 1024;
227 
228 		/// List of all sockets to poll.
229 		GenericSocket[] sockets;
230 
231 		/// Debug AA to check for dangling socket references.
232 		debug GenericSocket[socket_t] socketHandles;
233 
234 		void delegate()[] idleHandlers;
235 
236 	public:
237 		/// Register a socket with the manager.
238 		void register(GenericSocket conn)
239 		{
240 			debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1);
241 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
242 			sockets ~= conn;
243 
244 			debug
245 			{
246 				auto handle = conn.socket.handle;
247 				assert(handle != socket_t.init, "Can't register a closed socket");
248 				assert(handle !in socketHandles, "This socket handle is already registered");
249 				socketHandles[handle] = conn;
250 			}
251 		}
252 
253 		/// Unregister a socket with the manager.
254 		void unregister(GenericSocket conn)
255 		{
256 			debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
257 
258 			debug
259 			{
260 				auto handle = conn.socket.handle;
261 				assert(handle != socket_t.init, "Can't unregister a closed socket");
262 				auto pconn = handle in socketHandles;
263 				assert(pconn, "This socket handle is not registered");
264 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
265 				socketHandles.remove(handle);
266 			}
267 
268 			foreach (size_t i, GenericSocket j; sockets)
269 				if (j is conn)
270 				{
271 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
272 					return;
273 				}
274 			assert(false, "Socket not registered");
275 		}
276 
277 		size_t size()
278 		{
279 			return sockets.length;
280 		}
281 
282 		/// Loop continuously until no sockets are left.
283 		void loop()
284 		{
285 			debug (ASOCKETS) stderr.writeln("Starting event loop.");
286 
287 			SocketSet readset, writeset, errorset;
288 			size_t sockcount;
289 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
290 			readset  = new SocketSet(setSize);
291 			writeset = new SocketSet(setSize);
292 			errorset = new SocketSet(setSize);
293 			while (true)
294 			{
295 				uint minSize = 0;
296 				version(Windows)
297 					minSize = cast(uint)sockets.length;
298 				else
299 				{
300 					foreach (s; sockets)
301 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
302 							minSize = s.socket.handle;
303 				}
304 				minSize++;
305 
306 				if (setSize < minSize)
307 				{
308 					debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
309 					setSize = minSize * 2;
310 					readset  = new SocketSet(setSize);
311 					writeset = new SocketSet(setSize);
312 					errorset = new SocketSet(setSize);
313 				}
314 				else
315 				{
316 					readset.reset();
317 					writeset.reset();
318 					errorset.reset();
319 				}
320 
321 				sockcount = 0;
322 				bool haveActive;
323 				debug (ASOCKETS) stderr.writeln("Populating sets");
324 				foreach (GenericSocket conn; sockets)
325 				{
326 					if (!conn.socket)
327 						continue;
328 					sockcount++;
329 					if (!conn.daemon)
330 						haveActive = true;
331 
332 					debug (ASOCKETS) stderr.writef("\t%s:", conn);
333 					if (conn.notifyRead)
334 					{
335 						readset.add(conn.socket);
336 						debug (ASOCKETS) stderr.write(" READ");
337 					}
338 					if (conn.notifyWrite)
339 					{
340 						writeset.add(conn.socket);
341 						debug (ASOCKETS) stderr.write(" WRITE");
342 					}
343 					errorset.add(conn.socket);
344 					debug (ASOCKETS) stderr.writeln();
345 				}
346 				debug (ASOCKETS)
347 				{
348 					stderr.writefln("Sets populated as follows:");
349 					printSets(readset, writeset, errorset);
350 				}
351 
352 				debug (ASOCKETS) stderr.writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...",
353 					sockcount,
354 					mainTimer.isWaiting() ? "with" : "no",
355 					idleHandlers.length,
356 				);
357 				if (!haveActive && !mainTimer.isWaiting())
358 				{
359 					debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop.");
360 					break;
361 				}
362 
363 				debug (ASOCKETS) stderr.flush();
364 
365 				int events;
366 				if (idleHandlers.length)
367 				{
368 					if (sockcount==0)
369 						events = 0;
370 					else
371 						events = Socket.select(readset, writeset, errorset, 0.seconds);
372 				}
373 				else
374 				if (USE_SLEEP && sockcount==0)
375 				{
376 					version(Windows)
377 					{
378 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
379 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
380 						if (duration <= 0)
381 							duration = 1; // Avoid busywait
382 						else
383 						if (duration > int.max)
384 							duration = int.max;
385 						Sleep(cast(int)duration);
386 						events = 0;
387 					}
388 					else
389 						assert(0);
390 				}
391 				else
392 				if (mainTimer.isWaiting())
393 					events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime());
394 				else
395 					events = Socket.select(readset, writeset, errorset);
396 
397 				debug (ASOCKETS) stderr.writefln("%d events fired.", events);
398 
399 				if (events > 0)
400 				{
401 					// Handle just one event at a time, as the first
402 					// handler might invalidate select()'s results.
403 					handleEvent(readset, writeset, errorset);
404 				}
405 				else
406 				if (idleHandlers.length)
407 				{
408 					import ae.utils.array;
409 					auto handler = idleHandlers.shift();
410 
411 					// Rotate the idle handler queue before running it,
412 					// in case the handler unregisters itself.
413 					idleHandlers ~= handler;
414 
415 					handler();
416 				}
417 
418 				// Timers may invalidate our select results, so fire them after processing the latter
419 				mainTimer.prod();
420 
421 				eventCounter++;
422 			}
423 		}
424 
425 		debug (ASOCKETS)
426 		void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset)
427 		{
428 			foreach (GenericSocket conn; sockets)
429 			{
430 				if (!conn.socket)
431 					stderr.writefln("\t\t%s is unset", conn);
432 				else
433 				{
434 					if (readset.isSet(conn.socket))
435 						stderr.writefln("\t\t%s is readable", conn);
436 					if (writeset.isSet(conn.socket))
437 						stderr.writefln("\t\t%s is writable", conn);
438 					if (errorset.isSet(conn.socket))
439 						stderr.writefln("\t\t%s is errored", conn);
440 				}
441 			}
442 		}
443 
444 		void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset)
445 		{
446 			debug (ASOCKETS)
447 			{
448 				stderr.writefln("\tSelect results:");
449 				printSets(readset, writeset, errorset);
450 			}
451 
452 			foreach (GenericSocket conn; sockets)
453 			{
454 				if (!conn.socket)
455 					continue;
456 
457 				if (readset.isSet(conn.socket))
458 				{
459 					debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn);
460 					return conn.onReadable();
461 				}
462 				else
463 				if (writeset.isSet(conn.socket))
464 				{
465 					debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn);
466 					return conn.onWritable();
467 				}
468 				else
469 				if (errorset.isSet(conn.socket))
470 				{
471 					debug (ASOCKETS) stderr.writefln("\t%s - calling onError", conn);
472 					return conn.onError("select() error: " ~ conn.socket.getErrorText());
473 				}
474 			}
475 
476 			assert(false, "select() reported events available, but no registered sockets are set");
477 		}
478 	}
479 
480 	// Use UFCS to allow removeIdleHandler to have a predicate with context
481 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
482 	{
483 		foreach (i, idleHandler; socketManager.idleHandlers)
484 			assert(handler !is idleHandler);
485 
486 		socketManager.idleHandlers ~= handler;
487 	}
488 
489 	static bool isFun(T)(T a, T b) { return a is b; }
490 	void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args)
491 	{
492 		foreach (i, idleHandler; socketManager.idleHandlers)
493 			if (pred(idleHandler, args))
494 			{
495 				import std.algorithm;
496 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
497 				return;
498 			}
499 		assert(false, "No such idle handler");
500 	}
501 
502 	private mixin template SocketMixin()
503 	{
504 		/// Interested in read notifications (onReadable)?
505 		bool notifyRead;
506 		/// Interested in write notifications (onWritable)?
507 		bool notifyWrite;
508 	}
509 }
510 
511 /// The default socket manager.
512 SocketManager socketManager;
513 
514 // ***************************************************************************
515 
516 /// General methods for an asynchronous socket.
517 abstract class GenericSocket
518 {
519 	/// Declares notifyRead and notifyWrite.
520 	mixin SocketMixin;
521 
522 protected:
523 	/// The socket this class wraps.
524 	Socket conn;
525 
526 protected:
527 	/// Retrieve the socket class this class wraps.
528 	@property final Socket socket()
529 	{
530 		return conn;
531 	}
532 
533 	void onReadable()
534 	{
535 	}
536 
537 	void onWritable()
538 	{
539 	}
540 
541 	void onError(string reason)
542 	{
543 	}
544 
545 public:
546 	/// allow getting the address of connections that are already disconnected
547 	private Address cachedLocalAddress, cachedRemoteAddress;
548 
549 	/// Don't block the process from exiting.
550 	/// TODO: Not implemented with libev
551 	bool daemon;
552 
553 	final @property Address localAddress()
554 	{
555 		if (cachedLocalAddress !is null)
556 			return cachedLocalAddress;
557 		else
558 		if (conn is null)
559 			return null;
560 		else
561 			return cachedLocalAddress = conn.localAddress();
562 	}
563 
564 	final @property string localAddressStr() nothrow
565 	{
566 		try
567 		{
568 			auto a = localAddress;
569 			return a is null ? "[null address]" : a.toString();
570 		}
571 		catch (Exception e)
572 			return "[error: " ~ e.msg ~ "]";
573 	}
574 
575 	final @property Address remoteAddress()
576 	{
577 		if (cachedRemoteAddress !is null)
578 			return cachedRemoteAddress;
579 		else
580 		if (conn is null)
581 			return null;
582 		else
583 			return cachedRemoteAddress = conn.remoteAddress();
584 	}
585 
586 	final @property string remoteAddressStr() nothrow
587 	{
588 		try
589 		{
590 			auto a = remoteAddress;
591 			return a is null ? "[null address]" : a.toString();
592 		}
593 		catch (Exception e)
594 			return "[error: " ~ e.msg ~ "]";
595 	}
596 
597 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
598 	{
599 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
600 		if (enabled)
601 		{
602 			try
603 				conn.setKeepAlive(time, interval);
604 			catch (SocketException)
605 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
606 		}
607 		else
608 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
609 	}
610 
611 	override string toString() const
612 	{
613 		import std.string;
614 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
615 	}
616 }
617 
618 // ***************************************************************************
619 
620 enum DisconnectType
621 {
622 	requested, // initiated by the application
623 	graceful,  // peer gracefully closed the connection
624 	error      // abnormal network condition
625 }
626 
627 enum ConnectionState
628 {
629 	/// The initial state, or the state after a disconnect was fully processed.
630 	disconnected,
631 
632 	/// Name resolution. Currently done synchronously.
633 	resolving,
634 
635 	/// A connection attempt is in progress.
636 	connecting,
637 
638 	/// A connection is established.
639 	connected,
640 
641 	/// Disconnecting in progress. No data can be sent or received at this point.
642 	/// We are waiting for queued data to be actually sent before disconnecting.
643 	disconnecting,
644 }
645 
646 /// Returns true if this is a connection state for which disconnecting is valid.
647 /// Generally, applications should be aware of the life cycle of their sockets,
648 /// so checking the state of a connection is unnecessary (and a code smell).
649 /// However, unconditionally disconnecting some connected sockets can be useful
650 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle),
651 /// such as in response to a signal.
652 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; }
653 
654 /// Common interface for connections and adapters.
655 interface IConnection
656 {
657 	enum MAX_PRIORITY = 4;
658 	enum DEFAULT_PRIORITY = 2;
659 
660 	static const defaultDisconnectReason = "Software closed the connection";
661 
662 	/// Get connection state.
663 	@property ConnectionState state();
664 
665 	/// Has a connection been established?
666 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
667 
668 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
669 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
670 
671 	/// Queue Data for sending.
672 	void send(Data[] data, int priority = DEFAULT_PRIORITY);
673 
674 	/// ditto
675 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
676 	{
677 		Data[1] data;
678 		data[0] = datum;
679 		this.send(data, priority);
680 		data[] = Data.init;
681 	}
682 
683 	/// Terminate the connection.
684 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
685 
686 	/// Callback setter for when a connection has been established
687 	/// (if applicable).
688 	alias void delegate() ConnectHandler;
689 	@property void handleConnect(ConnectHandler value); /// ditto
690 
691 	/// Callback setter for when new data is read.
692 	alias void delegate(Data data) ReadDataHandler;
693 	@property void handleReadData(ReadDataHandler value); /// ditto
694 
695 	/// Callback setter for when a connection was closed.
696 	alias void delegate(string reason, DisconnectType type) DisconnectHandler;
697 	@property void handleDisconnect(DisconnectHandler value); /// ditto
698 
699 	/// Callback setter for when all queued data has been sent.
700 	alias void delegate() BufferFlushedHandler;
701 	@property void handleBufferFlushed(BufferFlushedHandler value); /// ditto
702 }
703 
704 // ***************************************************************************
705 
706 class Connection : GenericSocket, IConnection
707 {
708 private:
709 	/// Blocks of data larger than this value are passed as unmanaged memory
710 	/// (in Data objects). Blocks smaller than this value will be reallocated
711 	/// on the managed heap. The disadvantage of placing large objects on the
712 	/// managed heap is false pointers; the disadvantage of using Data for
713 	/// small objects is wasted slack space due to the page size alignment
714 	/// requirement.
715 	enum UNMANAGED_THRESHOLD = 256;
716 
717 	ConnectionState _state;
718 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
719 
720 public:
721 	/// Get connection state.
722 	override @property ConnectionState state() { return _state; }
723 
724 protected:
725 	abstract sizediff_t doSend(in void[] buffer);
726 	abstract sizediff_t doReceive(void[] buffer);
727 
728 	/// The send buffers.
729 	Data[][MAX_PRIORITY+1] outQueue;
730 	/// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled).
731 	int partiallySent = -1;
732 
733 	/// Constructor used by a ServerSocket for new connections
734 	this(Socket conn)
735 	{
736 		this();
737 		this.conn = conn;
738 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
739 		if (conn)
740 			socketManager.register(this);
741 		updateFlags();
742 	}
743 
744 	final void updateFlags()
745 	{
746 		if (state == ConnectionState.connecting)
747 			notifyWrite = true;
748 		else
749 			notifyWrite = writePending;
750 
751 		notifyRead = state == ConnectionState.connected && readDataHandler;
752 		debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite);
753 	}
754 
755 	/// Called when a socket is readable.
756 	override void onReadable()
757 	{
758 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
759 		static ubyte[0x10000] inBuffer = void;
760 		auto received = doReceive(inBuffer);
761 
762 		if (received == 0)
763 			return disconnect("Connection closed", DisconnectType.graceful);
764 
765 		if (received == Socket.ERROR)
766 		{
767 		//	if (wouldHaveBlocked)
768 		//	{
769 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
770 		//		return;
771 		//	}
772 		//	else
773 				onError("recv() error: " ~ lastSocketError);
774 		}
775 		else
776 		{
777 			debug (PRINTDATA)
778 			{
779 				std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress);
780 				std.stdio.write(hexDump(inBuffer[0 .. received]));
781 				std.stdio.stdout.flush();
782 			}
783 
784 			if (state == ConnectionState.disconnecting)
785 			{
786 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this);
787 			}
788 			else
789 			if (!readDataHandler)
790 			{
791 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this);
792 			}
793 			else
794 			{
795 				// Currently, unlike the D1 version of this module,
796 				// we will always reallocate read network data.
797 				// This disfavours code which doesn't need to store
798 				// read data after processing it, but otherwise
799 				// makes things simpler and safer all around.
800 
801 				if (received < UNMANAGED_THRESHOLD)
802 				{
803 					// Copy to the managed heap
804 					readDataHandler(Data(inBuffer[0 .. received].dup));
805 				}
806 				else
807 				{
808 					// Copy to unmanaged memory
809 					readDataHandler(Data(inBuffer[0 .. received], true));
810 				}
811 			}
812 		}
813 	}
814 
815 	/// Called when an error occurs on the socket.
816 	override void onError(string reason)
817 	{
818 		if (state == ConnectionState.disconnecting)
819 		{
820 			debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
821 			return close();
822 		}
823 
824 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
825 		disconnect("Socket error: " ~ reason, DisconnectType.error);
826 	}
827 
828 	this()
829 	{
830 	}
831 
832 public:
833 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
834 	/// The disconnect handler will be called immediately, even when not all data has been flushed yet.
835 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
836 	{
837 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
838 		assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state));
839 
840 		if (writePending)
841 		{
842 			if (type==DisconnectType.requested)
843 			{
844 				assert(conn, "Attempting to disconnect on an uninitialized socket");
845 				// queue disconnect after all data is sent
846 				debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason);
847 				state = ConnectionState.disconnecting;
848 				//setIdleTimeout(30.seconds);
849 				if (disconnectHandler)
850 					disconnectHandler(reason, type);
851 				updateFlags();
852 				return;
853 			}
854 			else
855 				discardQueues();
856 		}
857 
858 		debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
859 
860 		if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected)
861 			close();
862 		else
863 		{
864 			assert(conn is null, "Registered but %s socket".format(state));
865 			if (state == ConnectionState.resolving)
866 				state = ConnectionState.disconnected;
867 		}
868 
869 		if (disconnectHandler)
870 			disconnectHandler(reason, type);
871 		updateFlags();
872 	}
873 
874 	private final void close()
875 	{
876 		assert(conn, "Attempting to close an unregistered socket");
877 		socketManager.unregister(this);
878 		conn.close();
879 		conn = null;
880 		outQueue[] = null;
881 		state = ConnectionState.disconnected;
882 	}
883 
884 	/// Append data to the send buffer.
885 	void send(Data[] data, int priority = DEFAULT_PRIORITY)
886 	{
887 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
888 		outQueue[priority] ~= data;
889 		notifyWrite = true; // Fast updateFlags()
890 
891 		debug (PRINTDATA)
892 		{
893 			std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress);
894 			foreach (datum; data)
895 				if (datum.length)
896 					std.stdio.write(hexDump(datum.contents));
897 				else
898 					std.stdio.writeln("(empty Data)");
899 			std.stdio.stdout.flush();
900 		}
901 	}
902 
903 	/// ditto
904 	alias send = IConnection.send;
905 
906 	final void clearQueue(int priority)
907 	{
908 		if (priority == partiallySent)
909 		{
910 			assert(outQueue[priority].length > 0);
911 			outQueue[priority] = outQueue[priority][0..1];
912 		}
913 		else
914 			outQueue[priority] = null;
915 		updateFlags();
916 	}
917 
918 	/// Clears all queues, even partially sent content.
919 	private final void discardQueues()
920 	{
921 		foreach (priority; 0..MAX_PRIORITY+1)
922 			outQueue[priority] = null;
923 		partiallySent = -1;
924 		updateFlags();
925 	}
926 
927 	@property
928 	final bool writePending()
929 	{
930 		foreach (queue; outQueue)
931 			if (queue.length)
932 				return true;
933 		return false;
934 	}
935 
936 	final bool queuePresent(int priority = DEFAULT_PRIORITY)
937 	{
938 		if (priority == partiallySent)
939 		{
940 			assert(outQueue[priority].length > 0);
941 			return outQueue[priority].length > 1;
942 		}
943 		else
944 			return outQueue[priority].length > 0;
945 	}
946 
947 	final size_t packetsQueued(int priority = DEFAULT_PRIORITY)
948 	{
949 		return outQueue[priority].length;
950 	}
951 
952 	final size_t bytesQueued(int priority = DEFAULT_PRIORITY)
953 	{
954 		size_t bytes;
955 		foreach (datum; outQueue[priority])
956 			bytes += datum.length;
957 		return bytes;
958 	}
959 
960 public:
961 	private ConnectHandler connectHandler;
962 	/// Callback for when a connection has been established.
963 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
964 
965 	private ReadDataHandler readDataHandler;
966 	/// Callback for incoming data.
967 	/// Data will not be received unless this handler is set.
968 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
969 
970 	private DisconnectHandler disconnectHandler;
971 	/// Callback for when a connection was closed.
972 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
973 
974 	/// Callback setter for when all queued data has been sent.
975 	private BufferFlushedHandler bufferFlushedHandler;
976 	@property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); }
977 }
978 
979 class StreamConnection : Connection
980 {
981 protected:
982 	this()
983 	{
984 		super();
985 	}
986 
987 	/// Called when a socket is writable.
988 	override void onWritable()
989 	{
990 		//scope(success) updateFlags();
991 		onWritableImpl();
992 		updateFlags();
993 	}
994 
995 	// Work around scope(success) breaking debugger stack traces
996 	final private void onWritableImpl()
997 	{
998 		debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state);
999 		if (state == ConnectionState.connecting)
1000 		{
1001 			int32_t error;
1002 			conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error);
1003 			if (error)
1004 				return disconnect(formatSocketError(error), DisconnectType.error);
1005 
1006 			state = ConnectionState.connected;
1007 
1008 			//debug writefln("[%s] Connected", remoteAddress);
1009 			try
1010 				setKeepAlive();
1011 			catch (Exception e)
1012 				return disconnect(e.msg, DisconnectType.error);
1013 			if (connectHandler)
1014 				connectHandler();
1015 			return;
1016 		}
1017 		//debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
1018 
1019 		foreach (sendPartial; [true, false])
1020 			foreach (int priority, ref queue; outQueue)
1021 				while (queue.length && (!sendPartial || priority == partiallySent))
1022 				{
1023 					assert(partiallySent == -1 || partiallySent == priority);
1024 
1025 					auto pdata = queue.ptr; // pointer to first data
1026 
1027 					ptrdiff_t sent = 0;
1028 					if (pdata.length)
1029 					{
1030 						sent = doSend(pdata.contents);
1031 						debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length);
1032 					}
1033 					else
1034 					{
1035 						debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this);
1036 					}
1037 
1038 					if (sent == Socket.ERROR)
1039 					{
1040 						if (wouldHaveBlocked())
1041 							return;
1042 						else
1043 							return onError("send() error: " ~ lastSocketError);
1044 					}
1045 					else
1046 					if (sent < pdata.length)
1047 					{
1048 						if (sent > 0)
1049 						{
1050 							*pdata = (*pdata)[sent..pdata.length];
1051 							partiallySent = priority;
1052 						}
1053 						return;
1054 					}
1055 					else
1056 					{
1057 						assert(sent == pdata.length);
1058 						//debug writefln("[%s] Sent data:", remoteAddress);
1059 						//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1060 						pdata.clear();
1061 						queue = queue[1..$];
1062 						partiallySent = -1;
1063 						if (queue.length == 0)
1064 							queue = null;
1065 					}
1066 				}
1067 
1068 		// outQueue is now empty
1069 		if (bufferFlushedHandler)
1070 			bufferFlushedHandler();
1071 		if (state == ConnectionState.disconnecting)
1072 		{
1073 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1074 			close();
1075 		}
1076 	}
1077 
1078 public:
1079 	this(Socket conn)
1080 	{
1081 		super(conn);
1082 	}
1083 }
1084 
1085 // ***************************************************************************
1086 
1087 /// A POSIX file stream.
1088 /// Allows adding a file (e.g. stdin/stdout) to the socket manager.
1089 /// Does not dup the given file descriptor, so "disconnecting" this connection
1090 /// will close it.
1091 version (Posix)
1092 class FileConnection : StreamConnection
1093 {
1094 	this(int fileno)
1095 	{
1096 		auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC);
1097 		conn.blocking = false;
1098 		super(conn);
1099 	}
1100 
1101 protected:
1102 	import core.sys.posix.unistd : read, write;
1103 
1104 	override sizediff_t doSend(in void[] buffer)
1105 	{
1106 		return write(socket.handle, buffer.ptr, buffer.length);
1107 	}
1108 
1109 	override sizediff_t doReceive(void[] buffer)
1110 	{
1111 		return read(socket.handle, buffer.ptr, buffer.length);
1112 	}
1113 }
1114 
1115 /// Separates reading and writing, e.g. for stdin/stdout.
1116 class Duplex : IConnection
1117 {
1118 	IConnection reader, writer;
1119 
1120 	this(IConnection reader, IConnection writer)
1121 	{
1122 		this.reader = reader;
1123 		this.writer = writer;
1124 		reader.handleConnect = &onConnect;
1125 		writer.handleConnect = &onConnect;
1126 		reader.handleDisconnect = &onDisconnect;
1127 		writer.handleDisconnect = &onDisconnect;
1128 	}
1129 
1130 	@property ConnectionState state()
1131 	{
1132 		if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting)
1133 			return ConnectionState.disconnecting;
1134 		else
1135 			return reader.state < writer.state ? reader.state : writer.state;
1136 	}
1137 
1138 	/// Queue Data for sending.
1139 	void send(Data[] data, int priority)
1140 	{
1141 		writer.send(data, priority);
1142 	}
1143 
1144 	alias send = IConnection.send; /// ditto
1145 
1146 	/// Terminate the connection.
1147 	/// Note: this isn't quite fleshed out - applications may want to
1148 	/// wait and send some more data even after stdin is closed, but
1149 	/// such an interface can't be fitted into an IConnection
1150 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1151 	{
1152 		if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting)
1153 			reader.disconnect(reason, type);
1154 		if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting)
1155 			writer.disconnect(reason, type);
1156 		debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1157 	}
1158 
1159 	protected void onConnect()
1160 	{
1161 		if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected)
1162 			connectHandler();
1163 	}
1164 
1165 	protected void onDisconnect(string reason, DisconnectType type)
1166 	{
1167 		debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1168 		if (disconnectHandler)
1169 		{
1170 			disconnectHandler(reason, type);
1171 			disconnectHandler = null; // don't call it twice for the other connection
1172 		}
1173 		// It is our responsibility to disconnect the other connection
1174 		// Use DisconnectType.requested to ensure that any written data is flushed
1175 		disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested);
1176 	}
1177 
1178 	/// Callback for when a connection has been established.
1179 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1180 	private ConnectHandler connectHandler;
1181 
1182 	/// Callback setter for when new data is read.
1183 	@property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; }
1184 
1185 	/// Callback setter for when a connection was closed.
1186 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1187 	private DisconnectHandler disconnectHandler;
1188 
1189 	/// Callback setter for when all queued data has been written.
1190 	@property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; }
1191 }
1192 
1193 unittest { if (false) new Duplex(null, null); }
1194 
1195 // ***************************************************************************
1196 
1197 /// An asynchronous TCP connection.
1198 class TcpConnection : StreamConnection
1199 {
1200 protected:
1201 	/// Queue of addresses to try connecting to.
1202 	AddressInfo[] addressQueue;
1203 
1204 	this(Socket conn)
1205 	{
1206 		super(conn);
1207 	}
1208 
1209 	override sizediff_t doSend(in void[] buffer)
1210 	{
1211 		return conn.send(buffer);
1212 	}
1213 
1214 	override sizediff_t doReceive(void[] buffer)
1215 	{
1216 		return conn.receive(buffer);
1217 	}
1218 
1219 	final void tryNextAddress()
1220 	{
1221 		assert(state == ConnectionState.connecting);
1222 		auto addressInfo = addressQueue[0];
1223 		addressQueue = addressQueue[1..$];
1224 
1225 		try
1226 		{
1227 			conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
1228 			conn.blocking = false;
1229 
1230 			socketManager.register(this);
1231 			updateFlags();
1232 			debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString());
1233 			conn.connect(addressInfo.address);
1234 		}
1235 		catch (SocketException e)
1236 			return onError("Connect error: " ~ e.msg);
1237 	}
1238 
1239 	/// Called when an error occurs on the socket.
1240 	override void onError(string reason)
1241 	{
1242 		if (state == ConnectionState.connecting && addressQueue.length)
1243 		{
1244 			socketManager.unregister(this);
1245 			conn.close();
1246 			conn = null;
1247 
1248 			return tryNextAddress();
1249 		}
1250 
1251 		super.onError(reason);
1252 	}
1253 
1254 public:
1255 	/// Default constructor
1256 	this()
1257 	{
1258 		debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this);
1259 	}
1260 
1261 	/// Start establishing a connection.
1262 	final void connect(string host, ushort port)
1263 	{
1264 		assert(host.length, "Empty host");
1265 		assert(port, "No port specified");
1266 
1267 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1268 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1269 
1270 		state = ConnectionState.resolving;
1271 
1272 		AddressInfo[] addressInfos;
1273 		try
1274 		{
1275 			auto addresses = getAddress(host, port);
1276 			enforce(addresses.length, "No addresses found");
1277 			debug (ASOCKETS)
1278 			{
1279 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1280 				foreach (address; addresses)
1281 					stderr.writefln("- %s", address.toString());
1282 			}
1283 
1284 			if (addresses.length > 1)
1285 			{
1286 				import std.random : randomShuffle;
1287 				randomShuffle(addresses);
1288 			}
1289 
1290 			foreach (address; addresses)
1291 				addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host);
1292 		}
1293 		catch (SocketException e)
1294 			return onError("Lookup error: " ~ e.msg);
1295 
1296 		state = ConnectionState.disconnected;
1297 		connect(addressInfos);
1298 	}
1299 
1300 	/// ditto
1301 	final void connect(AddressInfo[] addresses)
1302 	{
1303 		assert(addresses.length, "No addresses specified");
1304 
1305 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1306 		assert(!conn);
1307 
1308 		addressQueue = addresses;
1309 		state = ConnectionState.connecting;
1310 		tryNextAddress();
1311 	}
1312 }
1313 
1314 // ***************************************************************************
1315 
1316 /// An asynchronous TCP connection server.
1317 class TcpServer
1318 {
1319 private:
1320 	/// Class that actually performs listening on a certain address family
1321 	final class Listener : GenericSocket
1322 	{
1323 		this(Socket conn)
1324 		{
1325 			debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this);
1326 			this.conn = conn;
1327 			socketManager.register(this);
1328 		}
1329 
1330 		/// Called when a socket is readable.
1331 		override void onReadable()
1332 		{
1333 			debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this);
1334 			Socket acceptSocket = conn.accept();
1335 			acceptSocket.blocking = false;
1336 			if (handleAccept)
1337 			{
1338 				TcpConnection connection = new TcpConnection(acceptSocket);
1339 				debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress);
1340 				connection.setKeepAlive();
1341 				//assert(connection.connected);
1342 				//connection.connected = true;
1343 				acceptHandler(connection);
1344 			}
1345 			else
1346 				acceptSocket.close();
1347 		}
1348 
1349 		/// Called when a socket is writable.
1350 		override void onWritable()
1351 		{
1352 		}
1353 
1354 		/// Called when an error occurs on the socket.
1355 		override void onError(string reason)
1356 		{
1357 			close(); // call parent
1358 		}
1359 
1360 		void closeListener()
1361 		{
1362 			assert(conn);
1363 			socketManager.unregister(this);
1364 			conn.close();
1365 			conn = null;
1366 		}
1367 	}
1368 
1369 	/// Whether the socket is listening.
1370 	bool listening;
1371 	/// Listener instances
1372 	Listener[] listeners;
1373 
1374 	final void updateFlags()
1375 	{
1376 		foreach (listener; listeners)
1377 			listener.notifyRead = handleAccept !is null;
1378 	}
1379 
1380 public:
1381 	/// Start listening on this socket.
1382 	final ushort listen(ushort port, string addr = null)
1383 	{
1384 		debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port);
1385 		//assert(!listening, "Attempting to listen on a listening socket");
1386 
1387 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1388 
1389 		debug (ASOCKETS)
1390 		{
1391 			stderr.writefln("Resolved to %s addresses:", addressInfos.length);
1392 			foreach (ref addressInfo; addressInfos)
1393 				stderr.writefln("- %s", addressInfo);
1394 		}
1395 
1396 		// listen on random ports only on IPv4 for now
1397 		if (port == 0)
1398 		{
1399 			foreach_reverse (i, ref addressInfo; addressInfos)
1400 				if (addressInfo.family != AddressFamily.INET)
1401 					addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$];
1402 		}
1403 
1404 		listen(addressInfos);
1405 
1406 		foreach (listener; listeners)
1407 		{
1408 			auto address = listener.conn.localAddress();
1409 			if (address.addressFamily == AddressFamily.INET)
1410 				port = to!ushort(address.toPortString());
1411 		}
1412 
1413 		return port;
1414 	}
1415 
1416 	/// ditto
1417 	final void listen(AddressInfo[] addressInfos)
1418 	{
1419 		foreach (ref addressInfo; addressInfos)
1420 		{
1421 			try
1422 			{
1423 				Socket conn = new Socket(addressInfo);
1424 				conn.blocking = false;
1425 				if (addressInfo.family == AddressFamily.INET6)
1426 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1427 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1428 
1429 				conn.bind(addressInfo.address);
1430 				conn.listen(8);
1431 
1432 				listeners ~= new Listener(conn);
1433 			}
1434 			catch (SocketException e)
1435 			{
1436 				debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1437 				debug(ASOCKETS) stderr.writeln(e.msg);
1438 			}
1439 		}
1440 
1441 		if (listeners.length==0)
1442 			throw new Exception("Unable to bind service");
1443 
1444 		listening = true;
1445 
1446 		updateFlags();
1447 	}
1448 
1449 	final @property Address[] localAddresses()
1450 	{
1451 		Address[] result;
1452 		foreach (listener; listeners)
1453 			result ~= listener.localAddress;
1454 		return result;
1455 	}
1456 
1457 	final @property bool isListening()
1458 	{
1459 		return listening;
1460 	}
1461 
1462 	/// Stop listening on this socket.
1463 	final void close()
1464 	{
1465 		foreach (listener;listeners)
1466 			listener.closeListener();
1467 		listeners = null;
1468 		listening = false;
1469 		if (handleClose)
1470 			handleClose();
1471 	}
1472 
1473 public:
1474 	/// Callback for when the socket was closed.
1475 	void delegate() handleClose;
1476 
1477 	private void delegate(TcpConnection incoming) acceptHandler;
1478 	/// Callback for an incoming connection.
1479 	/// Connections will not be accepted unless this handler is set.
1480 	@property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; }
1481 	/// ditto
1482 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); }
1483 }
1484 
1485 // ***************************************************************************
1486 
1487 /// An asynchronous UDP stream.
1488 /// UDP does not have connections, so this class encapsulates a socket
1489 /// with a fixed destination (sendto) address, and optionally bound to
1490 /// a local address.
1491 /// Currently received packets' address is not exposed.
1492 class UdpConnection : Connection
1493 {
1494 protected:
1495 	this(Socket conn)
1496 	{
1497 		super(conn);
1498 	}
1499 
1500 	/// Called when a socket is writable.
1501 	override void onWritable()
1502 	{
1503 		//scope(success) updateFlags();
1504 		onWritableImpl();
1505 		updateFlags();
1506 	}
1507 
1508 	// Work around scope(success) breaking debugger stack traces
1509 	final private void onWritableImpl()
1510 	{
1511 		foreach (priority, ref queue; outQueue)
1512 			while (queue.length)
1513 			{
1514 				auto pdata = queue.ptr; // pointer to first data
1515 
1516 				auto sent = conn.sendTo(pdata.contents, remoteAddress);
1517 
1518 				if (sent == Socket.ERROR)
1519 				{
1520 					if (wouldHaveBlocked())
1521 						return;
1522 					else
1523 						return onError("send() error: " ~ lastSocketError);
1524 				}
1525 				else
1526 				if (sent < pdata.length)
1527 				{
1528 					return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length));
1529 				}
1530 				else
1531 				{
1532 					assert(sent == pdata.length);
1533 					//debug writefln("[%s] Sent data:", remoteAddress);
1534 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1535 					pdata.clear();
1536 					queue = queue[1..$];
1537 					if (queue.length == 0)
1538 						queue = null;
1539 				}
1540 			}
1541 
1542 		// outQueue is now empty
1543 		if (bufferFlushedHandler)
1544 			bufferFlushedHandler();
1545 		if (state == ConnectionState.disconnecting)
1546 		{
1547 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1548 			close();
1549 		}
1550 	}
1551 
1552 	override sizediff_t doSend(in void[] buffer)
1553 	{
1554 		assert(false); // never called (called only from overridden methods)
1555 	}
1556 
1557 	override sizediff_t doReceive(void[] buffer)
1558 	{
1559 		return conn.receive(buffer);
1560 	}
1561 
1562 public:
1563 	/// Default constructor
1564 	this()
1565 	{
1566 		debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this);
1567 	}
1568 
1569 	/// Initialize with the given AddressFamily, without binding to an address.
1570 	final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP)
1571 	{
1572 		initializeImpl(family, type, protocol);
1573 		if (connectHandler)
1574 			connectHandler();
1575 	}
1576 
1577 	final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol)
1578 	{
1579 		assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state));
1580 		assert(!conn);
1581 
1582 		conn = new Socket(family, type, protocol);
1583 		conn.blocking = false;
1584 		socketManager.register(this);
1585 		state = ConnectionState.connected;
1586 		updateFlags();
1587 	}
1588 
1589 	/// Bind to a local address in order to receive packets sent there.
1590 	final ushort bind(string host, ushort port)
1591 	{
1592 		assert(host.length, "Empty host");
1593 
1594 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1595 
1596 		state = ConnectionState.resolving;
1597 
1598 		AddressInfo addressInfo;
1599 		try
1600 		{
1601 			auto addresses = getAddress(host, port);
1602 			enforce(addresses.length, "No addresses found");
1603 			debug (ASOCKETS)
1604 			{
1605 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1606 				foreach (address; addresses)
1607 					stderr.writefln("- %s", address.toString());
1608 			}
1609 
1610 			Address address;
1611 			if (addresses.length > 1)
1612 			{
1613 				import std.random : uniform;
1614 				address = addresses[uniform(0, $)];
1615 			}
1616 			else
1617 				address = addresses[0];
1618 			addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host);
1619 		}
1620 		catch (SocketException e)
1621 		{
1622 			onError("Lookup error: " ~ e.msg);
1623 			return 0;
1624 		}
1625 
1626 		state = ConnectionState.disconnected;
1627 		return bind(addressInfo);
1628 	}
1629 
1630 	/// ditto
1631 	final ushort bind(AddressInfo addressInfo)
1632 	{
1633 		initialize(addressInfo.family, addressInfo.type, addressInfo.protocol);
1634 		conn.bind(addressInfo.address);
1635 
1636 		auto address = conn.localAddress();
1637 		auto port = to!ushort(address.toPortString());
1638 
1639 		if (connectHandler)
1640 			connectHandler();
1641 
1642 		return port;
1643 	}
1644 
1645 public:
1646 	/// Where to send packets to.
1647 	Address remoteAddress;
1648 }
1649 
1650 ///
1651 unittest
1652 {
1653 	auto server = new UdpConnection();
1654 	auto serverPort = server.bind("localhost", 0);
1655 
1656 	auto client = new UdpConnection();
1657 	client.initialize(server.localAddress.addressFamily);
1658 
1659 	string[] packets = ["Hello", "there"];
1660 	client.remoteAddress = server.localAddress;
1661 	client.send({
1662 		Data[] data;
1663 		foreach (packet; packets)
1664 			data ~= Data(packet);
1665 		return data;
1666 	}());
1667 
1668 	server.handleReadData = (Data data)
1669 	{
1670 		assert(data.contents == packets[0]);
1671 		packets = packets[1..$];
1672 		if (!packets.length)
1673 		{
1674 			server.close();
1675 			client.close();
1676 		}
1677 	};
1678 	socketManager.loop();
1679 	assert(!packets.length);
1680 }
1681 
1682 // ***************************************************************************
1683 
1684 /// Base class for a connection adapter.
1685 /// By itself, does nothing.
1686 class ConnectionAdapter : IConnection
1687 {
1688 	IConnection next;
1689 
1690 	this(IConnection next)
1691 	{
1692 		this.next = next;
1693 		next.handleConnect = &onConnect;
1694 		next.handleDisconnect = &onDisconnect;
1695 		next.handleBufferFlushed = &onBufferFlushed;
1696 	}
1697 
1698 	@property ConnectionState state() { return next.state; }
1699 
1700 	/// Queue Data for sending.
1701 	void send(Data[] data, int priority)
1702 	{
1703 		next.send(data, priority);
1704 	}
1705 
1706 	alias send = IConnection.send; /// ditto
1707 
1708 	/// Terminate the connection.
1709 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1710 	{
1711 		next.disconnect(reason, type);
1712 	}
1713 
1714 	protected void onConnect()
1715 	{
1716 		if (connectHandler)
1717 			connectHandler();
1718 	}
1719 
1720 	protected void onReadData(Data data)
1721 	{
1722 		// onReadData should be fired only if readDataHandler is set
1723 		readDataHandler(data);
1724 	}
1725 
1726 	protected void onDisconnect(string reason, DisconnectType type)
1727 	{
1728 		if (disconnectHandler)
1729 			disconnectHandler(reason, type);
1730 	}
1731 
1732 	protected void onBufferFlushed()
1733 	{
1734 		if (bufferFlushedHandler)
1735 			bufferFlushedHandler();
1736 	}
1737 
1738 	/// Callback for when a connection has been established.
1739 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1740 	private ConnectHandler connectHandler;
1741 
1742 	/// Callback setter for when new data is read.
1743 	@property void handleReadData(ReadDataHandler value)
1744 	{
1745 		readDataHandler = value;
1746 		next.handleReadData = value ? &onReadData : null ;
1747 	}
1748 	private ReadDataHandler readDataHandler;
1749 
1750 	/// Callback setter for when a connection was closed.
1751 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1752 	private DisconnectHandler disconnectHandler;
1753 
1754 	/// Callback setter for when all queued data has been written.
1755 	@property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; }
1756 	private BufferFlushedHandler bufferFlushedHandler;
1757 }
1758 
1759 // ***************************************************************************
1760 
1761 /// Adapter for connections with a line-based protocol.
1762 /// Splits data stream into delimiter-separated lines.
1763 class LineBufferedAdapter : ConnectionAdapter
1764 {
1765 	/// The protocol's line delimiter.
1766 	string delimiter = "\r\n";
1767 
1768 	this(IConnection next)
1769 	{
1770 		super(next);
1771 	}
1772 
1773 	/// Append a line to the send buffer.
1774 	void send(string line)
1775 	{
1776 		//super.send(Data(line ~ delimiter));
1777 		// https://issues.dlang.org/show_bug.cgi?id=13985
1778 		ConnectionAdapter ca = this;
1779 		ca.send(Data(line ~ delimiter));
1780 	}
1781 
1782 protected:
1783 	/// The receive buffer.
1784 	Data inBuffer;
1785 
1786 	/// Called when data has been received.
1787 	final override void onReadData(Data data)
1788 	{
1789 		import std.string;
1790 		auto oldBufferLength = inBuffer.length;
1791 		if (oldBufferLength)
1792 			inBuffer ~= data;
1793 		else
1794 			inBuffer = data;
1795 
1796 		if (delimiter.length == 1)
1797 		{
1798 			import core.stdc.string; // memchr
1799 
1800 			char c = delimiter[0];
1801 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1802 			while (p)
1803 			{
1804 				sizediff_t index = p - inBuffer.ptr;
1805 				processLine(index);
1806 
1807 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1808 			}
1809 		}
1810 		else
1811 		{
1812 			sizediff_t index;
1813 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1814 			while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0)
1815 				processLine(index);
1816 		}
1817 	}
1818 
1819 	final void processLine(size_t index)
1820 	{
1821 		auto line = inBuffer[0..index];
1822 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1823 		super.onReadData(line);
1824 	}
1825 
1826 	override void onDisconnect(string reason, DisconnectType type)
1827 	{
1828 		super.onDisconnect(reason, type);
1829 		inBuffer.clear();
1830 	}
1831 }
1832 
1833 // ***************************************************************************
1834 
1835 /// Fires an event handler or disconnects connections
1836 /// after a period of inactivity.
1837 class TimeoutAdapter : ConnectionAdapter
1838 {
1839 	this(IConnection next)
1840 	{
1841 		debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this);
1842 		super(next);
1843 	}
1844 
1845 	void cancelIdleTimeout()
1846 	{
1847 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
1848 		assert(idleTask !is null);
1849 		assert(idleTask.isWaiting());
1850 		idleTask.cancel();
1851 	}
1852 
1853 	void resumeIdleTimeout()
1854 	{
1855 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
1856 		assert(idleTask !is null);
1857 		assert(!idleTask.isWaiting());
1858 		mainTimer.add(idleTask);
1859 	}
1860 
1861 	final void setIdleTimeout(Duration duration)
1862 	{
1863 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
1864 		assert(duration > Duration.zero);
1865 
1866 		// Configure idleTask
1867 		if (idleTask is null)
1868 		{
1869 			idleTask = new TimerTask(duration);
1870 			idleTask.handleTask = &onTask_Idle;
1871 		}
1872 		else
1873 		{
1874 			if (idleTask.isWaiting())
1875 				idleTask.cancel();
1876 			idleTask.delay = duration;
1877 		}
1878 
1879 		mainTimer.add(idleTask);
1880 	}
1881 
1882 	void markNonIdle()
1883 	{
1884 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
1885 		if (handleNonIdle)
1886 			handleNonIdle();
1887 		if (idleTask && idleTask.isWaiting())
1888 			idleTask.restart();
1889 	}
1890 
1891 	/// Callback for when a connection has stopped responding.
1892 	/// If unset, the connection will be disconnected.
1893 	void delegate() handleIdleTimeout;
1894 
1895 	/// Callback for when a connection is marked as non-idle
1896 	/// (when data is received).
1897 	void delegate() handleNonIdle;
1898 
1899 protected:
1900 	override void onConnect()
1901 	{
1902 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
1903 		markNonIdle();
1904 		super.onConnect();
1905 	}
1906 
1907 	override void onReadData(Data data)
1908 	{
1909 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
1910 		markNonIdle();
1911 		super.onReadData(data);
1912 	}
1913 
1914 	override void onDisconnect(string reason, DisconnectType type)
1915 	{
1916 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
1917 		if (idleTask && idleTask.isWaiting())
1918 			idleTask.cancel();
1919 		super.onDisconnect(reason, type);
1920 	}
1921 
1922 private:
1923 	TimerTask idleTask; // non-null if an idle timeout has been set
1924 
1925 	final void onTask_Idle(Timer timer, TimerTask task)
1926 	{
1927 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this);
1928 		if (state == ConnectionState.disconnecting)
1929 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
1930 
1931 		if (state == ConnectionState.disconnected)
1932 			return;
1933 
1934 		if (handleIdleTimeout)
1935 		{
1936 			resumeIdleTimeout(); // reschedule (by default)
1937 			handleIdleTimeout();
1938 		}
1939 		else
1940 			disconnect("Time-out", DisconnectType.error);
1941 	}
1942 }
1943 
1944 // ***************************************************************************
1945 
1946 unittest
1947 {
1948 	void testTimer()
1949 	{
1950 		bool fired;
1951 		setTimeout({fired = true;}, 10.msecs);
1952 		socketManager.loop();
1953 		assert(fired);
1954 	}
1955 
1956 	testTimer();
1957 }