1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <ae@cy.md>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.array : toArray;
21 import ae.utils.math;
22 public import ae.sys.data;
23 
24 import core.stdc.stdint : int32_t;
25 
26 import std.exception;
27 import std.parallelism : totalCPUs;
28 import std.socket;
29 import std..string : format;
30 public import std.socket : Address, AddressInfo, Socket;
31 
32 version (Windows)
33     private import c_socks = core.sys.windows.winsock2;
34 else version (Posix)
35     private import c_socks = core.sys.posix.sys.socket;
36 
37 debug(ASOCKETS) import std.stdio : stderr;
38 debug(PRINTDATA) import std.stdio : stderr;
39 debug(PRINTDATA) import ae.utils.text : hexDump;
40 private import std.conv : to;
41 
42 
43 // https://issues.dlang.org/show_bug.cgi?id=7016
44 static import ae.utils.array;
45 
46 version(LIBEV)
47 {
48 	import deimos.ev;
49 	pragma(lib, "ev");
50 }
51 
52 version(Windows)
53 {
54 	import core.sys.windows.windows : Sleep;
55 	private enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
56 }
57 else
58 	private enum USE_SLEEP = false;
59 
60 /// Flags that determine socket wake-up events.
61 int eventCounter;
62 
63 version(LIBEV)
64 {
65 	// Watchers are a GenericSocket field (as declared in SocketMixin).
66 	// Use one watcher per read and write event.
67 	// Start/stop those as higher-level code declares interest in those events.
68 	// Use the "data" ev_io field to store the parent GenericSocket address.
69 	// Also use the "data" field as a flag to indicate whether the watcher is active
70 	// (data is null when the watcher is stopped).
71 
72 	/// `libev`-based event loop implementation.
73 	struct SocketManager
74 	{
75 	private:
76 		size_t count;
77 
78 		extern(C)
79 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
80 		{
81 			eventCounter++;
82 			auto socket = cast(GenericSocket)w.data;
83 			assert(socket, "libev event fired on stopped watcher");
84 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents);
85 
86 			if (revents & EV_READ)
87 				socket.onReadable();
88 			else
89 			if (revents & EV_WRITE)
90 				socket.onWritable();
91 			else
92 				assert(false, "Unknown event fired from libev");
93 
94 			// TODO? Need to get proper SocketManager instance to call updateTimer on
95 			socketManager.updateTimer(false);
96 		}
97 
98 		ev_timer evTimer;
99 		MonoTime lastNextEvent = MonoTime.max;
100 
101 		extern(C)
102 		static void timerCallback(ev_loop_t* l, ev_timer* w, int /*revents*/)
103 		{
104 			eventCounter++;
105 			debug (ASOCKETS) writefln("Timer callback called.");
106 			mainTimer.prod();
107 
108 			socketManager.updateTimer(true);
109 			debug (ASOCKETS) writefln("Timer callback exiting.");
110 		}
111 
112 		void updateTimer(bool force)
113 		{
114 			auto nextEvent = mainTimer.getNextEvent();
115 			if (force || lastNextEvent != nextEvent)
116 			{
117 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
118 				if (nextEvent == MonoTime.max) // Stopping
119 				{
120 					if (lastNextEvent != MonoTime.max)
121 						ev_timer_stop(ev_default_loop(0), &evTimer);
122 				}
123 				else
124 				{
125 					auto remaining = mainTimer.getRemainingTime();
126 					while (remaining <= Duration.zero)
127 					{
128 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
129 						mainTimer.prod();
130 						remaining = mainTimer.getRemainingTime();
131 					}
132 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
133 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
134 					if (lastNextEvent == MonoTime.max) // Starting
135 					{
136 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
137 						ev_timer_start(ev_default_loop(0), &evTimer);
138 					}
139 					else // Adjusting
140 					{
141 						evTimer.repeat = tstamp;
142 						ev_timer_again(ev_default_loop(0), &evTimer);
143 					}
144 				}
145 				lastNextEvent = nextEvent;
146 			}
147 		}
148 
149 	public:
150 		/// Register a socket with the manager.
151 		void register(GenericSocket socket)
152 		{
153 			debug (ASOCKETS) writefln("Registering %s", socket);
154 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
155 			auto fd = socket.conn.handle;
156 			assert(fd, "Must have fd before socket registration");
157 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
158 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
159 			count++;
160 		}
161 
162 		/// Unregister a socket with the manager.
163 		void unregister(GenericSocket socket)
164 		{
165 			debug (ASOCKETS) writefln("Unregistering %s", socket);
166 			socket.notifyRead  = false;
167 			socket.notifyWrite = false;
168 			count--;
169 		}
170 
171 		/// Returns the number of registered sockets.
172 		size_t size()
173 		{
174 			return count;
175 		}
176 
177 		/// Loop continuously until no sockets are left.
178 		void loop()
179 		{
180 			auto evLoop = ev_default_loop(0);
181 			enforce(evLoop, "libev initialization failure");
182 
183 			updateTimer(true);
184 			debug (ASOCKETS) writeln("ev_run");
185 			ev_run(ev_default_loop(0), 0);
186 		}
187 	}
188 
189 	private mixin template SocketMixin()
190 	{
191 		private ev_io evRead, evWrite;
192 
193 		private void setWatcherState(ref ev_io ev, bool newValue, int /*event*/)
194 		{
195 			if (!conn)
196 			{
197 				// Can happen when setting delegates before connecting.
198 				return;
199 			}
200 
201 			if (newValue && !ev.data)
202 			{
203 				// Start
204 				ev.data = cast(void*)this;
205 				ev_io_start(ev_default_loop(0), &ev);
206 			}
207 			else
208 			if (!newValue && ev.data)
209 			{
210 				// Stop
211 				assert(ev.data is cast(void*)this);
212 				ev.data = null;
213 				ev_io_stop(ev_default_loop(0), &ev);
214 			}
215 		}
216 
217 		/// Interested in read notifications (onReadable)?
218 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
219 		/// Interested in write notifications (onWritable)?
220 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
221 
222 		debug ~this() @nogc
223 		{
224 			// The LIBEV SocketManager holds no references to registered sockets.
225 			// TODO: Add a doubly-linked list?
226 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
227 		}
228 	}
229 }
230 else // Use select
231 {
232 	/// `select`-based event loop implementation.
233 	struct SocketManager
234 	{
235 	private:
236 		enum FD_SETSIZE = 1024;
237 
238 		/// List of all sockets to poll.
239 		GenericSocket[] sockets;
240 
241 		/// Debug AA to check for dangling socket references.
242 		debug GenericSocket[socket_t] socketHandles;
243 
244 		void delegate()[] nextTickHandlers, idleHandlers;
245 
246 	public:
247 		/// Register a socket with the manager.
248 		void register(GenericSocket conn)
249 		{
250 			debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1);
251 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
252 			sockets ~= conn;
253 
254 			debug
255 			{
256 				auto handle = conn.socket.handle;
257 				assert(handle != socket_t.init, "Can't register a closed socket");
258 				assert(handle !in socketHandles, "This socket handle is already registered");
259 				socketHandles[handle] = conn;
260 			}
261 		}
262 
263 		/// Unregister a socket with the manager.
264 		void unregister(GenericSocket conn)
265 		{
266 			debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
267 
268 			debug
269 			{
270 				auto handle = conn.socket.handle;
271 				assert(handle != socket_t.init, "Can't unregister a closed socket");
272 				auto pconn = handle in socketHandles;
273 				assert(pconn, "This socket handle is not registered");
274 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
275 				socketHandles.remove(handle);
276 			}
277 
278 			foreach (size_t i, GenericSocket j; sockets)
279 				if (j is conn)
280 				{
281 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
282 					return;
283 				}
284 			assert(false, "Socket not registered");
285 		}
286 
287 		/// Returns the number of registered sockets.
288 		size_t size()
289 		{
290 			return sockets.length;
291 		}
292 
293 		/// Loop continuously until no sockets are left.
294 		void loop()
295 		{
296 			debug (ASOCKETS) stderr.writeln("Starting event loop.");
297 
298 			SocketSet readset, writeset;
299 			size_t sockcount;
300 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
301 			readset  = new SocketSet(setSize);
302 			writeset = new SocketSet(setSize);
303 			while (true)
304 			{
305 				if (nextTickHandlers.length)
306 				{
307 					auto thisTickHandlers = nextTickHandlers;
308 					nextTickHandlers = null;
309 
310 					foreach (handler; thisTickHandlers)
311 						handler();
312 
313 					continue;
314 				}
315 
316 				uint minSize = 0;
317 				version(Windows)
318 					minSize = cast(uint)sockets.length;
319 				else
320 				{
321 					foreach (s; sockets)
322 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
323 							minSize = s.socket.handle;
324 				}
325 				minSize++;
326 
327 				if (setSize < minSize)
328 				{
329 					debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
330 					setSize = minSize * 2;
331 					readset  = new SocketSet(setSize);
332 					writeset = new SocketSet(setSize);
333 				}
334 				else
335 				{
336 					readset.reset();
337 					writeset.reset();
338 				}
339 
340 				sockcount = 0;
341 				bool haveActive;
342 				debug (ASOCKETS) stderr.writeln("Populating sets");
343 				foreach (GenericSocket conn; sockets)
344 				{
345 					if (!conn.socket)
346 						continue;
347 					sockcount++;
348 
349 					debug (ASOCKETS) stderr.writef("\t%s:", conn);
350 					if (conn.notifyRead)
351 					{
352 						readset.add(conn.socket);
353 						if (!conn.daemonRead)
354 							haveActive = true;
355 						debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : "");
356 					}
357 					if (conn.notifyWrite)
358 					{
359 						writeset.add(conn.socket);
360 						if (!conn.daemonWrite)
361 							haveActive = true;
362 						debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : "");
363 					}
364 					debug (ASOCKETS) stderr.writeln();
365 				}
366 				debug (ASOCKETS)
367 				{
368 					stderr.writefln("Sets populated as follows:");
369 					printSets(readset, writeset);
370 				}
371 
372 				debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...",
373 					haveActive ? "" : "not ",
374 					sockcount,
375 					mainTimer.isWaiting() ? "with" : "no",
376 					idleHandlers.length,
377 				);
378 				if (!haveActive && !mainTimer.isWaiting() && !nextTickHandlers.length)
379 				{
380 					debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop.");
381 					break;
382 				}
383 
384 				debug (ASOCKETS) stderr.flush();
385 
386 				int events;
387 				if (idleHandlers.length)
388 				{
389 					if (sockcount==0)
390 						events = 0;
391 					else
392 						events = Socket.select(readset, writeset, null, 0.seconds);
393 				}
394 				else
395 				if (USE_SLEEP && sockcount==0)
396 				{
397 					version(Windows)
398 					{
399 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
400 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
401 						if (duration <= 0)
402 							duration = 1; // Avoid busywait
403 						else
404 						if (duration > int.max)
405 							duration = int.max;
406 						Sleep(cast(int)duration);
407 						events = 0;
408 					}
409 					else
410 						assert(0);
411 				}
412 				else
413 				if (mainTimer.isWaiting())
414 					events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime());
415 				else
416 					events = Socket.select(readset, writeset, null);
417 
418 				debug (ASOCKETS) stderr.writefln("%d events fired.", events);
419 
420 				if (events > 0)
421 				{
422 					// Handle just one event at a time, as the first
423 					// handler might invalidate select()'s results.
424 					handleEvent(readset, writeset);
425 				}
426 				else
427 				if (idleHandlers.length)
428 				{
429 					import ae.utils.array : shift;
430 					auto handler = idleHandlers.shift();
431 
432 					// Rotate the idle handler queue before running it,
433 					// in case the handler unregisters itself.
434 					idleHandlers ~= handler;
435 
436 					handler();
437 				}
438 
439 				// Timers may invalidate our select results, so fire them after processing the latter
440 				mainTimer.prod();
441 
442 				eventCounter++;
443 			}
444 		}
445 
446 		debug (ASOCKETS)
447 		private void printSets(SocketSet readset, SocketSet writeset)
448 		{
449 			foreach (GenericSocket conn; sockets)
450 			{
451 				if (!conn.socket)
452 					stderr.writefln("\t\t%s is unset", conn);
453 				else
454 				{
455 					if (readset.isSet(conn.socket))
456 						stderr.writefln("\t\t%s is readable", conn);
457 					if (writeset.isSet(conn.socket))
458 						stderr.writefln("\t\t%s is writable", conn);
459 				}
460 			}
461 		}
462 
463 		private void handleEvent(SocketSet readset, SocketSet writeset)
464 		{
465 			debug (ASOCKETS)
466 			{
467 				stderr.writefln("\tSelect results:");
468 				printSets(readset, writeset);
469 			}
470 
471 			foreach (GenericSocket conn; sockets)
472 			{
473 				if (!conn.socket)
474 					continue;
475 
476 				if (readset.isSet(conn.socket))
477 				{
478 					debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn);
479 					return conn.onReadable();
480 				}
481 				else
482 				if (writeset.isSet(conn.socket))
483 				{
484 					debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn);
485 					return conn.onWritable();
486 				}
487 			}
488 
489 			assert(false, "select() reported events available, but no registered sockets are set");
490 		}
491 	}
492 
493 	/// Schedule a function to run on the next event loop iteration.
494 	/// Can be used to queue logic to run once all current execution frames exit.
495 	/// Similar to e.g. process.nextTick in Node.
496 	void onNextTick(ref SocketManager socketManager, void delegate() dg) pure @safe nothrow
497 	{
498 		socketManager.nextTickHandlers ~= dg;
499 	}
500 
501 	// Use UFCS to allow removeIdleHandler to have a predicate with context
502 	/// Register a function to be called when the event loop is idle,
503 	/// and would otherwise sleep.
504 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
505 	{
506 		foreach (i, idleHandler; socketManager.idleHandlers)
507 			assert(handler !is idleHandler);
508 
509 		socketManager.idleHandlers ~= handler;
510 	}
511 
512 	/// Unregister a function previously registered with `addIdleHandler`.
513 	void removeIdleHandler(alias pred=(a, b) => a is b, Args...)(ref SocketManager socketManager, Args args)
514 	{
515 		foreach (i, idleHandler; socketManager.idleHandlers)
516 			if (pred(idleHandler, args))
517 			{
518 				import std.algorithm : remove;
519 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
520 				return;
521 			}
522 		assert(false, "No such idle handler");
523 	}
524 
525 	private mixin template SocketMixin()
526 	{
527 		/// Interested in read notifications (onReadable)?
528 		bool notifyRead;
529 		/// Interested in write notifications (onWritable)?
530 		bool notifyWrite;
531 	}
532 }
533 
534 /// The default socket manager.
535 SocketManager socketManager;
536 
537 // ***************************************************************************
538 
539 /// General methods for an asynchronous socket.
540 abstract class GenericSocket
541 {
542 	/// Declares notifyRead and notifyWrite.
543 	mixin SocketMixin;
544 
545 protected:
546 	/// The socket this class wraps.
547 	Socket conn;
548 
549 // protected:
550 	/// Retrieve the socket class this class wraps.
551 	@property final Socket socket()
552 	{
553 		return conn;
554 	}
555 
556 	void onReadable()
557 	{
558 	}
559 
560 	void onWritable()
561 	{
562 	}
563 
564 	void onError(string /*reason*/)
565 	{
566 	}
567 
568 public:
569 	/// allow getting the address of connections that are already disconnected
570 	private Address[2] cachedAddress;
571 
572 	/*private*/ final @property Address _address(bool local)()
573 	{
574 		if (cachedAddress[local] !is null)
575 			return cachedAddress[local];
576 		else
577 		if (conn is null)
578 			return null;
579 		else
580 		{
581 			Address a;
582 			if (conn.addressFamily == AddressFamily.UNSPEC)
583 			{
584 				// Socket will attempt to construct an UnknownAddress,
585 				// which will almost certainly not match the real address length.
586 				static if (local)
587 					alias getname = c_socks.getsockname;
588 				else
589 					alias getname = c_socks.getpeername;
590 
591 				c_socks.socklen_t nameLen = 0;
592 				if (getname(conn.handle, null, &nameLen) < 0)
593 					throw new SocketOSException("Unable to obtain socket address");
594 
595 				auto buf = new ubyte[nameLen];
596 				auto sa = cast(c_socks.sockaddr*)buf.ptr;
597 				if (getname(conn.handle, sa, &nameLen) < 0)
598 					throw new SocketOSException("Unable to obtain socket address");
599 				a = new UnknownAddressReference(sa, nameLen);
600 			}
601 			else
602 				a = local ? conn.localAddress() : conn.remoteAddress();
603 			return cachedAddress[local] = a;
604 		}
605 	}
606 
607 	alias localAddress = _address!true; /// Retrieve this socket's local address.
608 	alias remoteAddress = _address!false; /// Retrieve this socket's remote address.
609 
610 	/*private*/ final @property string _addressStr(bool local)() nothrow
611 	{
612 		try
613 		{
614 			auto a = _address!local;
615 			if (a is null)
616 				return "[null address]";
617 			string host = a.toAddrString();
618 			import std..string : indexOf;
619 			if (host.indexOf(':') >= 0)
620 				host = "[" ~ host ~ "]";
621 			try
622 			{
623 				string port = a.toPortString();
624 				return host ~ ":" ~ port;
625 			}
626 			catch (Exception e)
627 				return host;
628 		}
629 		catch (Exception e)
630 			return "[error: " ~ e.msg ~ "]";
631 	}
632 
633 	alias localAddressStr = _addressStr!true; /// Retrieve this socket's local address, as a string.
634 	alias remoteAddressStr = _addressStr!false; /// Retrieve this socket's remote address, as a string.
635 
636 	/// Don't block the process from exiting, even if the socket is ready to receive data.
637 	/// TODO: Not implemented with libev
638 	bool daemonRead;
639 
640 	/// Don't block the process from exiting, even if the socket is ready to send data.
641 	/// TODO: Not implemented with libev
642 	bool daemonWrite;
643 
644 	deprecated alias daemon = daemonRead;
645 
646 	/// Enable TCP keep-alive on the socket with the given settings.
647 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
648 	{
649 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
650 		if (enabled)
651 		{
652 			try
653 				conn.setKeepAlive(time, interval);
654 			catch (SocketException)
655 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
656 		}
657 		else
658 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
659 	}
660 
661 	/// Returns a string containing the class name, address, and file descriptor.
662 	override string toString() const
663 	{
664 		import std..string : format, split;
665 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
666 	}
667 }
668 
669 // ***************************************************************************
670 
671 /// Classifies the cause of the disconnect.
672 /// Can be used to decide e.g. when it makes sense to reconnect.
673 enum DisconnectType
674 {
675 	requested, /// Initiated by the application.
676 	graceful,  /// The peer gracefully closed the connection.
677 	error      /// Some abnormal network condition.
678 }
679 
680 /// Used to indicate the state of a connection throughout its lifecycle.
681 enum ConnectionState
682 {
683 	/// The initial state, or the state after a disconnect was fully processed.
684 	disconnected,
685 
686 	/// Name resolution. Currently done synchronously.
687 	resolving,
688 
689 	/// A connection attempt is in progress.
690 	connecting,
691 
692 	/// A connection is established.
693 	connected,
694 
695 	/// Disconnecting in progress. No data can be sent or received at this point.
696 	/// We are waiting for queued data to be actually sent before disconnecting.
697 	disconnecting,
698 }
699 
700 /// Returns true if this is a connection state for which disconnecting is valid.
701 /// Generally, applications should be aware of the life cycle of their sockets,
702 /// so checking the state of a connection is unnecessary (and a code smell).
703 /// However, unconditionally disconnecting some connected sockets can be useful
704 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle),
705 /// such as in response to a signal.
706 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; }
707 
708 /// Common interface for connections and adapters.
709 interface IConnection
710 {
711 	/// `send` queues data for sending in one of five queues, indexed
712 	/// by a numeric priority.
713 	/// `MAX_PRIORITY` is the highest (least urgent) priority index.
714 	/// `DEFAULT_PRIORITY` is the default priority
715 	enum MAX_PRIORITY = 4;
716 	enum DEFAULT_PRIORITY = 2; /// ditto
717 
718 	/// This is the default value for the `disconnect` `reason` string parameter.
719 	static const defaultDisconnectReason = "Software closed the connection";
720 
721 	/// Get connection state.
722 	@property ConnectionState state();
723 
724 	/// Has a connection been established?
725 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
726 
727 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
728 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
729 
730 	/// Queue Data for sending.
731 	void send(scope Data[] data, int priority = DEFAULT_PRIORITY);
732 
733 	/// ditto
734 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
735 	{
736 		this.send(datum.toArray, priority);
737 	}
738 
739 	/// Terminate the connection.
740 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
741 
742 	/// Callback setter for when a connection has been established
743 	/// (if applicable).
744 	alias ConnectHandler = void delegate();
745 	@property void handleConnect(ConnectHandler value); /// ditto
746 
747 	/// Callback setter for when new data is read.
748 	alias ReadDataHandler = void delegate(Data data);
749 	@property void handleReadData(ReadDataHandler value); /// ditto
750 
751 	/// Callback setter for when a connection was closed.
752 	alias DisconnectHandler = void delegate(string reason, DisconnectType type);
753 	@property void handleDisconnect(DisconnectHandler value); /// ditto
754 
755 	/// Callback setter for when all queued data has been sent.
756 	alias BufferFlushedHandler = void delegate();
757 	@property void handleBufferFlushed(BufferFlushedHandler value); /// ditto
758 }
759 
760 // ***************************************************************************
761 
762 /// Implementation of `IConnection` using a socket.
763 /// Implements receiving data when readable and sending queued data
764 /// when writable.
765 class Connection : GenericSocket, IConnection
766 {
767 private:
768 	/// Blocks of data larger than this value are passed as unmanaged memory
769 	/// (in Data objects). Blocks smaller than this value will be reallocated
770 	/// on the managed heap. The disadvantage of placing large objects on the
771 	/// managed heap is false pointers; the disadvantage of using Data for
772 	/// small objects is wasted slack space due to the page size alignment
773 	/// requirement.
774 	enum UNMANAGED_THRESHOLD = 256;
775 
776 	ConnectionState _state;
777 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
778 
779 public:
780 	/// Get connection state.
781 	override @property ConnectionState state() { return _state; }
782 
783 protected:
784 	abstract sizediff_t doSend(in void[] buffer);
785 	abstract sizediff_t doReceive(void[] buffer);
786 
787 	/// The send buffers.
788 	DataVec[MAX_PRIORITY+1] outQueue;
789 	/// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled).
790 	int partiallySent = -1;
791 
792 	/// Constructor used by a ServerSocket for new connections
793 	this(Socket conn)
794 	{
795 		this();
796 		this.conn = conn;
797 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
798 		if (conn)
799 			socketManager.register(this);
800 		updateFlags();
801 	}
802 
803 	final void updateFlags()
804 	{
805 		if (state == ConnectionState.connecting)
806 			notifyWrite = true;
807 		else
808 			notifyWrite = writePending;
809 
810 		notifyRead = state == ConnectionState.connected && readDataHandler;
811 		debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite);
812 	}
813 
814 	/// Called when a socket is readable.
815 	override void onReadable()
816 	{
817 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
818 		static ubyte[0x10000] inBuffer = void;
819 		auto received = doReceive(inBuffer);
820 
821 		if (received == 0)
822 			return disconnect("Connection closed", DisconnectType.graceful);
823 
824 		if (received == Socket.ERROR)
825 		{
826 		//	if (wouldHaveBlocked)
827 		//	{
828 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
829 		//		return;
830 		//	}
831 		//	else
832 				onError("recv() error: " ~ lastSocketError);
833 		}
834 		else
835 		{
836 			debug (PRINTDATA)
837 			{
838 				stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr);
839 				stderr.write(hexDump(inBuffer[0 .. received]));
840 				stderr.flush();
841 			}
842 
843 			if (state == ConnectionState.disconnecting)
844 			{
845 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this);
846 			}
847 			else
848 			if (!readDataHandler)
849 			{
850 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this);
851 			}
852 			else
853 			{
854 				// Currently, unlike the D1 version of this module,
855 				// we will always reallocate read network data.
856 				// This disfavours code which doesn't need to store
857 				// read data after processing it, but otherwise
858 				// makes things simpler and safer all around.
859 
860 				Data data;
861 				if (received < UNMANAGED_THRESHOLD)
862 				{
863 					// Copy to the managed heap
864 					data = Data(inBuffer[0 .. received].dup);
865 				}
866 				else
867 				{
868 					// Copy to unmanaged memory
869 					data = Data(inBuffer[0 .. received], true);
870 				}
871 				readDataHandler(data);
872 			}
873 		}
874 	}
875 
876 	/// Called when an error occurs on the socket.
877 	override void onError(string reason)
878 	{
879 		if (state == ConnectionState.disconnecting)
880 		{
881 			debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
882 			return close();
883 		}
884 
885 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
886 		disconnect("Socket error: " ~ reason, DisconnectType.error);
887 	}
888 
889 	this()
890 	{
891 	}
892 
893 public:
894 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
895 	/// The disconnect handler will be called immediately, even when not all data has been flushed yet.
896 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
897 	{
898 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
899 		assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state));
900 
901 		if (writePending)
902 		{
903 			if (type==DisconnectType.requested)
904 			{
905 				assert(conn, "Attempting to disconnect on an uninitialized socket");
906 				// queue disconnect after all data is sent
907 				debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason);
908 				state = ConnectionState.disconnecting;
909 				//setIdleTimeout(30.seconds);
910 				if (disconnectHandler)
911 					disconnectHandler(reason, type);
912 				updateFlags();
913 				return;
914 			}
915 			else
916 				discardQueues();
917 		}
918 
919 		debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
920 
921 		if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected)
922 			close();
923 		else
924 		{
925 			assert(conn is null, "Registered but %s socket".format(state));
926 			if (state == ConnectionState.resolving)
927 				state = ConnectionState.disconnected;
928 		}
929 
930 		if (disconnectHandler)
931 			disconnectHandler(reason, type);
932 		updateFlags();
933 	}
934 
935 	private final void close()
936 	{
937 		assert(conn, "Attempting to close an unregistered socket");
938 		socketManager.unregister(this);
939 		conn.close();
940 		conn = null;
941 		outQueue[] = DataVec.init;
942 		state = ConnectionState.disconnected;
943 	}
944 
945 	/// Append data to the send buffer.
946 	void send(scope Data[] data, int priority = DEFAULT_PRIORITY)
947 	{
948 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
949 		outQueue[priority] ~= data;
950 		notifyWrite = true; // Fast updateFlags()
951 
952 		debug (PRINTDATA)
953 		{
954 			stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr);
955 			foreach (datum; data)
956 				if (datum.length)
957 					stderr.write(hexDump(datum.contents));
958 				else
959 					stderr.writeln("(empty Data)");
960 			stderr.flush();
961 		}
962 	}
963 
964 	/// ditto
965 	alias send = IConnection.send;
966 
967 	/// Cancel all queued `Data` packets with the given priority.
968 	/// Does not cancel any partially-sent `Data`.
969 	final void clearQueue(int priority)
970 	{
971 		if (priority == partiallySent)
972 		{
973 			assert(outQueue[priority].length > 0);
974 			outQueue[priority].length = 1;
975 		}
976 		else
977 			outQueue[priority] = null;
978 		updateFlags();
979 	}
980 
981 	/// Clears all queues, even partially sent content.
982 	private final void discardQueues()
983 	{
984 		foreach (priority; 0..MAX_PRIORITY+1)
985 			outQueue[priority] = null;
986 		partiallySent = -1;
987 		updateFlags();
988 	}
989 
990 	/// Returns true if any queues have pending data.
991 	@property
992 	final bool writePending()
993 	{
994 		foreach (ref queue; outQueue)
995 			if (queue.length)
996 				return true;
997 		return false;
998 	}
999 
1000 	/// Returns true if there are any queued `Data` which have not yet
1001 	/// begun to be sent.
1002 	final bool queuePresent(int priority = DEFAULT_PRIORITY)
1003 	{
1004 		if (priority == partiallySent)
1005 		{
1006 			assert(outQueue[priority].length > 0);
1007 			return outQueue[priority].length > 1;
1008 		}
1009 		else
1010 			return outQueue[priority].length > 0;
1011 	}
1012 
1013 	/// Returns the number of queued `Data` at the given priority.
1014 	final size_t packetsQueued(int priority = DEFAULT_PRIORITY)
1015 	{
1016 		return outQueue[priority].length;
1017 	}
1018 
1019 	/// Returns the number of queued bytes at the given priority.
1020 	final size_t bytesQueued(int priority = DEFAULT_PRIORITY)
1021 	{
1022 		size_t bytes;
1023 		foreach (datum; outQueue[priority])
1024 			bytes += datum.length;
1025 		return bytes;
1026 	}
1027 
1028 // public:
1029 	private ConnectHandler connectHandler;
1030 	/// Callback for when a connection has been established.
1031 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
1032 
1033 	private ReadDataHandler readDataHandler;
1034 	/// Callback for incoming data.
1035 	/// Data will not be received unless this handler is set.
1036 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
1037 
1038 	private DisconnectHandler disconnectHandler;
1039 	/// Callback for when a connection was closed.
1040 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
1041 
1042 	private BufferFlushedHandler bufferFlushedHandler;
1043 	/// Callback setter for when all queued data has been sent.
1044 	@property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); }
1045 }
1046 
1047 /// Implements a stream connection.
1048 /// Queued `Data` is allowed to be fragmented.
1049 class StreamConnection : Connection
1050 {
1051 protected:
1052 	this()
1053 	{
1054 		super();
1055 	}
1056 
1057 	/// Called when a socket is writable.
1058 	override void onWritable()
1059 	{
1060 		//scope(success) updateFlags();
1061 		onWritableImpl();
1062 		updateFlags();
1063 	}
1064 
1065 	// Work around scope(success) breaking debugger stack traces
1066 	final private void onWritableImpl()
1067 	{
1068 		debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state);
1069 		if (state == ConnectionState.connecting)
1070 		{
1071 			int32_t error;
1072 			conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error);
1073 			if (error)
1074 				return disconnect(formatSocketError(error), DisconnectType.error);
1075 
1076 			state = ConnectionState.connected;
1077 
1078 			//debug writefln("[%s] Connected", remoteAddressStr);
1079 			try
1080 				setKeepAlive();
1081 			catch (Exception e)
1082 				return disconnect(e.msg, DisconnectType.error);
1083 			if (connectHandler)
1084 				connectHandler();
1085 			return;
1086 		}
1087 		//debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
1088 
1089 		foreach (sendPartial; [true, false])
1090 			foreach (int priority, ref queue; outQueue)
1091 				while (queue.length && (!sendPartial || priority == partiallySent))
1092 				{
1093 					assert(partiallySent == -1 || partiallySent == priority);
1094 
1095 					ptrdiff_t sent = 0;
1096 					if (!queue.front.empty)
1097 					{
1098 						sent = doSend(queue.front.contents);
1099 						debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, queue.front.length);
1100 					}
1101 					else
1102 					{
1103 						debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this);
1104 					}
1105 
1106 					if (sent == Socket.ERROR)
1107 					{
1108 						if (wouldHaveBlocked())
1109 							return;
1110 						else
1111 							return onError("send() error: " ~ lastSocketError);
1112 					}
1113 					else
1114 					if (sent < queue.front.length)
1115 					{
1116 						if (sent > 0)
1117 						{
1118 							queue.front = queue.front[sent..queue.front.length];
1119 							partiallySent = priority;
1120 						}
1121 						return;
1122 					}
1123 					else
1124 					{
1125 						assert(sent == queue.front.length);
1126 						//debug writefln("[%s] Sent data:", remoteAddressStr);
1127 						//debug writefln("%s", hexDump(queue.front.contents[0..sent]));
1128 						queue.front.clear();
1129 						queue.popFront();
1130 						partiallySent = -1;
1131 						if (queue.length == 0)
1132 							queue = null;
1133 					}
1134 				}
1135 
1136 		// outQueue is now empty
1137 		if (bufferFlushedHandler)
1138 			bufferFlushedHandler();
1139 		if (state == ConnectionState.disconnecting)
1140 		{
1141 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1142 			close();
1143 		}
1144 	}
1145 
1146 public:
1147 	this(Socket conn)
1148 	{
1149 		super(conn);
1150 	} ///
1151 }
1152 
1153 // ***************************************************************************
1154 
1155 /// A POSIX file stream.
1156 /// Allows adding a file (e.g. stdin/stdout) to the socket manager.
1157 /// Does not dup the given file descriptor, so "disconnecting" this connection
1158 /// will close it.
1159 version (Posix)
1160 class FileConnection : StreamConnection
1161 {
1162 	this(int fileno)
1163 	{
1164 		auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC);
1165 		conn.blocking = false;
1166 		super(conn);
1167 	} ///
1168 
1169 protected:
1170 	import core.sys.posix.unistd : read, write;
1171 
1172 	override sizediff_t doSend(in void[] buffer)
1173 	{
1174 		return write(socket.handle, buffer.ptr, buffer.length);
1175 	}
1176 
1177 	override sizediff_t doReceive(void[] buffer)
1178 	{
1179 		return read(socket.handle, buffer.ptr, buffer.length);
1180 	}
1181 }
1182 
1183 /// Separates reading and writing, e.g. for stdin/stdout.
1184 class Duplex : IConnection
1185 {
1186 	///
1187 	IConnection reader, writer;
1188 
1189 	this(IConnection reader, IConnection writer)
1190 	{
1191 		this.reader = reader;
1192 		this.writer = writer;
1193 		reader.handleConnect = &onConnect;
1194 		writer.handleConnect = &onConnect;
1195 		reader.handleDisconnect = &onDisconnect;
1196 		writer.handleDisconnect = &onDisconnect;
1197 	} ///
1198 
1199 	@property ConnectionState state()
1200 	{
1201 		if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting)
1202 			return ConnectionState.disconnecting;
1203 		else
1204 			return reader.state < writer.state ? reader.state : writer.state;
1205 	} ///
1206 
1207 	/// Queue Data for sending.
1208 	void send(scope Data[] data, int priority)
1209 	{
1210 		writer.send(data, priority);
1211 	}
1212 
1213 	alias send = IConnection.send; /// ditto
1214 
1215 	/// Terminate the connection.
1216 	/// Note: this isn't quite fleshed out - applications may want to
1217 	/// wait and send some more data even after stdin is closed, but
1218 	/// such an interface can't be fitted into an IConnection
1219 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1220 	{
1221 		if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting)
1222 			reader.disconnect(reason, type);
1223 		if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting)
1224 			writer.disconnect(reason, type);
1225 		debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1226 	}
1227 
1228 	protected void onConnect()
1229 	{
1230 		if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected)
1231 			connectHandler();
1232 	}
1233 
1234 	protected void onDisconnect(string reason, DisconnectType type)
1235 	{
1236 		debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1237 		if (disconnectHandler)
1238 		{
1239 			disconnectHandler(reason, type);
1240 			disconnectHandler = null; // don't call it twice for the other connection
1241 		}
1242 		// It is our responsibility to disconnect the other connection
1243 		// Use DisconnectType.requested to ensure that any written data is flushed
1244 		disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested);
1245 	}
1246 
1247 	/// Callback for when a connection has been established.
1248 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1249 	private ConnectHandler connectHandler;
1250 
1251 	/// Callback setter for when new data is read.
1252 	@property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; }
1253 
1254 	/// Callback setter for when a connection was closed.
1255 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1256 	private DisconnectHandler disconnectHandler;
1257 
1258 	/// Callback setter for when all queued data has been written.
1259 	@property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; }
1260 }
1261 
1262 unittest { if (false) new Duplex(null, null); }
1263 
1264 // ***************************************************************************
1265 
1266 /// An asynchronous socket-based connection.
1267 class SocketConnection : StreamConnection
1268 {
1269 protected:
1270 	AddressInfo[] addressQueue;
1271 
1272 	this(Socket conn)
1273 	{
1274 		super(conn);
1275 	}
1276 
1277 	override sizediff_t doSend(in void[] buffer)
1278 	{
1279 		return conn.send(buffer);
1280 	}
1281 
1282 	override sizediff_t doReceive(void[] buffer)
1283 	{
1284 		return conn.receive(buffer);
1285 	}
1286 
1287 	final void tryNextAddress()
1288 	{
1289 		assert(state == ConnectionState.connecting);
1290 		auto addressInfo = addressQueue[0];
1291 		addressQueue = addressQueue[1..$];
1292 
1293 		try
1294 		{
1295 			conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
1296 			conn.blocking = false;
1297 
1298 			socketManager.register(this);
1299 			updateFlags();
1300 			debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString());
1301 			conn.connect(addressInfo.address);
1302 		}
1303 		catch (SocketException e)
1304 			return onError("Connect error: " ~ e.msg);
1305 	}
1306 
1307 	/// Called when an error occurs on the socket.
1308 	override void onError(string reason)
1309 	{
1310 		if (state == ConnectionState.connecting && addressQueue.length)
1311 		{
1312 			socketManager.unregister(this);
1313 			conn.close();
1314 			conn = null;
1315 
1316 			return tryNextAddress();
1317 		}
1318 
1319 		super.onError(reason);
1320 	}
1321 
1322 public:
1323 	/// Default constructor
1324 	this()
1325 	{
1326 		debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this);
1327 	}
1328 
1329 	/// Start establishing a connection.
1330 	final void connect(AddressInfo[] addresses)
1331 	{
1332 		assert(addresses.length, "No addresses specified");
1333 
1334 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1335 		assert(!conn);
1336 
1337 		addressQueue = addresses;
1338 		state = ConnectionState.connecting;
1339 		tryNextAddress();
1340 	}
1341 }
1342 
1343 /// An asynchronous TCP connection.
1344 class TcpConnection : SocketConnection
1345 {
1346 protected:
1347 	this(Socket conn)
1348 	{
1349 		super(conn);
1350 	}
1351 
1352 public:
1353 	/// Default constructor
1354 	this()
1355 	{
1356 		debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this);
1357 	}
1358 
1359 	///
1360 	alias connect = SocketConnection.connect; // raise overload
1361 
1362 	/// Start establishing a connection.
1363 	final void connect(string host, ushort port)
1364 	{
1365 		assert(host.length, "Empty host");
1366 		assert(port, "No port specified");
1367 
1368 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1369 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1370 
1371 		state = ConnectionState.resolving;
1372 
1373 		AddressInfo[] addressInfos;
1374 		try
1375 		{
1376 			auto addresses = getAddress(host, port);
1377 			enforce(addresses.length, "No addresses found");
1378 			debug (ASOCKETS)
1379 			{
1380 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1381 				foreach (address; addresses)
1382 					stderr.writefln("- %s", address.toString());
1383 			}
1384 
1385 			if (addresses.length > 1)
1386 			{
1387 				import std.random : randomShuffle;
1388 				randomShuffle(addresses);
1389 			}
1390 
1391 			foreach (address; addresses)
1392 				addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host);
1393 		}
1394 		catch (SocketException e)
1395 			return onError("Lookup error: " ~ e.msg);
1396 
1397 		state = ConnectionState.disconnected;
1398 		connect(addressInfos);
1399 	}
1400 }
1401 
1402 // ***************************************************************************
1403 
1404 /// An asynchronous connection server for socket-based connections.
1405 class SocketServer
1406 {
1407 protected:
1408 	/// Class that actually performs listening on a certain address family
1409 	final class Listener : GenericSocket
1410 	{
1411 		this(Socket conn)
1412 		{
1413 			debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this);
1414 			this.conn = conn;
1415 			socketManager.register(this);
1416 		}
1417 
1418 		/// Called when a socket is readable.
1419 		override void onReadable()
1420 		{
1421 			debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this);
1422 			Socket acceptSocket = conn.accept();
1423 			acceptSocket.blocking = false;
1424 			if (handleAccept)
1425 			{
1426 				auto connection = createConnection(acceptSocket);
1427 				debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr);
1428 				connection.setKeepAlive();
1429 				//assert(connection.connected);
1430 				//connection.connected = true;
1431 				acceptHandler(connection);
1432 			}
1433 			else
1434 				acceptSocket.close();
1435 		}
1436 
1437 		/// Called when a socket is writable.
1438 		override void onWritable()
1439 		{
1440 		}
1441 
1442 		/// Called when an error occurs on the socket.
1443 		override void onError(string reason)
1444 		{
1445 			close(); // call parent
1446 		}
1447 
1448 		void closeListener()
1449 		{
1450 			assert(conn);
1451 			socketManager.unregister(this);
1452 			conn.close();
1453 			conn = null;
1454 		}
1455 	}
1456 
1457 	SocketConnection createConnection(Socket socket)
1458 	{
1459 		return new SocketConnection(socket);
1460 	}
1461 
1462 	/// Whether the socket is listening.
1463 	bool listening;
1464 	/// Listener instances
1465 	Listener[] listeners;
1466 
1467 	final void updateFlags()
1468 	{
1469 		foreach (listener; listeners)
1470 			listener.notifyRead = handleAccept !is null;
1471 	}
1472 
1473 public:
1474 	/// Start listening on this socket.
1475 	final void listen(AddressInfo[] addressInfos)
1476 	{
1477 		foreach (ref addressInfo; addressInfos)
1478 		{
1479 			try
1480 			{
1481 				Socket conn = new Socket(addressInfo);
1482 				conn.blocking = false;
1483 				if (addressInfo.family == AddressFamily.INET6)
1484 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1485 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1486 
1487 				conn.bind(addressInfo.address);
1488 				conn.listen(totalCPUs * 2);
1489 
1490 				listeners ~= new Listener(conn);
1491 			}
1492 			catch (SocketException e)
1493 			{
1494 				debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1495 				debug(ASOCKETS) stderr.writeln(e.msg);
1496 			}
1497 		}
1498 
1499 		if (listeners.length==0)
1500 			throw new Exception("Unable to bind service");
1501 
1502 		listening = true;
1503 
1504 		updateFlags();
1505 	}
1506 
1507 	this()
1508 	{
1509 	} ///
1510 
1511 	/// Creates a Server with the given sockets.
1512 	/// The sockets must have already had `bind` and `listen` called on them.
1513 	this(Socket[] sockets...)
1514 	{
1515 		foreach (socket; sockets)
1516 			listeners ~= new Listener(socket);
1517 	}
1518 
1519 	/// Returns all listening addresses.
1520 	final @property Address[] localAddresses()
1521 	{
1522 		Address[] result;
1523 		foreach (listener; listeners)
1524 			result ~= listener.localAddress;
1525 		return result;
1526 	}
1527 
1528 	/// Returns `true` if the server is listening for incoming connections.
1529 	final @property bool isListening()
1530 	{
1531 		return listening;
1532 	}
1533 
1534 	/// Stop listening on this socket.
1535 	final void close()
1536 	{
1537 		foreach (listener;listeners)
1538 			listener.closeListener();
1539 		listeners = null;
1540 		listening = false;
1541 		if (handleClose)
1542 			handleClose();
1543 	}
1544 
1545 	/// Create a SocketServer using the handle passed on standard input,
1546 	/// for which `listen` had already been called. Used by
1547 	/// e.g. FastCGI and systemd sockets with "Listen = yes".
1548 	static SocketServer fromStdin()
1549 	{
1550 		socket_t socket;
1551 		version (Windows)
1552 		{
1553 			import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE;
1554 			socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE);
1555 		}
1556 		else
1557 			socket = cast(socket_t)0;
1558 
1559 		auto s = new Socket(socket, AddressFamily.UNSPEC);
1560 		s.blocking = false;
1561 		return new SocketServer(s);
1562 	}
1563 
1564 	/// Callback for when the socket was closed.
1565 	void delegate() handleClose;
1566 
1567 	private void delegate(SocketConnection incoming) acceptHandler;
1568 	/// Callback for an incoming connection.
1569 	/// Connections will not be accepted unless this handler is set.
1570 	@property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; }
1571 	/// ditto
1572 	@property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); }
1573 }
1574 
1575 /// An asynchronous TCP connection server.
1576 class TcpServer : SocketServer
1577 {
1578 protected:
1579 	override SocketConnection createConnection(Socket socket)
1580 	{
1581 		return new TcpConnection(socket);
1582 	}
1583 
1584 public:
1585 	this()
1586 	{
1587 	} ///
1588 
1589 	this(Socket[] sockets...)
1590 	{
1591 		super(sockets);
1592 	} /// Construct from the given sockets.
1593 
1594 	///
1595 	alias listen = SocketServer.listen; // raise overload
1596 
1597 	/// Start listening on this socket.
1598 	final ushort listen(ushort port, string addr = null)
1599 	{
1600 		debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port);
1601 		//assert(!listening, "Attempting to listen on a listening socket");
1602 
1603 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1604 
1605 		debug (ASOCKETS)
1606 		{
1607 			stderr.writefln("Resolved to %s addresses:", addressInfos.length);
1608 			foreach (ref addressInfo; addressInfos)
1609 				stderr.writefln("- %s", addressInfo);
1610 		}
1611 
1612 		// listen on random ports only on IPv4 for now
1613 		if (port == 0)
1614 		{
1615 			foreach_reverse (i, ref addressInfo; addressInfos)
1616 				if (addressInfo.family != AddressFamily.INET)
1617 					addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$];
1618 		}
1619 
1620 		listen(addressInfos);
1621 
1622 		foreach (listener; listeners)
1623 		{
1624 			auto address = listener.conn.localAddress();
1625 			if (address.addressFamily == AddressFamily.INET)
1626 				port = to!ushort(address.toPortString());
1627 		}
1628 
1629 		return port;
1630 	}
1631 
1632 	deprecated("Use SocketServer.fromStdin")
1633 	static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; }
1634 
1635 	/// Delegate to be called when a connection is accepted.
1636 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); }
1637 }
1638 
1639 // ***************************************************************************
1640 
1641 /// An asynchronous UDP stream.
1642 /// UDP does not have connections, so this class encapsulates a socket
1643 /// with a fixed destination (sendto) address, and optionally bound to
1644 /// a local address.
1645 /// Currently received packets' address is not exposed.
1646 class UdpConnection : Connection
1647 {
1648 protected:
1649 	this(Socket conn)
1650 	{
1651 		super(conn);
1652 	}
1653 
1654 	/// Called when a socket is writable.
1655 	override void onWritable()
1656 	{
1657 		//scope(success) updateFlags();
1658 		onWritableImpl();
1659 		updateFlags();
1660 	}
1661 
1662 	// Work around scope(success) breaking debugger stack traces
1663 	final private void onWritableImpl()
1664 	{
1665 		foreach (priority, ref queue; outQueue)
1666 			while (queue.length)
1667 			{
1668 				auto sent = conn.sendTo(queue.front.contents, remoteAddress);
1669 
1670 				if (sent == Socket.ERROR)
1671 				{
1672 					if (wouldHaveBlocked())
1673 						return;
1674 					else
1675 						return onError("send() error: " ~ lastSocketError);
1676 				}
1677 				else
1678 				if (sent < queue.front.length)
1679 				{
1680 					return onError("Sent only %d/%d bytes of the datagram!".format(sent, queue.front.length));
1681 				}
1682 				else
1683 				{
1684 					assert(sent == queue.front.length);
1685 					//debug writefln("[%s] Sent data:", remoteAddressStr);
1686 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1687 					queue.front.clear();
1688 					queue.popFront();
1689 					if (queue.length == 0)
1690 						queue = null;
1691 				}
1692 			}
1693 
1694 		// outQueue is now empty
1695 		if (bufferFlushedHandler)
1696 			bufferFlushedHandler();
1697 		if (state == ConnectionState.disconnecting)
1698 		{
1699 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1700 			close();
1701 		}
1702 	}
1703 
1704 	override sizediff_t doSend(in void[] buffer)
1705 	{
1706 		assert(false); // never called (called only from overridden methods)
1707 	}
1708 
1709 	override sizediff_t doReceive(void[] buffer)
1710 	{
1711 		return conn.receive(buffer);
1712 	}
1713 
1714 public:
1715 	/// Default constructor
1716 	this()
1717 	{
1718 		debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this);
1719 	}
1720 
1721 	/// Initialize with the given `AddressFamily`, without binding to an address.
1722 	final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP)
1723 	{
1724 		initializeImpl(family, type, protocol);
1725 		if (connectHandler)
1726 			connectHandler();
1727 	}
1728 
1729 	private final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol)
1730 	{
1731 		assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state));
1732 		assert(!conn);
1733 
1734 		conn = new Socket(family, type, protocol);
1735 		conn.blocking = false;
1736 		socketManager.register(this);
1737 		state = ConnectionState.connected;
1738 		updateFlags();
1739 	}
1740 
1741 	/// Bind to a local address in order to receive packets sent there.
1742 	final ushort bind(string host, ushort port)
1743 	{
1744 		assert(host.length, "Empty host");
1745 
1746 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1747 
1748 		state = ConnectionState.resolving;
1749 
1750 		AddressInfo addressInfo;
1751 		try
1752 		{
1753 			auto addresses = getAddress(host, port);
1754 			enforce(addresses.length, "No addresses found");
1755 			debug (ASOCKETS)
1756 			{
1757 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1758 				foreach (address; addresses)
1759 					stderr.writefln("- %s", address.toString());
1760 			}
1761 
1762 			Address address;
1763 			if (addresses.length > 1)
1764 			{
1765 				import std.random : uniform;
1766 				address = addresses[uniform(0, $)];
1767 			}
1768 			else
1769 				address = addresses[0];
1770 			addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host);
1771 		}
1772 		catch (SocketException e)
1773 		{
1774 			onError("Lookup error: " ~ e.msg);
1775 			return 0;
1776 		}
1777 
1778 		state = ConnectionState.disconnected;
1779 		return bind(addressInfo);
1780 	}
1781 
1782 	/// ditto
1783 	final ushort bind(AddressInfo addressInfo)
1784 	{
1785 		initialize(addressInfo.family, addressInfo.type, addressInfo.protocol);
1786 		conn.bind(addressInfo.address);
1787 
1788 		auto address = conn.localAddress();
1789 		auto port = to!ushort(address.toPortString());
1790 
1791 		if (connectHandler)
1792 			connectHandler();
1793 
1794 		return port;
1795 	}
1796 
1797 // public:
1798 	/// Where to send packets to.
1799 	Address remoteAddress;
1800 }
1801 
1802 ///
1803 unittest
1804 {
1805 	auto server = new UdpConnection();
1806 	server.bind("localhost", 0);
1807 
1808 	auto client = new UdpConnection();
1809 	client.initialize(server.localAddress.addressFamily);
1810 
1811 	string[] packets = ["Hello", "there"];
1812 	client.remoteAddress = server.localAddress;
1813 	client.send({
1814 		DataVec data;
1815 		foreach (packet; packets)
1816 			data ~= Data(packet);
1817 		return data;
1818 	}()[]);
1819 
1820 	server.handleReadData = (Data data)
1821 	{
1822 		assert(data.contents == packets[0]);
1823 		packets = packets[1..$];
1824 		if (!packets.length)
1825 		{
1826 			server.close();
1827 			client.close();
1828 		}
1829 	};
1830 	socketManager.loop();
1831 	assert(!packets.length);
1832 }
1833 
1834 // ***************************************************************************
1835 
1836 /// Base class for a connection adapter.
1837 /// By itself, does nothing.
1838 class ConnectionAdapter : IConnection
1839 {
1840 	/// The next connection in the chain (towards the raw transport).
1841 	IConnection next;
1842 
1843 	this(IConnection next)
1844 	{
1845 		this.next = next;
1846 		next.handleConnect = &onConnect;
1847 		next.handleDisconnect = &onDisconnect;
1848 		next.handleBufferFlushed = &onBufferFlushed;
1849 	} ///
1850 
1851 	@property ConnectionState state() { return next.state; } ///
1852 
1853 	/// Queue Data for sending.
1854 	void send(scope Data[] data, int priority)
1855 	{
1856 		next.send(data, priority);
1857 	}
1858 
1859 	alias send = IConnection.send; /// ditto
1860 
1861 	/// Terminate the connection.
1862 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1863 	{
1864 		next.disconnect(reason, type);
1865 	}
1866 
1867 	protected void onConnect()
1868 	{
1869 		if (connectHandler)
1870 			connectHandler();
1871 	}
1872 
1873 	protected void onReadData(Data data)
1874 	{
1875 		// onReadData should be fired only if readDataHandler is set
1876 		assert(readDataHandler, "onReadData caled with null readDataHandler");
1877 		readDataHandler(data);
1878 	}
1879 
1880 	protected void onDisconnect(string reason, DisconnectType type)
1881 	{
1882 		if (disconnectHandler)
1883 			disconnectHandler(reason, type);
1884 	}
1885 
1886 	protected void onBufferFlushed()
1887 	{
1888 		if (bufferFlushedHandler)
1889 			bufferFlushedHandler();
1890 	}
1891 
1892 	/// Callback for when a connection has been established.
1893 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1894 	protected ConnectHandler connectHandler;
1895 
1896 	/// Callback setter for when new data is read.
1897 	@property void handleReadData(ReadDataHandler value)
1898 	{
1899 		readDataHandler = value;
1900 		next.handleReadData = value ? &onReadData : null ;
1901 	}
1902 	protected ReadDataHandler readDataHandler;
1903 
1904 	/// Callback setter for when a connection was closed.
1905 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1906 	protected DisconnectHandler disconnectHandler;
1907 
1908 	/// Callback setter for when all queued data has been written.
1909 	@property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; }
1910 	protected BufferFlushedHandler bufferFlushedHandler;
1911 }
1912 
1913 // ***************************************************************************
1914 
1915 /// Adapter for connections with a line-based protocol.
1916 /// Splits data stream into delimiter-separated lines.
1917 class LineBufferedAdapter : ConnectionAdapter
1918 {
1919 	/// The protocol's line delimiter.
1920 	string delimiter = "\r\n";
1921 
1922 	/// Maximum line length (0 means unlimited).
1923 	size_t maxLength = 0;
1924 
1925 	this(IConnection next)
1926 	{
1927 		super(next);
1928 	} ///
1929 
1930 	/// Append a line to the send buffer.
1931 	void send(string line)
1932 	{
1933 		//super.send(Data(line ~ delimiter));
1934 		// https://issues.dlang.org/show_bug.cgi?id=13985
1935 		ConnectionAdapter ca = this;
1936 		ca.send(Data(line ~ delimiter));
1937 	}
1938 
1939 protected:
1940 	/// The receive buffer.
1941 	Data inBuffer;
1942 
1943 	/// Called when data has been received.
1944 	final override void onReadData(Data data)
1945 	{
1946 		import std..string : indexOf;
1947 		auto oldBufferLength = inBuffer.length;
1948 		if (oldBufferLength)
1949 			inBuffer ~= data;
1950 		else
1951 			inBuffer = data;
1952 
1953 		if (delimiter.length == 1)
1954 		{
1955 			import core.stdc..string : memchr;
1956 
1957 			char c = delimiter[0];
1958 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1959 			while (p)
1960 			{
1961 				sizediff_t index = p - inBuffer.ptr;
1962 				if (!processLine(index))
1963 					break;
1964 
1965 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1966 			}
1967 		}
1968 		else
1969 		{
1970 			sizediff_t index;
1971 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1972 			while ((index = indexOf(cast(string)inBuffer.contents, delimiter)) >= 0
1973 				&& processLine(index))
1974 			{}
1975 		}
1976 
1977 		if (maxLength && inBuffer.length > maxLength)
1978 			disconnect("Line too long", DisconnectType.error);
1979 	}
1980 
1981 	final bool processLine(size_t index)
1982 	{
1983 		if (maxLength && index > maxLength)
1984 		{
1985 			disconnect("Line too long", DisconnectType.error);
1986 			return false;
1987 		}
1988 		auto line = inBuffer[0..index];
1989 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1990 		super.onReadData(line);
1991 		return true;
1992 	}
1993 
1994 	override void onDisconnect(string reason, DisconnectType type)
1995 	{
1996 		super.onDisconnect(reason, type);
1997 		inBuffer.clear();
1998 	}
1999 }
2000 
2001 // ***************************************************************************
2002 
2003 /// Fires an event handler or disconnects connections
2004 /// after a period of inactivity.
2005 class TimeoutAdapter : ConnectionAdapter
2006 {
2007 	this(IConnection next)
2008 	{
2009 		debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this);
2010 		super(next);
2011 	} ///
2012 
2013 	/// Set the `Duration` indicating the period of inactivity after which to take action.
2014 	final void setIdleTimeout(Duration duration)
2015 	{
2016 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
2017 		assert(duration > Duration.zero);
2018 
2019 		// Configure idleTask
2020 		if (idleTask is null)
2021 		{
2022 			idleTask = new TimerTask(duration);
2023 			idleTask.handleTask = &onTask_Idle;
2024 		}
2025 		else
2026 		{
2027 			if (idleTask.isWaiting())
2028 				idleTask.cancel();
2029 			idleTask.delay = duration;
2030 		}
2031 
2032 		mainTimer.add(idleTask);
2033 	}
2034 
2035 	/// Manually mark this connection as non-idle, restarting the idle timer.
2036 	/// `handleNonIdle` will be called, if set.
2037 	void markNonIdle()
2038 	{
2039 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
2040 		if (handleNonIdle)
2041 			handleNonIdle();
2042 		if (idleTask && idleTask.isWaiting())
2043 			idleTask.restart();
2044 	}
2045 
2046 	/// Stop the idle timer.
2047 	void cancelIdleTimeout()
2048 	{
2049 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
2050 		assert(idleTask !is null);
2051 		assert(idleTask.isWaiting());
2052 		idleTask.cancel();
2053 	}
2054 
2055 	/// Restart the idle timer.
2056 	void resumeIdleTimeout()
2057 	{
2058 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
2059 		assert(idleTask !is null);
2060 		assert(!idleTask.isWaiting());
2061 		mainTimer.add(idleTask);
2062 	}
2063 
2064 	/// Callback for when a connection has stopped responding.
2065 	/// If unset, the connection will be disconnected.
2066 	void delegate() handleIdleTimeout;
2067 
2068 	/// Callback for when a connection is marked as non-idle
2069 	/// (when data is received).
2070 	void delegate() handleNonIdle;
2071 
2072 protected:
2073 	override void onConnect()
2074 	{
2075 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
2076 		markNonIdle();
2077 		super.onConnect();
2078 	}
2079 
2080 	override void onReadData(Data data)
2081 	{
2082 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
2083 		markNonIdle();
2084 		super.onReadData(data);
2085 	}
2086 
2087 	override void onDisconnect(string reason, DisconnectType type)
2088 	{
2089 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
2090 		if (idleTask && idleTask.isWaiting())
2091 			idleTask.cancel();
2092 		super.onDisconnect(reason, type);
2093 	}
2094 
2095 private:
2096 	TimerTask idleTask; // non-null if an idle timeout has been set
2097 
2098 	final void onTask_Idle(Timer /*timer*/, TimerTask /*task*/)
2099 	{
2100 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this);
2101 		if (state == ConnectionState.disconnecting)
2102 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
2103 
2104 		if (state == ConnectionState.disconnected)
2105 			return;
2106 
2107 		if (handleIdleTimeout)
2108 		{
2109 			resumeIdleTimeout(); // reschedule (by default)
2110 			handleIdleTimeout();
2111 		}
2112 		else
2113 			disconnect("Time-out", DisconnectType.error);
2114 	}
2115 }
2116 
2117 // ***************************************************************************
2118 
2119 unittest
2120 {
2121 	void testTimer()
2122 	{
2123 		bool fired;
2124 		setTimeout({fired = true;}, 10.msecs);
2125 		socketManager.loop();
2126 		assert(fired);
2127 	}
2128 
2129 	testTimer();
2130 }