1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <vladimir@thecybershadow.net>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.math;
21 public import ae.sys.data;
22 
23 import core.stdc.stdint : int32_t;
24 
25 import std.exception;
26 import std.socket;
27 import std.string : format;
28 public import std.socket : Address, AddressInfo, Socket;
29 
30 version (Windows)
31     private import c_socks = core.sys.windows.winsock2;
32 else version (Posix)
33     private import c_socks = core.sys.posix.sys.socket;
34 
35 debug(ASOCKETS) import std.stdio : stderr;
36 debug(PRINTDATA) import std.stdio : stderr;
37 debug(PRINTDATA) import ae.utils.text : hexDump;
38 private import std.conv : to;
39 
40 
41 // http://d.puremagic.com/issues/show_bug.cgi?id=7016
42 static import ae.utils.array;
43 
44 version(LIBEV)
45 {
46 	import deimos.ev;
47 	pragma(lib, "ev");
48 }
49 
50 version(Windows)
51 {
52 	import core.sys.windows.windows : Sleep;
53 	enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
54 }
55 else
56 	enum USE_SLEEP = false;
57 
58 /// Flags that determine socket wake-up events.
59 int eventCounter;
60 
61 version(LIBEV)
62 {
63 	// Watchers are a GenericSocket field (as declared in SocketMixin).
64 	// Use one watcher per read and write event.
65 	// Start/stop those as higher-level code declares interest in those events.
66 	// Use the "data" ev_io field to store the parent GenericSocket address.
67 	// Also use the "data" field as a flag to indicate whether the watcher is active
68 	// (data is null when the watcher is stopped).
69 
70 	struct SocketManager
71 	{
72 	private:
73 		size_t count;
74 
75 		extern(C)
76 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
77 		{
78 			eventCounter++;
79 			auto socket = cast(GenericSocket)w.data;
80 			assert(socket, "libev event fired on stopped watcher");
81 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents);
82 
83 			if (revents & EV_READ)
84 				socket.onReadable();
85 			else
86 			if (revents & EV_WRITE)
87 				socket.onWritable();
88 			else
89 				assert(false, "Unknown event fired from libev");
90 
91 			// TODO? Need to get proper SocketManager instance to call updateTimer on
92 			socketManager.updateTimer(false);
93 		}
94 
95 		ev_timer evTimer;
96 		MonoTime lastNextEvent = MonoTime.max;
97 
98 		extern(C)
99 		static void timerCallback(ev_loop_t* l, ev_timer* w, int revents)
100 		{
101 			eventCounter++;
102 			debug (ASOCKETS) writefln("Timer callback called.");
103 			mainTimer.prod();
104 
105 			socketManager.updateTimer(true);
106 			debug (ASOCKETS) writefln("Timer callback exiting.");
107 		}
108 
109 		void updateTimer(bool force)
110 		{
111 			auto nextEvent = mainTimer.getNextEvent();
112 			if (force || lastNextEvent != nextEvent)
113 			{
114 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
115 				if (nextEvent == MonoTime.max) // Stopping
116 				{
117 					if (lastNextEvent != MonoTime.max)
118 						ev_timer_stop(ev_default_loop(0), &evTimer);
119 				}
120 				else
121 				{
122 					auto remaining = mainTimer.getRemainingTime();
123 					while (remaining <= Duration.zero)
124 					{
125 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
126 						mainTimer.prod();
127 						remaining = mainTimer.getRemainingTime();
128 					}
129 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
130 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
131 					if (lastNextEvent == MonoTime.max) // Starting
132 					{
133 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
134 						ev_timer_start(ev_default_loop(0), &evTimer);
135 					}
136 					else // Adjusting
137 					{
138 						evTimer.repeat = tstamp;
139 						ev_timer_again(ev_default_loop(0), &evTimer);
140 					}
141 				}
142 				lastNextEvent = nextEvent;
143 			}
144 		}
145 
146 	public:
147 		/// Register a socket with the manager.
148 		void register(GenericSocket socket)
149 		{
150 			debug (ASOCKETS) writefln("Registering %s", socket);
151 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
152 			auto fd = socket.conn.handle;
153 			assert(fd, "Must have fd before socket registration");
154 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
155 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
156 			count++;
157 		}
158 
159 		/// Unregister a socket with the manager.
160 		void unregister(GenericSocket socket)
161 		{
162 			debug (ASOCKETS) writefln("Unregistering %s", socket);
163 			socket.notifyRead  = false;
164 			socket.notifyWrite = false;
165 			count--;
166 		}
167 
168 		size_t size()
169 		{
170 			return count;
171 		}
172 
173 		/// Loop continuously until no sockets are left.
174 		void loop()
175 		{
176 			auto evLoop = ev_default_loop(0);
177 			enforce(evLoop, "libev initialization failure");
178 
179 			updateTimer(true);
180 			debug (ASOCKETS) writeln("ev_run");
181 			ev_run(ev_default_loop(0), 0);
182 		}
183 	}
184 
185 	private mixin template SocketMixin()
186 	{
187 		private ev_io evRead, evWrite;
188 
189 		private void setWatcherState(ref ev_io ev, bool newValue, int event)
190 		{
191 			if (!conn)
192 			{
193 				// Can happen when setting delegates before connecting.
194 				return;
195 			}
196 
197 			if (newValue && !ev.data)
198 			{
199 				// Start
200 				ev.data = cast(void*)this;
201 				ev_io_start(ev_default_loop(0), &ev);
202 			}
203 			else
204 			if (!newValue && ev.data)
205 			{
206 				// Stop
207 				assert(ev.data is cast(void*)this);
208 				ev.data = null;
209 				ev_io_stop(ev_default_loop(0), &ev);
210 			}
211 		}
212 
213 		/// Interested in read notifications (onReadable)?
214 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
215 		/// Interested in write notifications (onWritable)?
216 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
217 
218 		debug ~this()
219 		{
220 			// The LIBEV SocketManager holds no references to registered sockets.
221 			// TODO: Add a doubly-linked list?
222 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
223 		}
224 	}
225 }
226 else // Use select
227 {
228 	struct SocketManager
229 	{
230 	private:
231 		enum FD_SETSIZE = 1024;
232 
233 		/// List of all sockets to poll.
234 		GenericSocket[] sockets;
235 
236 		/// Debug AA to check for dangling socket references.
237 		debug GenericSocket[socket_t] socketHandles;
238 
239 		void delegate()[] idleHandlers;
240 
241 	public:
242 		/// Register a socket with the manager.
243 		void register(GenericSocket conn)
244 		{
245 			debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1);
246 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
247 			sockets ~= conn;
248 
249 			debug
250 			{
251 				auto handle = conn.socket.handle;
252 				assert(handle != socket_t.init, "Can't register a closed socket");
253 				assert(handle !in socketHandles, "This socket handle is already registered");
254 				socketHandles[handle] = conn;
255 			}
256 		}
257 
258 		/// Unregister a socket with the manager.
259 		void unregister(GenericSocket conn)
260 		{
261 			debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
262 
263 			debug
264 			{
265 				auto handle = conn.socket.handle;
266 				assert(handle != socket_t.init, "Can't unregister a closed socket");
267 				auto pconn = handle in socketHandles;
268 				assert(pconn, "This socket handle is not registered");
269 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
270 				socketHandles.remove(handle);
271 			}
272 
273 			foreach (size_t i, GenericSocket j; sockets)
274 				if (j is conn)
275 				{
276 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
277 					return;
278 				}
279 			assert(false, "Socket not registered");
280 		}
281 
282 		size_t size()
283 		{
284 			return sockets.length;
285 		}
286 
287 		/// Loop continuously until no sockets are left.
288 		void loop()
289 		{
290 			debug (ASOCKETS) stderr.writeln("Starting event loop.");
291 
292 			SocketSet readset, writeset;
293 			size_t sockcount;
294 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
295 			readset  = new SocketSet(setSize);
296 			writeset = new SocketSet(setSize);
297 			while (true)
298 			{
299 				uint minSize = 0;
300 				version(Windows)
301 					minSize = cast(uint)sockets.length;
302 				else
303 				{
304 					foreach (s; sockets)
305 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
306 							minSize = s.socket.handle;
307 				}
308 				minSize++;
309 
310 				if (setSize < minSize)
311 				{
312 					debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
313 					setSize = minSize * 2;
314 					readset  = new SocketSet(setSize);
315 					writeset = new SocketSet(setSize);
316 				}
317 				else
318 				{
319 					readset.reset();
320 					writeset.reset();
321 				}
322 
323 				sockcount = 0;
324 				bool haveActive;
325 				debug (ASOCKETS) stderr.writeln("Populating sets");
326 				foreach (GenericSocket conn; sockets)
327 				{
328 					if (!conn.socket)
329 						continue;
330 					sockcount++;
331 
332 					debug (ASOCKETS) stderr.writef("\t%s:", conn);
333 					if (conn.notifyRead)
334 					{
335 						readset.add(conn.socket);
336 						if (!conn.daemonRead)
337 							haveActive = true;
338 						debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : "");
339 					}
340 					if (conn.notifyWrite)
341 					{
342 						writeset.add(conn.socket);
343 						if (!conn.daemonWrite)
344 							haveActive = true;
345 						debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : "");
346 					}
347 					debug (ASOCKETS) stderr.writeln();
348 				}
349 				debug (ASOCKETS)
350 				{
351 					stderr.writefln("Sets populated as follows:");
352 					printSets(readset, writeset);
353 				}
354 
355 				debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...",
356 					haveActive ? "" : "not ",
357 					sockcount,
358 					mainTimer.isWaiting() ? "with" : "no",
359 					idleHandlers.length,
360 				);
361 				if (!haveActive && !mainTimer.isWaiting())
362 				{
363 					debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop.");
364 					break;
365 				}
366 
367 				debug (ASOCKETS) stderr.flush();
368 
369 				int events;
370 				if (idleHandlers.length)
371 				{
372 					if (sockcount==0)
373 						events = 0;
374 					else
375 						events = Socket.select(readset, writeset, null, 0.seconds);
376 				}
377 				else
378 				if (USE_SLEEP && sockcount==0)
379 				{
380 					version(Windows)
381 					{
382 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
383 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
384 						if (duration <= 0)
385 							duration = 1; // Avoid busywait
386 						else
387 						if (duration > int.max)
388 							duration = int.max;
389 						Sleep(cast(int)duration);
390 						events = 0;
391 					}
392 					else
393 						assert(0);
394 				}
395 				else
396 				if (mainTimer.isWaiting())
397 					events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime());
398 				else
399 					events = Socket.select(readset, writeset, null);
400 
401 				debug (ASOCKETS) stderr.writefln("%d events fired.", events);
402 
403 				if (events > 0)
404 				{
405 					// Handle just one event at a time, as the first
406 					// handler might invalidate select()'s results.
407 					handleEvent(readset, writeset);
408 				}
409 				else
410 				if (idleHandlers.length)
411 				{
412 					import ae.utils.array;
413 					auto handler = idleHandlers.shift();
414 
415 					// Rotate the idle handler queue before running it,
416 					// in case the handler unregisters itself.
417 					idleHandlers ~= handler;
418 
419 					handler();
420 				}
421 
422 				// Timers may invalidate our select results, so fire them after processing the latter
423 				mainTimer.prod();
424 
425 				eventCounter++;
426 			}
427 		}
428 
429 		debug (ASOCKETS)
430 		void printSets(SocketSet readset, SocketSet writeset)
431 		{
432 			foreach (GenericSocket conn; sockets)
433 			{
434 				if (!conn.socket)
435 					stderr.writefln("\t\t%s is unset", conn);
436 				else
437 				{
438 					if (readset.isSet(conn.socket))
439 						stderr.writefln("\t\t%s is readable", conn);
440 					if (writeset.isSet(conn.socket))
441 						stderr.writefln("\t\t%s is writable", conn);
442 				}
443 			}
444 		}
445 
446 		void handleEvent(SocketSet readset, SocketSet writeset)
447 		{
448 			debug (ASOCKETS)
449 			{
450 				stderr.writefln("\tSelect results:");
451 				printSets(readset, writeset);
452 			}
453 
454 			foreach (GenericSocket conn; sockets)
455 			{
456 				if (!conn.socket)
457 					continue;
458 
459 				if (readset.isSet(conn.socket))
460 				{
461 					debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn);
462 					return conn.onReadable();
463 				}
464 				else
465 				if (writeset.isSet(conn.socket))
466 				{
467 					debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn);
468 					return conn.onWritable();
469 				}
470 			}
471 
472 			assert(false, "select() reported events available, but no registered sockets are set");
473 		}
474 	}
475 
476 	// Use UFCS to allow removeIdleHandler to have a predicate with context
477 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
478 	{
479 		foreach (i, idleHandler; socketManager.idleHandlers)
480 			assert(handler !is idleHandler);
481 
482 		socketManager.idleHandlers ~= handler;
483 	}
484 
485 	static bool isFun(T)(T a, T b) { return a is b; }
486 	void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args)
487 	{
488 		foreach (i, idleHandler; socketManager.idleHandlers)
489 			if (pred(idleHandler, args))
490 			{
491 				import std.algorithm;
492 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
493 				return;
494 			}
495 		assert(false, "No such idle handler");
496 	}
497 
498 	private mixin template SocketMixin()
499 	{
500 		/// Interested in read notifications (onReadable)?
501 		bool notifyRead;
502 		/// Interested in write notifications (onWritable)?
503 		bool notifyWrite;
504 	}
505 }
506 
507 /// The default socket manager.
508 SocketManager socketManager;
509 
510 // ***************************************************************************
511 
512 /// General methods for an asynchronous socket.
513 abstract class GenericSocket
514 {
515 	/// Declares notifyRead and notifyWrite.
516 	mixin SocketMixin;
517 
518 protected:
519 	/// The socket this class wraps.
520 	Socket conn;
521 
522 protected:
523 	/// Retrieve the socket class this class wraps.
524 	@property final Socket socket()
525 	{
526 		return conn;
527 	}
528 
529 	void onReadable()
530 	{
531 	}
532 
533 	void onWritable()
534 	{
535 	}
536 
537 	void onError(string reason)
538 	{
539 	}
540 
541 public:
542 	/// allow getting the address of connections that are already disconnected
543 	private Address[2] cachedAddress;
544 
545 	/*private*/ final @property Address address(bool local)()
546 	{
547 		if (cachedAddress[local] !is null)
548 			return cachedAddress[local];
549 		else
550 		if (conn is null)
551 			return null;
552 		else
553 		{
554 			Address a;
555 			if (conn.addressFamily == AddressFamily.UNSPEC)
556 			{
557 				// Socket will attempt to construct an UnknownAddress,
558 				// which will almost certainly not match the real address length.
559 				static if (local)
560 					alias getname = c_socks.getsockname;
561 				else
562 					alias getname = c_socks.getpeername;
563 
564 				c_socks.socklen_t nameLen = 0;
565 				if (getname(conn.handle, null, &nameLen) < 0)
566 					throw new SocketOSException("Unable to obtain socket address");
567 
568 				auto buf = new ubyte[nameLen];
569 				auto sa = cast(c_socks.sockaddr*)buf.ptr;
570 				if (getname(conn.handle, sa, &nameLen) < 0)
571 					throw new SocketOSException("Unable to obtain socket address");
572 				a = new UnknownAddressReference(sa, nameLen);
573 			}
574 			else
575 				a = local ? conn.localAddress() : conn.remoteAddress();
576 			return cachedAddress[local] = a;
577 		}
578 	}
579 
580 	alias localAddress = address!true;
581 	alias remoteAddress = address!false;
582 
583 	/*private*/ final @property string addressStr(bool local)() nothrow
584 	{
585 		try
586 		{
587 			auto a = address!local;
588 			if (a is null)
589 				return "[null address]";
590 			string host = a.toAddrString();
591 			import std.string : indexOf;
592 			if (host.indexOf(':') >= 0)
593 				host = "[" ~ host ~ "]";
594 			try
595 			{
596 				string port = a.toPortString();
597 				return host ~ ":" ~ port;
598 			}
599 			catch (Exception e)
600 				return host;
601 		}
602 		catch (Exception e)
603 			return "[error: " ~ e.msg ~ "]";
604 	}
605 
606 	alias localAddressStr = addressStr!true;
607 	alias remoteAddressStr = addressStr!false;
608 
609 	/// Don't block the process from exiting, even if the socket is ready to receive data.
610 	/// TODO: Not implemented with libev
611 	bool daemonRead;
612 
613 	/// Don't block the process from exiting, even if the socket is ready to send data.
614 	/// TODO: Not implemented with libev
615 	bool daemonWrite;
616 
617 	deprecated alias daemon = daemonRead;
618 
619 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
620 	{
621 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
622 		if (enabled)
623 		{
624 			try
625 				conn.setKeepAlive(time, interval);
626 			catch (SocketException)
627 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
628 		}
629 		else
630 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
631 	}
632 
633 	override string toString() const
634 	{
635 		import std.string;
636 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
637 	}
638 }
639 
640 // ***************************************************************************
641 
642 enum DisconnectType
643 {
644 	requested, // initiated by the application
645 	graceful,  // peer gracefully closed the connection
646 	error      // abnormal network condition
647 }
648 
649 enum ConnectionState
650 {
651 	/// The initial state, or the state after a disconnect was fully processed.
652 	disconnected,
653 
654 	/// Name resolution. Currently done synchronously.
655 	resolving,
656 
657 	/// A connection attempt is in progress.
658 	connecting,
659 
660 	/// A connection is established.
661 	connected,
662 
663 	/// Disconnecting in progress. No data can be sent or received at this point.
664 	/// We are waiting for queued data to be actually sent before disconnecting.
665 	disconnecting,
666 }
667 
668 /// Returns true if this is a connection state for which disconnecting is valid.
669 /// Generally, applications should be aware of the life cycle of their sockets,
670 /// so checking the state of a connection is unnecessary (and a code smell).
671 /// However, unconditionally disconnecting some connected sockets can be useful
672 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle),
673 /// such as in response to a signal.
674 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; }
675 
676 /// Common interface for connections and adapters.
677 interface IConnection
678 {
679 	enum MAX_PRIORITY = 4;
680 	enum DEFAULT_PRIORITY = 2;
681 
682 	static const defaultDisconnectReason = "Software closed the connection";
683 
684 	/// Get connection state.
685 	@property ConnectionState state();
686 
687 	/// Has a connection been established?
688 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
689 
690 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
691 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
692 
693 	/// Queue Data for sending.
694 	void send(Data[] data, int priority = DEFAULT_PRIORITY);
695 
696 	/// ditto
697 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
698 	{
699 		Data[1] data;
700 		data[0] = datum;
701 		this.send(data, priority);
702 		data[] = Data.init;
703 	}
704 
705 	/// Terminate the connection.
706 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
707 
708 	/// Callback setter for when a connection has been established
709 	/// (if applicable).
710 	alias void delegate() ConnectHandler;
711 	@property void handleConnect(ConnectHandler value); /// ditto
712 
713 	/// Callback setter for when new data is read.
714 	alias void delegate(Data data) ReadDataHandler;
715 	@property void handleReadData(ReadDataHandler value); /// ditto
716 
717 	/// Callback setter for when a connection was closed.
718 	alias void delegate(string reason, DisconnectType type) DisconnectHandler;
719 	@property void handleDisconnect(DisconnectHandler value); /// ditto
720 
721 	/// Callback setter for when all queued data has been sent.
722 	alias void delegate() BufferFlushedHandler;
723 	@property void handleBufferFlushed(BufferFlushedHandler value); /// ditto
724 }
725 
726 // ***************************************************************************
727 
728 class Connection : GenericSocket, IConnection
729 {
730 private:
731 	/// Blocks of data larger than this value are passed as unmanaged memory
732 	/// (in Data objects). Blocks smaller than this value will be reallocated
733 	/// on the managed heap. The disadvantage of placing large objects on the
734 	/// managed heap is false pointers; the disadvantage of using Data for
735 	/// small objects is wasted slack space due to the page size alignment
736 	/// requirement.
737 	enum UNMANAGED_THRESHOLD = 256;
738 
739 	ConnectionState _state;
740 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
741 
742 public:
743 	/// Get connection state.
744 	override @property ConnectionState state() { return _state; }
745 
746 protected:
747 	abstract sizediff_t doSend(in void[] buffer);
748 	abstract sizediff_t doReceive(void[] buffer);
749 
750 	/// The send buffers.
751 	Data[][MAX_PRIORITY+1] outQueue;
752 	/// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled).
753 	int partiallySent = -1;
754 
755 	/// Constructor used by a ServerSocket for new connections
756 	this(Socket conn)
757 	{
758 		this();
759 		this.conn = conn;
760 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
761 		if (conn)
762 			socketManager.register(this);
763 		updateFlags();
764 	}
765 
766 	final void updateFlags()
767 	{
768 		if (state == ConnectionState.connecting)
769 			notifyWrite = true;
770 		else
771 			notifyWrite = writePending;
772 
773 		notifyRead = state == ConnectionState.connected && readDataHandler;
774 		debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite);
775 	}
776 
777 	/// Called when a socket is readable.
778 	override void onReadable()
779 	{
780 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
781 		static ubyte[0x10000] inBuffer = void;
782 		auto received = doReceive(inBuffer);
783 
784 		if (received == 0)
785 			return disconnect("Connection closed", DisconnectType.graceful);
786 
787 		if (received == Socket.ERROR)
788 		{
789 		//	if (wouldHaveBlocked)
790 		//	{
791 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
792 		//		return;
793 		//	}
794 		//	else
795 				onError("recv() error: " ~ lastSocketError);
796 		}
797 		else
798 		{
799 			debug (PRINTDATA)
800 			{
801 				stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr);
802 				stderr.write(hexDump(inBuffer[0 .. received]));
803 				stderr.flush();
804 			}
805 
806 			if (state == ConnectionState.disconnecting)
807 			{
808 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this);
809 			}
810 			else
811 			if (!readDataHandler)
812 			{
813 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this);
814 			}
815 			else
816 			{
817 				// Currently, unlike the D1 version of this module,
818 				// we will always reallocate read network data.
819 				// This disfavours code which doesn't need to store
820 				// read data after processing it, but otherwise
821 				// makes things simpler and safer all around.
822 
823 				if (received < UNMANAGED_THRESHOLD)
824 				{
825 					// Copy to the managed heap
826 					readDataHandler(Data(inBuffer[0 .. received].dup));
827 				}
828 				else
829 				{
830 					// Copy to unmanaged memory
831 					readDataHandler(Data(inBuffer[0 .. received], true));
832 				}
833 			}
834 		}
835 	}
836 
837 	/// Called when an error occurs on the socket.
838 	override void onError(string reason)
839 	{
840 		if (state == ConnectionState.disconnecting)
841 		{
842 			debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
843 			return close();
844 		}
845 
846 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
847 		disconnect("Socket error: " ~ reason, DisconnectType.error);
848 	}
849 
850 	this()
851 	{
852 	}
853 
854 public:
855 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
856 	/// The disconnect handler will be called immediately, even when not all data has been flushed yet.
857 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
858 	{
859 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
860 		assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state));
861 
862 		if (writePending)
863 		{
864 			if (type==DisconnectType.requested)
865 			{
866 				assert(conn, "Attempting to disconnect on an uninitialized socket");
867 				// queue disconnect after all data is sent
868 				debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason);
869 				state = ConnectionState.disconnecting;
870 				//setIdleTimeout(30.seconds);
871 				if (disconnectHandler)
872 					disconnectHandler(reason, type);
873 				updateFlags();
874 				return;
875 			}
876 			else
877 				discardQueues();
878 		}
879 
880 		debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
881 
882 		if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected)
883 			close();
884 		else
885 		{
886 			assert(conn is null, "Registered but %s socket".format(state));
887 			if (state == ConnectionState.resolving)
888 				state = ConnectionState.disconnected;
889 		}
890 
891 		if (disconnectHandler)
892 			disconnectHandler(reason, type);
893 		updateFlags();
894 	}
895 
896 	private final void close()
897 	{
898 		assert(conn, "Attempting to close an unregistered socket");
899 		socketManager.unregister(this);
900 		conn.close();
901 		conn = null;
902 		outQueue[] = null;
903 		state = ConnectionState.disconnected;
904 	}
905 
906 	/// Append data to the send buffer.
907 	void send(Data[] data, int priority = DEFAULT_PRIORITY)
908 	{
909 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
910 		outQueue[priority] ~= data;
911 		notifyWrite = true; // Fast updateFlags()
912 
913 		debug (PRINTDATA)
914 		{
915 			stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr);
916 			foreach (datum; data)
917 				if (datum.length)
918 					stderr.write(hexDump(datum.contents));
919 				else
920 					stderr.writeln("(empty Data)");
921 			stderr.flush();
922 		}
923 	}
924 
925 	/// ditto
926 	alias send = IConnection.send;
927 
928 	final void clearQueue(int priority)
929 	{
930 		if (priority == partiallySent)
931 		{
932 			assert(outQueue[priority].length > 0);
933 			outQueue[priority] = outQueue[priority][0..1];
934 		}
935 		else
936 			outQueue[priority] = null;
937 		updateFlags();
938 	}
939 
940 	/// Clears all queues, even partially sent content.
941 	private final void discardQueues()
942 	{
943 		foreach (priority; 0..MAX_PRIORITY+1)
944 			outQueue[priority] = null;
945 		partiallySent = -1;
946 		updateFlags();
947 	}
948 
949 	@property
950 	final bool writePending()
951 	{
952 		foreach (queue; outQueue)
953 			if (queue.length)
954 				return true;
955 		return false;
956 	}
957 
958 	final bool queuePresent(int priority = DEFAULT_PRIORITY)
959 	{
960 		if (priority == partiallySent)
961 		{
962 			assert(outQueue[priority].length > 0);
963 			return outQueue[priority].length > 1;
964 		}
965 		else
966 			return outQueue[priority].length > 0;
967 	}
968 
969 	final size_t packetsQueued(int priority = DEFAULT_PRIORITY)
970 	{
971 		return outQueue[priority].length;
972 	}
973 
974 	final size_t bytesQueued(int priority = DEFAULT_PRIORITY)
975 	{
976 		size_t bytes;
977 		foreach (datum; outQueue[priority])
978 			bytes += datum.length;
979 		return bytes;
980 	}
981 
982 public:
983 	private ConnectHandler connectHandler;
984 	/// Callback for when a connection has been established.
985 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
986 
987 	private ReadDataHandler readDataHandler;
988 	/// Callback for incoming data.
989 	/// Data will not be received unless this handler is set.
990 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
991 
992 	private DisconnectHandler disconnectHandler;
993 	/// Callback for when a connection was closed.
994 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
995 
996 	/// Callback setter for when all queued data has been sent.
997 	private BufferFlushedHandler bufferFlushedHandler;
998 	@property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); }
999 }
1000 
1001 class StreamConnection : Connection
1002 {
1003 protected:
1004 	this()
1005 	{
1006 		super();
1007 	}
1008 
1009 	/// Called when a socket is writable.
1010 	override void onWritable()
1011 	{
1012 		//scope(success) updateFlags();
1013 		onWritableImpl();
1014 		updateFlags();
1015 	}
1016 
1017 	// Work around scope(success) breaking debugger stack traces
1018 	final private void onWritableImpl()
1019 	{
1020 		debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state);
1021 		if (state == ConnectionState.connecting)
1022 		{
1023 			int32_t error;
1024 			conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error);
1025 			if (error)
1026 				return disconnect(formatSocketError(error), DisconnectType.error);
1027 
1028 			state = ConnectionState.connected;
1029 
1030 			//debug writefln("[%s] Connected", remoteAddressStr);
1031 			try
1032 				setKeepAlive();
1033 			catch (Exception e)
1034 				return disconnect(e.msg, DisconnectType.error);
1035 			if (connectHandler)
1036 				connectHandler();
1037 			return;
1038 		}
1039 		//debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
1040 
1041 		foreach (sendPartial; [true, false])
1042 			foreach (int priority, ref queue; outQueue)
1043 				while (queue.length && (!sendPartial || priority == partiallySent))
1044 				{
1045 					assert(partiallySent == -1 || partiallySent == priority);
1046 
1047 					auto pdata = queue.ptr; // pointer to first data
1048 
1049 					ptrdiff_t sent = 0;
1050 					if (pdata.length)
1051 					{
1052 						sent = doSend(pdata.contents);
1053 						debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length);
1054 					}
1055 					else
1056 					{
1057 						debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this);
1058 					}
1059 
1060 					if (sent == Socket.ERROR)
1061 					{
1062 						if (wouldHaveBlocked())
1063 							return;
1064 						else
1065 							return onError("send() error: " ~ lastSocketError);
1066 					}
1067 					else
1068 					if (sent < pdata.length)
1069 					{
1070 						if (sent > 0)
1071 						{
1072 							*pdata = (*pdata)[sent..pdata.length];
1073 							partiallySent = priority;
1074 						}
1075 						return;
1076 					}
1077 					else
1078 					{
1079 						assert(sent == pdata.length);
1080 						//debug writefln("[%s] Sent data:", remoteAddressStr);
1081 						//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1082 						pdata.clear();
1083 						queue = queue[1..$];
1084 						partiallySent = -1;
1085 						if (queue.length == 0)
1086 							queue = null;
1087 					}
1088 				}
1089 
1090 		// outQueue is now empty
1091 		if (bufferFlushedHandler)
1092 			bufferFlushedHandler();
1093 		if (state == ConnectionState.disconnecting)
1094 		{
1095 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1096 			close();
1097 		}
1098 	}
1099 
1100 public:
1101 	this(Socket conn)
1102 	{
1103 		super(conn);
1104 	}
1105 }
1106 
1107 // ***************************************************************************
1108 
1109 /// A POSIX file stream.
1110 /// Allows adding a file (e.g. stdin/stdout) to the socket manager.
1111 /// Does not dup the given file descriptor, so "disconnecting" this connection
1112 /// will close it.
1113 version (Posix)
1114 class FileConnection : StreamConnection
1115 {
1116 	this(int fileno)
1117 	{
1118 		auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC);
1119 		conn.blocking = false;
1120 		super(conn);
1121 	}
1122 
1123 protected:
1124 	import core.sys.posix.unistd : read, write;
1125 
1126 	override sizediff_t doSend(in void[] buffer)
1127 	{
1128 		return write(socket.handle, buffer.ptr, buffer.length);
1129 	}
1130 
1131 	override sizediff_t doReceive(void[] buffer)
1132 	{
1133 		return read(socket.handle, buffer.ptr, buffer.length);
1134 	}
1135 }
1136 
1137 /// Separates reading and writing, e.g. for stdin/stdout.
1138 class Duplex : IConnection
1139 {
1140 	IConnection reader, writer;
1141 
1142 	this(IConnection reader, IConnection writer)
1143 	{
1144 		this.reader = reader;
1145 		this.writer = writer;
1146 		reader.handleConnect = &onConnect;
1147 		writer.handleConnect = &onConnect;
1148 		reader.handleDisconnect = &onDisconnect;
1149 		writer.handleDisconnect = &onDisconnect;
1150 	}
1151 
1152 	@property ConnectionState state()
1153 	{
1154 		if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting)
1155 			return ConnectionState.disconnecting;
1156 		else
1157 			return reader.state < writer.state ? reader.state : writer.state;
1158 	}
1159 
1160 	/// Queue Data for sending.
1161 	void send(Data[] data, int priority)
1162 	{
1163 		writer.send(data, priority);
1164 	}
1165 
1166 	alias send = IConnection.send; /// ditto
1167 
1168 	/// Terminate the connection.
1169 	/// Note: this isn't quite fleshed out - applications may want to
1170 	/// wait and send some more data even after stdin is closed, but
1171 	/// such an interface can't be fitted into an IConnection
1172 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1173 	{
1174 		if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting)
1175 			reader.disconnect(reason, type);
1176 		if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting)
1177 			writer.disconnect(reason, type);
1178 		debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1179 	}
1180 
1181 	protected void onConnect()
1182 	{
1183 		if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected)
1184 			connectHandler();
1185 	}
1186 
1187 	protected void onDisconnect(string reason, DisconnectType type)
1188 	{
1189 		debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1190 		if (disconnectHandler)
1191 		{
1192 			disconnectHandler(reason, type);
1193 			disconnectHandler = null; // don't call it twice for the other connection
1194 		}
1195 		// It is our responsibility to disconnect the other connection
1196 		// Use DisconnectType.requested to ensure that any written data is flushed
1197 		disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested);
1198 	}
1199 
1200 	/// Callback for when a connection has been established.
1201 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1202 	private ConnectHandler connectHandler;
1203 
1204 	/// Callback setter for when new data is read.
1205 	@property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; }
1206 
1207 	/// Callback setter for when a connection was closed.
1208 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1209 	private DisconnectHandler disconnectHandler;
1210 
1211 	/// Callback setter for when all queued data has been written.
1212 	@property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; }
1213 }
1214 
1215 unittest { if (false) new Duplex(null, null); }
1216 
1217 // ***************************************************************************
1218 
1219 /// An asynchronous socket-based connection.
1220 class SocketConnection : StreamConnection
1221 {
1222 protected:
1223 	AddressInfo[] addressQueue;
1224 
1225 	this(Socket conn)
1226 	{
1227 		super(conn);
1228 	}
1229 
1230 	override sizediff_t doSend(in void[] buffer)
1231 	{
1232 		return conn.send(buffer);
1233 	}
1234 
1235 	override sizediff_t doReceive(void[] buffer)
1236 	{
1237 		return conn.receive(buffer);
1238 	}
1239 
1240 	final void tryNextAddress()
1241 	{
1242 		assert(state == ConnectionState.connecting);
1243 		auto addressInfo = addressQueue[0];
1244 		addressQueue = addressQueue[1..$];
1245 
1246 		try
1247 		{
1248 			conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
1249 			conn.blocking = false;
1250 
1251 			socketManager.register(this);
1252 			updateFlags();
1253 			debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString());
1254 			conn.connect(addressInfo.address);
1255 		}
1256 		catch (SocketException e)
1257 			return onError("Connect error: " ~ e.msg);
1258 	}
1259 
1260 	/// Called when an error occurs on the socket.
1261 	override void onError(string reason)
1262 	{
1263 		if (state == ConnectionState.connecting && addressQueue.length)
1264 		{
1265 			socketManager.unregister(this);
1266 			conn.close();
1267 			conn = null;
1268 
1269 			return tryNextAddress();
1270 		}
1271 
1272 		super.onError(reason);
1273 	}
1274 
1275 public:
1276 	/// Default constructor
1277 	this()
1278 	{
1279 		debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this);
1280 	}
1281 
1282 	/// Start establishing a connection.
1283 	final void connect(AddressInfo[] addresses)
1284 	{
1285 		assert(addresses.length, "No addresses specified");
1286 
1287 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1288 		assert(!conn);
1289 
1290 		addressQueue = addresses;
1291 		state = ConnectionState.connecting;
1292 		tryNextAddress();
1293 	}
1294 }
1295 
1296 /// An asynchronous TCP connection.
1297 class TcpConnection : SocketConnection
1298 {
1299 protected:
1300 	this(Socket conn)
1301 	{
1302 		super(conn);
1303 	}
1304 
1305 public:
1306 	/// Default constructor
1307 	this()
1308 	{
1309 		debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this);
1310 	}
1311 
1312 	alias connect = SocketConnection.connect; // raise overload
1313 
1314 	/// Start establishing a connection.
1315 	final void connect(string host, ushort port)
1316 	{
1317 		assert(host.length, "Empty host");
1318 		assert(port, "No port specified");
1319 
1320 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1321 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1322 
1323 		state = ConnectionState.resolving;
1324 
1325 		AddressInfo[] addressInfos;
1326 		try
1327 		{
1328 			auto addresses = getAddress(host, port);
1329 			enforce(addresses.length, "No addresses found");
1330 			debug (ASOCKETS)
1331 			{
1332 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1333 				foreach (address; addresses)
1334 					stderr.writefln("- %s", address.toString());
1335 			}
1336 
1337 			if (addresses.length > 1)
1338 			{
1339 				import std.random : randomShuffle;
1340 				randomShuffle(addresses);
1341 			}
1342 
1343 			foreach (address; addresses)
1344 				addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host);
1345 		}
1346 		catch (SocketException e)
1347 			return onError("Lookup error: " ~ e.msg);
1348 
1349 		state = ConnectionState.disconnected;
1350 		connect(addressInfos);
1351 	}
1352 }
1353 
1354 // ***************************************************************************
1355 
1356 /// An asynchronous connection server for socket-based connections.
1357 class SocketServer
1358 {
1359 protected:
1360 	/// Class that actually performs listening on a certain address family
1361 	final class Listener : GenericSocket
1362 	{
1363 		this(Socket conn)
1364 		{
1365 			debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this);
1366 			this.conn = conn;
1367 			socketManager.register(this);
1368 		}
1369 
1370 		/// Called when a socket is readable.
1371 		override void onReadable()
1372 		{
1373 			debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this);
1374 			Socket acceptSocket = conn.accept();
1375 			acceptSocket.blocking = false;
1376 			if (handleAccept)
1377 			{
1378 				auto connection = createConnection(acceptSocket);
1379 				debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr);
1380 				connection.setKeepAlive();
1381 				//assert(connection.connected);
1382 				//connection.connected = true;
1383 				acceptHandler(connection);
1384 			}
1385 			else
1386 				acceptSocket.close();
1387 		}
1388 
1389 		/// Called when a socket is writable.
1390 		override void onWritable()
1391 		{
1392 		}
1393 
1394 		/// Called when an error occurs on the socket.
1395 		override void onError(string reason)
1396 		{
1397 			close(); // call parent
1398 		}
1399 
1400 		void closeListener()
1401 		{
1402 			assert(conn);
1403 			socketManager.unregister(this);
1404 			conn.close();
1405 			conn = null;
1406 		}
1407 	}
1408 
1409 	SocketConnection createConnection(Socket socket)
1410 	{
1411 		return new SocketConnection(socket);
1412 	}
1413 
1414 	/// Whether the socket is listening.
1415 	bool listening;
1416 	/// Listener instances
1417 	Listener[] listeners;
1418 
1419 	final void updateFlags()
1420 	{
1421 		foreach (listener; listeners)
1422 			listener.notifyRead = handleAccept !is null;
1423 	}
1424 
1425 public:
1426 	/// Start listening on this socket.
1427 	final void listen(AddressInfo[] addressInfos)
1428 	{
1429 		foreach (ref addressInfo; addressInfos)
1430 		{
1431 			try
1432 			{
1433 				Socket conn = new Socket(addressInfo);
1434 				conn.blocking = false;
1435 				if (addressInfo.family == AddressFamily.INET6)
1436 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1437 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1438 
1439 				conn.bind(addressInfo.address);
1440 				conn.listen(8);
1441 
1442 				listeners ~= new Listener(conn);
1443 			}
1444 			catch (SocketException e)
1445 			{
1446 				debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1447 				debug(ASOCKETS) stderr.writeln(e.msg);
1448 			}
1449 		}
1450 
1451 		if (listeners.length==0)
1452 			throw new Exception("Unable to bind service");
1453 
1454 		listening = true;
1455 
1456 		updateFlags();
1457 	}
1458 
1459 	this()
1460 	{
1461 	}
1462 
1463 	/// Creates a Server with the given sockets.
1464 	/// The sockets must have already had `bind` and `listen` called on them.
1465 	this(Socket[] sockets...)
1466 	{
1467 		foreach (socket; sockets)
1468 			listeners ~= new Listener(socket);
1469 	}
1470 
1471 	final @property Address[] localAddresses()
1472 	{
1473 		Address[] result;
1474 		foreach (listener; listeners)
1475 			result ~= listener.localAddress;
1476 		return result;
1477 	}
1478 
1479 	final @property bool isListening()
1480 	{
1481 		return listening;
1482 	}
1483 
1484 	/// Stop listening on this socket.
1485 	final void close()
1486 	{
1487 		foreach (listener;listeners)
1488 			listener.closeListener();
1489 		listeners = null;
1490 		listening = false;
1491 		if (handleClose)
1492 			handleClose();
1493 	}
1494 
1495 	/// Create a SocketServer using the handle passed on standard input,
1496 	/// for which `listen` had already been called. Used by
1497 	/// e.g. FastCGI and systemd sockets with "Listen = yes".
1498 	static SocketServer fromStdin()
1499 	{
1500 		socket_t socket;
1501 		version (Windows)
1502 		{
1503 			import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE;
1504 			socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE);
1505 		}
1506 		else
1507 			socket = cast(socket_t)0;
1508 
1509 		auto s = new Socket(socket, AddressFamily.UNSPEC);
1510 		s.blocking = false;
1511 		return new SocketServer(s);
1512 	}
1513 
1514 	/// Callback for when the socket was closed.
1515 	void delegate() handleClose;
1516 
1517 	private void delegate(SocketConnection incoming) acceptHandler;
1518 	/// Callback for an incoming connection.
1519 	/// Connections will not be accepted unless this handler is set.
1520 	@property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; }
1521 	/// ditto
1522 	@property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); }
1523 }
1524 
1525 /// An asynchronous TCP connection server.
1526 class TcpServer : SocketServer
1527 {
1528 protected:
1529 	override SocketConnection createConnection(Socket socket)
1530 	{
1531 		return new TcpConnection(socket);
1532 	}
1533 
1534 public:
1535 	this()
1536 	{
1537 	}
1538 
1539 	this(Socket[] sockets...)
1540 	{
1541 		super(sockets);
1542 	}
1543 
1544 	alias listen = SocketServer.listen; // raise overload
1545 
1546 	/// Start listening on this socket.
1547 	final ushort listen(ushort port, string addr = null)
1548 	{
1549 		debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port);
1550 		//assert(!listening, "Attempting to listen on a listening socket");
1551 
1552 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1553 
1554 		debug (ASOCKETS)
1555 		{
1556 			stderr.writefln("Resolved to %s addresses:", addressInfos.length);
1557 			foreach (ref addressInfo; addressInfos)
1558 				stderr.writefln("- %s", addressInfo);
1559 		}
1560 
1561 		// listen on random ports only on IPv4 for now
1562 		if (port == 0)
1563 		{
1564 			foreach_reverse (i, ref addressInfo; addressInfos)
1565 				if (addressInfo.family != AddressFamily.INET)
1566 					addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$];
1567 		}
1568 
1569 		listen(addressInfos);
1570 
1571 		foreach (listener; listeners)
1572 		{
1573 			auto address = listener.conn.localAddress();
1574 			if (address.addressFamily == AddressFamily.INET)
1575 				port = to!ushort(address.toPortString());
1576 		}
1577 
1578 		return port;
1579 	}
1580 
1581 	deprecated("Use SocketServer.fromStdin")
1582 	static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; }
1583 
1584 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); }
1585 }
1586 
1587 // ***************************************************************************
1588 
1589 /// An asynchronous UDP stream.
1590 /// UDP does not have connections, so this class encapsulates a socket
1591 /// with a fixed destination (sendto) address, and optionally bound to
1592 /// a local address.
1593 /// Currently received packets' address is not exposed.
1594 class UdpConnection : Connection
1595 {
1596 protected:
1597 	this(Socket conn)
1598 	{
1599 		super(conn);
1600 	}
1601 
1602 	/// Called when a socket is writable.
1603 	override void onWritable()
1604 	{
1605 		//scope(success) updateFlags();
1606 		onWritableImpl();
1607 		updateFlags();
1608 	}
1609 
1610 	// Work around scope(success) breaking debugger stack traces
1611 	final private void onWritableImpl()
1612 	{
1613 		foreach (priority, ref queue; outQueue)
1614 			while (queue.length)
1615 			{
1616 				auto pdata = queue.ptr; // pointer to first data
1617 
1618 				auto sent = conn.sendTo(pdata.contents, remoteAddress);
1619 
1620 				if (sent == Socket.ERROR)
1621 				{
1622 					if (wouldHaveBlocked())
1623 						return;
1624 					else
1625 						return onError("send() error: " ~ lastSocketError);
1626 				}
1627 				else
1628 				if (sent < pdata.length)
1629 				{
1630 					return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length));
1631 				}
1632 				else
1633 				{
1634 					assert(sent == pdata.length);
1635 					//debug writefln("[%s] Sent data:", remoteAddressStr);
1636 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1637 					pdata.clear();
1638 					queue = queue[1..$];
1639 					if (queue.length == 0)
1640 						queue = null;
1641 				}
1642 			}
1643 
1644 		// outQueue is now empty
1645 		if (bufferFlushedHandler)
1646 			bufferFlushedHandler();
1647 		if (state == ConnectionState.disconnecting)
1648 		{
1649 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1650 			close();
1651 		}
1652 	}
1653 
1654 	override sizediff_t doSend(in void[] buffer)
1655 	{
1656 		assert(false); // never called (called only from overridden methods)
1657 	}
1658 
1659 	override sizediff_t doReceive(void[] buffer)
1660 	{
1661 		return conn.receive(buffer);
1662 	}
1663 
1664 public:
1665 	/// Default constructor
1666 	this()
1667 	{
1668 		debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this);
1669 	}
1670 
1671 	/// Initialize with the given AddressFamily, without binding to an address.
1672 	final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP)
1673 	{
1674 		initializeImpl(family, type, protocol);
1675 		if (connectHandler)
1676 			connectHandler();
1677 	}
1678 
1679 	final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol)
1680 	{
1681 		assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state));
1682 		assert(!conn);
1683 
1684 		conn = new Socket(family, type, protocol);
1685 		conn.blocking = false;
1686 		socketManager.register(this);
1687 		state = ConnectionState.connected;
1688 		updateFlags();
1689 	}
1690 
1691 	/// Bind to a local address in order to receive packets sent there.
1692 	final ushort bind(string host, ushort port)
1693 	{
1694 		assert(host.length, "Empty host");
1695 
1696 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1697 
1698 		state = ConnectionState.resolving;
1699 
1700 		AddressInfo addressInfo;
1701 		try
1702 		{
1703 			auto addresses = getAddress(host, port);
1704 			enforce(addresses.length, "No addresses found");
1705 			debug (ASOCKETS)
1706 			{
1707 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1708 				foreach (address; addresses)
1709 					stderr.writefln("- %s", address.toString());
1710 			}
1711 
1712 			Address address;
1713 			if (addresses.length > 1)
1714 			{
1715 				import std.random : uniform;
1716 				address = addresses[uniform(0, $)];
1717 			}
1718 			else
1719 				address = addresses[0];
1720 			addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host);
1721 		}
1722 		catch (SocketException e)
1723 		{
1724 			onError("Lookup error: " ~ e.msg);
1725 			return 0;
1726 		}
1727 
1728 		state = ConnectionState.disconnected;
1729 		return bind(addressInfo);
1730 	}
1731 
1732 	/// ditto
1733 	final ushort bind(AddressInfo addressInfo)
1734 	{
1735 		initialize(addressInfo.family, addressInfo.type, addressInfo.protocol);
1736 		conn.bind(addressInfo.address);
1737 
1738 		auto address = conn.localAddress();
1739 		auto port = to!ushort(address.toPortString());
1740 
1741 		if (connectHandler)
1742 			connectHandler();
1743 
1744 		return port;
1745 	}
1746 
1747 public:
1748 	/// Where to send packets to.
1749 	Address remoteAddress;
1750 }
1751 
1752 ///
1753 unittest
1754 {
1755 	auto server = new UdpConnection();
1756 	auto serverPort = server.bind("localhost", 0);
1757 
1758 	auto client = new UdpConnection();
1759 	client.initialize(server.localAddress.addressFamily);
1760 
1761 	string[] packets = ["Hello", "there"];
1762 	client.remoteAddress = server.localAddress;
1763 	client.send({
1764 		Data[] data;
1765 		foreach (packet; packets)
1766 			data ~= Data(packet);
1767 		return data;
1768 	}());
1769 
1770 	server.handleReadData = (Data data)
1771 	{
1772 		assert(data.contents == packets[0]);
1773 		packets = packets[1..$];
1774 		if (!packets.length)
1775 		{
1776 			server.close();
1777 			client.close();
1778 		}
1779 	};
1780 	socketManager.loop();
1781 	assert(!packets.length);
1782 }
1783 
1784 // ***************************************************************************
1785 
1786 /// Base class for a connection adapter.
1787 /// By itself, does nothing.
1788 class ConnectionAdapter : IConnection
1789 {
1790 	IConnection next;
1791 
1792 	this(IConnection next)
1793 	{
1794 		this.next = next;
1795 		next.handleConnect = &onConnect;
1796 		next.handleDisconnect = &onDisconnect;
1797 		next.handleBufferFlushed = &onBufferFlushed;
1798 	}
1799 
1800 	@property ConnectionState state() { return next.state; }
1801 
1802 	/// Queue Data for sending.
1803 	void send(Data[] data, int priority)
1804 	{
1805 		next.send(data, priority);
1806 	}
1807 
1808 	alias send = IConnection.send; /// ditto
1809 
1810 	/// Terminate the connection.
1811 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1812 	{
1813 		next.disconnect(reason, type);
1814 	}
1815 
1816 	protected void onConnect()
1817 	{
1818 		if (connectHandler)
1819 			connectHandler();
1820 	}
1821 
1822 	protected void onReadData(Data data)
1823 	{
1824 		// onReadData should be fired only if readDataHandler is set
1825 		readDataHandler(data);
1826 	}
1827 
1828 	protected void onDisconnect(string reason, DisconnectType type)
1829 	{
1830 		if (disconnectHandler)
1831 			disconnectHandler(reason, type);
1832 	}
1833 
1834 	protected void onBufferFlushed()
1835 	{
1836 		if (bufferFlushedHandler)
1837 			bufferFlushedHandler();
1838 	}
1839 
1840 	/// Callback for when a connection has been established.
1841 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1842 	private ConnectHandler connectHandler;
1843 
1844 	/// Callback setter for when new data is read.
1845 	@property void handleReadData(ReadDataHandler value)
1846 	{
1847 		readDataHandler = value;
1848 		next.handleReadData = value ? &onReadData : null ;
1849 	}
1850 	private ReadDataHandler readDataHandler;
1851 
1852 	/// Callback setter for when a connection was closed.
1853 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1854 	private DisconnectHandler disconnectHandler;
1855 
1856 	/// Callback setter for when all queued data has been written.
1857 	@property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; }
1858 	private BufferFlushedHandler bufferFlushedHandler;
1859 }
1860 
1861 // ***************************************************************************
1862 
1863 /// Adapter for connections with a line-based protocol.
1864 /// Splits data stream into delimiter-separated lines.
1865 class LineBufferedAdapter : ConnectionAdapter
1866 {
1867 	/// The protocol's line delimiter.
1868 	string delimiter = "\r\n";
1869 
1870 	this(IConnection next)
1871 	{
1872 		super(next);
1873 	}
1874 
1875 	/// Append a line to the send buffer.
1876 	void send(string line)
1877 	{
1878 		//super.send(Data(line ~ delimiter));
1879 		// https://issues.dlang.org/show_bug.cgi?id=13985
1880 		ConnectionAdapter ca = this;
1881 		ca.send(Data(line ~ delimiter));
1882 	}
1883 
1884 protected:
1885 	/// The receive buffer.
1886 	Data inBuffer;
1887 
1888 	/// Called when data has been received.
1889 	final override void onReadData(Data data)
1890 	{
1891 		import std.string;
1892 		auto oldBufferLength = inBuffer.length;
1893 		if (oldBufferLength)
1894 			inBuffer ~= data;
1895 		else
1896 			inBuffer = data;
1897 
1898 		if (delimiter.length == 1)
1899 		{
1900 			import core.stdc.string; // memchr
1901 
1902 			char c = delimiter[0];
1903 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1904 			while (p)
1905 			{
1906 				sizediff_t index = p - inBuffer.ptr;
1907 				processLine(index);
1908 
1909 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1910 			}
1911 		}
1912 		else
1913 		{
1914 			sizediff_t index;
1915 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1916 			while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0)
1917 				processLine(index);
1918 		}
1919 	}
1920 
1921 	final void processLine(size_t index)
1922 	{
1923 		auto line = inBuffer[0..index];
1924 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1925 		super.onReadData(line);
1926 	}
1927 
1928 	override void onDisconnect(string reason, DisconnectType type)
1929 	{
1930 		super.onDisconnect(reason, type);
1931 		inBuffer.clear();
1932 	}
1933 }
1934 
1935 // ***************************************************************************
1936 
1937 /// Fires an event handler or disconnects connections
1938 /// after a period of inactivity.
1939 class TimeoutAdapter : ConnectionAdapter
1940 {
1941 	this(IConnection next)
1942 	{
1943 		debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this);
1944 		super(next);
1945 	}
1946 
1947 	void cancelIdleTimeout()
1948 	{
1949 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
1950 		assert(idleTask !is null);
1951 		assert(idleTask.isWaiting());
1952 		idleTask.cancel();
1953 	}
1954 
1955 	void resumeIdleTimeout()
1956 	{
1957 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
1958 		assert(idleTask !is null);
1959 		assert(!idleTask.isWaiting());
1960 		mainTimer.add(idleTask);
1961 	}
1962 
1963 	final void setIdleTimeout(Duration duration)
1964 	{
1965 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
1966 		assert(duration > Duration.zero);
1967 
1968 		// Configure idleTask
1969 		if (idleTask is null)
1970 		{
1971 			idleTask = new TimerTask(duration);
1972 			idleTask.handleTask = &onTask_Idle;
1973 		}
1974 		else
1975 		{
1976 			if (idleTask.isWaiting())
1977 				idleTask.cancel();
1978 			idleTask.delay = duration;
1979 		}
1980 
1981 		mainTimer.add(idleTask);
1982 	}
1983 
1984 	void markNonIdle()
1985 	{
1986 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
1987 		if (handleNonIdle)
1988 			handleNonIdle();
1989 		if (idleTask && idleTask.isWaiting())
1990 			idleTask.restart();
1991 	}
1992 
1993 	/// Callback for when a connection has stopped responding.
1994 	/// If unset, the connection will be disconnected.
1995 	void delegate() handleIdleTimeout;
1996 
1997 	/// Callback for when a connection is marked as non-idle
1998 	/// (when data is received).
1999 	void delegate() handleNonIdle;
2000 
2001 protected:
2002 	override void onConnect()
2003 	{
2004 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
2005 		markNonIdle();
2006 		super.onConnect();
2007 	}
2008 
2009 	override void onReadData(Data data)
2010 	{
2011 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
2012 		markNonIdle();
2013 		super.onReadData(data);
2014 	}
2015 
2016 	override void onDisconnect(string reason, DisconnectType type)
2017 	{
2018 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
2019 		if (idleTask && idleTask.isWaiting())
2020 			idleTask.cancel();
2021 		super.onDisconnect(reason, type);
2022 	}
2023 
2024 private:
2025 	TimerTask idleTask; // non-null if an idle timeout has been set
2026 
2027 	final void onTask_Idle(Timer timer, TimerTask task)
2028 	{
2029 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this);
2030 		if (state == ConnectionState.disconnecting)
2031 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
2032 
2033 		if (state == ConnectionState.disconnected)
2034 			return;
2035 
2036 		if (handleIdleTimeout)
2037 		{
2038 			resumeIdleTimeout(); // reschedule (by default)
2039 			handleIdleTimeout();
2040 		}
2041 		else
2042 			disconnect("Time-out", DisconnectType.error);
2043 	}
2044 }
2045 
2046 // ***************************************************************************
2047 
2048 unittest
2049 {
2050 	void testTimer()
2051 	{
2052 		bool fired;
2053 		setTimeout({fired = true;}, 10.msecs);
2054 		socketManager.loop();
2055 		assert(fired);
2056 	}
2057 
2058 	testTimer();
2059 }