1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <vladimir@thecybershadow.net>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.math;
21 public import ae.sys.data;
22 
23 import std.exception;
24 import std.socket;
25 import std.string : format;
26 public import std.socket : Address, Socket;
27 
28 debug(ASOCKETS) import std.stdio;
29 debug(PRINTDATA) import ae.utils.text : hexDump;
30 private import std.conv : to;
31 
32 
33 // http://d.puremagic.com/issues/show_bug.cgi?id=7016
34 static import ae.utils.array;
35 
36 version(LIBEV)
37 {
38 	import deimos.ev;
39 	pragma(lib, "ev");
40 }
41 
42 version(Windows)
43 {
44 	import core.sys.windows.windows : Sleep;
45 	enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
46 }
47 else
48 	enum USE_SLEEP = false;
49 
50 /// Flags that determine socket wake-up events.
51 int eventCounter;
52 
53 version(LIBEV)
54 {
55 	// Watchers are a GenericSocket field (as declared in SocketMixin).
56 	// Use one watcher per read and write event.
57 	// Start/stop those as higher-level code declares interest in those events.
58 	// Use the "data" ev_io field to store the parent GenericSocket address.
59 	// Also use the "data" field as a flag to indicate whether the watcher is active
60 	// (data is null when the watcher is stopped).
61 
62 	struct SocketManager
63 	{
64 	private:
65 		size_t count;
66 
67 		extern(C)
68 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
69 		{
70 			eventCounter++;
71 			auto socket = cast(GenericSocket)w.data;
72 			assert(socket, "libev event fired on stopped watcher");
73 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents);
74 
75 			if (revents & EV_READ)
76 				socket.onReadable();
77 			else
78 			if (revents & EV_WRITE)
79 				socket.onWritable();
80 			else
81 				assert(false, "Unknown event fired from libev");
82 
83 			// TODO? Need to get proper SocketManager instance to call updateTimer on
84 			socketManager.updateTimer(false);
85 		}
86 
87 		ev_timer evTimer;
88 		MonoTime lastNextEvent = MonoTime.max;
89 
90 		extern(C)
91 		static void timerCallback(ev_loop_t* l, ev_timer* w, int revents)
92 		{
93 			eventCounter++;
94 			debug (ASOCKETS) writefln("Timer callback called.");
95 			mainTimer.prod();
96 
97 			socketManager.updateTimer(true);
98 			debug (ASOCKETS) writefln("Timer callback exiting.");
99 		}
100 
101 		void updateTimer(bool force)
102 		{
103 			auto nextEvent = mainTimer.getNextEvent();
104 			if (force || lastNextEvent != nextEvent)
105 			{
106 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
107 				if (nextEvent == MonoTime.max) // Stopping
108 				{
109 					if (lastNextEvent != MonoTime.max)
110 						ev_timer_stop(ev_default_loop(0), &evTimer);
111 				}
112 				else
113 				{
114 					auto remaining = mainTimer.getRemainingTime();
115 					while (remaining <= Duration.zero)
116 					{
117 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
118 						mainTimer.prod();
119 						remaining = mainTimer.getRemainingTime();
120 					}
121 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
122 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
123 					if (lastNextEvent == MonoTime.max) // Starting
124 					{
125 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
126 						ev_timer_start(ev_default_loop(0), &evTimer);
127 					}
128 					else // Adjusting
129 					{
130 						evTimer.repeat = tstamp;
131 						ev_timer_again(ev_default_loop(0), &evTimer);
132 					}
133 				}
134 				lastNextEvent = nextEvent;
135 			}
136 		}
137 
138 		/// Register a socket with the manager.
139 		void register(GenericSocket socket)
140 		{
141 			debug (ASOCKETS) writefln("Registering %s", socket);
142 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
143 			auto fd = socket.conn.handle;
144 			assert(fd, "Must have fd before socket registration");
145 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
146 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
147 			count++;
148 		}
149 
150 		/// Unregister a socket with the manager.
151 		void unregister(GenericSocket socket)
152 		{
153 			debug (ASOCKETS) writefln("Unregistering %s", socket);
154 			socket.notifyRead  = false;
155 			socket.notifyWrite = false;
156 			count--;
157 		}
158 
159 	public:
160 		size_t size()
161 		{
162 			return count;
163 		}
164 
165 		/// Loop continuously until no sockets are left.
166 		void loop()
167 		{
168 			auto evLoop = ev_default_loop(0);
169 			enforce(evLoop, "libev initialization failure");
170 
171 			updateTimer(true);
172 			debug (ASOCKETS) writeln("ev_run");
173 			ev_run(ev_default_loop(0), 0);
174 		}
175 	}
176 
177 	private mixin template SocketMixin()
178 	{
179 		private ev_io evRead, evWrite;
180 
181 		private void setWatcherState(ref ev_io ev, bool newValue, int event)
182 		{
183 			if (!conn)
184 			{
185 				// Can happen when setting delegates before connecting.
186 				return;
187 			}
188 
189 			if (newValue && !ev.data)
190 			{
191 				// Start
192 				ev.data = cast(void*)this;
193 				ev_io_start(ev_default_loop(0), &ev);
194 			}
195 			else
196 			if (!newValue && ev.data)
197 			{
198 				// Stop
199 				assert(ev.data is cast(void*)this);
200 				ev.data = null;
201 				ev_io_stop(ev_default_loop(0), &ev);
202 			}
203 		}
204 
205 		/// Interested in read notifications (onReadable)?
206 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
207 		/// Interested in write notifications (onWritable)?
208 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
209 
210 		debug ~this()
211 		{
212 			// The LIBEV SocketManager holds no references to registered sockets.
213 			// TODO: Add a doubly-linked list?
214 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
215 		}
216 	}
217 }
218 else // Use select
219 {
220 	struct SocketManager
221 	{
222 	private:
223 		enum FD_SETSIZE = 1024;
224 
225 		/// List of all sockets to poll.
226 		GenericSocket[] sockets;
227 
228 		/// Debug AA to check for dangling socket references.
229 		debug GenericSocket[socket_t] socketHandles;
230 
231 		/// Register a socket with the manager.
232 		void register(GenericSocket conn)
233 		{
234 			debug (ASOCKETS) writefln("Registering %s (%d total)", conn, sockets.length + 1);
235 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
236 			sockets ~= conn;
237 
238 			debug
239 			{
240 				auto handle = conn.socket.handle;
241 				assert(handle != socket_t.init, "Can't register a closed socket");
242 				assert(handle !in socketHandles, "This socket handle is already registered");
243 				socketHandles[handle] = conn;
244 			}
245 		}
246 
247 		/// Unregister a socket with the manager.
248 		void unregister(GenericSocket conn)
249 		{
250 			debug (ASOCKETS) writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
251 
252 			debug
253 			{
254 				auto handle = conn.socket.handle;
255 				assert(handle != socket_t.init, "Can't unregister a closed socket");
256 				auto pconn = handle in socketHandles;
257 				assert(pconn, "This socket handle is not registered");
258 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
259 				socketHandles.remove(handle);
260 			}
261 
262 			foreach (size_t i, GenericSocket j; sockets)
263 				if (j is conn)
264 				{
265 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
266 					return;
267 				}
268 			assert(false, "Socket not registered");
269 		}
270 
271 		void delegate()[] idleHandlers;
272 
273 	public:
274 		size_t size()
275 		{
276 			return sockets.length;
277 		}
278 
279 		/// Loop continuously until no sockets are left.
280 		void loop()
281 		{
282 			debug (ASOCKETS) writeln("Starting event loop.");
283 
284 			SocketSet readset, writeset, errorset;
285 			size_t sockcount;
286 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
287 			readset  = new SocketSet(setSize);
288 			writeset = new SocketSet(setSize);
289 			errorset = new SocketSet(setSize);
290 			while (true)
291 			{
292 				uint minSize = 0;
293 				version(Windows)
294 					minSize = cast(uint)sockets.length;
295 				else
296 				{
297 					foreach (s; sockets)
298 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
299 							minSize = s.socket.handle;
300 				}
301 				minSize++;
302 
303 				if (setSize < minSize)
304 				{
305 					debug (ASOCKETS) writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
306 					setSize = minSize * 2;
307 					readset  = new SocketSet(setSize);
308 					writeset = new SocketSet(setSize);
309 					errorset = new SocketSet(setSize);
310 				}
311 				else
312 				{
313 					readset.reset();
314 					writeset.reset();
315 					errorset.reset();
316 				}
317 
318 				sockcount = 0;
319 				bool haveActive;
320 				debug (ASOCKETS) writeln("Populating sets");
321 				foreach (GenericSocket conn; sockets)
322 				{
323 					if (!conn.socket)
324 						continue;
325 					sockcount++;
326 					if (!conn.daemon)
327 						haveActive = true;
328 
329 					debug (ASOCKETS) writef("\t%s:", conn);
330 					if (conn.notifyRead)
331 					{
332 						readset.add(conn.socket);
333 						debug (ASOCKETS) write(" READ");
334 					}
335 					if (conn.notifyWrite)
336 					{
337 						writeset.add(conn.socket);
338 						debug (ASOCKETS) write(" WRITE");
339 					}
340 					errorset.add(conn.socket);
341 					debug (ASOCKETS) writeln();
342 				}
343 				debug (ASOCKETS)
344 				{
345 					writefln("Sets populated as follows:");
346 					printSets(readset, writeset, errorset);
347 				}
348 
349 				debug (ASOCKETS) writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...",
350 					sockcount,
351 					mainTimer.isWaiting() ? "with" : "no",
352 					idleHandlers.length,
353 				);
354 				if (!haveActive && !mainTimer.isWaiting())
355 				{
356 					debug (ASOCKETS) writeln("No more sockets or timer events, exiting loop.");
357 					break;
358 				}
359 
360 				debug (ASOCKETS) { stdout.flush(); stderr.flush(); }
361 
362 				int events;
363 				if (idleHandlers.length)
364 				{
365 					if (sockcount==0)
366 						events = 0;
367 					else
368 						events = Socket.select(readset, writeset, errorset, 0.seconds);
369 				}
370 				else
371 				if (USE_SLEEP && sockcount==0)
372 				{
373 					version(Windows)
374 					{
375 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
376 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
377 						if (duration <= 0)
378 							duration = 1; // Avoid busywait
379 						else
380 						if (duration > int.max)
381 							duration = int.max;
382 						Sleep(cast(int)duration);
383 						events = 0;
384 					}
385 					else
386 						assert(0);
387 				}
388 				else
389 				if (mainTimer.isWaiting())
390 					events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime());
391 				else
392 					events = Socket.select(readset, writeset, errorset);
393 
394 				debug (ASOCKETS) writefln("%d events fired.", events);
395 
396 				if (events > 0)
397 				{
398 					// Handle just one event at a time, as the first
399 					// handler might invalidate select()'s results.
400 					handleEvent(readset, writeset, errorset);
401 				}
402 				else
403 				if (idleHandlers.length)
404 				{
405 					import ae.utils.array;
406 					auto handler = idleHandlers.shift();
407 
408 					// Rotate the idle handler queue before running it,
409 					// in case the handler unregisters itself.
410 					idleHandlers ~= handler;
411 
412 					handler();
413 				}
414 
415 				// Timers may invalidate our select results, so fire them after processing the latter
416 				mainTimer.prod();
417 
418 				eventCounter++;
419 			}
420 		}
421 
422 		debug (ASOCKETS)
423 		void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset)
424 		{
425 			foreach (GenericSocket conn; sockets)
426 			{
427 				if (!conn.socket)
428 					writefln("\t\t%s is unset", conn);
429 				else
430 				if (readset.isSet(conn.socket))
431 					writefln("\t\t%s is readable", conn);
432 				else
433 				if (writeset.isSet(conn.socket))
434 					writefln("\t\t%s is writable", conn);
435 				else
436 				if (errorset.isSet(conn.socket))
437 					writefln("\t\t%s is errored", conn);
438 			}
439 		}
440 
441 		void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset)
442 		{
443 			debug (ASOCKETS)
444 			{
445 				writefln("\tSelect results:");
446 				printSets(readset, writeset, errorset);
447 			}
448 
449 			foreach (GenericSocket conn; sockets)
450 			{
451 				if (!conn.socket)
452 					continue;
453 
454 				if (readset.isSet(conn.socket))
455 				{
456 					debug (ASOCKETS) writefln("\t%s - calling onReadable", conn);
457 					return conn.onReadable();
458 				}
459 				else
460 				if (writeset.isSet(conn.socket))
461 				{
462 					debug (ASOCKETS) writefln("\t%s - calling onWritable", conn);
463 					return conn.onWritable();
464 				}
465 				else
466 				if (errorset.isSet(conn.socket))
467 				{
468 					debug (ASOCKETS) writefln("\t%s - calling onError", conn);
469 					return conn.onError("select() error: " ~ conn.socket.getErrorText());
470 				}
471 			}
472 
473 			assert(false, "select() reported events available, but no registered sockets are set");
474 		}
475 	}
476 
477 	// Use UFCS to allow removeIdleHandler to have a predicate with context
478 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
479 	{
480 		foreach (i, idleHandler; socketManager.idleHandlers)
481 			assert(handler !is idleHandler);
482 
483 		socketManager.idleHandlers ~= handler;
484 	}
485 
486 	static bool isFun(T)(T a, T b) { return a is b; }
487 	void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args)
488 	{
489 		foreach (i, idleHandler; socketManager.idleHandlers)
490 			if (pred(idleHandler, args))
491 			{
492 				import std.algorithm;
493 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
494 				return;
495 			}
496 		assert(false, "No such idle handler");
497 	}
498 
499 	private mixin template SocketMixin()
500 	{
501 		/// Interested in read notifications (onReadable)?
502 		bool notifyRead;
503 		/// Interested in write notifications (onWritable)?
504 		bool notifyWrite;
505 	}
506 }
507 
508 /// The default socket manager.
509 SocketManager socketManager;
510 
511 // ***************************************************************************
512 
513 /// General methods for an asynchronous socket.
514 private abstract class GenericSocket
515 {
516 	/// Declares notifyRead and notifyWrite.
517 	mixin SocketMixin;
518 
519 protected:
520 	/// The socket this class wraps.
521 	Socket conn;
522 
523 protected:
524 	/// Retrieve the socket class this class wraps.
525 	@property final Socket socket()
526 	{
527 		return conn;
528 	}
529 
530 	void onReadable()
531 	{
532 	}
533 
534 	void onWritable()
535 	{
536 	}
537 
538 	void onError(string reason)
539 	{
540 	}
541 
542 public:
543 	/// allow getting the address of connections that are already disconnected
544 	private Address cachedLocalAddress, cachedRemoteAddress;
545 
546 	/// Don't block the process from exiting.
547 	/// TODO: Not implemented with libev
548 	bool daemon;
549 
550 	final @property Address localAddress()
551 	{
552 		if (cachedLocalAddress !is null)
553 			return cachedLocalAddress;
554 		else
555 		if (conn is null)
556 			return null;
557 		else
558 			return cachedLocalAddress = conn.localAddress();
559 	}
560 
561 	final @property string localAddressStr() nothrow
562 	{
563 		try
564 		{
565 			auto a = localAddress;
566 			return a is null ? "[null address]" : a.toString();
567 		}
568 		catch (Exception e)
569 			return "[error: " ~ e.msg ~ "]";
570 	}
571 
572 	final @property Address remoteAddress()
573 	{
574 		if (cachedRemoteAddress !is null)
575 			return cachedRemoteAddress;
576 		else
577 		if (conn is null)
578 			return null;
579 		else
580 			return cachedRemoteAddress = conn.remoteAddress();
581 	}
582 
583 	final @property string remoteAddressStr() nothrow
584 	{
585 		try
586 		{
587 			auto a = remoteAddress;
588 			return a is null ? "[null address]" : a.toString();
589 		}
590 		catch (Exception e)
591 			return "[error: " ~ e.msg ~ "]";
592 	}
593 
594 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
595 	{
596 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
597 		if (enabled)
598 		{
599 			try
600 				conn.setKeepAlive(time, interval);
601 			catch (SocketFeatureException)
602 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
603 		}
604 		else
605 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
606 	}
607 
608 	override string toString() const
609 	{
610 		import std.string;
611 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
612 	}
613 }
614 
615 // ***************************************************************************
616 
617 enum DisconnectType
618 {
619 	requested, // initiated by the application
620 	graceful,  // peer gracefully closed the connection
621 	error      // abnormal network condition
622 }
623 
624 enum ConnectionState
625 {
626 	/// The initial state, or the state after a disconnect was fully processed.
627 	disconnected,
628 
629 	/// Name resolution. Currently done synchronously.
630 	resolving,
631 
632 	/// A connection attempt is in progress.
633 	connecting,
634 
635 	/// A connection is established.
636 	connected,
637 
638 	/// Disconnecting in progress. No data can be sent or received at this point.
639 	/// We are waiting for queued data to be actually sent before disconnecting.
640 	disconnecting,
641 }
642 
643 /// Common interface for connections and adapters.
644 interface IConnection
645 {
646 	enum MAX_PRIORITY = 4;
647 	enum DEFAULT_PRIORITY = 2;
648 
649 	static const defaultDisconnectReason = "Software closed the connection";
650 
651 	/// Get connection state.
652 	@property ConnectionState state();
653 
654 	/// Has a connection been established?
655 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
656 
657 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
658 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
659 
660 	/// Queue Data for sending.
661 	void send(Data[] data, int priority = DEFAULT_PRIORITY);
662 
663 	/// ditto
664 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
665 	{
666 		Data[1] data;
667 		data[0] = datum;
668 		this.send(data);
669 		data[] = Data.init;
670 	}
671 
672 	/// Terminate the connection.
673 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
674 
675 	/// Callback setter for when a connection has been established
676 	/// (if applicable).
677 	alias void delegate() ConnectHandler;
678 	@property void handleConnect(ConnectHandler value); /// ditto
679 
680 	/// Callback setter for when new data is read.
681 	alias void delegate(Data data) ReadDataHandler;
682 	@property void handleReadData(ReadDataHandler value); /// ditto
683 
684 	/// Callback setter for when a connection was closed.
685 	alias void delegate(string reason, DisconnectType type) DisconnectHandler;
686 	@property void handleDisconnect(DisconnectHandler value); /// ditto
687 }
688 
689 // ***************************************************************************
690 
691 /// An asynchronous TCP connection.
692 class TcpConnection : GenericSocket, IConnection
693 {
694 private:
695 	/// Blocks of data larger than this value are passed as unmanaged memory
696 	/// (in Data objects). Blocks smaller than this value will be reallocated
697 	/// on the managed heap. The disadvantage of placing large objects on the
698 	/// managed heap is false pointers; the disadvantage of using Data for
699 	/// small objects is wasted slack space due to the page size alignment
700 	/// requirement.
701 	enum UNMANAGED_THRESHOLD = 256;
702 
703 	/// Queue of addresses to try connecting to.
704 	Address[] addressQueue;
705 
706 	ConnectionState _state;
707 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
708 
709 public:
710 	/// Get connection state.
711 	override @property ConnectionState state() { return _state; }
712 
713 protected:
714 	/// The send buffers.
715 	Data[][MAX_PRIORITY+1] outQueue;
716 	/// Whether the first item from each queue has been partially sent (and thus can't be cancelled).
717 	bool[MAX_PRIORITY+1] partiallySent;
718 
719 	/// Constructor used by a ServerSocket for new connections
720 	this(Socket conn)
721 	{
722 		this();
723 		this.conn = conn;
724 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
725 		if (conn)
726 			socketManager.register(this);
727 		updateFlags();
728 	}
729 
730 	final void updateFlags()
731 	{
732 		if (state == ConnectionState.connecting)
733 			notifyWrite = true;
734 		else
735 			notifyWrite = writePending;
736 
737 		notifyRead = state == ConnectionState.connected && readDataHandler;
738 	}
739 
740 	/// Called when a socket is readable.
741 	override void onReadable()
742 	{
743 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
744 		static ubyte[0x10000] inBuffer;
745 		auto received = conn.receive(inBuffer);
746 
747 		if (received == 0)
748 			return disconnect("Connection closed", DisconnectType.graceful);
749 
750 		if (received == Socket.ERROR)
751 		{
752 		//	if (wouldHaveBlocked)
753 		//	{
754 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
755 		//		return;
756 		//	}
757 		//	else
758 				onError("recv() error: " ~ lastSocketError);
759 		}
760 		else
761 		{
762 			debug (PRINTDATA)
763 			{
764 				std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress);
765 				std.stdio.write(hexDump(inBuffer[0 .. received]));
766 				std.stdio.stdout.flush();
767 			}
768 
769 			if (state == ConnectionState.disconnecting)
770 			{
771 				debug (ASOCKETS) writefln("\t\t%s: Discarding received data because we are disconnecting", this);
772 			}
773 			else
774 			if (!readDataHandler)
775 			{
776 				debug (ASOCKETS) writefln("\t\t%s: Discarding received data because there is no data handler", this);
777 			}
778 			else
779 			{
780 				// Currently, unlike the D1 version of this module,
781 				// we will always reallocate read network data.
782 				// This disfavours code which doesn't need to store
783 				// read data after processing it, but otherwise
784 				// makes things simpler and safer all around.
785 
786 				if (received < UNMANAGED_THRESHOLD)
787 				{
788 					// Copy to the managed heap
789 					readDataHandler(Data(inBuffer[0 .. received].dup));
790 				}
791 				else
792 				{
793 					// Copy to unmanaged memory
794 					readDataHandler(Data(inBuffer[0 .. received], true));
795 				}
796 			}
797 		}
798 	}
799 
800 	/// Called when a socket is writable.
801 	override void onWritable()
802 	{
803 		//scope(success) updateFlags();
804 		onWritableImpl();
805 		updateFlags();
806 	}
807 
808 	// Work around scope(success) breaking debugger stack traces
809 	final private void onWritableImpl()
810 	{
811 		if (state == ConnectionState.connecting)
812 		{
813 			state = ConnectionState.connected;
814 
815 			//debug writefln("[%s] Connected", remoteAddress);
816 			try
817 				setKeepAlive();
818 			catch (Exception e)
819 				return disconnect(e.msg, DisconnectType.error);
820 			if (connectHandler)
821 				connectHandler();
822 			return;
823 		}
824 		//debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
825 
826 		foreach (priority, ref queue; outQueue)
827 			while (queue.length)
828 			{
829 				auto pdata = queue.ptr; // pointer to first data
830 
831 				ptrdiff_t sent = 0;
832 				if (pdata.length)
833 				{
834 					sent = conn.send(pdata.contents);
835 					debug (ASOCKETS) writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length);
836 				}
837 				else
838 				{
839 					debug (ASOCKETS) writefln("\t\t%s: empty Data object", this);
840 				}
841 
842 				if (sent == Socket.ERROR)
843 				{
844 					if (wouldHaveBlocked())
845 						return;
846 					else
847 						return onError("send() error: " ~ lastSocketError);
848 				}
849 				else
850 				if (sent < pdata.length)
851 				{
852 					if (sent > 0)
853 					{
854 						*pdata = (*pdata)[sent..pdata.length];
855 						partiallySent[priority] = true;
856 					}
857 					return;
858 				}
859 				else
860 				{
861 					assert(sent == pdata.length);
862 					//debug writefln("[%s] Sent data:", remoteAddress);
863 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
864 					pdata.clear();
865 					queue = queue[1..$];
866 					partiallySent[priority] = false;
867 					if (queue.length == 0)
868 						queue = null;
869 				}
870 			}
871 
872 		// outQueue is now empty
873 		if (handleBufferFlushed)
874 			handleBufferFlushed();
875 		if (state == ConnectionState.disconnecting)
876 		{
877 			debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
878 			close();
879 		}
880 	}
881 
882 	/// Called when an error occurs on the socket.
883 	override void onError(string reason)
884 	{
885 		if (state == ConnectionState.connecting && addressQueue.length)
886 		{
887 			socketManager.unregister(this);
888 			conn.close();
889 			conn = null;
890 
891 			return tryNextAddress();
892 		}
893 
894 		if (state == ConnectionState.disconnecting)
895 		{
896 			debug (ASOCKETS) writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
897 			return close();
898 		}
899 
900 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
901 		disconnect("Socket error: " ~ reason, DisconnectType.error);
902 	}
903 
904 	final void tryNextAddress()
905 	{
906 		assert(state == ConnectionState.connecting);
907 		auto address = addressQueue[0];
908 		addressQueue = addressQueue[1..$];
909 
910 		try
911 		{
912 			conn = new Socket(address.addressFamily(), SocketType.STREAM, ProtocolType.TCP);
913 			conn.blocking = false;
914 
915 			socketManager.register(this);
916 			updateFlags();
917 			debug (ASOCKETS) writefln("Attempting connection to %s", address.toString());
918 			conn.connect(address);
919 		}
920 		catch (SocketException e)
921 			return onError("Connect error: " ~ e.msg);
922 	}
923 
924 public:
925 	/// Default constructor
926 	this()
927 	{
928 		debug (ASOCKETS) writefln("New TcpConnection @ %s", cast(void*)this);
929 	}
930 
931 	/// Start establishing a connection.
932 	final void connect(string host, ushort port)
933 	{
934 		assert(host.length, "Empty host");
935 		assert(port, "No port specified");
936 
937 		debug (ASOCKETS) writefln("Connecting to %s:%s", host, port);
938 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
939 		assert(!conn);
940 
941 		state = ConnectionState.resolving;
942 
943 		try
944 		{
945 			addressQueue = getAddress(host, port);
946 			enforce(addressQueue.length, "No addresses found");
947 			debug (ASOCKETS)
948 			{
949 				writefln("Resolved to %s addresses:", addressQueue.length);
950 				foreach (address; addressQueue)
951 					writefln("- %s", address.toString());
952 			}
953 
954 			state = ConnectionState.connecting;
955 			if (addressQueue.length > 1)
956 			{
957 				import std.random : randomShuffle;
958 				randomShuffle(addressQueue);
959 			}
960 		}
961 		catch (SocketException e)
962 			return onError("Lookup error: " ~ e.msg);
963 
964 		tryNextAddress();
965 	}
966 
967 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
968 	/// The disconnect handler will be called when all data has been flushed.
969 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
970 	{
971 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
972 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected, "Attempting to disconnect on a %s socket".format(state));
973 
974 		if (writePending)
975 		{
976 			if (type==DisconnectType.requested)
977 			{
978 				assert(conn, "Attempting to disconnect on an uninitialized socket");
979 				// queue disconnect after all data is sent
980 				debug (ASOCKETS) writefln("[%s] Queueing disconnect: %s", remoteAddress, reason);
981 				state = ConnectionState.disconnecting;
982 				//setIdleTimeout(30.seconds);
983 				if (disconnectHandler)
984 					disconnectHandler(reason, type);
985 				updateFlags();
986 				return;
987 			}
988 			else
989 				discardQueues();
990 		}
991 
992 		debug (ASOCKETS) writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
993 
994 		if (state == ConnectionState.connecting || state == ConnectionState.connected)
995 			close();
996 		else
997 			assert(conn is null, "Registered but %s socket".format(state));
998 
999 		if (disconnectHandler)
1000 			disconnectHandler(reason, type);
1001 		updateFlags();
1002 	}
1003 
1004 	private final void close()
1005 	{
1006 		assert(conn, "Attempting to close an unregistered socket");
1007 		socketManager.unregister(this);
1008 		conn.close();
1009 		conn = null;
1010 		outQueue[] = null;
1011 		state = ConnectionState.disconnected;
1012 	}
1013 
1014 	/// Append data to the send buffer.
1015 	void send(Data[] data, int priority = DEFAULT_PRIORITY)
1016 	{
1017 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
1018 		outQueue[priority] ~= data;
1019 		notifyWrite = true; // Fast updateFlags()
1020 
1021 		debug (PRINTDATA)
1022 		{
1023 			std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress);
1024 			foreach (datum; data)
1025 				if (datum.length)
1026 					std.stdio.write(hexDump(datum.contents));
1027 				else
1028 					std.stdio.writeln("(empty Data)");
1029 			std.stdio.stdout.flush();
1030 		}
1031 	}
1032 
1033 	/// ditto
1034 	alias send = IConnection.send;
1035 
1036 	final void clearQueue(int priority)
1037 	{
1038 		if (partiallySent[priority])
1039 		{
1040 			assert(outQueue[priority].length > 0);
1041 			outQueue[priority] = outQueue[priority][0..1];
1042 		}
1043 		else
1044 			outQueue[priority] = null;
1045 		updateFlags();
1046 	}
1047 
1048 	/// Clears all queues, even partially sent content.
1049 	private final void discardQueues()
1050 	{
1051 		foreach (priority; 0..MAX_PRIORITY+1)
1052 		{
1053 			outQueue[priority] = null;
1054 			partiallySent[priority] = false;
1055 		}
1056 		updateFlags();
1057 	}
1058 
1059 	@property
1060 	final bool writePending()
1061 	{
1062 		foreach (queue; outQueue)
1063 			if (queue.length)
1064 				return true;
1065 		return false;
1066 	}
1067 
1068 	final bool queuePresent(int priority)
1069 	{
1070 		if (partiallySent[priority])
1071 		{
1072 			assert(outQueue[priority].length > 0);
1073 			return outQueue[priority].length > 1;
1074 		}
1075 		else
1076 			return outQueue[priority].length > 0;
1077 	}
1078 
1079 public:
1080 	private ConnectHandler connectHandler;
1081 	/// Callback for when a connection has been established.
1082 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
1083 
1084 	/// Callback for when the send buffer has been flushed.
1085 	void delegate() handleBufferFlushed;
1086 
1087 	private ReadDataHandler readDataHandler;
1088 	/// Callback for incoming data.
1089 	/// Data will not be received unless this handler is set.
1090 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
1091 
1092 	private DisconnectHandler disconnectHandler;
1093 	/// Callback for when a connection was closed.
1094 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
1095 }
1096 
1097 // ***************************************************************************
1098 
1099 /// An asynchronous TCP connection server.
1100 final class TcpServer
1101 {
1102 private:
1103 	/// Class that actually performs listening on a certain address family
1104 	final class Listener : GenericSocket
1105 	{
1106 		this(Socket conn)
1107 		{
1108 			debug (ASOCKETS) writefln("New Listener @ %s", cast(void*)this);
1109 			this.conn = conn;
1110 			socketManager.register(this);
1111 		}
1112 
1113 		/// Called when a socket is readable.
1114 		override void onReadable()
1115 		{
1116 			debug (ASOCKETS) writefln("Accepting connection from listener @ %s", cast(void*)this);
1117 			Socket acceptSocket = conn.accept();
1118 			acceptSocket.blocking = false;
1119 			if (handleAccept)
1120 			{
1121 				TcpConnection connection = new TcpConnection(acceptSocket);
1122 				debug (ASOCKETS) writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress);
1123 				connection.setKeepAlive();
1124 				//assert(connection.connected);
1125 				//connection.connected = true;
1126 				acceptHandler(connection);
1127 			}
1128 			else
1129 				acceptSocket.close();
1130 		}
1131 
1132 		/// Called when a socket is writable.
1133 		override void onWritable()
1134 		{
1135 		}
1136 
1137 		/// Called when an error occurs on the socket.
1138 		override void onError(string reason)
1139 		{
1140 			close(); // call parent
1141 		}
1142 
1143 		void closeListener()
1144 		{
1145 			assert(conn);
1146 			socketManager.unregister(this);
1147 			conn.close();
1148 			conn = null;
1149 		}
1150 	}
1151 
1152 	/// Whether the socket is listening.
1153 	bool listening;
1154 	/// Listener instances
1155 	Listener[] listeners;
1156 
1157 	final void updateFlags()
1158 	{
1159 		foreach (listener; listeners)
1160 			listener.notifyRead = handleAccept !is null;
1161 	}
1162 
1163 public:
1164 	/// Start listening on this socket.
1165 	ushort listen(ushort port, string addr = null)
1166 	{
1167 		debug(ASOCKETS) writefln("Attempting to listen on %s:%d", addr, port);
1168 		//assert(!listening, "Attempting to listen on a listening socket");
1169 
1170 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1171 
1172 		foreach (ref addressInfo; addressInfos)
1173 		{
1174 			if (addressInfo.family != AddressFamily.INET && port == 0)
1175 				continue;  // listen on random ports only on IPv4 for now
1176 
1177 			try
1178 			{
1179 				Socket conn = new Socket(addressInfo);
1180 				conn.blocking = false;
1181 				if (addressInfo.family == AddressFamily.INET6)
1182 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1183 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1184 
1185 				conn.bind(addressInfo.address);
1186 				conn.listen(8);
1187 
1188 				if (addressInfo.family == AddressFamily.INET)
1189 					port = to!ushort(conn.localAddress().toPortString());
1190 
1191 				listeners ~= new Listener(conn);
1192 			}
1193 			catch (SocketException e)
1194 			{
1195 				debug(ASOCKETS) writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1196 				debug(ASOCKETS) writeln(e.msg);
1197 			}
1198 		}
1199 
1200 		if (listeners.length==0)
1201 			throw new Exception("Unable to bind service");
1202 
1203 		listening = true;
1204 
1205 		updateFlags();
1206 
1207 		return port;
1208 	}
1209 
1210 	@property Address[] localAddresses()
1211 	{
1212 		Address[] result;
1213 		foreach (listener; listeners)
1214 			result ~= listener.localAddress;
1215 		return result;
1216 	}
1217 
1218 	@property bool isListening()
1219 	{
1220 		return listening;
1221 	}
1222 
1223 	/// Stop listening on this socket.
1224 	void close()
1225 	{
1226 		foreach (listener;listeners)
1227 			listener.closeListener();
1228 		listeners = null;
1229 		listening = false;
1230 		if (handleClose)
1231 			handleClose();
1232 	}
1233 
1234 public:
1235 	/// Callback for when the socket was closed.
1236 	void delegate() handleClose;
1237 
1238 	private void delegate(TcpConnection incoming) acceptHandler;
1239 	/// Callback for an incoming connection.
1240 	/// Connections will not be accepted unless this handler is set.
1241 	@property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; }
1242 	/// ditto
1243 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); }
1244 }
1245 
1246 // ***************************************************************************
1247 
1248 /// Base class for a connection adapter.
1249 /// By itself, does nothing.
1250 class ConnectionAdapter : IConnection
1251 {
1252 	IConnection next;
1253 
1254 	this(IConnection next)
1255 	{
1256 		this.next = next;
1257 		next.handleConnect = &onConnect;
1258 		next.handleDisconnect = &onDisconnect;
1259 	}
1260 
1261 	@property ConnectionState state() { return next.state; }
1262 
1263 	/// Queue Data for sending.
1264 	void send(Data[] data, int priority)
1265 	{
1266 		next.send(data, priority);
1267 	}
1268 
1269 	alias send = IConnection.send; /// ditto
1270 
1271 	/// Terminate the connection.
1272 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1273 	{
1274 		next.disconnect(reason, type);
1275 	}
1276 
1277 	protected void onConnect()
1278 	{
1279 		if (connectHandler)
1280 			connectHandler();
1281 	}
1282 
1283 	protected void onReadData(Data data)
1284 	{
1285 		// onReadData should be fired only if readDataHandler is set
1286 		readDataHandler(data);
1287 	}
1288 
1289 	protected void onDisconnect(string reason, DisconnectType type)
1290 	{
1291 		if (disconnectHandler)
1292 			disconnectHandler(reason, type);
1293 	}
1294 
1295 	/// Callback for when a connection has been established.
1296 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1297 	private ConnectHandler connectHandler;
1298 
1299 	/// Callback setter for when new data is read.
1300 	@property void handleReadData(ReadDataHandler value)
1301 	{
1302 		readDataHandler = value;
1303 		next.handleReadData = value ? &onReadData : null ;
1304 	}
1305 	private ReadDataHandler readDataHandler;
1306 
1307 	/// Callback setter for when a connection was closed.
1308 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1309 	private DisconnectHandler disconnectHandler;
1310 }
1311 
1312 // ***************************************************************************
1313 
1314 /// Adapter for connections with a line-based protocol.
1315 /// Splits data stream into delimiter-separated lines.
1316 class LineBufferedAdapter : ConnectionAdapter
1317 {
1318 	/// The protocol's line delimiter.
1319 	string delimiter = "\r\n";
1320 
1321 	this(IConnection next)
1322 	{
1323 		super(next);
1324 	}
1325 
1326 	/// Append a line to the send buffer.
1327 	void send(string line)
1328 	{
1329 		//super.send(Data(line ~ delimiter));
1330 		// https://issues.dlang.org/show_bug.cgi?id=13985
1331 		ConnectionAdapter ca = this;
1332 		ca.send(Data(line ~ delimiter));
1333 	}
1334 
1335 protected:
1336 	/// The receive buffer.
1337 	Data inBuffer;
1338 
1339 	/// Called when data has been received.
1340 	final override void onReadData(Data data)
1341 	{
1342 		import std.string;
1343 		auto oldBufferLength = inBuffer.length;
1344 		if (oldBufferLength)
1345 			inBuffer ~= data;
1346 		else
1347 			inBuffer = data;
1348 
1349 		if (delimiter.length == 1)
1350 		{
1351 			import core.stdc.string; // memchr
1352 
1353 			char c = delimiter[0];
1354 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1355 			while (p)
1356 			{
1357 				sizediff_t index = p - inBuffer.ptr;
1358 				processLine(index);
1359 
1360 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1361 			}
1362 		}
1363 		else
1364 		{
1365 			sizediff_t index;
1366 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1367 			while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0)
1368 				processLine(index);
1369 		}
1370 	}
1371 
1372 	final void processLine(size_t index)
1373 	{
1374 		auto line = inBuffer[0..index];
1375 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1376 		super.onReadData(line);
1377 	}
1378 
1379 	override void onDisconnect(string reason, DisconnectType type)
1380 	{
1381 		super.onDisconnect(reason, type);
1382 		inBuffer.clear();
1383 	}
1384 }
1385 
1386 // ***************************************************************************
1387 
1388 /// Fires an event handler or disconnects connections
1389 /// after a period of inactivity.
1390 class TimeoutAdapter : ConnectionAdapter
1391 {
1392 	this(IConnection next)
1393 	{
1394 		debug (ASOCKETS) writefln("New TimeoutAdapter @ %s", cast(void*)this);
1395 		super(next);
1396 	}
1397 
1398 	void cancelIdleTimeout()
1399 	{
1400 		debug (ASOCKETS) writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
1401 		assert(idleTask !is null);
1402 		assert(idleTask.isWaiting());
1403 		idleTask.cancel();
1404 	}
1405 
1406 	void resumeIdleTimeout()
1407 	{
1408 		debug (ASOCKETS) writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
1409 		assert(state == ConnectionState.connected);
1410 		assert(idleTask !is null);
1411 		assert(!idleTask.isWaiting());
1412 		mainTimer.add(idleTask);
1413 	}
1414 
1415 	final void setIdleTimeout(Duration duration)
1416 	{
1417 		debug (ASOCKETS) writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
1418 		assert(duration > Duration.zero);
1419 		if (idleTask is null)
1420 		{
1421 			idleTask = new TimerTask(duration);
1422 			idleTask.handleTask = &onTask_Idle;
1423 		}
1424 		else
1425 		{
1426 			if (idleTask.isWaiting())
1427 				idleTask.cancel();
1428 			idleTask.delay = duration;
1429 		}
1430 		if (state == ConnectionState.connected)
1431 			mainTimer.add(idleTask);
1432 	}
1433 
1434 	void markNonIdle()
1435 	{
1436 		debug (ASOCKETS) writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
1437 		assert(idleTask !is null);
1438 		if (handleNonIdle)
1439 			handleNonIdle();
1440 		if (idleTask.isWaiting())
1441 			idleTask.restart();
1442 	}
1443 
1444 	/// Callback for when a connection has stopped responding.
1445 	/// If unset, the connection will be disconnected.
1446 	void delegate() handleIdleTimeout;
1447 
1448 	/// Callback for when a connection is marked as non-idle
1449 	/// (when data is received).
1450 	void delegate() handleNonIdle;
1451 
1452 protected:
1453 	override void onConnect()
1454 	{
1455 		debug (ASOCKETS) writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
1456 		super.onConnect();
1457 		if (idleTask)
1458 			resumeIdleTimeout();
1459 	}
1460 
1461 	override void onReadData(Data data)
1462 	{
1463 		debug (ASOCKETS) writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
1464 		markNonIdle();
1465 		super.onReadData(data);
1466 	}
1467 
1468 	override void onDisconnect(string reason, DisconnectType type)
1469 	{
1470 		debug (ASOCKETS) writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
1471 		super.onDisconnect(reason, type);
1472 		if (idleTask && idleTask.isWaiting())
1473 			idleTask.cancel();
1474 	}
1475 
1476 private:
1477 	TimerTask idleTask;
1478 
1479 	final void onTask_Idle(Timer timer, TimerTask task)
1480 	{
1481 		if (state == ConnectionState.disconnecting)
1482 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
1483 
1484 		if (state != ConnectionState.connected)
1485 			return;
1486 
1487 		if (handleIdleTimeout)
1488 		{
1489 			handleIdleTimeout();
1490 			if (state == ConnectionState.connected)
1491 			{
1492 				assert(!idleTask.isWaiting());
1493 				mainTimer.add(idleTask);
1494 			}
1495 		}
1496 		else
1497 			disconnect("Time-out", DisconnectType.error);
1498 	}
1499 }
1500 
1501 // ***************************************************************************
1502 
1503 unittest
1504 {
1505 	void testTimer()
1506 	{
1507 		bool fired;
1508 		setTimeout({fired = true;}, 10.msecs);
1509 		socketManager.loop();
1510 		assert(fired);
1511 	}
1512 
1513 	testTimer();
1514 }