1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <vladimir@thecybershadow.net>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.math;
21 public import ae.sys.data;
22 
23 import core.stdc.stdint : int32_t;
24 
25 import std.exception;
26 import std.socket;
27 import std.string : format;
28 public import std.socket : Address, AddressInfo, Socket;
29 
30 version (Windows)
31     private import c_socks = core.sys.windows.winsock2;
32 else version (Posix)
33     private import c_socks = core.sys.posix.sys.socket;
34 
35 debug(ASOCKETS) import std.stdio : stderr;
36 debug(PRINTDATA) static import std.stdio;
37 debug(PRINTDATA) import ae.utils.text : hexDump;
38 private import std.conv : to;
39 
40 
41 // http://d.puremagic.com/issues/show_bug.cgi?id=7016
42 static import ae.utils.array;
43 
44 version(LIBEV)
45 {
46 	import deimos.ev;
47 	pragma(lib, "ev");
48 }
49 
50 version(Windows)
51 {
52 	import core.sys.windows.windows : Sleep;
53 	enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
54 }
55 else
56 	enum USE_SLEEP = false;
57 
58 /// Flags that determine socket wake-up events.
59 int eventCounter;
60 
61 version(LIBEV)
62 {
63 	// Watchers are a GenericSocket field (as declared in SocketMixin).
64 	// Use one watcher per read and write event.
65 	// Start/stop those as higher-level code declares interest in those events.
66 	// Use the "data" ev_io field to store the parent GenericSocket address.
67 	// Also use the "data" field as a flag to indicate whether the watcher is active
68 	// (data is null when the watcher is stopped).
69 
70 	struct SocketManager
71 	{
72 	private:
73 		size_t count;
74 
75 		extern(C)
76 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
77 		{
78 			eventCounter++;
79 			auto socket = cast(GenericSocket)w.data;
80 			assert(socket, "libev event fired on stopped watcher");
81 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents);
82 
83 			if (revents & EV_READ)
84 				socket.onReadable();
85 			else
86 			if (revents & EV_WRITE)
87 				socket.onWritable();
88 			else
89 				assert(false, "Unknown event fired from libev");
90 
91 			// TODO? Need to get proper SocketManager instance to call updateTimer on
92 			socketManager.updateTimer(false);
93 		}
94 
95 		ev_timer evTimer;
96 		MonoTime lastNextEvent = MonoTime.max;
97 
98 		extern(C)
99 		static void timerCallback(ev_loop_t* l, ev_timer* w, int revents)
100 		{
101 			eventCounter++;
102 			debug (ASOCKETS) writefln("Timer callback called.");
103 			mainTimer.prod();
104 
105 			socketManager.updateTimer(true);
106 			debug (ASOCKETS) writefln("Timer callback exiting.");
107 		}
108 
109 		void updateTimer(bool force)
110 		{
111 			auto nextEvent = mainTimer.getNextEvent();
112 			if (force || lastNextEvent != nextEvent)
113 			{
114 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
115 				if (nextEvent == MonoTime.max) // Stopping
116 				{
117 					if (lastNextEvent != MonoTime.max)
118 						ev_timer_stop(ev_default_loop(0), &evTimer);
119 				}
120 				else
121 				{
122 					auto remaining = mainTimer.getRemainingTime();
123 					while (remaining <= Duration.zero)
124 					{
125 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
126 						mainTimer.prod();
127 						remaining = mainTimer.getRemainingTime();
128 					}
129 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
130 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
131 					if (lastNextEvent == MonoTime.max) // Starting
132 					{
133 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
134 						ev_timer_start(ev_default_loop(0), &evTimer);
135 					}
136 					else // Adjusting
137 					{
138 						evTimer.repeat = tstamp;
139 						ev_timer_again(ev_default_loop(0), &evTimer);
140 					}
141 				}
142 				lastNextEvent = nextEvent;
143 			}
144 		}
145 
146 	public:
147 		/// Register a socket with the manager.
148 		void register(GenericSocket socket)
149 		{
150 			debug (ASOCKETS) writefln("Registering %s", socket);
151 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
152 			auto fd = socket.conn.handle;
153 			assert(fd, "Must have fd before socket registration");
154 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
155 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
156 			count++;
157 		}
158 
159 		/// Unregister a socket with the manager.
160 		void unregister(GenericSocket socket)
161 		{
162 			debug (ASOCKETS) writefln("Unregistering %s", socket);
163 			socket.notifyRead  = false;
164 			socket.notifyWrite = false;
165 			count--;
166 		}
167 
168 		size_t size()
169 		{
170 			return count;
171 		}
172 
173 		/// Loop continuously until no sockets are left.
174 		void loop()
175 		{
176 			auto evLoop = ev_default_loop(0);
177 			enforce(evLoop, "libev initialization failure");
178 
179 			updateTimer(true);
180 			debug (ASOCKETS) writeln("ev_run");
181 			ev_run(ev_default_loop(0), 0);
182 		}
183 	}
184 
185 	private mixin template SocketMixin()
186 	{
187 		private ev_io evRead, evWrite;
188 
189 		private void setWatcherState(ref ev_io ev, bool newValue, int event)
190 		{
191 			if (!conn)
192 			{
193 				// Can happen when setting delegates before connecting.
194 				return;
195 			}
196 
197 			if (newValue && !ev.data)
198 			{
199 				// Start
200 				ev.data = cast(void*)this;
201 				ev_io_start(ev_default_loop(0), &ev);
202 			}
203 			else
204 			if (!newValue && ev.data)
205 			{
206 				// Stop
207 				assert(ev.data is cast(void*)this);
208 				ev.data = null;
209 				ev_io_stop(ev_default_loop(0), &ev);
210 			}
211 		}
212 
213 		/// Interested in read notifications (onReadable)?
214 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
215 		/// Interested in write notifications (onWritable)?
216 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
217 
218 		debug ~this()
219 		{
220 			// The LIBEV SocketManager holds no references to registered sockets.
221 			// TODO: Add a doubly-linked list?
222 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
223 		}
224 	}
225 }
226 else // Use select
227 {
228 	struct SocketManager
229 	{
230 	private:
231 		enum FD_SETSIZE = 1024;
232 
233 		/// List of all sockets to poll.
234 		GenericSocket[] sockets;
235 
236 		/// Debug AA to check for dangling socket references.
237 		debug GenericSocket[socket_t] socketHandles;
238 
239 		void delegate()[] idleHandlers;
240 
241 	public:
242 		/// Register a socket with the manager.
243 		void register(GenericSocket conn)
244 		{
245 			debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1);
246 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
247 			sockets ~= conn;
248 
249 			debug
250 			{
251 				auto handle = conn.socket.handle;
252 				assert(handle != socket_t.init, "Can't register a closed socket");
253 				assert(handle !in socketHandles, "This socket handle is already registered");
254 				socketHandles[handle] = conn;
255 			}
256 		}
257 
258 		/// Unregister a socket with the manager.
259 		void unregister(GenericSocket conn)
260 		{
261 			debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
262 
263 			debug
264 			{
265 				auto handle = conn.socket.handle;
266 				assert(handle != socket_t.init, "Can't unregister a closed socket");
267 				auto pconn = handle in socketHandles;
268 				assert(pconn, "This socket handle is not registered");
269 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
270 				socketHandles.remove(handle);
271 			}
272 
273 			foreach (size_t i, GenericSocket j; sockets)
274 				if (j is conn)
275 				{
276 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
277 					return;
278 				}
279 			assert(false, "Socket not registered");
280 		}
281 
282 		size_t size()
283 		{
284 			return sockets.length;
285 		}
286 
287 		/// Loop continuously until no sockets are left.
288 		void loop()
289 		{
290 			debug (ASOCKETS) stderr.writeln("Starting event loop.");
291 
292 			SocketSet readset, writeset, errorset;
293 			size_t sockcount;
294 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
295 			readset  = new SocketSet(setSize);
296 			writeset = new SocketSet(setSize);
297 			errorset = new SocketSet(setSize);
298 			while (true)
299 			{
300 				uint minSize = 0;
301 				version(Windows)
302 					minSize = cast(uint)sockets.length;
303 				else
304 				{
305 					foreach (s; sockets)
306 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
307 							minSize = s.socket.handle;
308 				}
309 				minSize++;
310 
311 				if (setSize < minSize)
312 				{
313 					debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
314 					setSize = minSize * 2;
315 					readset  = new SocketSet(setSize);
316 					writeset = new SocketSet(setSize);
317 					errorset = new SocketSet(setSize);
318 				}
319 				else
320 				{
321 					readset.reset();
322 					writeset.reset();
323 					errorset.reset();
324 				}
325 
326 				sockcount = 0;
327 				bool haveActive;
328 				debug (ASOCKETS) stderr.writeln("Populating sets");
329 				foreach (GenericSocket conn; sockets)
330 				{
331 					if (!conn.socket)
332 						continue;
333 					sockcount++;
334 					if (!conn.daemon)
335 						haveActive = true;
336 
337 					debug (ASOCKETS) stderr.writef("\t%s:", conn);
338 					if (conn.notifyRead)
339 					{
340 						readset.add(conn.socket);
341 						debug (ASOCKETS) stderr.write(" READ");
342 					}
343 					if (conn.notifyWrite)
344 					{
345 						writeset.add(conn.socket);
346 						debug (ASOCKETS) stderr.write(" WRITE");
347 					}
348 					errorset.add(conn.socket);
349 					debug (ASOCKETS) stderr.writeln();
350 				}
351 				debug (ASOCKETS)
352 				{
353 					stderr.writefln("Sets populated as follows:");
354 					printSets(readset, writeset, errorset);
355 				}
356 
357 				debug (ASOCKETS) stderr.writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...",
358 					sockcount,
359 					mainTimer.isWaiting() ? "with" : "no",
360 					idleHandlers.length,
361 				);
362 				if (!haveActive && !mainTimer.isWaiting())
363 				{
364 					debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop.");
365 					break;
366 				}
367 
368 				debug (ASOCKETS) stderr.flush();
369 
370 				int events;
371 				if (idleHandlers.length)
372 				{
373 					if (sockcount==0)
374 						events = 0;
375 					else
376 						events = Socket.select(readset, writeset, errorset, 0.seconds);
377 				}
378 				else
379 				if (USE_SLEEP && sockcount==0)
380 				{
381 					version(Windows)
382 					{
383 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
384 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
385 						if (duration <= 0)
386 							duration = 1; // Avoid busywait
387 						else
388 						if (duration > int.max)
389 							duration = int.max;
390 						Sleep(cast(int)duration);
391 						events = 0;
392 					}
393 					else
394 						assert(0);
395 				}
396 				else
397 				if (mainTimer.isWaiting())
398 					events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime());
399 				else
400 					events = Socket.select(readset, writeset, errorset);
401 
402 				debug (ASOCKETS) stderr.writefln("%d events fired.", events);
403 
404 				if (events > 0)
405 				{
406 					// Handle just one event at a time, as the first
407 					// handler might invalidate select()'s results.
408 					handleEvent(readset, writeset, errorset);
409 				}
410 				else
411 				if (idleHandlers.length)
412 				{
413 					import ae.utils.array;
414 					auto handler = idleHandlers.shift();
415 
416 					// Rotate the idle handler queue before running it,
417 					// in case the handler unregisters itself.
418 					idleHandlers ~= handler;
419 
420 					handler();
421 				}
422 
423 				// Timers may invalidate our select results, so fire them after processing the latter
424 				mainTimer.prod();
425 
426 				eventCounter++;
427 			}
428 		}
429 
430 		debug (ASOCKETS)
431 		void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset)
432 		{
433 			foreach (GenericSocket conn; sockets)
434 			{
435 				if (!conn.socket)
436 					stderr.writefln("\t\t%s is unset", conn);
437 				else
438 				{
439 					if (readset.isSet(conn.socket))
440 						stderr.writefln("\t\t%s is readable", conn);
441 					if (writeset.isSet(conn.socket))
442 						stderr.writefln("\t\t%s is writable", conn);
443 					if (errorset.isSet(conn.socket))
444 						stderr.writefln("\t\t%s is errored", conn);
445 				}
446 			}
447 		}
448 
449 		void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset)
450 		{
451 			debug (ASOCKETS)
452 			{
453 				stderr.writefln("\tSelect results:");
454 				printSets(readset, writeset, errorset);
455 			}
456 
457 			foreach (GenericSocket conn; sockets)
458 			{
459 				if (!conn.socket)
460 					continue;
461 
462 				if (readset.isSet(conn.socket))
463 				{
464 					debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn);
465 					return conn.onReadable();
466 				}
467 				else
468 				if (writeset.isSet(conn.socket))
469 				{
470 					debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn);
471 					return conn.onWritable();
472 				}
473 				else
474 				if (errorset.isSet(conn.socket))
475 				{
476 					debug (ASOCKETS) stderr.writefln("\t%s - calling onError", conn);
477 					return conn.onError("select() error: " ~ conn.socket.getErrorText());
478 				}
479 			}
480 
481 			assert(false, "select() reported events available, but no registered sockets are set");
482 		}
483 	}
484 
485 	// Use UFCS to allow removeIdleHandler to have a predicate with context
486 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
487 	{
488 		foreach (i, idleHandler; socketManager.idleHandlers)
489 			assert(handler !is idleHandler);
490 
491 		socketManager.idleHandlers ~= handler;
492 	}
493 
494 	static bool isFun(T)(T a, T b) { return a is b; }
495 	void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args)
496 	{
497 		foreach (i, idleHandler; socketManager.idleHandlers)
498 			if (pred(idleHandler, args))
499 			{
500 				import std.algorithm;
501 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
502 				return;
503 			}
504 		assert(false, "No such idle handler");
505 	}
506 
507 	private mixin template SocketMixin()
508 	{
509 		/// Interested in read notifications (onReadable)?
510 		bool notifyRead;
511 		/// Interested in write notifications (onWritable)?
512 		bool notifyWrite;
513 	}
514 }
515 
516 /// The default socket manager.
517 SocketManager socketManager;
518 
519 // ***************************************************************************
520 
521 /// General methods for an asynchronous socket.
522 abstract class GenericSocket
523 {
524 	/// Declares notifyRead and notifyWrite.
525 	mixin SocketMixin;
526 
527 protected:
528 	/// The socket this class wraps.
529 	Socket conn;
530 
531 protected:
532 	/// Retrieve the socket class this class wraps.
533 	@property final Socket socket()
534 	{
535 		return conn;
536 	}
537 
538 	void onReadable()
539 	{
540 	}
541 
542 	void onWritable()
543 	{
544 	}
545 
546 	void onError(string reason)
547 	{
548 	}
549 
550 public:
551 	/// allow getting the address of connections that are already disconnected
552 	private Address[2] cachedAddress;
553 
554 	/*private*/ final @property Address address(bool local)()
555 	{
556 		if (cachedAddress[local] !is null)
557 			return cachedAddress[local];
558 		else
559 		if (conn is null)
560 			return null;
561 		else
562 		{
563 			Address a;
564 			if (conn.addressFamily == AddressFamily.UNSPEC)
565 			{
566 				// Socket will attempt to construct an UnknownAddress,
567 				// which will almost certainly not match the real address length.
568 				static if (local)
569 					alias getname = c_socks.getsockname;
570 				else
571 					alias getname = c_socks.getpeername;
572 
573 				c_socks.socklen_t nameLen = 0;
574 				if (getname(conn.handle, null, &nameLen) < 0)
575 					throw new SocketOSException("Unable to obtain socket address");
576 
577 				auto buf = new ubyte[nameLen];
578 				auto sa = cast(c_socks.sockaddr*)buf.ptr;
579 				if (getname(conn.handle, sa, &nameLen) < 0)
580 					throw new SocketOSException("Unable to obtain socket address");
581 				a = new UnknownAddressReference(sa, nameLen);
582 			}
583 			else
584 				a = local ? conn.localAddress() : conn.remoteAddress();
585 			return cachedAddress[local] = a;
586 		}
587 	}
588 
589 	alias localAddress = address!true;
590 	alias remoteAddress = address!false;
591 
592 	/*private*/ final @property string addressStr(bool local)() nothrow
593 	{
594 		try
595 		{
596 			auto a = address!local;
597 			if (a is null)
598 				return "[null address]";
599 			string host = a.toAddrString();
600 			import std.string : indexOf;
601 			if (host.indexOf(':') >= 0)
602 				host = "[" ~ host ~ "]";
603 			try
604 			{
605 				string port = a.toPortString();
606 				return host ~ ":" ~ port;
607 			}
608 			catch (Exception e)
609 				return host;
610 		}
611 		catch (Exception e)
612 			return "[error: " ~ e.msg ~ "]";
613 	}
614 
615 	alias localAddressStr = addressStr!true;
616 	alias remoteAddressStr = addressStr!false;
617 
618 	/// Don't block the process from exiting.
619 	/// TODO: Not implemented with libev
620 	bool daemon;
621 
622 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
623 	{
624 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
625 		if (enabled)
626 		{
627 			try
628 				conn.setKeepAlive(time, interval);
629 			catch (SocketException)
630 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
631 		}
632 		else
633 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
634 	}
635 
636 	override string toString() const
637 	{
638 		import std.string;
639 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
640 	}
641 }
642 
643 // ***************************************************************************
644 
645 enum DisconnectType
646 {
647 	requested, // initiated by the application
648 	graceful,  // peer gracefully closed the connection
649 	error      // abnormal network condition
650 }
651 
652 enum ConnectionState
653 {
654 	/// The initial state, or the state after a disconnect was fully processed.
655 	disconnected,
656 
657 	/// Name resolution. Currently done synchronously.
658 	resolving,
659 
660 	/// A connection attempt is in progress.
661 	connecting,
662 
663 	/// A connection is established.
664 	connected,
665 
666 	/// Disconnecting in progress. No data can be sent or received at this point.
667 	/// We are waiting for queued data to be actually sent before disconnecting.
668 	disconnecting,
669 }
670 
671 /// Returns true if this is a connection state for which disconnecting is valid.
672 /// Generally, applications should be aware of the life cycle of their sockets,
673 /// so checking the state of a connection is unnecessary (and a code smell).
674 /// However, unconditionally disconnecting some connected sockets can be useful
675 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle),
676 /// such as in response to a signal.
677 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; }
678 
679 /// Common interface for connections and adapters.
680 interface IConnection
681 {
682 	enum MAX_PRIORITY = 4;
683 	enum DEFAULT_PRIORITY = 2;
684 
685 	static const defaultDisconnectReason = "Software closed the connection";
686 
687 	/// Get connection state.
688 	@property ConnectionState state();
689 
690 	/// Has a connection been established?
691 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
692 
693 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
694 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
695 
696 	/// Queue Data for sending.
697 	void send(Data[] data, int priority = DEFAULT_PRIORITY);
698 
699 	/// ditto
700 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
701 	{
702 		Data[1] data;
703 		data[0] = datum;
704 		this.send(data, priority);
705 		data[] = Data.init;
706 	}
707 
708 	/// Terminate the connection.
709 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
710 
711 	/// Callback setter for when a connection has been established
712 	/// (if applicable).
713 	alias void delegate() ConnectHandler;
714 	@property void handleConnect(ConnectHandler value); /// ditto
715 
716 	/// Callback setter for when new data is read.
717 	alias void delegate(Data data) ReadDataHandler;
718 	@property void handleReadData(ReadDataHandler value); /// ditto
719 
720 	/// Callback setter for when a connection was closed.
721 	alias void delegate(string reason, DisconnectType type) DisconnectHandler;
722 	@property void handleDisconnect(DisconnectHandler value); /// ditto
723 
724 	/// Callback setter for when all queued data has been sent.
725 	alias void delegate() BufferFlushedHandler;
726 	@property void handleBufferFlushed(BufferFlushedHandler value); /// ditto
727 }
728 
729 // ***************************************************************************
730 
731 class Connection : GenericSocket, IConnection
732 {
733 private:
734 	/// Blocks of data larger than this value are passed as unmanaged memory
735 	/// (in Data objects). Blocks smaller than this value will be reallocated
736 	/// on the managed heap. The disadvantage of placing large objects on the
737 	/// managed heap is false pointers; the disadvantage of using Data for
738 	/// small objects is wasted slack space due to the page size alignment
739 	/// requirement.
740 	enum UNMANAGED_THRESHOLD = 256;
741 
742 	ConnectionState _state;
743 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
744 
745 public:
746 	/// Get connection state.
747 	override @property ConnectionState state() { return _state; }
748 
749 protected:
750 	abstract sizediff_t doSend(in void[] buffer);
751 	abstract sizediff_t doReceive(void[] buffer);
752 
753 	/// The send buffers.
754 	Data[][MAX_PRIORITY+1] outQueue;
755 	/// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled).
756 	int partiallySent = -1;
757 
758 	/// Constructor used by a ServerSocket for new connections
759 	this(Socket conn)
760 	{
761 		this();
762 		this.conn = conn;
763 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
764 		if (conn)
765 			socketManager.register(this);
766 		updateFlags();
767 	}
768 
769 	final void updateFlags()
770 	{
771 		if (state == ConnectionState.connecting)
772 			notifyWrite = true;
773 		else
774 			notifyWrite = writePending;
775 
776 		notifyRead = state == ConnectionState.connected && readDataHandler;
777 		debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite);
778 	}
779 
780 	/// Called when a socket is readable.
781 	override void onReadable()
782 	{
783 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
784 		static ubyte[0x10000] inBuffer = void;
785 		auto received = doReceive(inBuffer);
786 
787 		if (received == 0)
788 			return disconnect("Connection closed", DisconnectType.graceful);
789 
790 		if (received == Socket.ERROR)
791 		{
792 		//	if (wouldHaveBlocked)
793 		//	{
794 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
795 		//		return;
796 		//	}
797 		//	else
798 				onError("recv() error: " ~ lastSocketError);
799 		}
800 		else
801 		{
802 			debug (PRINTDATA)
803 			{
804 				std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress);
805 				std.stdio.write(hexDump(inBuffer[0 .. received]));
806 				std.stdio.stdout.flush();
807 			}
808 
809 			if (state == ConnectionState.disconnecting)
810 			{
811 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this);
812 			}
813 			else
814 			if (!readDataHandler)
815 			{
816 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this);
817 			}
818 			else
819 			{
820 				// Currently, unlike the D1 version of this module,
821 				// we will always reallocate read network data.
822 				// This disfavours code which doesn't need to store
823 				// read data after processing it, but otherwise
824 				// makes things simpler and safer all around.
825 
826 				if (received < UNMANAGED_THRESHOLD)
827 				{
828 					// Copy to the managed heap
829 					readDataHandler(Data(inBuffer[0 .. received].dup));
830 				}
831 				else
832 				{
833 					// Copy to unmanaged memory
834 					readDataHandler(Data(inBuffer[0 .. received], true));
835 				}
836 			}
837 		}
838 	}
839 
840 	/// Called when an error occurs on the socket.
841 	override void onError(string reason)
842 	{
843 		if (state == ConnectionState.disconnecting)
844 		{
845 			debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
846 			return close();
847 		}
848 
849 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
850 		disconnect("Socket error: " ~ reason, DisconnectType.error);
851 	}
852 
853 	this()
854 	{
855 	}
856 
857 public:
858 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
859 	/// The disconnect handler will be called immediately, even when not all data has been flushed yet.
860 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
861 	{
862 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
863 		assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state));
864 
865 		if (writePending)
866 		{
867 			if (type==DisconnectType.requested)
868 			{
869 				assert(conn, "Attempting to disconnect on an uninitialized socket");
870 				// queue disconnect after all data is sent
871 				debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason);
872 				state = ConnectionState.disconnecting;
873 				//setIdleTimeout(30.seconds);
874 				if (disconnectHandler)
875 					disconnectHandler(reason, type);
876 				updateFlags();
877 				return;
878 			}
879 			else
880 				discardQueues();
881 		}
882 
883 		debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
884 
885 		if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected)
886 			close();
887 		else
888 		{
889 			assert(conn is null, "Registered but %s socket".format(state));
890 			if (state == ConnectionState.resolving)
891 				state = ConnectionState.disconnected;
892 		}
893 
894 		if (disconnectHandler)
895 			disconnectHandler(reason, type);
896 		updateFlags();
897 	}
898 
899 	private final void close()
900 	{
901 		assert(conn, "Attempting to close an unregistered socket");
902 		socketManager.unregister(this);
903 		conn.close();
904 		conn = null;
905 		outQueue[] = null;
906 		state = ConnectionState.disconnected;
907 	}
908 
909 	/// Append data to the send buffer.
910 	void send(Data[] data, int priority = DEFAULT_PRIORITY)
911 	{
912 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
913 		outQueue[priority] ~= data;
914 		notifyWrite = true; // Fast updateFlags()
915 
916 		debug (PRINTDATA)
917 		{
918 			std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress);
919 			foreach (datum; data)
920 				if (datum.length)
921 					std.stdio.write(hexDump(datum.contents));
922 				else
923 					std.stdio.writeln("(empty Data)");
924 			std.stdio.stdout.flush();
925 		}
926 	}
927 
928 	/// ditto
929 	alias send = IConnection.send;
930 
931 	final void clearQueue(int priority)
932 	{
933 		if (priority == partiallySent)
934 		{
935 			assert(outQueue[priority].length > 0);
936 			outQueue[priority] = outQueue[priority][0..1];
937 		}
938 		else
939 			outQueue[priority] = null;
940 		updateFlags();
941 	}
942 
943 	/// Clears all queues, even partially sent content.
944 	private final void discardQueues()
945 	{
946 		foreach (priority; 0..MAX_PRIORITY+1)
947 			outQueue[priority] = null;
948 		partiallySent = -1;
949 		updateFlags();
950 	}
951 
952 	@property
953 	final bool writePending()
954 	{
955 		foreach (queue; outQueue)
956 			if (queue.length)
957 				return true;
958 		return false;
959 	}
960 
961 	final bool queuePresent(int priority = DEFAULT_PRIORITY)
962 	{
963 		if (priority == partiallySent)
964 		{
965 			assert(outQueue[priority].length > 0);
966 			return outQueue[priority].length > 1;
967 		}
968 		else
969 			return outQueue[priority].length > 0;
970 	}
971 
972 	final size_t packetsQueued(int priority = DEFAULT_PRIORITY)
973 	{
974 		return outQueue[priority].length;
975 	}
976 
977 	final size_t bytesQueued(int priority = DEFAULT_PRIORITY)
978 	{
979 		size_t bytes;
980 		foreach (datum; outQueue[priority])
981 			bytes += datum.length;
982 		return bytes;
983 	}
984 
985 public:
986 	private ConnectHandler connectHandler;
987 	/// Callback for when a connection has been established.
988 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
989 
990 	private ReadDataHandler readDataHandler;
991 	/// Callback for incoming data.
992 	/// Data will not be received unless this handler is set.
993 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
994 
995 	private DisconnectHandler disconnectHandler;
996 	/// Callback for when a connection was closed.
997 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
998 
999 	/// Callback setter for when all queued data has been sent.
1000 	private BufferFlushedHandler bufferFlushedHandler;
1001 	@property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); }
1002 }
1003 
1004 class StreamConnection : Connection
1005 {
1006 protected:
1007 	this()
1008 	{
1009 		super();
1010 	}
1011 
1012 	/// Called when a socket is writable.
1013 	override void onWritable()
1014 	{
1015 		//scope(success) updateFlags();
1016 		onWritableImpl();
1017 		updateFlags();
1018 	}
1019 
1020 	// Work around scope(success) breaking debugger stack traces
1021 	final private void onWritableImpl()
1022 	{
1023 		debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state);
1024 		if (state == ConnectionState.connecting)
1025 		{
1026 			int32_t error;
1027 			conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error);
1028 			if (error)
1029 				return disconnect(formatSocketError(error), DisconnectType.error);
1030 
1031 			state = ConnectionState.connected;
1032 
1033 			//debug writefln("[%s] Connected", remoteAddress);
1034 			try
1035 				setKeepAlive();
1036 			catch (Exception e)
1037 				return disconnect(e.msg, DisconnectType.error);
1038 			if (connectHandler)
1039 				connectHandler();
1040 			return;
1041 		}
1042 		//debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
1043 
1044 		foreach (sendPartial; [true, false])
1045 			foreach (int priority, ref queue; outQueue)
1046 				while (queue.length && (!sendPartial || priority == partiallySent))
1047 				{
1048 					assert(partiallySent == -1 || partiallySent == priority);
1049 
1050 					auto pdata = queue.ptr; // pointer to first data
1051 
1052 					ptrdiff_t sent = 0;
1053 					if (pdata.length)
1054 					{
1055 						sent = doSend(pdata.contents);
1056 						debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length);
1057 					}
1058 					else
1059 					{
1060 						debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this);
1061 					}
1062 
1063 					if (sent == Socket.ERROR)
1064 					{
1065 						if (wouldHaveBlocked())
1066 							return;
1067 						else
1068 							return onError("send() error: " ~ lastSocketError);
1069 					}
1070 					else
1071 					if (sent < pdata.length)
1072 					{
1073 						if (sent > 0)
1074 						{
1075 							*pdata = (*pdata)[sent..pdata.length];
1076 							partiallySent = priority;
1077 						}
1078 						return;
1079 					}
1080 					else
1081 					{
1082 						assert(sent == pdata.length);
1083 						//debug writefln("[%s] Sent data:", remoteAddress);
1084 						//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1085 						pdata.clear();
1086 						queue = queue[1..$];
1087 						partiallySent = -1;
1088 						if (queue.length == 0)
1089 							queue = null;
1090 					}
1091 				}
1092 
1093 		// outQueue is now empty
1094 		if (bufferFlushedHandler)
1095 			bufferFlushedHandler();
1096 		if (state == ConnectionState.disconnecting)
1097 		{
1098 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1099 			close();
1100 		}
1101 	}
1102 
1103 public:
1104 	this(Socket conn)
1105 	{
1106 		super(conn);
1107 	}
1108 }
1109 
1110 // ***************************************************************************
1111 
1112 /// A POSIX file stream.
1113 /// Allows adding a file (e.g. stdin/stdout) to the socket manager.
1114 /// Does not dup the given file descriptor, so "disconnecting" this connection
1115 /// will close it.
1116 version (Posix)
1117 class FileConnection : StreamConnection
1118 {
1119 	this(int fileno)
1120 	{
1121 		auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC);
1122 		conn.blocking = false;
1123 		super(conn);
1124 	}
1125 
1126 protected:
1127 	import core.sys.posix.unistd : read, write;
1128 
1129 	override sizediff_t doSend(in void[] buffer)
1130 	{
1131 		return write(socket.handle, buffer.ptr, buffer.length);
1132 	}
1133 
1134 	override sizediff_t doReceive(void[] buffer)
1135 	{
1136 		return read(socket.handle, buffer.ptr, buffer.length);
1137 	}
1138 }
1139 
1140 /// Separates reading and writing, e.g. for stdin/stdout.
1141 class Duplex : IConnection
1142 {
1143 	IConnection reader, writer;
1144 
1145 	this(IConnection reader, IConnection writer)
1146 	{
1147 		this.reader = reader;
1148 		this.writer = writer;
1149 		reader.handleConnect = &onConnect;
1150 		writer.handleConnect = &onConnect;
1151 		reader.handleDisconnect = &onDisconnect;
1152 		writer.handleDisconnect = &onDisconnect;
1153 	}
1154 
1155 	@property ConnectionState state()
1156 	{
1157 		if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting)
1158 			return ConnectionState.disconnecting;
1159 		else
1160 			return reader.state < writer.state ? reader.state : writer.state;
1161 	}
1162 
1163 	/// Queue Data for sending.
1164 	void send(Data[] data, int priority)
1165 	{
1166 		writer.send(data, priority);
1167 	}
1168 
1169 	alias send = IConnection.send; /// ditto
1170 
1171 	/// Terminate the connection.
1172 	/// Note: this isn't quite fleshed out - applications may want to
1173 	/// wait and send some more data even after stdin is closed, but
1174 	/// such an interface can't be fitted into an IConnection
1175 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1176 	{
1177 		if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting)
1178 			reader.disconnect(reason, type);
1179 		if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting)
1180 			writer.disconnect(reason, type);
1181 		debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1182 	}
1183 
1184 	protected void onConnect()
1185 	{
1186 		if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected)
1187 			connectHandler();
1188 	}
1189 
1190 	protected void onDisconnect(string reason, DisconnectType type)
1191 	{
1192 		debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1193 		if (disconnectHandler)
1194 		{
1195 			disconnectHandler(reason, type);
1196 			disconnectHandler = null; // don't call it twice for the other connection
1197 		}
1198 		// It is our responsibility to disconnect the other connection
1199 		// Use DisconnectType.requested to ensure that any written data is flushed
1200 		disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested);
1201 	}
1202 
1203 	/// Callback for when a connection has been established.
1204 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1205 	private ConnectHandler connectHandler;
1206 
1207 	/// Callback setter for when new data is read.
1208 	@property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; }
1209 
1210 	/// Callback setter for when a connection was closed.
1211 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1212 	private DisconnectHandler disconnectHandler;
1213 
1214 	/// Callback setter for when all queued data has been written.
1215 	@property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; }
1216 }
1217 
1218 unittest { if (false) new Duplex(null, null); }
1219 
1220 // ***************************************************************************
1221 
1222 /// An asynchronous socket-based connection.
1223 class SocketConnection : StreamConnection
1224 {
1225 protected:
1226 	AddressInfo[] addressQueue;
1227 
1228 	this(Socket conn)
1229 	{
1230 		super(conn);
1231 	}
1232 
1233 	override sizediff_t doSend(in void[] buffer)
1234 	{
1235 		return conn.send(buffer);
1236 	}
1237 
1238 	override sizediff_t doReceive(void[] buffer)
1239 	{
1240 		return conn.receive(buffer);
1241 	}
1242 
1243 	final void tryNextAddress()
1244 	{
1245 		assert(state == ConnectionState.connecting);
1246 		auto addressInfo = addressQueue[0];
1247 		addressQueue = addressQueue[1..$];
1248 
1249 		try
1250 		{
1251 			conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
1252 			conn.blocking = false;
1253 
1254 			socketManager.register(this);
1255 			updateFlags();
1256 			debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString());
1257 			conn.connect(addressInfo.address);
1258 		}
1259 		catch (SocketException e)
1260 			return onError("Connect error: " ~ e.msg);
1261 	}
1262 
1263 	/// Called when an error occurs on the socket.
1264 	override void onError(string reason)
1265 	{
1266 		if (state == ConnectionState.connecting && addressQueue.length)
1267 		{
1268 			socketManager.unregister(this);
1269 			conn.close();
1270 			conn = null;
1271 
1272 			return tryNextAddress();
1273 		}
1274 
1275 		super.onError(reason);
1276 	}
1277 
1278 public:
1279 	/// Default constructor
1280 	this()
1281 	{
1282 		debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this);
1283 	}
1284 
1285 	/// Start establishing a connection.
1286 	final void connect(AddressInfo[] addresses)
1287 	{
1288 		assert(addresses.length, "No addresses specified");
1289 
1290 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1291 		assert(!conn);
1292 
1293 		addressQueue = addresses;
1294 		state = ConnectionState.connecting;
1295 		tryNextAddress();
1296 	}
1297 }
1298 
1299 /// An asynchronous TCP connection.
1300 class TcpConnection : SocketConnection
1301 {
1302 protected:
1303 	this(Socket conn)
1304 	{
1305 		super(conn);
1306 	}
1307 
1308 public:
1309 	/// Default constructor
1310 	this()
1311 	{
1312 		debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this);
1313 	}
1314 
1315 	alias connect = SocketConnection.connect; // raise overload
1316 
1317 	/// Start establishing a connection.
1318 	final void connect(string host, ushort port)
1319 	{
1320 		assert(host.length, "Empty host");
1321 		assert(port, "No port specified");
1322 
1323 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1324 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1325 
1326 		state = ConnectionState.resolving;
1327 
1328 		AddressInfo[] addressInfos;
1329 		try
1330 		{
1331 			auto addresses = getAddress(host, port);
1332 			enforce(addresses.length, "No addresses found");
1333 			debug (ASOCKETS)
1334 			{
1335 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1336 				foreach (address; addresses)
1337 					stderr.writefln("- %s", address.toString());
1338 			}
1339 
1340 			if (addresses.length > 1)
1341 			{
1342 				import std.random : randomShuffle;
1343 				randomShuffle(addresses);
1344 			}
1345 
1346 			foreach (address; addresses)
1347 				addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host);
1348 		}
1349 		catch (SocketException e)
1350 			return onError("Lookup error: " ~ e.msg);
1351 
1352 		state = ConnectionState.disconnected;
1353 		connect(addressInfos);
1354 	}
1355 }
1356 
1357 // ***************************************************************************
1358 
1359 /// An asynchronous connection server for socket-based connections.
1360 class SocketServer
1361 {
1362 protected:
1363 	/// Class that actually performs listening on a certain address family
1364 	final class Listener : GenericSocket
1365 	{
1366 		this(Socket conn)
1367 		{
1368 			debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this);
1369 			this.conn = conn;
1370 			socketManager.register(this);
1371 		}
1372 
1373 		/// Called when a socket is readable.
1374 		override void onReadable()
1375 		{
1376 			debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this);
1377 			Socket acceptSocket = conn.accept();
1378 			acceptSocket.blocking = false;
1379 			if (handleAccept)
1380 			{
1381 				auto connection = createConnection(acceptSocket);
1382 				debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress);
1383 				connection.setKeepAlive();
1384 				//assert(connection.connected);
1385 				//connection.connected = true;
1386 				acceptHandler(connection);
1387 			}
1388 			else
1389 				acceptSocket.close();
1390 		}
1391 
1392 		/// Called when a socket is writable.
1393 		override void onWritable()
1394 		{
1395 		}
1396 
1397 		/// Called when an error occurs on the socket.
1398 		override void onError(string reason)
1399 		{
1400 			close(); // call parent
1401 		}
1402 
1403 		void closeListener()
1404 		{
1405 			assert(conn);
1406 			socketManager.unregister(this);
1407 			conn.close();
1408 			conn = null;
1409 		}
1410 	}
1411 
1412 	SocketConnection createConnection(Socket socket)
1413 	{
1414 		return new SocketConnection(socket);
1415 	}
1416 
1417 	/// Whether the socket is listening.
1418 	bool listening;
1419 	/// Listener instances
1420 	Listener[] listeners;
1421 
1422 	final void updateFlags()
1423 	{
1424 		foreach (listener; listeners)
1425 			listener.notifyRead = handleAccept !is null;
1426 	}
1427 
1428 public:
1429 	/// Start listening on this socket.
1430 	final void listen(AddressInfo[] addressInfos)
1431 	{
1432 		foreach (ref addressInfo; addressInfos)
1433 		{
1434 			try
1435 			{
1436 				Socket conn = new Socket(addressInfo);
1437 				conn.blocking = false;
1438 				if (addressInfo.family == AddressFamily.INET6)
1439 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1440 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1441 
1442 				conn.bind(addressInfo.address);
1443 				conn.listen(8);
1444 
1445 				listeners ~= new Listener(conn);
1446 			}
1447 			catch (SocketException e)
1448 			{
1449 				debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1450 				debug(ASOCKETS) stderr.writeln(e.msg);
1451 			}
1452 		}
1453 
1454 		if (listeners.length==0)
1455 			throw new Exception("Unable to bind service");
1456 
1457 		listening = true;
1458 
1459 		updateFlags();
1460 	}
1461 
1462 	this()
1463 	{
1464 	}
1465 
1466 	/// Creates a Server with the given sockets.
1467 	/// The sockets must have already had `bind` and `listen` called on them.
1468 	this(Socket[] sockets...)
1469 	{
1470 		foreach (socket; sockets)
1471 			listeners ~= new Listener(socket);
1472 	}
1473 
1474 	final @property Address[] localAddresses()
1475 	{
1476 		Address[] result;
1477 		foreach (listener; listeners)
1478 			result ~= listener.localAddress;
1479 		return result;
1480 	}
1481 
1482 	final @property bool isListening()
1483 	{
1484 		return listening;
1485 	}
1486 
1487 	/// Stop listening on this socket.
1488 	final void close()
1489 	{
1490 		foreach (listener;listeners)
1491 			listener.closeListener();
1492 		listeners = null;
1493 		listening = false;
1494 		if (handleClose)
1495 			handleClose();
1496 	}
1497 
1498 	/// Create a SocketServer using the handle passed on standard input,
1499 	/// for which `listen` had already been called. Used by
1500 	/// e.g. FastCGI and systemd sockets with "Listen = yes".
1501 	static SocketServer fromStdin()
1502 	{
1503 		socket_t socket;
1504 		version (Windows)
1505 		{
1506 			import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE;
1507 			socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE);
1508 		}
1509 		else
1510 			socket = cast(socket_t)0;
1511 
1512 		auto s = new Socket(socket, AddressFamily.UNSPEC);
1513 		s.blocking = false;
1514 		return new SocketServer(s);
1515 	}
1516 
1517 	/// Callback for when the socket was closed.
1518 	void delegate() handleClose;
1519 
1520 	private void delegate(SocketConnection incoming) acceptHandler;
1521 	/// Callback for an incoming connection.
1522 	/// Connections will not be accepted unless this handler is set.
1523 	@property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; }
1524 	/// ditto
1525 	@property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); }
1526 }
1527 
1528 /// An asynchronous TCP connection server.
1529 class TcpServer : SocketServer
1530 {
1531 protected:
1532 	override SocketConnection createConnection(Socket socket)
1533 	{
1534 		return new TcpConnection(socket);
1535 	}
1536 
1537 public:
1538 	this()
1539 	{
1540 	}
1541 
1542 	this(Socket[] sockets...)
1543 	{
1544 		super(sockets);
1545 	}
1546 
1547 	alias listen = SocketServer.listen; // raise overload
1548 
1549 	/// Start listening on this socket.
1550 	final ushort listen(ushort port, string addr = null)
1551 	{
1552 		debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port);
1553 		//assert(!listening, "Attempting to listen on a listening socket");
1554 
1555 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1556 
1557 		debug (ASOCKETS)
1558 		{
1559 			stderr.writefln("Resolved to %s addresses:", addressInfos.length);
1560 			foreach (ref addressInfo; addressInfos)
1561 				stderr.writefln("- %s", addressInfo);
1562 		}
1563 
1564 		// listen on random ports only on IPv4 for now
1565 		if (port == 0)
1566 		{
1567 			foreach_reverse (i, ref addressInfo; addressInfos)
1568 				if (addressInfo.family != AddressFamily.INET)
1569 					addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$];
1570 		}
1571 
1572 		listen(addressInfos);
1573 
1574 		foreach (listener; listeners)
1575 		{
1576 			auto address = listener.conn.localAddress();
1577 			if (address.addressFamily == AddressFamily.INET)
1578 				port = to!ushort(address.toPortString());
1579 		}
1580 
1581 		return port;
1582 	}
1583 
1584 	deprecated("Use SocketServer.fromStdin")
1585 	static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; }
1586 
1587 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); }
1588 }
1589 
1590 // ***************************************************************************
1591 
1592 /// An asynchronous UDP stream.
1593 /// UDP does not have connections, so this class encapsulates a socket
1594 /// with a fixed destination (sendto) address, and optionally bound to
1595 /// a local address.
1596 /// Currently received packets' address is not exposed.
1597 class UdpConnection : Connection
1598 {
1599 protected:
1600 	this(Socket conn)
1601 	{
1602 		super(conn);
1603 	}
1604 
1605 	/// Called when a socket is writable.
1606 	override void onWritable()
1607 	{
1608 		//scope(success) updateFlags();
1609 		onWritableImpl();
1610 		updateFlags();
1611 	}
1612 
1613 	// Work around scope(success) breaking debugger stack traces
1614 	final private void onWritableImpl()
1615 	{
1616 		foreach (priority, ref queue; outQueue)
1617 			while (queue.length)
1618 			{
1619 				auto pdata = queue.ptr; // pointer to first data
1620 
1621 				auto sent = conn.sendTo(pdata.contents, remoteAddress);
1622 
1623 				if (sent == Socket.ERROR)
1624 				{
1625 					if (wouldHaveBlocked())
1626 						return;
1627 					else
1628 						return onError("send() error: " ~ lastSocketError);
1629 				}
1630 				else
1631 				if (sent < pdata.length)
1632 				{
1633 					return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length));
1634 				}
1635 				else
1636 				{
1637 					assert(sent == pdata.length);
1638 					//debug writefln("[%s] Sent data:", remoteAddress);
1639 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1640 					pdata.clear();
1641 					queue = queue[1..$];
1642 					if (queue.length == 0)
1643 						queue = null;
1644 				}
1645 			}
1646 
1647 		// outQueue is now empty
1648 		if (bufferFlushedHandler)
1649 			bufferFlushedHandler();
1650 		if (state == ConnectionState.disconnecting)
1651 		{
1652 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1653 			close();
1654 		}
1655 	}
1656 
1657 	override sizediff_t doSend(in void[] buffer)
1658 	{
1659 		assert(false); // never called (called only from overridden methods)
1660 	}
1661 
1662 	override sizediff_t doReceive(void[] buffer)
1663 	{
1664 		return conn.receive(buffer);
1665 	}
1666 
1667 public:
1668 	/// Default constructor
1669 	this()
1670 	{
1671 		debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this);
1672 	}
1673 
1674 	/// Initialize with the given AddressFamily, without binding to an address.
1675 	final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP)
1676 	{
1677 		initializeImpl(family, type, protocol);
1678 		if (connectHandler)
1679 			connectHandler();
1680 	}
1681 
1682 	final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol)
1683 	{
1684 		assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state));
1685 		assert(!conn);
1686 
1687 		conn = new Socket(family, type, protocol);
1688 		conn.blocking = false;
1689 		socketManager.register(this);
1690 		state = ConnectionState.connected;
1691 		updateFlags();
1692 	}
1693 
1694 	/// Bind to a local address in order to receive packets sent there.
1695 	final ushort bind(string host, ushort port)
1696 	{
1697 		assert(host.length, "Empty host");
1698 
1699 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1700 
1701 		state = ConnectionState.resolving;
1702 
1703 		AddressInfo addressInfo;
1704 		try
1705 		{
1706 			auto addresses = getAddress(host, port);
1707 			enforce(addresses.length, "No addresses found");
1708 			debug (ASOCKETS)
1709 			{
1710 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1711 				foreach (address; addresses)
1712 					stderr.writefln("- %s", address.toString());
1713 			}
1714 
1715 			Address address;
1716 			if (addresses.length > 1)
1717 			{
1718 				import std.random : uniform;
1719 				address = addresses[uniform(0, $)];
1720 			}
1721 			else
1722 				address = addresses[0];
1723 			addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host);
1724 		}
1725 		catch (SocketException e)
1726 		{
1727 			onError("Lookup error: " ~ e.msg);
1728 			return 0;
1729 		}
1730 
1731 		state = ConnectionState.disconnected;
1732 		return bind(addressInfo);
1733 	}
1734 
1735 	/// ditto
1736 	final ushort bind(AddressInfo addressInfo)
1737 	{
1738 		initialize(addressInfo.family, addressInfo.type, addressInfo.protocol);
1739 		conn.bind(addressInfo.address);
1740 
1741 		auto address = conn.localAddress();
1742 		auto port = to!ushort(address.toPortString());
1743 
1744 		if (connectHandler)
1745 			connectHandler();
1746 
1747 		return port;
1748 	}
1749 
1750 public:
1751 	/// Where to send packets to.
1752 	Address remoteAddress;
1753 }
1754 
1755 ///
1756 unittest
1757 {
1758 	auto server = new UdpConnection();
1759 	auto serverPort = server.bind("localhost", 0);
1760 
1761 	auto client = new UdpConnection();
1762 	client.initialize(server.localAddress.addressFamily);
1763 
1764 	string[] packets = ["Hello", "there"];
1765 	client.remoteAddress = server.localAddress;
1766 	client.send({
1767 		Data[] data;
1768 		foreach (packet; packets)
1769 			data ~= Data(packet);
1770 		return data;
1771 	}());
1772 
1773 	server.handleReadData = (Data data)
1774 	{
1775 		assert(data.contents == packets[0]);
1776 		packets = packets[1..$];
1777 		if (!packets.length)
1778 		{
1779 			server.close();
1780 			client.close();
1781 		}
1782 	};
1783 	socketManager.loop();
1784 	assert(!packets.length);
1785 }
1786 
1787 // ***************************************************************************
1788 
1789 /// Base class for a connection adapter.
1790 /// By itself, does nothing.
1791 class ConnectionAdapter : IConnection
1792 {
1793 	IConnection next;
1794 
1795 	this(IConnection next)
1796 	{
1797 		this.next = next;
1798 		next.handleConnect = &onConnect;
1799 		next.handleDisconnect = &onDisconnect;
1800 		next.handleBufferFlushed = &onBufferFlushed;
1801 	}
1802 
1803 	@property ConnectionState state() { return next.state; }
1804 
1805 	/// Queue Data for sending.
1806 	void send(Data[] data, int priority)
1807 	{
1808 		next.send(data, priority);
1809 	}
1810 
1811 	alias send = IConnection.send; /// ditto
1812 
1813 	/// Terminate the connection.
1814 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1815 	{
1816 		next.disconnect(reason, type);
1817 	}
1818 
1819 	protected void onConnect()
1820 	{
1821 		if (connectHandler)
1822 			connectHandler();
1823 	}
1824 
1825 	protected void onReadData(Data data)
1826 	{
1827 		// onReadData should be fired only if readDataHandler is set
1828 		readDataHandler(data);
1829 	}
1830 
1831 	protected void onDisconnect(string reason, DisconnectType type)
1832 	{
1833 		if (disconnectHandler)
1834 			disconnectHandler(reason, type);
1835 	}
1836 
1837 	protected void onBufferFlushed()
1838 	{
1839 		if (bufferFlushedHandler)
1840 			bufferFlushedHandler();
1841 	}
1842 
1843 	/// Callback for when a connection has been established.
1844 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1845 	private ConnectHandler connectHandler;
1846 
1847 	/// Callback setter for when new data is read.
1848 	@property void handleReadData(ReadDataHandler value)
1849 	{
1850 		readDataHandler = value;
1851 		next.handleReadData = value ? &onReadData : null ;
1852 	}
1853 	private ReadDataHandler readDataHandler;
1854 
1855 	/// Callback setter for when a connection was closed.
1856 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1857 	private DisconnectHandler disconnectHandler;
1858 
1859 	/// Callback setter for when all queued data has been written.
1860 	@property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; }
1861 	private BufferFlushedHandler bufferFlushedHandler;
1862 }
1863 
1864 // ***************************************************************************
1865 
1866 /// Adapter for connections with a line-based protocol.
1867 /// Splits data stream into delimiter-separated lines.
1868 class LineBufferedAdapter : ConnectionAdapter
1869 {
1870 	/// The protocol's line delimiter.
1871 	string delimiter = "\r\n";
1872 
1873 	this(IConnection next)
1874 	{
1875 		super(next);
1876 	}
1877 
1878 	/// Append a line to the send buffer.
1879 	void send(string line)
1880 	{
1881 		//super.send(Data(line ~ delimiter));
1882 		// https://issues.dlang.org/show_bug.cgi?id=13985
1883 		ConnectionAdapter ca = this;
1884 		ca.send(Data(line ~ delimiter));
1885 	}
1886 
1887 protected:
1888 	/// The receive buffer.
1889 	Data inBuffer;
1890 
1891 	/// Called when data has been received.
1892 	final override void onReadData(Data data)
1893 	{
1894 		import std.string;
1895 		auto oldBufferLength = inBuffer.length;
1896 		if (oldBufferLength)
1897 			inBuffer ~= data;
1898 		else
1899 			inBuffer = data;
1900 
1901 		if (delimiter.length == 1)
1902 		{
1903 			import core.stdc.string; // memchr
1904 
1905 			char c = delimiter[0];
1906 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1907 			while (p)
1908 			{
1909 				sizediff_t index = p - inBuffer.ptr;
1910 				processLine(index);
1911 
1912 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1913 			}
1914 		}
1915 		else
1916 		{
1917 			sizediff_t index;
1918 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1919 			while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0)
1920 				processLine(index);
1921 		}
1922 	}
1923 
1924 	final void processLine(size_t index)
1925 	{
1926 		auto line = inBuffer[0..index];
1927 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1928 		super.onReadData(line);
1929 	}
1930 
1931 	override void onDisconnect(string reason, DisconnectType type)
1932 	{
1933 		super.onDisconnect(reason, type);
1934 		inBuffer.clear();
1935 	}
1936 }
1937 
1938 // ***************************************************************************
1939 
1940 /// Fires an event handler or disconnects connections
1941 /// after a period of inactivity.
1942 class TimeoutAdapter : ConnectionAdapter
1943 {
1944 	this(IConnection next)
1945 	{
1946 		debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this);
1947 		super(next);
1948 	}
1949 
1950 	void cancelIdleTimeout()
1951 	{
1952 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
1953 		assert(idleTask !is null);
1954 		assert(idleTask.isWaiting());
1955 		idleTask.cancel();
1956 	}
1957 
1958 	void resumeIdleTimeout()
1959 	{
1960 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
1961 		assert(idleTask !is null);
1962 		assert(!idleTask.isWaiting());
1963 		mainTimer.add(idleTask);
1964 	}
1965 
1966 	final void setIdleTimeout(Duration duration)
1967 	{
1968 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
1969 		assert(duration > Duration.zero);
1970 
1971 		// Configure idleTask
1972 		if (idleTask is null)
1973 		{
1974 			idleTask = new TimerTask(duration);
1975 			idleTask.handleTask = &onTask_Idle;
1976 		}
1977 		else
1978 		{
1979 			if (idleTask.isWaiting())
1980 				idleTask.cancel();
1981 			idleTask.delay = duration;
1982 		}
1983 
1984 		mainTimer.add(idleTask);
1985 	}
1986 
1987 	void markNonIdle()
1988 	{
1989 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
1990 		if (handleNonIdle)
1991 			handleNonIdle();
1992 		if (idleTask && idleTask.isWaiting())
1993 			idleTask.restart();
1994 	}
1995 
1996 	/// Callback for when a connection has stopped responding.
1997 	/// If unset, the connection will be disconnected.
1998 	void delegate() handleIdleTimeout;
1999 
2000 	/// Callback for when a connection is marked as non-idle
2001 	/// (when data is received).
2002 	void delegate() handleNonIdle;
2003 
2004 protected:
2005 	override void onConnect()
2006 	{
2007 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
2008 		markNonIdle();
2009 		super.onConnect();
2010 	}
2011 
2012 	override void onReadData(Data data)
2013 	{
2014 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
2015 		markNonIdle();
2016 		super.onReadData(data);
2017 	}
2018 
2019 	override void onDisconnect(string reason, DisconnectType type)
2020 	{
2021 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
2022 		if (idleTask && idleTask.isWaiting())
2023 			idleTask.cancel();
2024 		super.onDisconnect(reason, type);
2025 	}
2026 
2027 private:
2028 	TimerTask idleTask; // non-null if an idle timeout has been set
2029 
2030 	final void onTask_Idle(Timer timer, TimerTask task)
2031 	{
2032 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this);
2033 		if (state == ConnectionState.disconnecting)
2034 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
2035 
2036 		if (state == ConnectionState.disconnected)
2037 			return;
2038 
2039 		if (handleIdleTimeout)
2040 		{
2041 			resumeIdleTimeout(); // reschedule (by default)
2042 			handleIdleTimeout();
2043 		}
2044 		else
2045 			disconnect("Time-out", DisconnectType.error);
2046 	}
2047 }
2048 
2049 // ***************************************************************************
2050 
2051 unittest
2052 {
2053 	void testTimer()
2054 	{
2055 		bool fired;
2056 		setTimeout({fired = true;}, 10.msecs);
2057 		socketManager.loop();
2058 		assert(fired);
2059 	}
2060 
2061 	testTimer();
2062 }