1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <ae@cy.md>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.math;
21 public import ae.sys.data;
22 
23 import core.stdc.stdint : int32_t;
24 
25 import std.exception;
26 import std.socket;
27 import std..string : format;
28 public import std.socket : Address, AddressInfo, Socket;
29 
30 version (Windows)
31     private import c_socks = core.sys.windows.winsock2;
32 else version (Posix)
33     private import c_socks = core.sys.posix.sys.socket;
34 
35 debug(ASOCKETS) import std.stdio : stderr;
36 debug(PRINTDATA) import std.stdio : stderr;
37 debug(PRINTDATA) import ae.utils.text : hexDump;
38 private import std.conv : to;
39 
40 
41 // http://d.puremagic.com/issues/show_bug.cgi?id=7016
42 static import ae.utils.array;
43 
44 version(LIBEV)
45 {
46 	import deimos.ev;
47 	pragma(lib, "ev");
48 }
49 
50 version(Windows)
51 {
52 	import core.sys.windows.windows : Sleep;
53 	enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
54 }
55 else
56 	enum USE_SLEEP = false;
57 
58 /// Flags that determine socket wake-up events.
59 int eventCounter;
60 
61 version(LIBEV)
62 {
63 	// Watchers are a GenericSocket field (as declared in SocketMixin).
64 	// Use one watcher per read and write event.
65 	// Start/stop those as higher-level code declares interest in those events.
66 	// Use the "data" ev_io field to store the parent GenericSocket address.
67 	// Also use the "data" field as a flag to indicate whether the watcher is active
68 	// (data is null when the watcher is stopped).
69 
70 	/// `libev`-based event loop implementation.
71 	struct SocketManager
72 	{
73 	private:
74 		size_t count;
75 
76 		extern(C)
77 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
78 		{
79 			eventCounter++;
80 			auto socket = cast(GenericSocket)w.data;
81 			assert(socket, "libev event fired on stopped watcher");
82 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents);
83 
84 			if (revents & EV_READ)
85 				socket.onReadable();
86 			else
87 			if (revents & EV_WRITE)
88 				socket.onWritable();
89 			else
90 				assert(false, "Unknown event fired from libev");
91 
92 			// TODO? Need to get proper SocketManager instance to call updateTimer on
93 			socketManager.updateTimer(false);
94 		}
95 
96 		ev_timer evTimer;
97 		MonoTime lastNextEvent = MonoTime.max;
98 
99 		extern(C)
100 		static void timerCallback(ev_loop_t* l, ev_timer* w, int /*revents*/)
101 		{
102 			eventCounter++;
103 			debug (ASOCKETS) writefln("Timer callback called.");
104 			mainTimer.prod();
105 
106 			socketManager.updateTimer(true);
107 			debug (ASOCKETS) writefln("Timer callback exiting.");
108 		}
109 
110 		void updateTimer(bool force)
111 		{
112 			auto nextEvent = mainTimer.getNextEvent();
113 			if (force || lastNextEvent != nextEvent)
114 			{
115 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
116 				if (nextEvent == MonoTime.max) // Stopping
117 				{
118 					if (lastNextEvent != MonoTime.max)
119 						ev_timer_stop(ev_default_loop(0), &evTimer);
120 				}
121 				else
122 				{
123 					auto remaining = mainTimer.getRemainingTime();
124 					while (remaining <= Duration.zero)
125 					{
126 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
127 						mainTimer.prod();
128 						remaining = mainTimer.getRemainingTime();
129 					}
130 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
131 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
132 					if (lastNextEvent == MonoTime.max) // Starting
133 					{
134 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
135 						ev_timer_start(ev_default_loop(0), &evTimer);
136 					}
137 					else // Adjusting
138 					{
139 						evTimer.repeat = tstamp;
140 						ev_timer_again(ev_default_loop(0), &evTimer);
141 					}
142 				}
143 				lastNextEvent = nextEvent;
144 			}
145 		}
146 
147 	public:
148 		/// Register a socket with the manager.
149 		void register(GenericSocket socket)
150 		{
151 			debug (ASOCKETS) writefln("Registering %s", socket);
152 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
153 			auto fd = socket.conn.handle;
154 			assert(fd, "Must have fd before socket registration");
155 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
156 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
157 			count++;
158 		}
159 
160 		/// Unregister a socket with the manager.
161 		void unregister(GenericSocket socket)
162 		{
163 			debug (ASOCKETS) writefln("Unregistering %s", socket);
164 			socket.notifyRead  = false;
165 			socket.notifyWrite = false;
166 			count--;
167 		}
168 
169 		/// Returns the number of registered sockets.
170 		size_t size()
171 		{
172 			return count;
173 		}
174 
175 		/// Loop continuously until no sockets are left.
176 		void loop()
177 		{
178 			auto evLoop = ev_default_loop(0);
179 			enforce(evLoop, "libev initialization failure");
180 
181 			updateTimer(true);
182 			debug (ASOCKETS) writeln("ev_run");
183 			ev_run(ev_default_loop(0), 0);
184 		}
185 	}
186 
187 	private mixin template SocketMixin()
188 	{
189 		private ev_io evRead, evWrite;
190 
191 		private void setWatcherState(ref ev_io ev, bool newValue, int /*event*/)
192 		{
193 			if (!conn)
194 			{
195 				// Can happen when setting delegates before connecting.
196 				return;
197 			}
198 
199 			if (newValue && !ev.data)
200 			{
201 				// Start
202 				ev.data = cast(void*)this;
203 				ev_io_start(ev_default_loop(0), &ev);
204 			}
205 			else
206 			if (!newValue && ev.data)
207 			{
208 				// Stop
209 				assert(ev.data is cast(void*)this);
210 				ev.data = null;
211 				ev_io_stop(ev_default_loop(0), &ev);
212 			}
213 		}
214 
215 		/// Interested in read notifications (onReadable)?
216 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
217 		/// Interested in write notifications (onWritable)?
218 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
219 
220 		debug ~this()
221 		{
222 			// The LIBEV SocketManager holds no references to registered sockets.
223 			// TODO: Add a doubly-linked list?
224 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
225 		}
226 	}
227 }
228 else // Use select
229 {
230 	/// `select`-based event loop implementation.
231 	struct SocketManager
232 	{
233 	private:
234 		enum FD_SETSIZE = 1024;
235 
236 		/// List of all sockets to poll.
237 		GenericSocket[] sockets;
238 
239 		/// Debug AA to check for dangling socket references.
240 		debug GenericSocket[socket_t] socketHandles;
241 
242 		void delegate()[] nextTickHandlers, idleHandlers;
243 
244 	public:
245 		/// Register a socket with the manager.
246 		void register(GenericSocket conn)
247 		{
248 			debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1);
249 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
250 			sockets ~= conn;
251 
252 			debug
253 			{
254 				auto handle = conn.socket.handle;
255 				assert(handle != socket_t.init, "Can't register a closed socket");
256 				assert(handle !in socketHandles, "This socket handle is already registered");
257 				socketHandles[handle] = conn;
258 			}
259 		}
260 
261 		/// Unregister a socket with the manager.
262 		void unregister(GenericSocket conn)
263 		{
264 			debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
265 
266 			debug
267 			{
268 				auto handle = conn.socket.handle;
269 				assert(handle != socket_t.init, "Can't unregister a closed socket");
270 				auto pconn = handle in socketHandles;
271 				assert(pconn, "This socket handle is not registered");
272 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
273 				socketHandles.remove(handle);
274 			}
275 
276 			foreach (size_t i, GenericSocket j; sockets)
277 				if (j is conn)
278 				{
279 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
280 					return;
281 				}
282 			assert(false, "Socket not registered");
283 		}
284 
285 		/// Returns the number of registered sockets.
286 		size_t size()
287 		{
288 			return sockets.length;
289 		}
290 
291 		/// Loop continuously until no sockets are left.
292 		void loop()
293 		{
294 			debug (ASOCKETS) stderr.writeln("Starting event loop.");
295 
296 			SocketSet readset, writeset;
297 			size_t sockcount;
298 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
299 			readset  = new SocketSet(setSize);
300 			writeset = new SocketSet(setSize);
301 			while (true)
302 			{
303 				if (nextTickHandlers.length)
304 				{
305 					auto thisTickHandlers = nextTickHandlers;
306 					nextTickHandlers = null;
307 
308 					foreach (handler; thisTickHandlers)
309 						handler();
310 
311 					continue;
312 				}
313 
314 				uint minSize = 0;
315 				version(Windows)
316 					minSize = cast(uint)sockets.length;
317 				else
318 				{
319 					foreach (s; sockets)
320 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
321 							minSize = s.socket.handle;
322 				}
323 				minSize++;
324 
325 				if (setSize < minSize)
326 				{
327 					debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
328 					setSize = minSize * 2;
329 					readset  = new SocketSet(setSize);
330 					writeset = new SocketSet(setSize);
331 				}
332 				else
333 				{
334 					readset.reset();
335 					writeset.reset();
336 				}
337 
338 				sockcount = 0;
339 				bool haveActive;
340 				debug (ASOCKETS) stderr.writeln("Populating sets");
341 				foreach (GenericSocket conn; sockets)
342 				{
343 					if (!conn.socket)
344 						continue;
345 					sockcount++;
346 
347 					debug (ASOCKETS) stderr.writef("\t%s:", conn);
348 					if (conn.notifyRead)
349 					{
350 						readset.add(conn.socket);
351 						if (!conn.daemonRead)
352 							haveActive = true;
353 						debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : "");
354 					}
355 					if (conn.notifyWrite)
356 					{
357 						writeset.add(conn.socket);
358 						if (!conn.daemonWrite)
359 							haveActive = true;
360 						debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : "");
361 					}
362 					debug (ASOCKETS) stderr.writeln();
363 				}
364 				debug (ASOCKETS)
365 				{
366 					stderr.writefln("Sets populated as follows:");
367 					printSets(readset, writeset);
368 				}
369 
370 				debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...",
371 					haveActive ? "" : "not ",
372 					sockcount,
373 					mainTimer.isWaiting() ? "with" : "no",
374 					idleHandlers.length,
375 				);
376 				if (!haveActive && !mainTimer.isWaiting() && !nextTickHandlers.length)
377 				{
378 					debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop.");
379 					break;
380 				}
381 
382 				debug (ASOCKETS) stderr.flush();
383 
384 				int events;
385 				if (idleHandlers.length)
386 				{
387 					if (sockcount==0)
388 						events = 0;
389 					else
390 						events = Socket.select(readset, writeset, null, 0.seconds);
391 				}
392 				else
393 				if (USE_SLEEP && sockcount==0)
394 				{
395 					version(Windows)
396 					{
397 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
398 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
399 						if (duration <= 0)
400 							duration = 1; // Avoid busywait
401 						else
402 						if (duration > int.max)
403 							duration = int.max;
404 						Sleep(cast(int)duration);
405 						events = 0;
406 					}
407 					else
408 						assert(0);
409 				}
410 				else
411 				if (mainTimer.isWaiting())
412 					events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime());
413 				else
414 					events = Socket.select(readset, writeset, null);
415 
416 				debug (ASOCKETS) stderr.writefln("%d events fired.", events);
417 
418 				if (events > 0)
419 				{
420 					// Handle just one event at a time, as the first
421 					// handler might invalidate select()'s results.
422 					handleEvent(readset, writeset);
423 				}
424 				else
425 				if (idleHandlers.length)
426 				{
427 					import ae.utils.array : shift;
428 					auto handler = idleHandlers.shift();
429 
430 					// Rotate the idle handler queue before running it,
431 					// in case the handler unregisters itself.
432 					idleHandlers ~= handler;
433 
434 					handler();
435 				}
436 
437 				// Timers may invalidate our select results, so fire them after processing the latter
438 				mainTimer.prod();
439 
440 				eventCounter++;
441 			}
442 		}
443 
444 		debug (ASOCKETS)
445 		void printSets(SocketSet readset, SocketSet writeset)
446 		{
447 			foreach (GenericSocket conn; sockets)
448 			{
449 				if (!conn.socket)
450 					stderr.writefln("\t\t%s is unset", conn);
451 				else
452 				{
453 					if (readset.isSet(conn.socket))
454 						stderr.writefln("\t\t%s is readable", conn);
455 					if (writeset.isSet(conn.socket))
456 						stderr.writefln("\t\t%s is writable", conn);
457 				}
458 			}
459 		}
460 
461 		void handleEvent(SocketSet readset, SocketSet writeset)
462 		{
463 			debug (ASOCKETS)
464 			{
465 				stderr.writefln("\tSelect results:");
466 				printSets(readset, writeset);
467 			}
468 
469 			foreach (GenericSocket conn; sockets)
470 			{
471 				if (!conn.socket)
472 					continue;
473 
474 				if (readset.isSet(conn.socket))
475 				{
476 					debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn);
477 					return conn.onReadable();
478 				}
479 				else
480 				if (writeset.isSet(conn.socket))
481 				{
482 					debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn);
483 					return conn.onWritable();
484 				}
485 			}
486 
487 			assert(false, "select() reported events available, but no registered sockets are set");
488 		}
489 	}
490 
491 	/// Schedule a function to run on the next event loop iteration.
492 	/// Can be used to queue logic to run once all current execution frames exit.
493 	/// Similar to e.g. process.nextTick in Node.
494 	void onNextTick(ref SocketManager socketManager, void delegate() dg) pure @safe nothrow
495 	{
496 		socketManager.nextTickHandlers ~= dg;
497 	}
498 
499 	// Use UFCS to allow removeIdleHandler to have a predicate with context
500 	/// Register a function to be called when the event loop is idle,
501 	/// and would otherwise sleep.
502 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
503 	{
504 		foreach (i, idleHandler; socketManager.idleHandlers)
505 			assert(handler !is idleHandler);
506 
507 		socketManager.idleHandlers ~= handler;
508 	}
509 
510 	/// Unregister a function previously registered with `addIdleHandler`.
511 	static bool isFun(T)(T a, T b) { return a is b; }
512 	void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args)
513 	{
514 		foreach (i, idleHandler; socketManager.idleHandlers)
515 			if (pred(idleHandler, args))
516 			{
517 				import std.algorithm : remove;
518 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
519 				return;
520 			}
521 		assert(false, "No such idle handler");
522 	}
523 
524 	private mixin template SocketMixin()
525 	{
526 		/// Interested in read notifications (onReadable)?
527 		bool notifyRead;
528 		/// Interested in write notifications (onWritable)?
529 		bool notifyWrite;
530 	}
531 }
532 
533 /// The default socket manager.
534 SocketManager socketManager;
535 
536 // ***************************************************************************
537 
538 /// General methods for an asynchronous socket.
539 abstract class GenericSocket
540 {
541 	/// Declares notifyRead and notifyWrite.
542 	mixin SocketMixin;
543 
544 protected:
545 	/// The socket this class wraps.
546 	Socket conn;
547 
548 // protected:
549 	/// Retrieve the socket class this class wraps.
550 	@property final Socket socket()
551 	{
552 		return conn;
553 	}
554 
555 	void onReadable()
556 	{
557 	}
558 
559 	void onWritable()
560 	{
561 	}
562 
563 	void onError(string /*reason*/)
564 	{
565 	}
566 
567 public:
568 	/// allow getting the address of connections that are already disconnected
569 	private Address[2] cachedAddress;
570 
571 	/*private*/ final @property Address address(bool local)()
572 	{
573 		if (cachedAddress[local] !is null)
574 			return cachedAddress[local];
575 		else
576 		if (conn is null)
577 			return null;
578 		else
579 		{
580 			Address a;
581 			if (conn.addressFamily == AddressFamily.UNSPEC)
582 			{
583 				// Socket will attempt to construct an UnknownAddress,
584 				// which will almost certainly not match the real address length.
585 				static if (local)
586 					alias getname = c_socks.getsockname;
587 				else
588 					alias getname = c_socks.getpeername;
589 
590 				c_socks.socklen_t nameLen = 0;
591 				if (getname(conn.handle, null, &nameLen) < 0)
592 					throw new SocketOSException("Unable to obtain socket address");
593 
594 				auto buf = new ubyte[nameLen];
595 				auto sa = cast(c_socks.sockaddr*)buf.ptr;
596 				if (getname(conn.handle, sa, &nameLen) < 0)
597 					throw new SocketOSException("Unable to obtain socket address");
598 				a = new UnknownAddressReference(sa, nameLen);
599 			}
600 			else
601 				a = local ? conn.localAddress() : conn.remoteAddress();
602 			return cachedAddress[local] = a;
603 		}
604 	}
605 
606 	alias localAddress = address!true;
607 	alias remoteAddress = address!false;
608 
609 	/*private*/ final @property string addressStr(bool local)() nothrow
610 	{
611 		try
612 		{
613 			auto a = address!local;
614 			if (a is null)
615 				return "[null address]";
616 			string host = a.toAddrString();
617 			import std..string : indexOf;
618 			if (host.indexOf(':') >= 0)
619 				host = "[" ~ host ~ "]";
620 			try
621 			{
622 				string port = a.toPortString();
623 				return host ~ ":" ~ port;
624 			}
625 			catch (Exception e)
626 				return host;
627 		}
628 		catch (Exception e)
629 			return "[error: " ~ e.msg ~ "]";
630 	}
631 
632 	alias localAddressStr = addressStr!true;
633 	alias remoteAddressStr = addressStr!false;
634 
635 	/// Don't block the process from exiting, even if the socket is ready to receive data.
636 	/// TODO: Not implemented with libev
637 	bool daemonRead;
638 
639 	/// Don't block the process from exiting, even if the socket is ready to send data.
640 	/// TODO: Not implemented with libev
641 	bool daemonWrite;
642 
643 	deprecated alias daemon = daemonRead;
644 
645 	/// Enable TCP keep-alive on the socket with the given settings.
646 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
647 	{
648 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
649 		if (enabled)
650 		{
651 			try
652 				conn.setKeepAlive(time, interval);
653 			catch (SocketException)
654 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
655 		}
656 		else
657 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
658 	}
659 
660 	/// Returns a string containing the class name, address, and file descriptor.
661 	override string toString() const
662 	{
663 		import std..string : format, split;
664 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
665 	}
666 }
667 
668 // ***************************************************************************
669 
670 /// Classifies the cause of the disconnect.
671 /// Can be used to decide e.g. when it makes sense to reconnect.
672 enum DisconnectType
673 {
674 	requested, /// Initiated by the application.
675 	graceful,  /// The peer gracefully closed the connection.
676 	error      /// Some abnormal network condition.
677 }
678 
679 /// Used to indicate the state of a connection throughout its lifecycle.
680 enum ConnectionState
681 {
682 	/// The initial state, or the state after a disconnect was fully processed.
683 	disconnected,
684 
685 	/// Name resolution. Currently done synchronously.
686 	resolving,
687 
688 	/// A connection attempt is in progress.
689 	connecting,
690 
691 	/// A connection is established.
692 	connected,
693 
694 	/// Disconnecting in progress. No data can be sent or received at this point.
695 	/// We are waiting for queued data to be actually sent before disconnecting.
696 	disconnecting,
697 }
698 
699 /// Returns true if this is a connection state for which disconnecting is valid.
700 /// Generally, applications should be aware of the life cycle of their sockets,
701 /// so checking the state of a connection is unnecessary (and a code smell).
702 /// However, unconditionally disconnecting some connected sockets can be useful
703 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle),
704 /// such as in response to a signal.
705 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; }
706 
707 /// Common interface for connections and adapters.
708 interface IConnection
709 {
710 	/// `send` queues data for sending in one of five queues, indexed
711 	/// by a numeric priority.
712 	/// `MAX_PRIORITY` is the highest (least urgent) priority index.
713 	/// `DEFAULT_PRIORITY` is the default priority
714 	enum MAX_PRIORITY = 4;
715 	enum DEFAULT_PRIORITY = 2; /// ditto
716 
717 	/// This is the default value for the `disconnect` `reason` string parameter.
718 	static const defaultDisconnectReason = "Software closed the connection";
719 
720 	/// Get connection state.
721 	@property ConnectionState state();
722 
723 	/// Has a connection been established?
724 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
725 
726 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
727 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
728 
729 	/// Queue Data for sending.
730 	void send(Data[] data, int priority = DEFAULT_PRIORITY);
731 
732 	/// ditto
733 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
734 	{
735 		Data[1] data;
736 		data[0] = datum;
737 		this.send(data, priority);
738 		data[] = Data.init;
739 	}
740 
741 	/// Terminate the connection.
742 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
743 
744 	/// Callback setter for when a connection has been established
745 	/// (if applicable).
746 	alias ConnectHandler = void delegate();
747 	@property void handleConnect(ConnectHandler value); /// ditto
748 
749 	/// Callback setter for when new data is read.
750 	alias ReadDataHandler = void delegate(Data data);
751 	@property void handleReadData(ReadDataHandler value); /// ditto
752 
753 	/// Callback setter for when a connection was closed.
754 	alias DisconnectHandler = void delegate(string reason, DisconnectType type);
755 	@property void handleDisconnect(DisconnectHandler value); /// ditto
756 
757 	/// Callback setter for when all queued data has been sent.
758 	alias BufferFlushedHandler = void delegate();
759 	@property void handleBufferFlushed(BufferFlushedHandler value); /// ditto
760 }
761 
762 // ***************************************************************************
763 
764 /// Implementation of `IConnection` using a socket.
765 /// Implements receiving data when readable and sending queued data
766 /// when writable.
767 class Connection : GenericSocket, IConnection
768 {
769 private:
770 	/// Blocks of data larger than this value are passed as unmanaged memory
771 	/// (in Data objects). Blocks smaller than this value will be reallocated
772 	/// on the managed heap. The disadvantage of placing large objects on the
773 	/// managed heap is false pointers; the disadvantage of using Data for
774 	/// small objects is wasted slack space due to the page size alignment
775 	/// requirement.
776 	enum UNMANAGED_THRESHOLD = 256;
777 
778 	ConnectionState _state;
779 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
780 
781 public:
782 	/// Get connection state.
783 	override @property ConnectionState state() { return _state; }
784 
785 protected:
786 	abstract sizediff_t doSend(in void[] buffer);
787 	abstract sizediff_t doReceive(void[] buffer);
788 
789 	/// The send buffers.
790 	Data[][MAX_PRIORITY+1] outQueue;
791 	/// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled).
792 	int partiallySent = -1;
793 
794 	/// Constructor used by a ServerSocket for new connections
795 	this(Socket conn)
796 	{
797 		this();
798 		this.conn = conn;
799 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
800 		if (conn)
801 			socketManager.register(this);
802 		updateFlags();
803 	}
804 
805 	final void updateFlags()
806 	{
807 		if (state == ConnectionState.connecting)
808 			notifyWrite = true;
809 		else
810 			notifyWrite = writePending;
811 
812 		notifyRead = state == ConnectionState.connected && readDataHandler;
813 		debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite);
814 	}
815 
816 	/// Called when a socket is readable.
817 	override void onReadable()
818 	{
819 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
820 		static ubyte[0x10000] inBuffer = void;
821 		auto received = doReceive(inBuffer);
822 
823 		if (received == 0)
824 			return disconnect("Connection closed", DisconnectType.graceful);
825 
826 		if (received == Socket.ERROR)
827 		{
828 		//	if (wouldHaveBlocked)
829 		//	{
830 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
831 		//		return;
832 		//	}
833 		//	else
834 				onError("recv() error: " ~ lastSocketError);
835 		}
836 		else
837 		{
838 			debug (PRINTDATA)
839 			{
840 				stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr);
841 				stderr.write(hexDump(inBuffer[0 .. received]));
842 				stderr.flush();
843 			}
844 
845 			if (state == ConnectionState.disconnecting)
846 			{
847 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this);
848 			}
849 			else
850 			if (!readDataHandler)
851 			{
852 				debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this);
853 			}
854 			else
855 			{
856 				// Currently, unlike the D1 version of this module,
857 				// we will always reallocate read network data.
858 				// This disfavours code which doesn't need to store
859 				// read data after processing it, but otherwise
860 				// makes things simpler and safer all around.
861 
862 				if (received < UNMANAGED_THRESHOLD)
863 				{
864 					// Copy to the managed heap
865 					readDataHandler(Data(inBuffer[0 .. received].dup));
866 				}
867 				else
868 				{
869 					// Copy to unmanaged memory
870 					readDataHandler(Data(inBuffer[0 .. received], true));
871 				}
872 			}
873 		}
874 	}
875 
876 	/// Called when an error occurs on the socket.
877 	override void onError(string reason)
878 	{
879 		if (state == ConnectionState.disconnecting)
880 		{
881 			debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
882 			return close();
883 		}
884 
885 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
886 		disconnect("Socket error: " ~ reason, DisconnectType.error);
887 	}
888 
889 	this()
890 	{
891 	}
892 
893 public:
894 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
895 	/// The disconnect handler will be called immediately, even when not all data has been flushed yet.
896 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
897 	{
898 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
899 		assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state));
900 
901 		if (writePending)
902 		{
903 			if (type==DisconnectType.requested)
904 			{
905 				assert(conn, "Attempting to disconnect on an uninitialized socket");
906 				// queue disconnect after all data is sent
907 				debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason);
908 				state = ConnectionState.disconnecting;
909 				//setIdleTimeout(30.seconds);
910 				if (disconnectHandler)
911 					disconnectHandler(reason, type);
912 				updateFlags();
913 				return;
914 			}
915 			else
916 				discardQueues();
917 		}
918 
919 		debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
920 
921 		if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected)
922 			close();
923 		else
924 		{
925 			assert(conn is null, "Registered but %s socket".format(state));
926 			if (state == ConnectionState.resolving)
927 				state = ConnectionState.disconnected;
928 		}
929 
930 		if (disconnectHandler)
931 			disconnectHandler(reason, type);
932 		updateFlags();
933 	}
934 
935 	private final void close()
936 	{
937 		assert(conn, "Attempting to close an unregistered socket");
938 		socketManager.unregister(this);
939 		conn.close();
940 		conn = null;
941 		outQueue[] = null;
942 		state = ConnectionState.disconnected;
943 	}
944 
945 	/// Append data to the send buffer.
946 	void send(Data[] data, int priority = DEFAULT_PRIORITY)
947 	{
948 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
949 		outQueue[priority] ~= data;
950 		notifyWrite = true; // Fast updateFlags()
951 
952 		debug (PRINTDATA)
953 		{
954 			stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr);
955 			foreach (datum; data)
956 				if (datum.length)
957 					stderr.write(hexDump(datum.contents));
958 				else
959 					stderr.writeln("(empty Data)");
960 			stderr.flush();
961 		}
962 	}
963 
964 	/// ditto
965 	alias send = IConnection.send;
966 
967 	/// Cancel all queued `Data` packets with the given priority.
968 	/// Does not cancel any partially-sent `Data`.
969 	final void clearQueue(int priority)
970 	{
971 		if (priority == partiallySent)
972 		{
973 			assert(outQueue[priority].length > 0);
974 			outQueue[priority] = outQueue[priority][0..1];
975 		}
976 		else
977 			outQueue[priority] = null;
978 		updateFlags();
979 	}
980 
981 	/// Clears all queues, even partially sent content.
982 	private final void discardQueues()
983 	{
984 		foreach (priority; 0..MAX_PRIORITY+1)
985 			outQueue[priority] = null;
986 		partiallySent = -1;
987 		updateFlags();
988 	}
989 
990 	/// Returns true if any queues have pending data.
991 	@property
992 	final bool writePending()
993 	{
994 		foreach (queue; outQueue)
995 			if (queue.length)
996 				return true;
997 		return false;
998 	}
999 
1000 	/// Returns true if there are any queued `Data` which have not yet
1001 	/// begun to be sent.
1002 	final bool queuePresent(int priority = DEFAULT_PRIORITY)
1003 	{
1004 		if (priority == partiallySent)
1005 		{
1006 			assert(outQueue[priority].length > 0);
1007 			return outQueue[priority].length > 1;
1008 		}
1009 		else
1010 			return outQueue[priority].length > 0;
1011 	}
1012 
1013 	/// Returns the number of queued `Data` at the given priority.
1014 	final size_t packetsQueued(int priority = DEFAULT_PRIORITY)
1015 	{
1016 		return outQueue[priority].length;
1017 	}
1018 
1019 	/// Returns the number of queued bytes at the given priority.
1020 	final size_t bytesQueued(int priority = DEFAULT_PRIORITY)
1021 	{
1022 		size_t bytes;
1023 		foreach (datum; outQueue[priority])
1024 			bytes += datum.length;
1025 		return bytes;
1026 	}
1027 
1028 // public:
1029 	private ConnectHandler connectHandler;
1030 	/// Callback for when a connection has been established.
1031 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
1032 
1033 	private ReadDataHandler readDataHandler;
1034 	/// Callback for incoming data.
1035 	/// Data will not be received unless this handler is set.
1036 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
1037 
1038 	private DisconnectHandler disconnectHandler;
1039 	/// Callback for when a connection was closed.
1040 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
1041 
1042 	private BufferFlushedHandler bufferFlushedHandler;
1043 	/// Callback setter for when all queued data has been sent.
1044 	@property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); }
1045 }
1046 
1047 /// Implements a stream connection.
1048 /// Queued `Data` is allowed to be fragmented.
1049 class StreamConnection : Connection
1050 {
1051 protected:
1052 	this()
1053 	{
1054 		super();
1055 	}
1056 
1057 	/// Called when a socket is writable.
1058 	override void onWritable()
1059 	{
1060 		//scope(success) updateFlags();
1061 		onWritableImpl();
1062 		updateFlags();
1063 	}
1064 
1065 	// Work around scope(success) breaking debugger stack traces
1066 	final private void onWritableImpl()
1067 	{
1068 		debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state);
1069 		if (state == ConnectionState.connecting)
1070 		{
1071 			int32_t error;
1072 			conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error);
1073 			if (error)
1074 				return disconnect(formatSocketError(error), DisconnectType.error);
1075 
1076 			state = ConnectionState.connected;
1077 
1078 			//debug writefln("[%s] Connected", remoteAddressStr);
1079 			try
1080 				setKeepAlive();
1081 			catch (Exception e)
1082 				return disconnect(e.msg, DisconnectType.error);
1083 			if (connectHandler)
1084 				connectHandler();
1085 			return;
1086 		}
1087 		//debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
1088 
1089 		foreach (sendPartial; [true, false])
1090 			foreach (int priority, ref queue; outQueue)
1091 				while (queue.length && (!sendPartial || priority == partiallySent))
1092 				{
1093 					assert(partiallySent == -1 || partiallySent == priority);
1094 
1095 					auto pdata = queue.ptr; // pointer to first data
1096 
1097 					ptrdiff_t sent = 0;
1098 					if (pdata.length)
1099 					{
1100 						sent = doSend(pdata.contents);
1101 						debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length);
1102 					}
1103 					else
1104 					{
1105 						debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this);
1106 					}
1107 
1108 					if (sent == Socket.ERROR)
1109 					{
1110 						if (wouldHaveBlocked())
1111 							return;
1112 						else
1113 							return onError("send() error: " ~ lastSocketError);
1114 					}
1115 					else
1116 					if (sent < pdata.length)
1117 					{
1118 						if (sent > 0)
1119 						{
1120 							*pdata = (*pdata)[sent..pdata.length];
1121 							partiallySent = priority;
1122 						}
1123 						return;
1124 					}
1125 					else
1126 					{
1127 						assert(sent == pdata.length);
1128 						//debug writefln("[%s] Sent data:", remoteAddressStr);
1129 						//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1130 						pdata.clear();
1131 						queue = queue[1..$];
1132 						partiallySent = -1;
1133 						if (queue.length == 0)
1134 							queue = null;
1135 					}
1136 				}
1137 
1138 		// outQueue is now empty
1139 		if (bufferFlushedHandler)
1140 			bufferFlushedHandler();
1141 		if (state == ConnectionState.disconnecting)
1142 		{
1143 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1144 			close();
1145 		}
1146 	}
1147 
1148 public:
1149 	this(Socket conn)
1150 	{
1151 		super(conn);
1152 	} ///
1153 }
1154 
1155 // ***************************************************************************
1156 
1157 /// A POSIX file stream.
1158 /// Allows adding a file (e.g. stdin/stdout) to the socket manager.
1159 /// Does not dup the given file descriptor, so "disconnecting" this connection
1160 /// will close it.
1161 version (Posix)
1162 class FileConnection : StreamConnection
1163 {
1164 	this(int fileno)
1165 	{
1166 		auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC);
1167 		conn.blocking = false;
1168 		super(conn);
1169 	} ///
1170 
1171 protected:
1172 	import core.sys.posix.unistd : read, write;
1173 
1174 	override sizediff_t doSend(in void[] buffer)
1175 	{
1176 		return write(socket.handle, buffer.ptr, buffer.length);
1177 	}
1178 
1179 	override sizediff_t doReceive(void[] buffer)
1180 	{
1181 		return read(socket.handle, buffer.ptr, buffer.length);
1182 	}
1183 }
1184 
1185 /// Separates reading and writing, e.g. for stdin/stdout.
1186 class Duplex : IConnection
1187 {
1188 	///
1189 	IConnection reader, writer;
1190 
1191 	this(IConnection reader, IConnection writer)
1192 	{
1193 		this.reader = reader;
1194 		this.writer = writer;
1195 		reader.handleConnect = &onConnect;
1196 		writer.handleConnect = &onConnect;
1197 		reader.handleDisconnect = &onDisconnect;
1198 		writer.handleDisconnect = &onDisconnect;
1199 	} ///
1200 
1201 	@property ConnectionState state()
1202 	{
1203 		if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting)
1204 			return ConnectionState.disconnecting;
1205 		else
1206 			return reader.state < writer.state ? reader.state : writer.state;
1207 	} ///
1208 
1209 	/// Queue Data for sending.
1210 	void send(Data[] data, int priority)
1211 	{
1212 		writer.send(data, priority);
1213 	}
1214 
1215 	alias send = IConnection.send; /// ditto
1216 
1217 	/// Terminate the connection.
1218 	/// Note: this isn't quite fleshed out - applications may want to
1219 	/// wait and send some more data even after stdin is closed, but
1220 	/// such an interface can't be fitted into an IConnection
1221 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1222 	{
1223 		if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting)
1224 			reader.disconnect(reason, type);
1225 		if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting)
1226 			writer.disconnect(reason, type);
1227 		debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1228 	}
1229 
1230 	protected void onConnect()
1231 	{
1232 		if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected)
1233 			connectHandler();
1234 	}
1235 
1236 	protected void onDisconnect(string reason, DisconnectType type)
1237 	{
1238 		debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state);
1239 		if (disconnectHandler)
1240 		{
1241 			disconnectHandler(reason, type);
1242 			disconnectHandler = null; // don't call it twice for the other connection
1243 		}
1244 		// It is our responsibility to disconnect the other connection
1245 		// Use DisconnectType.requested to ensure that any written data is flushed
1246 		disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested);
1247 	}
1248 
1249 	/// Callback for when a connection has been established.
1250 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1251 	private ConnectHandler connectHandler;
1252 
1253 	/// Callback setter for when new data is read.
1254 	@property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; }
1255 
1256 	/// Callback setter for when a connection was closed.
1257 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1258 	private DisconnectHandler disconnectHandler;
1259 
1260 	/// Callback setter for when all queued data has been written.
1261 	@property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; }
1262 }
1263 
1264 unittest { if (false) new Duplex(null, null); }
1265 
1266 // ***************************************************************************
1267 
1268 /// An asynchronous socket-based connection.
1269 class SocketConnection : StreamConnection
1270 {
1271 protected:
1272 	AddressInfo[] addressQueue;
1273 
1274 	this(Socket conn)
1275 	{
1276 		super(conn);
1277 	}
1278 
1279 	override sizediff_t doSend(in void[] buffer)
1280 	{
1281 		return conn.send(buffer);
1282 	}
1283 
1284 	override sizediff_t doReceive(void[] buffer)
1285 	{
1286 		return conn.receive(buffer);
1287 	}
1288 
1289 	final void tryNextAddress()
1290 	{
1291 		assert(state == ConnectionState.connecting);
1292 		auto addressInfo = addressQueue[0];
1293 		addressQueue = addressQueue[1..$];
1294 
1295 		try
1296 		{
1297 			conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
1298 			conn.blocking = false;
1299 
1300 			socketManager.register(this);
1301 			updateFlags();
1302 			debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString());
1303 			conn.connect(addressInfo.address);
1304 		}
1305 		catch (SocketException e)
1306 			return onError("Connect error: " ~ e.msg);
1307 	}
1308 
1309 	/// Called when an error occurs on the socket.
1310 	override void onError(string reason)
1311 	{
1312 		if (state == ConnectionState.connecting && addressQueue.length)
1313 		{
1314 			socketManager.unregister(this);
1315 			conn.close();
1316 			conn = null;
1317 
1318 			return tryNextAddress();
1319 		}
1320 
1321 		super.onError(reason);
1322 	}
1323 
1324 public:
1325 	/// Default constructor
1326 	this()
1327 	{
1328 		debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this);
1329 	}
1330 
1331 	/// Start establishing a connection.
1332 	final void connect(AddressInfo[] addresses)
1333 	{
1334 		assert(addresses.length, "No addresses specified");
1335 
1336 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1337 		assert(!conn);
1338 
1339 		addressQueue = addresses;
1340 		state = ConnectionState.connecting;
1341 		tryNextAddress();
1342 	}
1343 }
1344 
1345 /// An asynchronous TCP connection.
1346 class TcpConnection : SocketConnection
1347 {
1348 protected:
1349 	this(Socket conn)
1350 	{
1351 		super(conn);
1352 	}
1353 
1354 public:
1355 	/// Default constructor
1356 	this()
1357 	{
1358 		debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this);
1359 	}
1360 
1361 	///
1362 	alias connect = SocketConnection.connect; // raise overload
1363 
1364 	/// Start establishing a connection.
1365 	final void connect(string host, ushort port)
1366 	{
1367 		assert(host.length, "Empty host");
1368 		assert(port, "No port specified");
1369 
1370 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1371 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1372 
1373 		state = ConnectionState.resolving;
1374 
1375 		AddressInfo[] addressInfos;
1376 		try
1377 		{
1378 			auto addresses = getAddress(host, port);
1379 			enforce(addresses.length, "No addresses found");
1380 			debug (ASOCKETS)
1381 			{
1382 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1383 				foreach (address; addresses)
1384 					stderr.writefln("- %s", address.toString());
1385 			}
1386 
1387 			if (addresses.length > 1)
1388 			{
1389 				import std.random : randomShuffle;
1390 				randomShuffle(addresses);
1391 			}
1392 
1393 			foreach (address; addresses)
1394 				addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host);
1395 		}
1396 		catch (SocketException e)
1397 			return onError("Lookup error: " ~ e.msg);
1398 
1399 		state = ConnectionState.disconnected;
1400 		connect(addressInfos);
1401 	}
1402 }
1403 
1404 // ***************************************************************************
1405 
1406 /// An asynchronous connection server for socket-based connections.
1407 class SocketServer
1408 {
1409 protected:
1410 	/// Class that actually performs listening on a certain address family
1411 	final class Listener : GenericSocket
1412 	{
1413 		this(Socket conn)
1414 		{
1415 			debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this);
1416 			this.conn = conn;
1417 			socketManager.register(this);
1418 		}
1419 
1420 		/// Called when a socket is readable.
1421 		override void onReadable()
1422 		{
1423 			debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this);
1424 			Socket acceptSocket = conn.accept();
1425 			acceptSocket.blocking = false;
1426 			if (handleAccept)
1427 			{
1428 				auto connection = createConnection(acceptSocket);
1429 				debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr);
1430 				connection.setKeepAlive();
1431 				//assert(connection.connected);
1432 				//connection.connected = true;
1433 				acceptHandler(connection);
1434 			}
1435 			else
1436 				acceptSocket.close();
1437 		}
1438 
1439 		/// Called when a socket is writable.
1440 		override void onWritable()
1441 		{
1442 		}
1443 
1444 		/// Called when an error occurs on the socket.
1445 		override void onError(string reason)
1446 		{
1447 			close(); // call parent
1448 		}
1449 
1450 		void closeListener()
1451 		{
1452 			assert(conn);
1453 			socketManager.unregister(this);
1454 			conn.close();
1455 			conn = null;
1456 		}
1457 	}
1458 
1459 	SocketConnection createConnection(Socket socket)
1460 	{
1461 		return new SocketConnection(socket);
1462 	}
1463 
1464 	/// Whether the socket is listening.
1465 	bool listening;
1466 	/// Listener instances
1467 	Listener[] listeners;
1468 
1469 	final void updateFlags()
1470 	{
1471 		foreach (listener; listeners)
1472 			listener.notifyRead = handleAccept !is null;
1473 	}
1474 
1475 public:
1476 	/// Start listening on this socket.
1477 	final void listen(AddressInfo[] addressInfos)
1478 	{
1479 		foreach (ref addressInfo; addressInfos)
1480 		{
1481 			try
1482 			{
1483 				Socket conn = new Socket(addressInfo);
1484 				conn.blocking = false;
1485 				if (addressInfo.family == AddressFamily.INET6)
1486 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1487 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1488 
1489 				conn.bind(addressInfo.address);
1490 				conn.listen(8);
1491 
1492 				listeners ~= new Listener(conn);
1493 			}
1494 			catch (SocketException e)
1495 			{
1496 				debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1497 				debug(ASOCKETS) stderr.writeln(e.msg);
1498 			}
1499 		}
1500 
1501 		if (listeners.length==0)
1502 			throw new Exception("Unable to bind service");
1503 
1504 		listening = true;
1505 
1506 		updateFlags();
1507 	}
1508 
1509 	this()
1510 	{
1511 	} ///
1512 
1513 	/// Creates a Server with the given sockets.
1514 	/// The sockets must have already had `bind` and `listen` called on them.
1515 	this(Socket[] sockets...)
1516 	{
1517 		foreach (socket; sockets)
1518 			listeners ~= new Listener(socket);
1519 	}
1520 
1521 	/// Returns all listening addresses.
1522 	final @property Address[] localAddresses()
1523 	{
1524 		Address[] result;
1525 		foreach (listener; listeners)
1526 			result ~= listener.localAddress;
1527 		return result;
1528 	}
1529 
1530 	/// Returns `true` if the server is listening for incoming connections.
1531 	final @property bool isListening()
1532 	{
1533 		return listening;
1534 	}
1535 
1536 	/// Stop listening on this socket.
1537 	final void close()
1538 	{
1539 		foreach (listener;listeners)
1540 			listener.closeListener();
1541 		listeners = null;
1542 		listening = false;
1543 		if (handleClose)
1544 			handleClose();
1545 	}
1546 
1547 	/// Create a SocketServer using the handle passed on standard input,
1548 	/// for which `listen` had already been called. Used by
1549 	/// e.g. FastCGI and systemd sockets with "Listen = yes".
1550 	static SocketServer fromStdin()
1551 	{
1552 		socket_t socket;
1553 		version (Windows)
1554 		{
1555 			import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE;
1556 			socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE);
1557 		}
1558 		else
1559 			socket = cast(socket_t)0;
1560 
1561 		auto s = new Socket(socket, AddressFamily.UNSPEC);
1562 		s.blocking = false;
1563 		return new SocketServer(s);
1564 	}
1565 
1566 	/// Callback for when the socket was closed.
1567 	void delegate() handleClose;
1568 
1569 	private void delegate(SocketConnection incoming) acceptHandler;
1570 	/// Callback for an incoming connection.
1571 	/// Connections will not be accepted unless this handler is set.
1572 	@property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; }
1573 	/// ditto
1574 	@property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); }
1575 }
1576 
1577 /// An asynchronous TCP connection server.
1578 class TcpServer : SocketServer
1579 {
1580 protected:
1581 	override SocketConnection createConnection(Socket socket)
1582 	{
1583 		return new TcpConnection(socket);
1584 	}
1585 
1586 public:
1587 	this()
1588 	{
1589 	} ///
1590 
1591 	this(Socket[] sockets...)
1592 	{
1593 		super(sockets);
1594 	} /// Construct from the given sockets.
1595 
1596 	///
1597 	alias listen = SocketServer.listen; // raise overload
1598 
1599 	/// Start listening on this socket.
1600 	final ushort listen(ushort port, string addr = null)
1601 	{
1602 		debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port);
1603 		//assert(!listening, "Attempting to listen on a listening socket");
1604 
1605 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1606 
1607 		debug (ASOCKETS)
1608 		{
1609 			stderr.writefln("Resolved to %s addresses:", addressInfos.length);
1610 			foreach (ref addressInfo; addressInfos)
1611 				stderr.writefln("- %s", addressInfo);
1612 		}
1613 
1614 		// listen on random ports only on IPv4 for now
1615 		if (port == 0)
1616 		{
1617 			foreach_reverse (i, ref addressInfo; addressInfos)
1618 				if (addressInfo.family != AddressFamily.INET)
1619 					addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$];
1620 		}
1621 
1622 		listen(addressInfos);
1623 
1624 		foreach (listener; listeners)
1625 		{
1626 			auto address = listener.conn.localAddress();
1627 			if (address.addressFamily == AddressFamily.INET)
1628 				port = to!ushort(address.toPortString());
1629 		}
1630 
1631 		return port;
1632 	}
1633 
1634 	deprecated("Use SocketServer.fromStdin")
1635 	static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; }
1636 
1637 	/// Delegate to be called when a connection is accepted.
1638 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); }
1639 }
1640 
1641 // ***************************************************************************
1642 
1643 /// An asynchronous UDP stream.
1644 /// UDP does not have connections, so this class encapsulates a socket
1645 /// with a fixed destination (sendto) address, and optionally bound to
1646 /// a local address.
1647 /// Currently received packets' address is not exposed.
1648 class UdpConnection : Connection
1649 {
1650 protected:
1651 	this(Socket conn)
1652 	{
1653 		super(conn);
1654 	}
1655 
1656 	/// Called when a socket is writable.
1657 	override void onWritable()
1658 	{
1659 		//scope(success) updateFlags();
1660 		onWritableImpl();
1661 		updateFlags();
1662 	}
1663 
1664 	// Work around scope(success) breaking debugger stack traces
1665 	final private void onWritableImpl()
1666 	{
1667 		foreach (priority, ref queue; outQueue)
1668 			while (queue.length)
1669 			{
1670 				auto pdata = queue.ptr; // pointer to first data
1671 
1672 				auto sent = conn.sendTo(pdata.contents, remoteAddress);
1673 
1674 				if (sent == Socket.ERROR)
1675 				{
1676 					if (wouldHaveBlocked())
1677 						return;
1678 					else
1679 						return onError("send() error: " ~ lastSocketError);
1680 				}
1681 				else
1682 				if (sent < pdata.length)
1683 				{
1684 					return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length));
1685 				}
1686 				else
1687 				{
1688 					assert(sent == pdata.length);
1689 					//debug writefln("[%s] Sent data:", remoteAddressStr);
1690 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1691 					pdata.clear();
1692 					queue = queue[1..$];
1693 					if (queue.length == 0)
1694 						queue = null;
1695 				}
1696 			}
1697 
1698 		// outQueue is now empty
1699 		if (bufferFlushedHandler)
1700 			bufferFlushedHandler();
1701 		if (state == ConnectionState.disconnecting)
1702 		{
1703 			debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1704 			close();
1705 		}
1706 	}
1707 
1708 	override sizediff_t doSend(in void[] buffer)
1709 	{
1710 		assert(false); // never called (called only from overridden methods)
1711 	}
1712 
1713 	override sizediff_t doReceive(void[] buffer)
1714 	{
1715 		return conn.receive(buffer);
1716 	}
1717 
1718 public:
1719 	/// Default constructor
1720 	this()
1721 	{
1722 		debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this);
1723 	}
1724 
1725 	/// Initialize with the given `AddressFamily`, without binding to an address.
1726 	final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP)
1727 	{
1728 		initializeImpl(family, type, protocol);
1729 		if (connectHandler)
1730 			connectHandler();
1731 	}
1732 
1733 	final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol)
1734 	{
1735 		assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state));
1736 		assert(!conn);
1737 
1738 		conn = new Socket(family, type, protocol);
1739 		conn.blocking = false;
1740 		socketManager.register(this);
1741 		state = ConnectionState.connected;
1742 		updateFlags();
1743 	}
1744 
1745 	/// Bind to a local address in order to receive packets sent there.
1746 	final ushort bind(string host, ushort port)
1747 	{
1748 		assert(host.length, "Empty host");
1749 
1750 		debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port);
1751 
1752 		state = ConnectionState.resolving;
1753 
1754 		AddressInfo addressInfo;
1755 		try
1756 		{
1757 			auto addresses = getAddress(host, port);
1758 			enforce(addresses.length, "No addresses found");
1759 			debug (ASOCKETS)
1760 			{
1761 				stderr.writefln("Resolved to %s addresses:", addresses.length);
1762 				foreach (address; addresses)
1763 					stderr.writefln("- %s", address.toString());
1764 			}
1765 
1766 			Address address;
1767 			if (addresses.length > 1)
1768 			{
1769 				import std.random : uniform;
1770 				address = addresses[uniform(0, $)];
1771 			}
1772 			else
1773 				address = addresses[0];
1774 			addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host);
1775 		}
1776 		catch (SocketException e)
1777 		{
1778 			onError("Lookup error: " ~ e.msg);
1779 			return 0;
1780 		}
1781 
1782 		state = ConnectionState.disconnected;
1783 		return bind(addressInfo);
1784 	}
1785 
1786 	/// ditto
1787 	final ushort bind(AddressInfo addressInfo)
1788 	{
1789 		initialize(addressInfo.family, addressInfo.type, addressInfo.protocol);
1790 		conn.bind(addressInfo.address);
1791 
1792 		auto address = conn.localAddress();
1793 		auto port = to!ushort(address.toPortString());
1794 
1795 		if (connectHandler)
1796 			connectHandler();
1797 
1798 		return port;
1799 	}
1800 
1801 // public:
1802 	/// Where to send packets to.
1803 	Address remoteAddress;
1804 }
1805 
1806 ///
1807 unittest
1808 {
1809 	auto server = new UdpConnection();
1810 	server.bind("localhost", 0);
1811 
1812 	auto client = new UdpConnection();
1813 	client.initialize(server.localAddress.addressFamily);
1814 
1815 	string[] packets = ["Hello", "there"];
1816 	client.remoteAddress = server.localAddress;
1817 	client.send({
1818 		Data[] data;
1819 		foreach (packet; packets)
1820 			data ~= Data(packet);
1821 		return data;
1822 	}());
1823 
1824 	server.handleReadData = (Data data)
1825 	{
1826 		assert(data.contents == packets[0]);
1827 		packets = packets[1..$];
1828 		if (!packets.length)
1829 		{
1830 			server.close();
1831 			client.close();
1832 		}
1833 	};
1834 	socketManager.loop();
1835 	assert(!packets.length);
1836 }
1837 
1838 // ***************************************************************************
1839 
1840 /// Base class for a connection adapter.
1841 /// By itself, does nothing.
1842 class ConnectionAdapter : IConnection
1843 {
1844 	/// The next connection in the chain (towards the raw transport).
1845 	IConnection next;
1846 
1847 	this(IConnection next)
1848 	{
1849 		this.next = next;
1850 		next.handleConnect = &onConnect;
1851 		next.handleDisconnect = &onDisconnect;
1852 		next.handleBufferFlushed = &onBufferFlushed;
1853 	} ///
1854 
1855 	@property ConnectionState state() { return next.state; } ///
1856 
1857 	/// Queue Data for sending.
1858 	void send(Data[] data, int priority)
1859 	{
1860 		next.send(data, priority);
1861 	}
1862 
1863 	alias send = IConnection.send; /// ditto
1864 
1865 	/// Terminate the connection.
1866 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1867 	{
1868 		next.disconnect(reason, type);
1869 	}
1870 
1871 	protected void onConnect()
1872 	{
1873 		if (connectHandler)
1874 			connectHandler();
1875 	}
1876 
1877 	protected void onReadData(Data data)
1878 	{
1879 		// onReadData should be fired only if readDataHandler is set
1880 		readDataHandler(data);
1881 	}
1882 
1883 	protected void onDisconnect(string reason, DisconnectType type)
1884 	{
1885 		if (disconnectHandler)
1886 			disconnectHandler(reason, type);
1887 	}
1888 
1889 	protected void onBufferFlushed()
1890 	{
1891 		if (bufferFlushedHandler)
1892 			bufferFlushedHandler();
1893 	}
1894 
1895 	/// Callback for when a connection has been established.
1896 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1897 	private ConnectHandler connectHandler;
1898 
1899 	/// Callback setter for when new data is read.
1900 	@property void handleReadData(ReadDataHandler value)
1901 	{
1902 		readDataHandler = value;
1903 		next.handleReadData = value ? &onReadData : null ;
1904 	}
1905 	private ReadDataHandler readDataHandler;
1906 
1907 	/// Callback setter for when a connection was closed.
1908 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1909 	private DisconnectHandler disconnectHandler;
1910 
1911 	/// Callback setter for when all queued data has been written.
1912 	@property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; }
1913 	private BufferFlushedHandler bufferFlushedHandler;
1914 }
1915 
1916 // ***************************************************************************
1917 
1918 /// Adapter for connections with a line-based protocol.
1919 /// Splits data stream into delimiter-separated lines.
1920 class LineBufferedAdapter : ConnectionAdapter
1921 {
1922 	/// The protocol's line delimiter.
1923 	string delimiter = "\r\n";
1924 
1925 	/// Maximum line length (0 means unlimited).
1926 	size_t maxLength = 0;
1927 
1928 	this(IConnection next)
1929 	{
1930 		super(next);
1931 	} ///
1932 
1933 	/// Append a line to the send buffer.
1934 	void send(string line)
1935 	{
1936 		//super.send(Data(line ~ delimiter));
1937 		// https://issues.dlang.org/show_bug.cgi?id=13985
1938 		ConnectionAdapter ca = this;
1939 		ca.send(Data(line ~ delimiter));
1940 	}
1941 
1942 protected:
1943 	/// The receive buffer.
1944 	Data inBuffer;
1945 
1946 	/// Called when data has been received.
1947 	final override void onReadData(Data data)
1948 	{
1949 		import std..string : indexOf;
1950 		auto oldBufferLength = inBuffer.length;
1951 		if (oldBufferLength)
1952 			inBuffer ~= data;
1953 		else
1954 			inBuffer = data;
1955 
1956 		if (delimiter.length == 1)
1957 		{
1958 			import core.stdc..string : memchr;
1959 
1960 			char c = delimiter[0];
1961 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1962 			while (p)
1963 			{
1964 				sizediff_t index = p - inBuffer.ptr;
1965 				if (!processLine(index))
1966 					break;
1967 
1968 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1969 			}
1970 		}
1971 		else
1972 		{
1973 			sizediff_t index;
1974 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1975 			while ((index = indexOf(cast(string)inBuffer.contents, delimiter)) >= 0
1976 				&& processLine(index))
1977 			{}
1978 		}
1979 
1980 		if (maxLength && inBuffer.length > maxLength)
1981 			disconnect("Line too long", DisconnectType.error);
1982 	}
1983 
1984 	final bool processLine(size_t index)
1985 	{
1986 		if (maxLength && index > maxLength)
1987 		{
1988 			disconnect("Line too long", DisconnectType.error);
1989 			return false;
1990 		}
1991 		auto line = inBuffer[0..index];
1992 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1993 		super.onReadData(line);
1994 		return true;
1995 	}
1996 
1997 	override void onDisconnect(string reason, DisconnectType type)
1998 	{
1999 		super.onDisconnect(reason, type);
2000 		inBuffer.clear();
2001 	}
2002 }
2003 
2004 // ***************************************************************************
2005 
2006 /// Fires an event handler or disconnects connections
2007 /// after a period of inactivity.
2008 class TimeoutAdapter : ConnectionAdapter
2009 {
2010 	this(IConnection next)
2011 	{
2012 		debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this);
2013 		super(next);
2014 	} ///
2015 
2016 	/// Set the `Duration` indicating the period of inactivity after which to take action.
2017 	final void setIdleTimeout(Duration duration)
2018 	{
2019 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
2020 		assert(duration > Duration.zero);
2021 
2022 		// Configure idleTask
2023 		if (idleTask is null)
2024 		{
2025 			idleTask = new TimerTask(duration);
2026 			idleTask.handleTask = &onTask_Idle;
2027 		}
2028 		else
2029 		{
2030 			if (idleTask.isWaiting())
2031 				idleTask.cancel();
2032 			idleTask.delay = duration;
2033 		}
2034 
2035 		mainTimer.add(idleTask);
2036 	}
2037 
2038 	/// Manually mark this connection as non-idle, restarting the idle timer.
2039 	/// `handleNonIdle` will be called, if set.
2040 	void markNonIdle()
2041 	{
2042 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
2043 		if (handleNonIdle)
2044 			handleNonIdle();
2045 		if (idleTask && idleTask.isWaiting())
2046 			idleTask.restart();
2047 	}
2048 
2049 	/// Stop the idle timer.
2050 	void cancelIdleTimeout()
2051 	{
2052 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
2053 		assert(idleTask !is null);
2054 		assert(idleTask.isWaiting());
2055 		idleTask.cancel();
2056 	}
2057 
2058 	/// Restart the idle timer.
2059 	void resumeIdleTimeout()
2060 	{
2061 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
2062 		assert(idleTask !is null);
2063 		assert(!idleTask.isWaiting());
2064 		mainTimer.add(idleTask);
2065 	}
2066 
2067 	/// Callback for when a connection has stopped responding.
2068 	/// If unset, the connection will be disconnected.
2069 	void delegate() handleIdleTimeout;
2070 
2071 	/// Callback for when a connection is marked as non-idle
2072 	/// (when data is received).
2073 	void delegate() handleNonIdle;
2074 
2075 protected:
2076 	override void onConnect()
2077 	{
2078 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
2079 		markNonIdle();
2080 		super.onConnect();
2081 	}
2082 
2083 	override void onReadData(Data data)
2084 	{
2085 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
2086 		markNonIdle();
2087 		super.onReadData(data);
2088 	}
2089 
2090 	override void onDisconnect(string reason, DisconnectType type)
2091 	{
2092 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
2093 		if (idleTask && idleTask.isWaiting())
2094 			idleTask.cancel();
2095 		super.onDisconnect(reason, type);
2096 	}
2097 
2098 private:
2099 	TimerTask idleTask; // non-null if an idle timeout has been set
2100 
2101 	final void onTask_Idle(Timer /*timer*/, TimerTask /*task*/)
2102 	{
2103 		debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this);
2104 		if (state == ConnectionState.disconnecting)
2105 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
2106 
2107 		if (state == ConnectionState.disconnected)
2108 			return;
2109 
2110 		if (handleIdleTimeout)
2111 		{
2112 			resumeIdleTimeout(); // reschedule (by default)
2113 			handleIdleTimeout();
2114 		}
2115 		else
2116 			disconnect("Time-out", DisconnectType.error);
2117 	}
2118 }
2119 
2120 // ***************************************************************************
2121 
2122 unittest
2123 {
2124 	void testTimer()
2125 	{
2126 		bool fired;
2127 		setTimeout({fired = true;}, 10.msecs);
2128 		socketManager.loop();
2129 		assert(fired);
2130 	}
2131 
2132 	testTimer();
2133 }