1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <ae@cy.md>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.math;
21 public import ae.sys.data;
22 
23 import core.stdc.stdint : int32_t;
24 
25 import std.exception;
26 import std.parallelism : totalCPUs;
27 import std.socket;
28 import std.string : format;
29 public import std.socket : Address, AddressInfo, Socket;
30 
31 version (Windows)
32     private import c_socks = core.sys.windows.winsock2;
33 else version (Posix)
34     private import c_socks = core.sys.posix.sys.socket;
35 
36 debug(ASOCKETS) import std.stdio : stderr;
37 debug(PRINTDATA) import std.stdio : stderr;
38 debug(PRINTDATA) import ae.utils.text : hexDump;
39 private import std.conv : to;
40 
41 
42 // https://issues.dlang.org/show_bug.cgi?id=7016
43 static import ae.utils.array;
44 
45 version(LIBEV)
46 {
47 	import deimos.ev;
48 	pragma(lib, "ev");
49 }
50 
51 version(Windows)
52 {
53 	import core.sys.windows.windows : Sleep;
54 	private enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
55 }
56 else
57 	private enum USE_SLEEP = false;
58 
59 /// Flags that determine socket wake-up events.
60 int eventCounter;
61 
62 version(LIBEV)
63 {
64 	// Watchers are a GenericSocket field (as declared in SocketMixin).
65 	// Use one watcher per read and write event.
66 	// Start/stop those as higher-level code declares interest in those events.
67 	// Use the "data" ev_io field to store the parent GenericSocket address.
68 	// Also use the "data" field as a flag to indicate whether the watcher is active
69 	// (data is null when the watcher is stopped).
70 
71 	/// `libev`-based event loop implementation.
72 	struct SocketManager
73 	{
74 	private:
75 		size_t count;
76 
77 		extern(C)
78 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
79 		{
80 			eventCounter++;
81 			auto socket = cast(GenericSocket)w.data;
82 			assert(socket, "libev event fired on stopped watcher");
83 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents);
84 
85 			if (revents & EV_READ)
86 				socket.onReadable();
87 			else
88 			if (revents & EV_WRITE)
89 				socket.onWritable();
90 			else
91 				assert(false, "Unknown event fired from libev");
92 
93 			// TODO? Need to get proper SocketManager instance to call updateTimer on
94 			socketManager.updateTimer(false);
95 		}
96 
97 		ev_timer evTimer;
98 		MonoTime lastNextEvent = MonoTime.max;
99 
100 		extern(C)
101 		static void timerCallback(ev_loop_t* l, ev_timer* w, int /*revents*/)
102 		{
103 			eventCounter++;
104 			debug (ASOCKETS) writefln("Timer callback called.");
105 			mainTimer.prod();
106 
107 			socketManager.updateTimer(true);
108 			debug (ASOCKETS) writefln("Timer callback exiting.");
109 		}
110 
111 		void updateTimer(bool force)
112 		{
113 			auto nextEvent = mainTimer.getNextEvent();
114 			if (force || lastNextEvent != nextEvent)
115 			{
116 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
117 				if (nextEvent == MonoTime.max) // Stopping
118 				{
119 					if (lastNextEvent != MonoTime.max)
120 						ev_timer_stop(ev_default_loop(0), &evTimer);
121 				}
122 				else
123 				{
124 					auto remaining = mainTimer.getRemainingTime();
125 					while (remaining <= Duration.zero)
126 					{
127 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
128 						mainTimer.prod();
129 						remaining = mainTimer.getRemainingTime();
130 					}
131 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
132 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
133 					if (lastNextEvent == MonoTime.max) // Starting
134 					{
135 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
136 						ev_timer_start(ev_default_loop(0), &evTimer);
137 					}
138 					else // Adjusting
139 					{
140 						evTimer.repeat = tstamp;
141 						ev_timer_again(ev_default_loop(0), &evTimer);
142 					}
143 				}
144 				lastNextEvent = nextEvent;
145 			}
146 		}
147 
148 	public:
149 		/// Register a socket with the manager.
150 		void register(GenericSocket socket)
151 		{
152 			debug (ASOCKETS) writefln("Registering %s", socket);
153 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
154 			auto fd = socket.conn.handle;
155 			assert(fd, "Must have fd before socket registration");
156 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
157 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
158 			count++;
159 		}
160 
161 		/// Unregister a socket with the manager.
162 		void unregister(GenericSocket socket)
163 		{
164 			debug (ASOCKETS) writefln("Unregistering %s", socket);
165 			socket.notifyRead  = false;
166 			socket.notifyWrite = false;
167 			count--;
168 		}
169 
170 		/// Returns the number of registered sockets.
171 		size_t size()
172 		{
173 			return count;
174 		}
175 
176 		/// Loop continuously until no sockets are left.
177 		void loop()
178 		{
179 			auto evLoop = ev_default_loop(0);
180 			enforce(evLoop, "libev initialization failure");
181 
182 			updateTimer(true);
183 			debug (ASOCKETS) writeln("ev_run");
184 			ev_run(ev_default_loop(0), 0);
185 		}
186 	}
187 
188 	private mixin template SocketMixin()
189 	{
190 		private ev_io evRead, evWrite;
191 
192 		private void setWatcherState(ref ev_io ev, bool newValue, int /*event*/)
193 		{
194 			if (!conn)
195 			{
196 				// Can happen when setting delegates before connecting.
197 				return;
198 			}
199 
200 			if (newValue && !ev.data)
201 			{
202 				// Start
203 				ev.data = cast(void*)this;
204 				ev_io_start(ev_default_loop(0), &ev);
205 			}
206 			else
207 			if (!newValue && ev.data)
208 			{
209 				// Stop
210 				assert(ev.data is cast(void*)this);
211 				ev.data = null;
212 				ev_io_stop(ev_default_loop(0), &ev);
213 			}
214 		}
215 
216 		/// Interested in read notifications (onReadable)?
217 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
218 		/// Interested in write notifications (onWritable)?
219 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
220 
221 		debug ~this() @nogc
222 		{
223 			// The LIBEV SocketManager holds no references to registered sockets.
224 			// TODO: Add a doubly-linked list?
225 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
226 		}
227 	}
228 }
229 else // Use select
230 {
231 	/// `select`-based event loop implementation.
232 	struct SocketManager
233 	{
234 	private:
235 		enum FD_SETSIZE = 1024;
236 
237 		/// List of all sockets to poll.
238 		GenericSocket[] sockets;
239 
240 		/// Debug AA to check for dangling socket references.
241 		debug GenericSocket[socket_t] socketHandles;
242 
243 		void delegate()[] nextTickHandlers, idleHandlers;
244 
245 	public:
246 		/// Register a socket with the manager.
247 		void register(GenericSocket conn)
248 		{
249 			debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1);
250 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
251 			sockets ~= conn;
252 
253 			debug
254 			{
255 				auto handle = conn.socket.handle;
256 				assert(handle != socket_t.init, "Can't register a closed socket");
257 				assert(handle !in socketHandles, "This socket handle is already registered");
258 				socketHandles[handle] = conn;
259 			}
260 		}
261 
262 		/// Unregister a socket with the manager.
263 		void unregister(GenericSocket conn)
264 		{
265 			debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
266 
267 			debug
268 			{
269 				auto handle = conn.socket.handle;
270 				assert(handle != socket_t.init, "Can't unregister a closed socket");
271 				auto pconn = handle in socketHandles;
272 				assert(pconn, "This socket handle is not registered");
273 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
274 				socketHandles.remove(handle);
275 			}
276 
277 			foreach (size_t i, GenericSocket j; sockets)
278 				if (j is conn)
279 				{
280 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
281 					return;
282 				}
283 			assert(false, "Socket not registered");
284 		}
285 
286 		/// Returns the number of registered sockets.
287 		size_t size()
288 		{
289 			return sockets.length;
290 		}
291 
292 		/// Loop continuously until no sockets are left.
293 		void loop()
294 		{
295 			debug (ASOCKETS) stderr.writeln("Starting event loop.");
296 
297 			SocketSet readset, writeset;
298 			size_t sockcount;
299 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
300 			readset  = new SocketSet(setSize);
301 			writeset = new SocketSet(setSize);
302 			while (true)
303 			{
304 				if (nextTickHandlers.length)
305 				{
306 					auto thisTickHandlers = nextTickHandlers;
307 					nextTickHandlers = null;
308 
309 					foreach (handler; thisTickHandlers)
310 						handler();
311 
312 					continue;
313 				}
314 
315 				uint minSize = 0;
316 				version(Windows)
317 					minSize = cast(uint)sockets.length;
318 				else
319 				{
320 					foreach (s; sockets)
321 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
322 							minSize = s.socket.handle;
323 				}
324 				minSize++;
325 
326 				if (setSize < minSize)
327 				{
328 					debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
329 					setSize = minSize * 2;
330 					readset  = new SocketSet(setSize);
331 					writeset = new SocketSet(setSize);
332 				}
333 				else
334 				{
335 					readset.reset();
336 					writeset.reset();
337 				}
338 
339 				sockcount = 0;
340 				bool haveActive;
341 				debug (ASOCKETS) stderr.writeln("Populating sets");
342 				foreach (GenericSocket conn; sockets)
343 				{
344 					if (!conn.socket)
345 						continue;
346 					sockcount++;
347 
348 					debug (ASOCKETS) stderr.writef("\t%s:", conn);
349 					if (conn.notifyRead)
350 					{
351 						readset.add(conn.socket);
352 						if (!conn.daemonRead)
353 							haveActive = true;
354 						debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : "");
355 					}
356 					if (conn.notifyWrite)
357 					{
358 						writeset.add(conn.socket);
359 						if (!conn.daemonWrite)
360 							haveActive = true;
361 						debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : "");
362 					}
363 					debug (ASOCKETS) stderr.writeln();
364 				}
365 				debug (ASOCKETS)
366 				{
367 					stderr.writefln("Sets populated as follows:");
368 					printSets(readset, writeset);
369 				}
370 
371 				debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...",
372 					haveActive ? "" : "not ",
373 					sockcount,
374 					mainTimer.isWaiting() ? "with" : "no",
375 					idleHandlers.length,
376 				);
377 				if (!haveActive && !mainTimer.isWaiting() && !nextTickHandlers.length)
378 				{
379 					debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop.");
380 					break;
381 				}
382 
383 				debug (ASOCKETS) stderr.flush();
384 
385 				int events;
386 				if (idleHandlers.length)
387 				{
388 					if (sockcount==0)
389 						events = 0;
390 					else
391 						events = Socket.select(readset, writeset, null, 0.seconds);
392 				}
393 				else
394 				if (USE_SLEEP && sockcount==0)
395 				{
396 					version(Windows)
397 					{
398 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
399 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
400 						if (duration <= 0)
401 							duration = 1; // Avoid busywait
402 						else
403 						if (duration > int.max)
404 							duration = int.max;
405 						Sleep(cast(int)duration);
406 						events = 0;
407 					}
408 					else
409 						assert(0);
410 				}
411 				else
412 				if (mainTimer.isWaiting())
413 					events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime());
414 				else
415 					events = Socket.select(readset, writeset, null);
416 
417 				debug (ASOCKETS) stderr.writefln("%d events fired.", events);
418 
419 				if (events > 0)
420 				{
421 					// Handle just one event at a time, as the first
422 					// handler might invalidate select()'s results.
423 					handleEvent(readset, writeset);
424 				}
425 				else
426 				if (idleHandlers.length)
427 				{
428 					import ae.utils.array : shift;
429 					auto handler = idleHandlers.shift();
430 
431 					// Rotate the idle handler queue before running it,
432 					// in case the handler unregisters itself.
433 					idleHandlers ~= handler;
434 
435 					handler();
436 				}
437 
438 				// Timers may invalidate our select results, so fire them after processing the latter
439 				mainTimer.prod();
440 
441 				eventCounter++;
442 			}
443 		}
444 
445 		debug (ASOCKETS)
446 		private void printSets(SocketSet readset, SocketSet writeset)
447 		{
448 			foreach (GenericSocket conn; sockets)
449 			{
450 				if (!conn.socket)
451 					stderr.writefln("\t\t%s is unset", conn);
452 				else
453 				{
454 					if (readset.isSet(conn.socket))
455 						stderr.writefln("\t\t%s is readable", conn);
456 					if (writeset.isSet(conn.socket))
457 						stderr.writefln("\t\t%s is writable", conn);
458 				}
459 			}
460 		}
461 
462 		private void handleEvent(SocketSet readset, SocketSet writeset)
463 		{
464 			debug (ASOCKETS)
465 			{
466 				stderr.writefln("\tSelect results:");
467 				printSets(readset, writeset);
468 			}
469 
470 			foreach (GenericSocket conn; sockets)
471 			{
472 				if (!conn.socket)
473 					continue;
474 
475 				if (readset.isSet(conn.socket))
476 				{
477 					debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn);
478 					return conn.onReadable();
479 				}
480 				else
481 				if (writeset.isSet(conn.socket))
482 				{
483 					debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn);
484 					return conn.onWritable();
485 				}
486 			}
487 
488 			assert(false, "select() reported events available, but no registered sockets are set");
489 		}
490 	}
491 
492 	/// Schedule a function to run on the next event loop iteration.
493 	/// Can be used to queue logic to run once all current execution frames exit.
494 	/// Similar to e.g. process.nextTick in Node.
495 	void onNextTick(ref SocketManager socketManager, void delegate() dg) pure @safe nothrow
496 	{
497 		socketManager.nextTickHandlers ~= dg;
498 	}
499 
500 	// Use UFCS to allow removeIdleHandler to have a predicate with context
501 	/// Register a function to be called when the event loop is idle,
502 	/// and would otherwise sleep.
503 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
504 	{
505 		foreach (i, idleHandler; socketManager.idleHandlers)
506 			assert(handler !is idleHandler);
507 
508 		socketManager.idleHandlers ~= handler;
509 	}
510 
511 	/// Unregister a function previously registered with `addIdleHandler`.
512 	void removeIdleHandler(alias pred=(a, b) => a is b, Args...)(ref SocketManager socketManager, Args args)
513 	{
514 		foreach (i, idleHandler; socketManager.idleHandlers)
515 			if (pred(idleHandler, args))
516 			{
517 				import std.algorithm : remove;
518 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
519 				return;
520 			}
521 		assert(false, "No such idle handler");
522 	}
523 
524 	private mixin template SocketMixin()
525 	{
526 		/// Interested in read notifications (onReadable)?
527 		bool notifyRead;
528 		/// Interested in write notifications (onWritable)?
529 		bool notifyWrite;
530 	}
531 }
532 
533 /// The default socket manager.
534 SocketManager socketManager;
535 
536 // ***************************************************************************
537 
538 /// General methods for an asynchronous socket.
539 abstract class GenericSocket
540 {
541 	/// Declares notifyRead and notifyWrite.
542 	mixin SocketMixin;
543 
544 protected:
545 	/// The socket this class wraps.
546 	Socket conn;
547 
548 // protected:
549 	/// Retrieve the socket class this class wraps.
550 	@property final Socket socket()
551 	{
552 		return conn;
553 	}
554 
555 	void onReadable()
556 	{
557 	}
558 
559 	void onWritable()
560 	{
561 	}
562 
563 	void onError(string /*reason*/)
564 	{
565 	}
566 
567 public:
568 	/// allow getting the address of connections that are already disconnected
569 	private Address[2] cachedAddress;
570 
571 	/*private*/ final @property Address _address(bool local)()
572 	{
573 		if (cachedAddress[local] !is null)
574 			return cachedAddress[local];
575 		else
576 		if (conn is null)
577 			return null;
578 		else
579 		{
580 			Address a;
581 			if (conn.addressFamily == AddressFamily.UNSPEC)
582 			{
583 				// Socket will attempt to construct an UnknownAddress,
584 				// which will almost certainly not match the real address length.
585 				static if (local)
586 					alias getname = c_socks.getsockname;
587 				else
588 					alias getname = c_socks.getpeername;
589 
590 				c_socks.socklen_t nameLen = 0;
591 				if (getname(conn.handle, null, &nameLen) < 0)
592 					throw new SocketOSException("Unable to obtain socket address");
593 
594 				auto buf = new ubyte[nameLen];
595 				auto sa = cast(c_socks.sockaddr*)buf.ptr;
596 				if (getname(conn.handle, sa, &nameLen) < 0)
597 					throw new SocketOSException("Unable to obtain socket address");
598 				a = new UnknownAddressReference(sa, nameLen);
599 			}
600 			else
601 				a = local ? conn.localAddress() : conn.remoteAddress();
602 			return cachedAddress[local] = a;
603 		}
604 	}
605 
606 	alias localAddress = _address!true; /// Retrieve this socket's local address.
607 	alias remoteAddress = _address!false; /// Retrieve this socket's remote address.
608 
609 	/*private*/ final @property string _addressStr(bool local)() nothrow
610 	{
611 		try
612 		{
613 			auto a = _address!local;
614 			if (a is null)
615 				return "[null address]";
616 			string host = a.toAddrString();
617 			import std.string : indexOf;
618 			if (host.indexOf(':') >= 0)
619 				host = "[" ~ host ~ "]";
620 			try
621 			{
622 				string port = a.toPortString();
623 				return host ~ ":" ~ port;
624 			}
625 			catch (Exception e)
626 				return host;
627 		}
628 		catch (Exception e)
629 			return "[error: " ~ e.msg ~ "]";
630 	}
631 
632 	alias localAddressStr = _addressStr!true; /// Retrieve this socket's local address, as a string.
633 	alias remoteAddressStr = _addressStr!false; /// Retrieve this socket's remote address, as a string.
634 
635 	/// Don't block the process from exiting, even if the socket is ready to receive data.
636 	/// TODO: Not implemented with libev
637 	bool daemonRead;
638 
639 	/// Don't block the process from exiting, even if the socket is ready to send data.
640 	/// TODO: Not implemented with libev
641 	bool daemonWrite;
642 
643 	deprecated alias daemon = daemonRead;
644 
645 	/// Enable TCP keep-alive on the socket with the given settings.
646 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
647 	{
648 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
649 		if (enabled)
650 		{
651 			try
652 				conn.setKeepAlive(time, interval);
653 			catch (SocketException)
654 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
655 		}
656 		else
657 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
658 	}
659 
660 	/// Returns a string containing the class name, address, and file descriptor.
661 	override string toString() const
662 	{
663 		import std.string : format, split;
664 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
665 	}
666 }
667 
668 // ***************************************************************************
669 
670 /// Classifies the cause of the disconnect.
671 /// Can be used to decide e.g. when it makes sense to reconnect.
672 enum DisconnectType
673 {
674 	requested, /// Initiated by the application.
675 	graceful,  /// The peer gracefully closed the connection.
676 	error      /// Some abnormal network condition.
677 }
678 
679 /// Used to indicate the state of a connection throughout its lifecycle.
680 enum ConnectionState
681 {
682 	/// The initial state, or the state after a disconnect was fully processed.
683 	disconnected,
684 
685 	/// Name resolution. Currently done synchronously.
686 	resolving,
687 
688 	/// A connection attempt is in progress.
689 	connecting,
690 
691 	/// A connection is established.
692 	connected,
693 
694 	/// Disconnecting in progress. No data can be sent or received at this point.
695 	/// We are waiting for queued data to be actually sent before disconnecting.
696 	disconnecting,
697 }
698 
699 /// Returns true if this is a connection state for which disconnecting is valid.
700 /// Generally, applications should be aware of the life cycle of their sockets,
701 /// so checking the state of a connection is unnecessary (and a code smell).
702 /// However, unconditionally disconnecting some connected sockets can be useful
703 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle),
704 /// such as in response to a signal.
705 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; }
706 
707 /// Common interface for connections and adapters.
708 interface IConnection
709 {
710 	/// `send` queues data for sending in one of five queues, indexed
711 	/// by a numeric priority.
712 	/// `MAX_PRIORITY` is the highest (least urgent) priority index.
713 	/// `DEFAULT_PRIORITY` is the default priority
714 	enum MAX_PRIORITY = 4;
715 	enum DEFAULT_PRIORITY = 2; /// ditto
716 
717 	/// This is the default value for the `disconnect` `reason` string parameter.
718 	static const defaultDisconnectReason = "Software closed the connection";
719 
720 	/// Get connection state.
721 	@property ConnectionState state();
722 
723 	/// Has a connection been established?
724 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
725 
726 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
727 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
728 
729 	/// Queue Data for sending.
730 	void send(Data[] data, int priority = DEFAULT_PRIORITY);
731 
732 	/// ditto
733 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
734 	{
735 		Data[1] data;
736 		data[0] = datum;
737 		this.send(data, priority);
738 		data[] = Data.init;
739 	}
740 
741 	/// Terminate the connection.
742 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
743 
744 	/// Callback setter for when a connection has been established
745 	/// (if applicable).
746 	alias ConnectHandler = void delegate();
747 	@property void handleConnect(ConnectHandler value); /// ditto
748 
749 	/// Callback setter for when new data is read.
750 	alias ReadDataHandler = void delegate(Data data);
751 	@property void handleReadData(ReadDataHandler value); /// ditto
752 
753 	/// Callback setter for when a connection was closed.
754 	alias DisconnectHandler = void delegate(string reason, DisconnectType type);
755 	@property void handleDisconnect(DisconnectHandler value); /// ditto
756 
757 	/// Callback setter for when all queued data has been sent.
758 	alias BufferFlushedHandler = void delegate();
759 	@property void handleBufferFlushed(BufferFlushedHandler value); /// ditto
760 }
761 
762 // ***************************************************************************
763 
764 /// Implementation of `IConnection` using a socket.
765 /// Implements receiving data when readable and sending queued data
766 /// when writable.
767 class Connection : GenericSocket, IConnection
768 {
769 private:
770 	/// Blocks of data larger than this value are passed as unmanaged memory
771 	/// (in Data objects). Blocks smaller than this value will be reallocated
772 	/// on the managed heap. The disadvantage of placing large objects on the
773 	/// managed heap is false pointers; the disadvantage of using Data for
774 	/// small objects is wasted slack space due to the page size alignment
775 	/// requirement.
776 	enum UNMANAGED_THRESHOLD = 256;
777 
778 	ConnectionState _state;
779 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
780 
781 public:
782 	/// Get connection state.
783 	override @property ConnectionState state() { return _state; }
784 
785 protected:
786 	abstract sizediff_t doSend(in void[] buffer);
787 	abstract sizediff_t doReceive(void[] buffer);
788 
789 	/// The send buffers.
790 	Data[][MAX_PRIORITY+1] outQueue;
791 	/// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled).
792 	int partiallySent = -1;
793 
794 	/// Constructor used by a ServerSocket for new connections
795 	this(Socket conn)
796 	{
797 		this();
798 		this.conn = conn;
799 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
800 		if (conn)
801 			socketManager.register(this);
802 		updateFlags();
803 	}
804 
805 	final void updateFlags()
806 	{
807 		if (state == ConnectionState.connecting)
808 			notifyWrite = true;
809 		else
810 			notifyWrite = writePending;
811 
812 		notifyRead = state == ConnectionState.connected && readDataHandler;
813 		debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite);
814 	}
815 
816 	/// Called when a socket is readable.
817 	override void onReadable()
818 	{
819 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
820 		static ubyte[0x10000] inBuffer = void;
821 		auto received = doReceive(inBuffer);
822 
823 		if (received == 0)
824 			return disconnect("Connection closed", DisconnectType.graceful);
825 
826 		if (received == Socket.ERROR)
827 		{
828 		//	if (wouldHaveBlocked)
829 		//	{
830 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
831 		//		return;
832 		//	}
833 		//	else
834 				onError("recv() error: " ~ lastSocketError);
835 		}
836 		else
837 		{
838 			debug (PRINTDATA)
839 			{
840 				stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr);
841 				stderr.write(hexDump(inBuffer[0 .. received]));
842 				stderr.flush();
843 			}
844 
845 			if (state == ConnectionState.disconnecting)
846 			{
847 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this);
848 			}
849 			else
850 			if (!readDataHandler)
851 			{
852 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this);
853 			}
854 			else
855 			{
856 				// Currently, unlike the D1 version of this module,
857 				// we will always reallocate read network data.
858 				// This disfavours code which doesn't need to store
859 				// read data after processing it, but otherwise
860 				// makes things simpler and safer all around.
861 
862 				Data data;
863 				if (received < UNMANAGED_THRESHOLD)
864 				{
865 					// Copy to the managed heap
866 					data = Data(inBuffer[0 .. received].dup);
867 				}
868 				else
869 				{
870 					// Copy to unmanaged memory
871 					data = Data(inBuffer[0 .. received], true);
872 				}
873 				readDataHandler(data);
874 			}
875 		}
876 	}
877 
878 	/// Called when an error occurs on the socket.
879 	override void onError(string reason)
880 	{
881 		if (state == ConnectionState.disconnecting)
882 		{
883 			debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
884 			return close();
885 		}
886 
887 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
888 		disconnect("Socket error: " ~ reason, DisconnectType.error);
889 	}
890 
891 	this()
892 	{
893 	}
894 
895 public:
896 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
897 	/// The disconnect handler will be called immediately, even when not all data has been flushed yet.
898 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
899 	{
900 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
901 		assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state));
902 
903 		if (writePending)
904 		{
905 			if (type==DisconnectType.requested)
906 			{
907 				assert(conn, "Attempting to disconnect on an uninitialized socket");
908 				// queue disconnect after all data is sent
909 				debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason);
910 				state = ConnectionState.disconnecting;
911 				//setIdleTimeout(30.seconds);
912 				if (disconnectHandler)
913 					disconnectHandler(reason, type);
914 				updateFlags();
915 				return;
916 			}
917 			else
918 				discardQueues();
919 		}
920 
921 		debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
922 
923 		if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected)
924 			close();
925 		else
926 		{
927 			assert(conn is null, "Registered but %s socket".format(state));
928 			if (state == ConnectionState.resolving)
929 				state = ConnectionState.disconnected;
930 		}
931 
932 		if (disconnectHandler)
933 			disconnectHandler(reason, type);
934 		updateFlags();
935 	}
936 
937 	private final void close()
938 	{
939 		assert(conn, "Attempting to close an unregistered socket");
940 		socketManager.unregister(this);
941 		conn.close();
942 		conn = null;
943 		outQueue[] = null;
944 		state = ConnectionState.disconnected;
945 	}
946 
947 	/// Append data to the send buffer.
948 	void send(Data[] data, int priority = DEFAULT_PRIORITY)
949 	{
950 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
951 		outQueue[priority] ~= data;
952 		notifyWrite = true; // Fast updateFlags()
953 
954 		debug (PRINTDATA)
955 		{
956 			stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr);
957 			foreach (datum; data)
958 				if (datum.length)
959 					stderr.write(hexDump(datum.contents));
960 				else
961 					stderr.writeln("(empty Data)");
962 			stderr.flush();
963 		}
964 	}
965 
966 	/// ditto
967 	alias send = IConnection.send;
968 
969 	/// Cancel all queued `Data` packets with the given priority.
970 	/// Does not cancel any partially-sent `Data`.
971 	final void clearQueue(int priority)
972 	{
973 		if (priority == partiallySent)
974 		{
975 			assert(outQueue[priority].length > 0);
976 			outQueue[priority] = outQueue[priority][0..1];
977 		}
978 		else
979 			outQueue[priority] = null;
980 		updateFlags();
981 	}
982 
983 	/// Clears all queues, even partially sent content.
984 	private final void discardQueues()
985 	{
986 		foreach (priority; 0..MAX_PRIORITY+1)
987 			outQueue[priority] = null;
988 		partiallySent = -1;
989 		updateFlags();
990 	}
991 
992 	/// Returns true if any queues have pending data.
993 	@property
994 	final bool writePending()
995 	{
996 		foreach (queue; outQueue)
997 			if (queue.length)
998 				return true;
999 		return false;
1000 	}
1001 
1002 	/// Returns true if there are any queued `Data` which have not yet
1003 	/// begun to be sent.
1004 	final bool queuePresent(int priority = DEFAULT_PRIORITY)
1005 	{
1006 		if (priority == partiallySent)
1007 		{
1008 			assert(outQueue[priority].length > 0);
1009 			return outQueue[priority].length > 1;
1010 		}
1011 		else
1012 			return outQueue[priority].length > 0;
1013 	}
1014 
1015 	/// Returns the number of queued `Data` at the given priority.
1016 	final size_t packetsQueued(int priority = DEFAULT_PRIORITY)
1017 	{
1018 		return outQueue[priority].length;
1019 	}
1020 
1021 	/// Returns the number of queued bytes at the given priority.
1022 	final size_t bytesQueued(int priority = DEFAULT_PRIORITY)
1023 	{
1024 		size_t bytes;
1025 		foreach (datum; outQueue[priority])
1026 			bytes += datum.length;
1027 		return bytes;
1028 	}
1029 
1030 // public:
1031 	private ConnectHandler connectHandler;
1032 	/// Callback for when a connection has been established.
1033 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
1034 
1035 	private ReadDataHandler readDataHandler;
1036 	/// Callback for incoming data.
1037 	/// Data will not be received unless this handler is set.
1038 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
1039 
1040 	private DisconnectHandler disconnectHandler;
1041 	/// Callback for when a connection was closed.
1042 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
1043 
1044 	private BufferFlushedHandler bufferFlushedHandler;
1045 	/// Callback setter for when all queued data has been sent.
1046 	@property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); }
1047 }
1048 
1049 /// Implements a stream connection.
1050 /// Queued `Data` is allowed to be fragmented.
1051 class StreamConnection : Connection
1052 {
1053 protected:
1054 	this()
1055 	{
1056 		super();
1057 	}
1058 
1059 	/// Called when a socket is writable.
1060 	override void onWritable()
1061 	{
1062 		//scope(success) updateFlags();
1063 		onWritableImpl();
1064 		updateFlags();
1065 	}
1066 
1067 	// Work around scope(success) breaking debugger stack traces
1068 	final private void onWritableImpl()
1069 	{
1070 		debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state);
1071 		if (state == ConnectionState.connecting)
1072 		{
1073 			int32_t error;
1074 			conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error);
1075 			if (error)
1076 				return disconnect(formatSocketError(error), DisconnectType.error);
1077 
1078 			state = ConnectionState.connected;
1079 
1080 			//debug writefln("[%s] Connected", remoteAddressStr);
1081 			try
1082 				setKeepAlive();
1083 			catch (Exception e)
1084 				return disconnect(e.msg, DisconnectType.error);
1085 			if (connectHandler)
1086 				connectHandler();
1087 			return;
1088 		}
1089 		//debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
1090 
1091 		foreach (sendPartial; [true, false])
1092 			foreach (int priority, ref queue; outQueue)
1093 				while (queue.length && (!sendPartial || priority == partiallySent))
1094 				{
1095 					assert(partiallySent == -1 || partiallySent == priority);
1096 
1097 					auto pdata = queue.ptr; // pointer to first data
1098 
1099 					ptrdiff_t sent = 0;
1100 					if (pdata.length)
1101 					{
1102 						sent = doSend(pdata.contents);
1103 						debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length);
1104 					}
1105 					else
1106 					{
1107 						debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this);
1108 					}
1109 
1110 					if (sent == Socket.ERROR)
1111 					{
1112 						if (wouldHaveBlocked())
1113 							return;
1114 						else
1115 							return onError("send() error: " ~ lastSocketError);
1116 					}
1117 					else
1118 					if (sent < pdata.length)
1119 					{
1120 						if (sent > 0)
1121 						{
1122 							*pdata = (*pdata)[sent..pdata.length];
1123 							partiallySent = priority;
1124 						}
1125 						return;
1126 					}
1127 					else
1128 					{
1129 						assert(sent == pdata.length);
1130 						//debug writefln("[%s] Sent data:", remoteAddressStr);
1131 						//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1132 						pdata.clear();
1133 						queue = queue[1..$];
1134 						partiallySent = -1;
1135 						if (queue.length == 0)
1136 							queue = null;
1137 					}
1138 				}
1139 
1140 		// outQueue is now empty
1141 		if (bufferFlushedHandler)
1142 			bufferFlushedHandler();
1143 		if (state == ConnectionState.disconnecting)
1144 		{
1145 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1146 			close();
1147 		}
1148 	}
1149 
1150 public:
1151 	this(Socket conn)
1152 	{
1153 		super(conn);
1154 	} ///
1155 }
1156 
1157 // ***************************************************************************
1158 
1159 /// A POSIX file stream.
1160 /// Allows adding a file (e.g. stdin/stdout) to the socket manager.
1161 /// Does not dup the given file descriptor, so "disconnecting" this connection
1162 /// will close it.
1163 version (Posix)
1164 class FileConnection : StreamConnection
1165 {
1166 	this(int fileno)
1167 	{
1168 		auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC);
1169 		conn.blocking = false;
1170 		super(conn);
1171 	} ///
1172 
1173 protected:
1174 	import core.sys.posix.unistd : read, write;
1175 
1176 	override sizediff_t doSend(in void[] buffer)
1177 	{
1178 		return write(socket.handle, buffer.ptr, buffer.length);
1179 	}
1180 
1181 	override sizediff_t doReceive(void[] buffer)
1182 	{
1183 		return read(socket.handle, buffer.ptr, buffer.length);
1184 	}
1185 }
1186 
1187 /// Separates reading and writing, e.g. for stdin/stdout.
1188 class Duplex : IConnection
1189 {
1190 	///
1191 	IConnection reader, writer;
1192 
1193 	this(IConnection reader, IConnection writer)
1194 	{
1195 		this.reader = reader;
1196 		this.writer = writer;
1197 		reader.handleConnect = &onConnect;
1198 		writer.handleConnect = &onConnect;
1199 		reader.handleDisconnect = &onDisconnect;
1200 		writer.handleDisconnect = &onDisconnect;
1201 	} ///
1202 
1203 	@property ConnectionState state()
1204 	{
1205 		if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting)
1206 			return ConnectionState.disconnecting;
1207 		else
1208 			return reader.state < writer.state ? reader.state : writer.state;
1209 	} ///
1210 
1211 	/// Queue Data for sending.
1212 	void send(Data[] data, int priority)
1213 	{
1214 		writer.send(data, priority);
1215 	}
1216 
1217 	alias send = IConnection.send; /// ditto
1218 
1219 	/// Terminate the connection.
1220 	/// Note: this isn't quite fleshed out - applications may want to
1221 	/// wait and send some more data even after stdin is closed, but
1222 	/// such an interface can't be fitted into an IConnection
1223 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1224 	{
1225 		if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting)
1226 			reader.disconnect(reason, type);
1227 		if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting)
1228 			writer.disconnect(reason, type);
1229 		debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1230 	}
1231 
1232 	protected void onConnect()
1233 	{
1234 		if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected)
1235 			connectHandler();
1236 	}
1237 
1238 	protected void onDisconnect(string reason, DisconnectType type)
1239 	{
1240 		debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1241 		if (disconnectHandler)
1242 		{
1243 			disconnectHandler(reason, type);
1244 			disconnectHandler = null; // don't call it twice for the other connection
1245 		}
1246 		// It is our responsibility to disconnect the other connection
1247 		// Use DisconnectType.requested to ensure that any written data is flushed
1248 		disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested);
1249 	}
1250 
1251 	/// Callback for when a connection has been established.
1252 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1253 	private ConnectHandler connectHandler;
1254 
1255 	/// Callback setter for when new data is read.
1256 	@property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; }
1257 
1258 	/// Callback setter for when a connection was closed.
1259 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1260 	private DisconnectHandler disconnectHandler;
1261 
1262 	/// Callback setter for when all queued data has been written.
1263 	@property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; }
1264 }
1265 
1266 unittest { if (false) new Duplex(null, null); }
1267 
1268 // ***************************************************************************
1269 
1270 /// An asynchronous socket-based connection.
1271 class SocketConnection : StreamConnection
1272 {
1273 protected:
1274 	AddressInfo[] addressQueue;
1275 
1276 	this(Socket conn)
1277 	{
1278 		super(conn);
1279 	}
1280 
1281 	override sizediff_t doSend(in void[] buffer)
1282 	{
1283 		return conn.send(buffer);
1284 	}
1285 
1286 	override sizediff_t doReceive(void[] buffer)
1287 	{
1288 		return conn.receive(buffer);
1289 	}
1290 
1291 	final void tryNextAddress()
1292 	{
1293 		assert(state == ConnectionState.connecting);
1294 		auto addressInfo = addressQueue[0];
1295 		addressQueue = addressQueue[1..$];
1296 
1297 		try
1298 		{
1299 			conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
1300 			conn.blocking = false;
1301 
1302 			socketManager.register(this);
1303 			updateFlags();
1304 			debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString());
1305 			conn.connect(addressInfo.address);
1306 		}
1307 		catch (SocketException e)
1308 			return onError("Connect error: " ~ e.msg);
1309 	}
1310 
1311 	/// Called when an error occurs on the socket.
1312 	override void onError(string reason)
1313 	{
1314 		if (state == ConnectionState.connecting && addressQueue.length)
1315 		{
1316 			socketManager.unregister(this);
1317 			conn.close();
1318 			conn = null;
1319 
1320 			return tryNextAddress();
1321 		}
1322 
1323 		super.onError(reason);
1324 	}
1325 
1326 public:
1327 	/// Default constructor
1328 	this()
1329 	{
1330 		debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this);
1331 	}
1332 
1333 	/// Start establishing a connection.
1334 	final void connect(AddressInfo[] addresses)
1335 	{
1336 		assert(addresses.length, "No addresses specified");
1337 
1338 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1339 		assert(!conn);
1340 
1341 		addressQueue = addresses;
1342 		state = ConnectionState.connecting;
1343 		tryNextAddress();
1344 	}
1345 }
1346 
1347 /// An asynchronous TCP connection.
1348 class TcpConnection : SocketConnection
1349 {
1350 protected:
1351 	this(Socket conn)
1352 	{
1353 		super(conn);
1354 	}
1355 
1356 public:
1357 	/// Default constructor
1358 	this()
1359 	{
1360 		debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this);
1361 	}
1362 
1363 	///
1364 	alias connect = SocketConnection.connect; // raise overload
1365 
1366 	/// Start establishing a connection.
1367 	final void connect(string host, ushort port)
1368 	{
1369 		assert(host.length, "Empty host");
1370 		assert(port, "No port specified");
1371 
1372 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1373 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1374 
1375 		state = ConnectionState.resolving;
1376 
1377 		AddressInfo[] addressInfos;
1378 		try
1379 		{
1380 			auto addresses = getAddress(host, port);
1381 			enforce(addresses.length, "No addresses found");
1382 			debug (ASOCKETS)
1383 			{
1384 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1385 				foreach (address; addresses)
1386 					stderr.writefln("- %s", address.toString());
1387 			}
1388 
1389 			if (addresses.length > 1)
1390 			{
1391 				import std.random : randomShuffle;
1392 				randomShuffle(addresses);
1393 			}
1394 
1395 			foreach (address; addresses)
1396 				addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host);
1397 		}
1398 		catch (SocketException e)
1399 			return onError("Lookup error: " ~ e.msg);
1400 
1401 		state = ConnectionState.disconnected;
1402 		connect(addressInfos);
1403 	}
1404 }
1405 
1406 // ***************************************************************************
1407 
1408 /// An asynchronous connection server for socket-based connections.
1409 class SocketServer
1410 {
1411 protected:
1412 	/// Class that actually performs listening on a certain address family
1413 	final class Listener : GenericSocket
1414 	{
1415 		this(Socket conn)
1416 		{
1417 			debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this);
1418 			this.conn = conn;
1419 			socketManager.register(this);
1420 		}
1421 
1422 		/// Called when a socket is readable.
1423 		override void onReadable()
1424 		{
1425 			debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this);
1426 			Socket acceptSocket = conn.accept();
1427 			acceptSocket.blocking = false;
1428 			if (handleAccept)
1429 			{
1430 				auto connection = createConnection(acceptSocket);
1431 				debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr);
1432 				connection.setKeepAlive();
1433 				//assert(connection.connected);
1434 				//connection.connected = true;
1435 				acceptHandler(connection);
1436 			}
1437 			else
1438 				acceptSocket.close();
1439 		}
1440 
1441 		/// Called when a socket is writable.
1442 		override void onWritable()
1443 		{
1444 		}
1445 
1446 		/// Called when an error occurs on the socket.
1447 		override void onError(string reason)
1448 		{
1449 			close(); // call parent
1450 		}
1451 
1452 		void closeListener()
1453 		{
1454 			assert(conn);
1455 			socketManager.unregister(this);
1456 			conn.close();
1457 			conn = null;
1458 		}
1459 	}
1460 
1461 	SocketConnection createConnection(Socket socket)
1462 	{
1463 		return new SocketConnection(socket);
1464 	}
1465 
1466 	/// Whether the socket is listening.
1467 	bool listening;
1468 	/// Listener instances
1469 	Listener[] listeners;
1470 
1471 	final void updateFlags()
1472 	{
1473 		foreach (listener; listeners)
1474 			listener.notifyRead = handleAccept !is null;
1475 	}
1476 
1477 public:
1478 	/// Start listening on this socket.
1479 	final void listen(AddressInfo[] addressInfos)
1480 	{
1481 		foreach (ref addressInfo; addressInfos)
1482 		{
1483 			try
1484 			{
1485 				Socket conn = new Socket(addressInfo);
1486 				conn.blocking = false;
1487 				if (addressInfo.family == AddressFamily.INET6)
1488 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1489 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1490 
1491 				conn.bind(addressInfo.address);
1492 				conn.listen(totalCPUs * 2);
1493 
1494 				listeners ~= new Listener(conn);
1495 			}
1496 			catch (SocketException e)
1497 			{
1498 				debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1499 				debug(ASOCKETS) stderr.writeln(e.msg);
1500 			}
1501 		}
1502 
1503 		if (listeners.length==0)
1504 			throw new Exception("Unable to bind service");
1505 
1506 		listening = true;
1507 
1508 		updateFlags();
1509 	}
1510 
1511 	this()
1512 	{
1513 	} ///
1514 
1515 	/// Creates a Server with the given sockets.
1516 	/// The sockets must have already had `bind` and `listen` called on them.
1517 	this(Socket[] sockets...)
1518 	{
1519 		foreach (socket; sockets)
1520 			listeners ~= new Listener(socket);
1521 	}
1522 
1523 	/// Returns all listening addresses.
1524 	final @property Address[] localAddresses()
1525 	{
1526 		Address[] result;
1527 		foreach (listener; listeners)
1528 			result ~= listener.localAddress;
1529 		return result;
1530 	}
1531 
1532 	/// Returns `true` if the server is listening for incoming connections.
1533 	final @property bool isListening()
1534 	{
1535 		return listening;
1536 	}
1537 
1538 	/// Stop listening on this socket.
1539 	final void close()
1540 	{
1541 		foreach (listener;listeners)
1542 			listener.closeListener();
1543 		listeners = null;
1544 		listening = false;
1545 		if (handleClose)
1546 			handleClose();
1547 	}
1548 
1549 	/// Create a SocketServer using the handle passed on standard input,
1550 	/// for which `listen` had already been called. Used by
1551 	/// e.g. FastCGI and systemd sockets with "Listen = yes".
1552 	static SocketServer fromStdin()
1553 	{
1554 		socket_t socket;
1555 		version (Windows)
1556 		{
1557 			import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE;
1558 			socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE);
1559 		}
1560 		else
1561 			socket = cast(socket_t)0;
1562 
1563 		auto s = new Socket(socket, AddressFamily.UNSPEC);
1564 		s.blocking = false;
1565 		return new SocketServer(s);
1566 	}
1567 
1568 	/// Callback for when the socket was closed.
1569 	void delegate() handleClose;
1570 
1571 	private void delegate(SocketConnection incoming) acceptHandler;
1572 	/// Callback for an incoming connection.
1573 	/// Connections will not be accepted unless this handler is set.
1574 	@property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; }
1575 	/// ditto
1576 	@property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); }
1577 }
1578 
1579 /// An asynchronous TCP connection server.
1580 class TcpServer : SocketServer
1581 {
1582 protected:
1583 	override SocketConnection createConnection(Socket socket)
1584 	{
1585 		return new TcpConnection(socket);
1586 	}
1587 
1588 public:
1589 	this()
1590 	{
1591 	} ///
1592 
1593 	this(Socket[] sockets...)
1594 	{
1595 		super(sockets);
1596 	} /// Construct from the given sockets.
1597 
1598 	///
1599 	alias listen = SocketServer.listen; // raise overload
1600 
1601 	/// Start listening on this socket.
1602 	final ushort listen(ushort port, string addr = null)
1603 	{
1604 		debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port);
1605 		//assert(!listening, "Attempting to listen on a listening socket");
1606 
1607 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1608 
1609 		debug (ASOCKETS)
1610 		{
1611 			stderr.writefln("Resolved to %s addresses:", addressInfos.length);
1612 			foreach (ref addressInfo; addressInfos)
1613 				stderr.writefln("- %s", addressInfo);
1614 		}
1615 
1616 		// listen on random ports only on IPv4 for now
1617 		if (port == 0)
1618 		{
1619 			foreach_reverse (i, ref addressInfo; addressInfos)
1620 				if (addressInfo.family != AddressFamily.INET)
1621 					addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$];
1622 		}
1623 
1624 		listen(addressInfos);
1625 
1626 		foreach (listener; listeners)
1627 		{
1628 			auto address = listener.conn.localAddress();
1629 			if (address.addressFamily == AddressFamily.INET)
1630 				port = to!ushort(address.toPortString());
1631 		}
1632 
1633 		return port;
1634 	}
1635 
1636 	deprecated("Use SocketServer.fromStdin")
1637 	static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; }
1638 
1639 	/// Delegate to be called when a connection is accepted.
1640 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); }
1641 }
1642 
1643 // ***************************************************************************
1644 
1645 /// An asynchronous UDP stream.
1646 /// UDP does not have connections, so this class encapsulates a socket
1647 /// with a fixed destination (sendto) address, and optionally bound to
1648 /// a local address.
1649 /// Currently received packets' address is not exposed.
1650 class UdpConnection : Connection
1651 {
1652 protected:
1653 	this(Socket conn)
1654 	{
1655 		super(conn);
1656 	}
1657 
1658 	/// Called when a socket is writable.
1659 	override void onWritable()
1660 	{
1661 		//scope(success) updateFlags();
1662 		onWritableImpl();
1663 		updateFlags();
1664 	}
1665 
1666 	// Work around scope(success) breaking debugger stack traces
1667 	final private void onWritableImpl()
1668 	{
1669 		foreach (priority, ref queue; outQueue)
1670 			while (queue.length)
1671 			{
1672 				auto pdata = queue.ptr; // pointer to first data
1673 
1674 				auto sent = conn.sendTo(pdata.contents, remoteAddress);
1675 
1676 				if (sent == Socket.ERROR)
1677 				{
1678 					if (wouldHaveBlocked())
1679 						return;
1680 					else
1681 						return onError("send() error: " ~ lastSocketError);
1682 				}
1683 				else
1684 				if (sent < pdata.length)
1685 				{
1686 					return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length));
1687 				}
1688 				else
1689 				{
1690 					assert(sent == pdata.length);
1691 					//debug writefln("[%s] Sent data:", remoteAddressStr);
1692 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1693 					pdata.clear();
1694 					queue = queue[1..$];
1695 					if (queue.length == 0)
1696 						queue = null;
1697 				}
1698 			}
1699 
1700 		// outQueue is now empty
1701 		if (bufferFlushedHandler)
1702 			bufferFlushedHandler();
1703 		if (state == ConnectionState.disconnecting)
1704 		{
1705 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1706 			close();
1707 		}
1708 	}
1709 
1710 	override sizediff_t doSend(in void[] buffer)
1711 	{
1712 		assert(false); // never called (called only from overridden methods)
1713 	}
1714 
1715 	override sizediff_t doReceive(void[] buffer)
1716 	{
1717 		return conn.receive(buffer);
1718 	}
1719 
1720 public:
1721 	/// Default constructor
1722 	this()
1723 	{
1724 		debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this);
1725 	}
1726 
1727 	/// Initialize with the given `AddressFamily`, without binding to an address.
1728 	final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP)
1729 	{
1730 		initializeImpl(family, type, protocol);
1731 		if (connectHandler)
1732 			connectHandler();
1733 	}
1734 
1735 	private final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol)
1736 	{
1737 		assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state));
1738 		assert(!conn);
1739 
1740 		conn = new Socket(family, type, protocol);
1741 		conn.blocking = false;
1742 		socketManager.register(this);
1743 		state = ConnectionState.connected;
1744 		updateFlags();
1745 	}
1746 
1747 	/// Bind to a local address in order to receive packets sent there.
1748 	final ushort bind(string host, ushort port)
1749 	{
1750 		assert(host.length, "Empty host");
1751 
1752 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1753 
1754 		state = ConnectionState.resolving;
1755 
1756 		AddressInfo addressInfo;
1757 		try
1758 		{
1759 			auto addresses = getAddress(host, port);
1760 			enforce(addresses.length, "No addresses found");
1761 			debug (ASOCKETS)
1762 			{
1763 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1764 				foreach (address; addresses)
1765 					stderr.writefln("- %s", address.toString());
1766 			}
1767 
1768 			Address address;
1769 			if (addresses.length > 1)
1770 			{
1771 				import std.random : uniform;
1772 				address = addresses[uniform(0, $)];
1773 			}
1774 			else
1775 				address = addresses[0];
1776 			addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host);
1777 		}
1778 		catch (SocketException e)
1779 		{
1780 			onError("Lookup error: " ~ e.msg);
1781 			return 0;
1782 		}
1783 
1784 		state = ConnectionState.disconnected;
1785 		return bind(addressInfo);
1786 	}
1787 
1788 	/// ditto
1789 	final ushort bind(AddressInfo addressInfo)
1790 	{
1791 		initialize(addressInfo.family, addressInfo.type, addressInfo.protocol);
1792 		conn.bind(addressInfo.address);
1793 
1794 		auto address = conn.localAddress();
1795 		auto port = to!ushort(address.toPortString());
1796 
1797 		if (connectHandler)
1798 			connectHandler();
1799 
1800 		return port;
1801 	}
1802 
1803 // public:
1804 	/// Where to send packets to.
1805 	Address remoteAddress;
1806 }
1807 
1808 ///
1809 unittest
1810 {
1811 	auto server = new UdpConnection();
1812 	server.bind("localhost", 0);
1813 
1814 	auto client = new UdpConnection();
1815 	client.initialize(server.localAddress.addressFamily);
1816 
1817 	string[] packets = ["Hello", "there"];
1818 	client.remoteAddress = server.localAddress;
1819 	client.send({
1820 		Data[] data;
1821 		foreach (packet; packets)
1822 			data ~= Data(packet);
1823 		return data;
1824 	}());
1825 
1826 	server.handleReadData = (Data data)
1827 	{
1828 		assert(data.contents == packets[0]);
1829 		packets = packets[1..$];
1830 		if (!packets.length)
1831 		{
1832 			server.close();
1833 			client.close();
1834 		}
1835 	};
1836 	socketManager.loop();
1837 	assert(!packets.length);
1838 }
1839 
1840 // ***************************************************************************
1841 
1842 /// Base class for a connection adapter.
1843 /// By itself, does nothing.
1844 class ConnectionAdapter : IConnection
1845 {
1846 	/// The next connection in the chain (towards the raw transport).
1847 	IConnection next;
1848 
1849 	this(IConnection next)
1850 	{
1851 		this.next = next;
1852 		next.handleConnect = &onConnect;
1853 		next.handleDisconnect = &onDisconnect;
1854 		next.handleBufferFlushed = &onBufferFlushed;
1855 	} ///
1856 
1857 	@property ConnectionState state() { return next.state; } ///
1858 
1859 	/// Queue Data for sending.
1860 	void send(Data[] data, int priority)
1861 	{
1862 		next.send(data, priority);
1863 	}
1864 
1865 	alias send = IConnection.send; /// ditto
1866 
1867 	/// Terminate the connection.
1868 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1869 	{
1870 		next.disconnect(reason, type);
1871 	}
1872 
1873 	protected void onConnect()
1874 	{
1875 		if (connectHandler)
1876 			connectHandler();
1877 	}
1878 
1879 	protected void onReadData(Data data)
1880 	{
1881 		// onReadData should be fired only if readDataHandler is set
1882 		assert(readDataHandler, "onReadData caled with null readDataHandler");
1883 		readDataHandler(data);
1884 	}
1885 
1886 	protected void onDisconnect(string reason, DisconnectType type)
1887 	{
1888 		if (disconnectHandler)
1889 			disconnectHandler(reason, type);
1890 	}
1891 
1892 	protected void onBufferFlushed()
1893 	{
1894 		if (bufferFlushedHandler)
1895 			bufferFlushedHandler();
1896 	}
1897 
1898 	/// Callback for when a connection has been established.
1899 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1900 	protected ConnectHandler connectHandler;
1901 
1902 	/// Callback setter for when new data is read.
1903 	@property void handleReadData(ReadDataHandler value)
1904 	{
1905 		readDataHandler = value;
1906 		next.handleReadData = value ? &onReadData : null ;
1907 	}
1908 	protected ReadDataHandler readDataHandler;
1909 
1910 	/// Callback setter for when a connection was closed.
1911 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1912 	protected DisconnectHandler disconnectHandler;
1913 
1914 	/// Callback setter for when all queued data has been written.
1915 	@property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; }
1916 	protected BufferFlushedHandler bufferFlushedHandler;
1917 }
1918 
1919 // ***************************************************************************
1920 
1921 /// Adapter for connections with a line-based protocol.
1922 /// Splits data stream into delimiter-separated lines.
1923 class LineBufferedAdapter : ConnectionAdapter
1924 {
1925 	/// The protocol's line delimiter.
1926 	string delimiter = "\r\n";
1927 
1928 	/// Maximum line length (0 means unlimited).
1929 	size_t maxLength = 0;
1930 
1931 	this(IConnection next)
1932 	{
1933 		super(next);
1934 	} ///
1935 
1936 	/// Append a line to the send buffer.
1937 	void send(string line)
1938 	{
1939 		//super.send(Data(line ~ delimiter));
1940 		// https://issues.dlang.org/show_bug.cgi?id=13985
1941 		ConnectionAdapter ca = this;
1942 		ca.send(Data(line ~ delimiter));
1943 	}
1944 
1945 protected:
1946 	/// The receive buffer.
1947 	Data inBuffer;
1948 
1949 	/// Called when data has been received.
1950 	final override void onReadData(Data data)
1951 	{
1952 		import std.string : indexOf;
1953 		auto oldBufferLength = inBuffer.length;
1954 		if (oldBufferLength)
1955 			inBuffer ~= data;
1956 		else
1957 			inBuffer = data;
1958 
1959 		if (delimiter.length == 1)
1960 		{
1961 			import core.stdc.string : memchr;
1962 
1963 			char c = delimiter[0];
1964 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1965 			while (p)
1966 			{
1967 				sizediff_t index = p - inBuffer.ptr;
1968 				if (!processLine(index))
1969 					break;
1970 
1971 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1972 			}
1973 		}
1974 		else
1975 		{
1976 			sizediff_t index;
1977 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1978 			while ((index = indexOf(cast(string)inBuffer.contents, delimiter)) >= 0
1979 				&& processLine(index))
1980 			{}
1981 		}
1982 
1983 		if (maxLength && inBuffer.length > maxLength)
1984 			disconnect("Line too long", DisconnectType.error);
1985 	}
1986 
1987 	final bool processLine(size_t index)
1988 	{
1989 		if (maxLength && index > maxLength)
1990 		{
1991 			disconnect("Line too long", DisconnectType.error);
1992 			return false;
1993 		}
1994 		auto line = inBuffer[0..index];
1995 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1996 		super.onReadData(line);
1997 		return true;
1998 	}
1999 
2000 	override void onDisconnect(string reason, DisconnectType type)
2001 	{
2002 		super.onDisconnect(reason, type);
2003 		inBuffer.clear();
2004 	}
2005 }
2006 
2007 // ***************************************************************************
2008 
2009 /// Fires an event handler or disconnects connections
2010 /// after a period of inactivity.
2011 class TimeoutAdapter : ConnectionAdapter
2012 {
2013 	this(IConnection next)
2014 	{
2015 		debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this);
2016 		super(next);
2017 	} ///
2018 
2019 	/// Set the `Duration` indicating the period of inactivity after which to take action.
2020 	final void setIdleTimeout(Duration duration)
2021 	{
2022 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
2023 		assert(duration > Duration.zero);
2024 
2025 		// Configure idleTask
2026 		if (idleTask is null)
2027 		{
2028 			idleTask = new TimerTask(duration);
2029 			idleTask.handleTask = &onTask_Idle;
2030 		}
2031 		else
2032 		{
2033 			if (idleTask.isWaiting())
2034 				idleTask.cancel();
2035 			idleTask.delay = duration;
2036 		}
2037 
2038 		mainTimer.add(idleTask);
2039 	}
2040 
2041 	/// Manually mark this connection as non-idle, restarting the idle timer.
2042 	/// `handleNonIdle` will be called, if set.
2043 	void markNonIdle()
2044 	{
2045 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
2046 		if (handleNonIdle)
2047 			handleNonIdle();
2048 		if (idleTask && idleTask.isWaiting())
2049 			idleTask.restart();
2050 	}
2051 
2052 	/// Stop the idle timer.
2053 	void cancelIdleTimeout()
2054 	{
2055 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
2056 		assert(idleTask !is null);
2057 		assert(idleTask.isWaiting());
2058 		idleTask.cancel();
2059 	}
2060 
2061 	/// Restart the idle timer.
2062 	void resumeIdleTimeout()
2063 	{
2064 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
2065 		assert(idleTask !is null);
2066 		assert(!idleTask.isWaiting());
2067 		mainTimer.add(idleTask);
2068 	}
2069 
2070 	/// Callback for when a connection has stopped responding.
2071 	/// If unset, the connection will be disconnected.
2072 	void delegate() handleIdleTimeout;
2073 
2074 	/// Callback for when a connection is marked as non-idle
2075 	/// (when data is received).
2076 	void delegate() handleNonIdle;
2077 
2078 protected:
2079 	override void onConnect()
2080 	{
2081 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
2082 		markNonIdle();
2083 		super.onConnect();
2084 	}
2085 
2086 	override void onReadData(Data data)
2087 	{
2088 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
2089 		markNonIdle();
2090 		super.onReadData(data);
2091 	}
2092 
2093 	override void onDisconnect(string reason, DisconnectType type)
2094 	{
2095 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
2096 		if (idleTask && idleTask.isWaiting())
2097 			idleTask.cancel();
2098 		super.onDisconnect(reason, type);
2099 	}
2100 
2101 private:
2102 	TimerTask idleTask; // non-null if an idle timeout has been set
2103 
2104 	final void onTask_Idle(Timer /*timer*/, TimerTask /*task*/)
2105 	{
2106 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this);
2107 		if (state == ConnectionState.disconnecting)
2108 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
2109 
2110 		if (state == ConnectionState.disconnected)
2111 			return;
2112 
2113 		if (handleIdleTimeout)
2114 		{
2115 			resumeIdleTimeout(); // reschedule (by default)
2116 			handleIdleTimeout();
2117 		}
2118 		else
2119 			disconnect("Time-out", DisconnectType.error);
2120 	}
2121 }
2122 
2123 // ***************************************************************************
2124 
2125 unittest
2126 {
2127 	void testTimer()
2128 	{
2129 		bool fired;
2130 		setTimeout({fired = true;}, 10.msecs);
2131 		socketManager.loop();
2132 		assert(fired);
2133 	}
2134 
2135 	testTimer();
2136 }