1 /**
2  * Asynchronous socket abstraction.
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Stéphan Kochen <stephan@kochen.nl>
12  *   Vladimir Panteleev <vladimir@thecybershadow.net>
13  *   Vincent Povirk <madewokherd@gmail.com>
14  *   Simon Arlott
15  */
16 
17 module ae.net.asockets;
18 
19 import ae.sys.timing;
20 import ae.utils.math;
21 public import ae.sys.data;
22 
23 import std.exception;
24 import std.socket;
25 import std.string : format;
26 public import std.socket : Address, AddressInfo, Socket;
27 
28 debug(ASOCKETS) import std.stdio;
29 debug(PRINTDATA) static import std.stdio;
30 debug(PRINTDATA) import ae.utils.text : hexDump;
31 private import std.conv : to;
32 
33 
34 // http://d.puremagic.com/issues/show_bug.cgi?id=7016
35 static import ae.utils.array;
36 
37 version(LIBEV)
38 {
39 	import deimos.ev;
40 	pragma(lib, "ev");
41 }
42 
43 version(Windows)
44 {
45 	import core.sys.windows.windows : Sleep;
46 	enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions
47 }
48 else
49 	enum USE_SLEEP = false;
50 
51 /// Flags that determine socket wake-up events.
52 int eventCounter;
53 
54 version(LIBEV)
55 {
56 	// Watchers are a GenericSocket field (as declared in SocketMixin).
57 	// Use one watcher per read and write event.
58 	// Start/stop those as higher-level code declares interest in those events.
59 	// Use the "data" ev_io field to store the parent GenericSocket address.
60 	// Also use the "data" field as a flag to indicate whether the watcher is active
61 	// (data is null when the watcher is stopped).
62 
63 	struct SocketManager
64 	{
65 	private:
66 		size_t count;
67 
68 		extern(C)
69 		static void ioCallback(ev_loop_t* l, ev_io* w, int revents)
70 		{
71 			eventCounter++;
72 			auto socket = cast(GenericSocket)w.data;
73 			assert(socket, "libev event fired on stopped watcher");
74 			debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents);
75 
76 			if (revents & EV_READ)
77 				socket.onReadable();
78 			else
79 			if (revents & EV_WRITE)
80 				socket.onWritable();
81 			else
82 				assert(false, "Unknown event fired from libev");
83 
84 			// TODO? Need to get proper SocketManager instance to call updateTimer on
85 			socketManager.updateTimer(false);
86 		}
87 
88 		ev_timer evTimer;
89 		MonoTime lastNextEvent = MonoTime.max;
90 
91 		extern(C)
92 		static void timerCallback(ev_loop_t* l, ev_timer* w, int revents)
93 		{
94 			eventCounter++;
95 			debug (ASOCKETS) writefln("Timer callback called.");
96 			mainTimer.prod();
97 
98 			socketManager.updateTimer(true);
99 			debug (ASOCKETS) writefln("Timer callback exiting.");
100 		}
101 
102 		void updateTimer(bool force)
103 		{
104 			auto nextEvent = mainTimer.getNextEvent();
105 			if (force || lastNextEvent != nextEvent)
106 			{
107 				debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent);
108 				if (nextEvent == MonoTime.max) // Stopping
109 				{
110 					if (lastNextEvent != MonoTime.max)
111 						ev_timer_stop(ev_default_loop(0), &evTimer);
112 				}
113 				else
114 				{
115 					auto remaining = mainTimer.getRemainingTime();
116 					while (remaining <= Duration.zero)
117 					{
118 						debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining);
119 						mainTimer.prod();
120 						remaining = mainTimer.getRemainingTime();
121 					}
122 					ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1);
123 					debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp);
124 					if (lastNextEvent == MonoTime.max) // Starting
125 					{
126 						ev_timer_init(&evTimer, &timerCallback, 0., tstamp);
127 						ev_timer_start(ev_default_loop(0), &evTimer);
128 					}
129 					else // Adjusting
130 					{
131 						evTimer.repeat = tstamp;
132 						ev_timer_again(ev_default_loop(0), &evTimer);
133 					}
134 				}
135 				lastNextEvent = nextEvent;
136 			}
137 		}
138 
139 		/// Register a socket with the manager.
140 		void register(GenericSocket socket)
141 		{
142 			debug (ASOCKETS) writefln("Registering %s", socket);
143 			debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket");
144 			auto fd = socket.conn.handle;
145 			assert(fd, "Must have fd before socket registration");
146 			ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ );
147 			ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE);
148 			count++;
149 		}
150 
151 		/// Unregister a socket with the manager.
152 		void unregister(GenericSocket socket)
153 		{
154 			debug (ASOCKETS) writefln("Unregistering %s", socket);
155 			socket.notifyRead  = false;
156 			socket.notifyWrite = false;
157 			count--;
158 		}
159 
160 	public:
161 		size_t size()
162 		{
163 			return count;
164 		}
165 
166 		/// Loop continuously until no sockets are left.
167 		void loop()
168 		{
169 			auto evLoop = ev_default_loop(0);
170 			enforce(evLoop, "libev initialization failure");
171 
172 			updateTimer(true);
173 			debug (ASOCKETS) writeln("ev_run");
174 			ev_run(ev_default_loop(0), 0);
175 		}
176 	}
177 
178 	private mixin template SocketMixin()
179 	{
180 		private ev_io evRead, evWrite;
181 
182 		private void setWatcherState(ref ev_io ev, bool newValue, int event)
183 		{
184 			if (!conn)
185 			{
186 				// Can happen when setting delegates before connecting.
187 				return;
188 			}
189 
190 			if (newValue && !ev.data)
191 			{
192 				// Start
193 				ev.data = cast(void*)this;
194 				ev_io_start(ev_default_loop(0), &ev);
195 			}
196 			else
197 			if (!newValue && ev.data)
198 			{
199 				// Stop
200 				assert(ev.data is cast(void*)this);
201 				ev.data = null;
202 				ev_io_stop(ev_default_loop(0), &ev);
203 			}
204 		}
205 
206 		/// Interested in read notifications (onReadable)?
207 		@property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); }
208 		/// Interested in write notifications (onWritable)?
209 		@property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); }
210 
211 		debug ~this()
212 		{
213 			// The LIBEV SocketManager holds no references to registered sockets.
214 			// TODO: Add a doubly-linked list?
215 			assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket");
216 		}
217 	}
218 }
219 else // Use select
220 {
221 	struct SocketManager
222 	{
223 	private:
224 		enum FD_SETSIZE = 1024;
225 
226 		/// List of all sockets to poll.
227 		GenericSocket[] sockets;
228 
229 		/// Debug AA to check for dangling socket references.
230 		debug GenericSocket[socket_t] socketHandles;
231 
232 		/// Register a socket with the manager.
233 		void register(GenericSocket conn)
234 		{
235 			debug (ASOCKETS) writefln("Registering %s (%d total)", conn, sockets.length + 1);
236 			assert(!conn.socket.blocking, "Trying to register a blocking socket");
237 			sockets ~= conn;
238 
239 			debug
240 			{
241 				auto handle = conn.socket.handle;
242 				assert(handle != socket_t.init, "Can't register a closed socket");
243 				assert(handle !in socketHandles, "This socket handle is already registered");
244 				socketHandles[handle] = conn;
245 			}
246 		}
247 
248 		/// Unregister a socket with the manager.
249 		void unregister(GenericSocket conn)
250 		{
251 			debug (ASOCKETS) writefln("Unregistering %s (%d total)", conn, sockets.length - 1);
252 
253 			debug
254 			{
255 				auto handle = conn.socket.handle;
256 				assert(handle != socket_t.init, "Can't unregister a closed socket");
257 				auto pconn = handle in socketHandles;
258 				assert(pconn, "This socket handle is not registered");
259 				assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket");
260 				socketHandles.remove(handle);
261 			}
262 
263 			foreach (size_t i, GenericSocket j; sockets)
264 				if (j is conn)
265 				{
266 					sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length];
267 					return;
268 				}
269 			assert(false, "Socket not registered");
270 		}
271 
272 		void delegate()[] idleHandlers;
273 
274 	public:
275 		size_t size()
276 		{
277 			return sockets.length;
278 		}
279 
280 		/// Loop continuously until no sockets are left.
281 		void loop()
282 		{
283 			debug (ASOCKETS) writeln("Starting event loop.");
284 
285 			SocketSet readset, writeset, errorset;
286 			size_t sockcount;
287 			uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012
288 			readset  = new SocketSet(setSize);
289 			writeset = new SocketSet(setSize);
290 			errorset = new SocketSet(setSize);
291 			while (true)
292 			{
293 				uint minSize = 0;
294 				version(Windows)
295 					minSize = cast(uint)sockets.length;
296 				else
297 				{
298 					foreach (s; sockets)
299 						if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize)
300 							minSize = s.socket.handle;
301 				}
302 				minSize++;
303 
304 				if (setSize < minSize)
305 				{
306 					debug (ASOCKETS) writefln("Resizing SocketSets: %d => %d", setSize, minSize*2);
307 					setSize = minSize * 2;
308 					readset  = new SocketSet(setSize);
309 					writeset = new SocketSet(setSize);
310 					errorset = new SocketSet(setSize);
311 				}
312 				else
313 				{
314 					readset.reset();
315 					writeset.reset();
316 					errorset.reset();
317 				}
318 
319 				sockcount = 0;
320 				bool haveActive;
321 				debug (ASOCKETS) writeln("Populating sets");
322 				foreach (GenericSocket conn; sockets)
323 				{
324 					if (!conn.socket)
325 						continue;
326 					sockcount++;
327 					if (!conn.daemon)
328 						haveActive = true;
329 
330 					debug (ASOCKETS) writef("\t%s:", conn);
331 					if (conn.notifyRead)
332 					{
333 						readset.add(conn.socket);
334 						debug (ASOCKETS) write(" READ");
335 					}
336 					if (conn.notifyWrite)
337 					{
338 						writeset.add(conn.socket);
339 						debug (ASOCKETS) write(" WRITE");
340 					}
341 					errorset.add(conn.socket);
342 					debug (ASOCKETS) writeln();
343 				}
344 				debug (ASOCKETS)
345 				{
346 					writefln("Sets populated as follows:");
347 					printSets(readset, writeset, errorset);
348 				}
349 
350 				debug (ASOCKETS) writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...",
351 					sockcount,
352 					mainTimer.isWaiting() ? "with" : "no",
353 					idleHandlers.length,
354 				);
355 				if (!haveActive && !mainTimer.isWaiting())
356 				{
357 					debug (ASOCKETS) writeln("No more sockets or timer events, exiting loop.");
358 					break;
359 				}
360 
361 				debug (ASOCKETS) { stdout.flush(); stderr.flush(); }
362 
363 				int events;
364 				if (idleHandlers.length)
365 				{
366 					if (sockcount==0)
367 						events = 0;
368 					else
369 						events = Socket.select(readset, writeset, errorset, 0.seconds);
370 				}
371 				else
372 				if (USE_SLEEP && sockcount==0)
373 				{
374 					version(Windows)
375 					{
376 						auto duration = mainTimer.getRemainingTime().total!"msecs"();
377 						debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs");
378 						if (duration <= 0)
379 							duration = 1; // Avoid busywait
380 						else
381 						if (duration > int.max)
382 							duration = int.max;
383 						Sleep(cast(int)duration);
384 						events = 0;
385 					}
386 					else
387 						assert(0);
388 				}
389 				else
390 				if (mainTimer.isWaiting())
391 					events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime());
392 				else
393 					events = Socket.select(readset, writeset, errorset);
394 
395 				debug (ASOCKETS) writefln("%d events fired.", events);
396 
397 				if (events > 0)
398 				{
399 					// Handle just one event at a time, as the first
400 					// handler might invalidate select()'s results.
401 					handleEvent(readset, writeset, errorset);
402 				}
403 				else
404 				if (idleHandlers.length)
405 				{
406 					import ae.utils.array;
407 					auto handler = idleHandlers.shift();
408 
409 					// Rotate the idle handler queue before running it,
410 					// in case the handler unregisters itself.
411 					idleHandlers ~= handler;
412 
413 					handler();
414 				}
415 
416 				// Timers may invalidate our select results, so fire them after processing the latter
417 				mainTimer.prod();
418 
419 				eventCounter++;
420 			}
421 		}
422 
423 		debug (ASOCKETS)
424 		void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset)
425 		{
426 			foreach (GenericSocket conn; sockets)
427 			{
428 				if (!conn.socket)
429 					writefln("\t\t%s is unset", conn);
430 				else
431 				if (readset.isSet(conn.socket))
432 					writefln("\t\t%s is readable", conn);
433 				else
434 				if (writeset.isSet(conn.socket))
435 					writefln("\t\t%s is writable", conn);
436 				else
437 				if (errorset.isSet(conn.socket))
438 					writefln("\t\t%s is errored", conn);
439 			}
440 		}
441 
442 		void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset)
443 		{
444 			debug (ASOCKETS)
445 			{
446 				writefln("\tSelect results:");
447 				printSets(readset, writeset, errorset);
448 			}
449 
450 			foreach (GenericSocket conn; sockets)
451 			{
452 				if (!conn.socket)
453 					continue;
454 
455 				if (readset.isSet(conn.socket))
456 				{
457 					debug (ASOCKETS) writefln("\t%s - calling onReadable", conn);
458 					return conn.onReadable();
459 				}
460 				else
461 				if (writeset.isSet(conn.socket))
462 				{
463 					debug (ASOCKETS) writefln("\t%s - calling onWritable", conn);
464 					return conn.onWritable();
465 				}
466 				else
467 				if (errorset.isSet(conn.socket))
468 				{
469 					debug (ASOCKETS) writefln("\t%s - calling onError", conn);
470 					return conn.onError("select() error: " ~ conn.socket.getErrorText());
471 				}
472 			}
473 
474 			assert(false, "select() reported events available, but no registered sockets are set");
475 		}
476 	}
477 
478 	// Use UFCS to allow removeIdleHandler to have a predicate with context
479 	void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
480 	{
481 		foreach (i, idleHandler; socketManager.idleHandlers)
482 			assert(handler !is idleHandler);
483 
484 		socketManager.idleHandlers ~= handler;
485 	}
486 
487 	static bool isFun(T)(T a, T b) { return a is b; }
488 	void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args)
489 	{
490 		foreach (i, idleHandler; socketManager.idleHandlers)
491 			if (pred(idleHandler, args))
492 			{
493 				import std.algorithm;
494 				socketManager.idleHandlers = socketManager.idleHandlers.remove(i);
495 				return;
496 			}
497 		assert(false, "No such idle handler");
498 	}
499 
500 	private mixin template SocketMixin()
501 	{
502 		/// Interested in read notifications (onReadable)?
503 		bool notifyRead;
504 		/// Interested in write notifications (onWritable)?
505 		bool notifyWrite;
506 	}
507 }
508 
509 /// The default socket manager.
510 SocketManager socketManager;
511 
512 // ***************************************************************************
513 
514 /// General methods for an asynchronous socket.
515 private abstract class GenericSocket
516 {
517 	/// Declares notifyRead and notifyWrite.
518 	mixin SocketMixin;
519 
520 protected:
521 	/// The socket this class wraps.
522 	Socket conn;
523 
524 protected:
525 	/// Retrieve the socket class this class wraps.
526 	@property final Socket socket()
527 	{
528 		return conn;
529 	}
530 
531 	void onReadable()
532 	{
533 	}
534 
535 	void onWritable()
536 	{
537 	}
538 
539 	void onError(string reason)
540 	{
541 	}
542 
543 public:
544 	/// allow getting the address of connections that are already disconnected
545 	private Address cachedLocalAddress, cachedRemoteAddress;
546 
547 	/// Don't block the process from exiting.
548 	/// TODO: Not implemented with libev
549 	bool daemon;
550 
551 	final @property Address localAddress()
552 	{
553 		if (cachedLocalAddress !is null)
554 			return cachedLocalAddress;
555 		else
556 		if (conn is null)
557 			return null;
558 		else
559 			return cachedLocalAddress = conn.localAddress();
560 	}
561 
562 	final @property string localAddressStr() nothrow
563 	{
564 		try
565 		{
566 			auto a = localAddress;
567 			return a is null ? "[null address]" : a.toString();
568 		}
569 		catch (Exception e)
570 			return "[error: " ~ e.msg ~ "]";
571 	}
572 
573 	final @property Address remoteAddress()
574 	{
575 		if (cachedRemoteAddress !is null)
576 			return cachedRemoteAddress;
577 		else
578 		if (conn is null)
579 			return null;
580 		else
581 			return cachedRemoteAddress = conn.remoteAddress();
582 	}
583 
584 	final @property string remoteAddressStr() nothrow
585 	{
586 		try
587 		{
588 			auto a = remoteAddress;
589 			return a is null ? "[null address]" : a.toString();
590 		}
591 		catch (Exception e)
592 			return "[error: " ~ e.msg ~ "]";
593 	}
594 
595 	final void setKeepAlive(bool enabled=true, int time=10, int interval=5)
596 	{
597 		assert(conn, "Attempting to set keep-alive on an uninitialized socket");
598 		if (enabled)
599 		{
600 			try
601 				conn.setKeepAlive(time, interval);
602 			catch (SocketException)
603 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true);
604 		}
605 		else
606 			conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false);
607 	}
608 
609 	override string toString() const
610 	{
611 		import std.string;
612 		return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1);
613 	}
614 }
615 
616 // ***************************************************************************
617 
618 enum DisconnectType
619 {
620 	requested, // initiated by the application
621 	graceful,  // peer gracefully closed the connection
622 	error      // abnormal network condition
623 }
624 
625 enum ConnectionState
626 {
627 	/// The initial state, or the state after a disconnect was fully processed.
628 	disconnected,
629 
630 	/// Name resolution. Currently done synchronously.
631 	resolving,
632 
633 	/// A connection attempt is in progress.
634 	connecting,
635 
636 	/// A connection is established.
637 	connected,
638 
639 	/// Disconnecting in progress. No data can be sent or received at this point.
640 	/// We are waiting for queued data to be actually sent before disconnecting.
641 	disconnecting,
642 }
643 
644 /// Common interface for connections and adapters.
645 interface IConnection
646 {
647 	enum MAX_PRIORITY = 4;
648 	enum DEFAULT_PRIORITY = 2;
649 
650 	static const defaultDisconnectReason = "Software closed the connection";
651 
652 	/// Get connection state.
653 	@property ConnectionState state();
654 
655 	/// Has a connection been established?
656 	deprecated final @property bool connected() { return state == ConnectionState.connected; }
657 
658 	/// Are we in the process of disconnecting? (Waiting for data to be flushed)
659 	deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; }
660 
661 	/// Queue Data for sending.
662 	void send(Data[] data, int priority = DEFAULT_PRIORITY);
663 
664 	/// ditto
665 	final void send(Data datum, int priority = DEFAULT_PRIORITY)
666 	{
667 		Data[1] data;
668 		data[0] = datum;
669 		this.send(data);
670 		data[] = Data.init;
671 	}
672 
673 	/// Terminate the connection.
674 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested);
675 
676 	/// Callback setter for when a connection has been established
677 	/// (if applicable).
678 	alias void delegate() ConnectHandler;
679 	@property void handleConnect(ConnectHandler value); /// ditto
680 
681 	/// Callback setter for when new data is read.
682 	alias void delegate(Data data) ReadDataHandler;
683 	@property void handleReadData(ReadDataHandler value); /// ditto
684 
685 	/// Callback setter for when a connection was closed.
686 	alias void delegate(string reason, DisconnectType type) DisconnectHandler;
687 	@property void handleDisconnect(DisconnectHandler value); /// ditto
688 
689 	/// Callback setter for when all queued data has been sent.
690 	alias void delegate() BufferFlushedHandler;
691 	@property void handleBufferFlushed(BufferFlushedHandler value); /// ditto
692 }
693 
694 // ***************************************************************************
695 
696 class Connection : GenericSocket, IConnection
697 {
698 private:
699 	/// Blocks of data larger than this value are passed as unmanaged memory
700 	/// (in Data objects). Blocks smaller than this value will be reallocated
701 	/// on the managed heap. The disadvantage of placing large objects on the
702 	/// managed heap is false pointers; the disadvantage of using Data for
703 	/// small objects is wasted slack space due to the page size alignment
704 	/// requirement.
705 	enum UNMANAGED_THRESHOLD = 256;
706 
707 	ConnectionState _state;
708 	final @property ConnectionState state(ConnectionState value) { return _state = value; }
709 
710 public:
711 	/// Get connection state.
712 	override @property ConnectionState state() { return _state; }
713 
714 protected:
715 	abstract sizediff_t doSend(in void[] buffer);
716 	abstract sizediff_t doReceive(void[] buffer);
717 
718 	/// The send buffers.
719 	Data[][MAX_PRIORITY+1] outQueue;
720 	/// Whether the first item from each queue has been partially sent (and thus can't be canceled).
721 	bool[MAX_PRIORITY+1] partiallySent;
722 
723 	/// Constructor used by a ServerSocket for new connections
724 	this(Socket conn)
725 	{
726 		this();
727 		this.conn = conn;
728 		state = conn is null ? ConnectionState.disconnected : ConnectionState.connected;
729 		if (conn)
730 			socketManager.register(this);
731 		updateFlags();
732 	}
733 
734 	final void updateFlags()
735 	{
736 		if (state == ConnectionState.connecting)
737 			notifyWrite = true;
738 		else
739 			notifyWrite = writePending;
740 
741 		notifyRead = state == ConnectionState.connected && readDataHandler;
742 	}
743 
744 	/// Called when a socket is readable.
745 	override void onReadable()
746 	{
747 		// TODO: use FIONREAD when Phobos gets ioctl support (issue 6649)
748 		static ubyte[0x10000] inBuffer = void;
749 		auto received = doReceive(inBuffer);
750 
751 		if (received == 0)
752 			return disconnect("Connection closed", DisconnectType.graceful);
753 
754 		if (received == Socket.ERROR)
755 		{
756 		//	if (wouldHaveBlocked)
757 		//	{
758 		//		debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this);
759 		//		return;
760 		//	}
761 		//	else
762 				onError("recv() error: " ~ lastSocketError);
763 		}
764 		else
765 		{
766 			debug (PRINTDATA)
767 			{
768 				std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress);
769 				std.stdio.write(hexDump(inBuffer[0 .. received]));
770 				std.stdio.stdout.flush();
771 			}
772 
773 			if (state == ConnectionState.disconnecting)
774 			{
775 				debug (ASOCKETS) writefln("\t\t%s: Discarding received data because we are disconnecting", this);
776 			}
777 			else
778 			if (!readDataHandler)
779 			{
780 				debug (ASOCKETS) writefln("\t\t%s: Discarding received data because there is no data handler", this);
781 			}
782 			else
783 			{
784 				// Currently, unlike the D1 version of this module,
785 				// we will always reallocate read network data.
786 				// This disfavours code which doesn't need to store
787 				// read data after processing it, but otherwise
788 				// makes things simpler and safer all around.
789 
790 				if (received < UNMANAGED_THRESHOLD)
791 				{
792 					// Copy to the managed heap
793 					readDataHandler(Data(inBuffer[0 .. received].dup));
794 				}
795 				else
796 				{
797 					// Copy to unmanaged memory
798 					readDataHandler(Data(inBuffer[0 .. received], true));
799 				}
800 			}
801 		}
802 	}
803 
804 	/// Called when an error occurs on the socket.
805 	override void onError(string reason)
806 	{
807 		if (state == ConnectionState.disconnecting)
808 		{
809 			debug (ASOCKETS) writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason));
810 			return close();
811 		}
812 
813 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected);
814 		disconnect("Socket error: " ~ reason, DisconnectType.error);
815 	}
816 
817 	this()
818 	{
819 	}
820 
821 public:
822 	/// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting.
823 	/// The disconnect handler will be called immediately, even when not all data has been flushed yet.
824 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
825 	{
826 		//scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces
827 		assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected, "Attempting to disconnect on a %s socket".format(state));
828 
829 		if (writePending)
830 		{
831 			if (type==DisconnectType.requested)
832 			{
833 				assert(conn, "Attempting to disconnect on an uninitialized socket");
834 				// queue disconnect after all data is sent
835 				debug (ASOCKETS) writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason);
836 				state = ConnectionState.disconnecting;
837 				//setIdleTimeout(30.seconds);
838 				if (disconnectHandler)
839 					disconnectHandler(reason, type);
840 				updateFlags();
841 				return;
842 			}
843 			else
844 				discardQueues();
845 		}
846 
847 		debug (ASOCKETS) writefln("Disconnecting @ %s: %s", cast(void*)this, reason);
848 
849 		if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected)
850 			close();
851 		else
852 		{
853 			assert(conn is null, "Registered but %s socket".format(state));
854 			if (state == ConnectionState.resolving)
855 				state = ConnectionState.disconnected;
856 		}
857 
858 		if (disconnectHandler)
859 			disconnectHandler(reason, type);
860 		updateFlags();
861 	}
862 
863 	private final void close()
864 	{
865 		assert(conn, "Attempting to close an unregistered socket");
866 		socketManager.unregister(this);
867 		conn.close();
868 		conn = null;
869 		outQueue[] = null;
870 		state = ConnectionState.disconnected;
871 	}
872 
873 	/// Append data to the send buffer.
874 	void send(Data[] data, int priority = DEFAULT_PRIORITY)
875 	{
876 		assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state));
877 		outQueue[priority] ~= data;
878 		notifyWrite = true; // Fast updateFlags()
879 
880 		debug (PRINTDATA)
881 		{
882 			std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress);
883 			foreach (datum; data)
884 				if (datum.length)
885 					std.stdio.write(hexDump(datum.contents));
886 				else
887 					std.stdio.writeln("(empty Data)");
888 			std.stdio.stdout.flush();
889 		}
890 	}
891 
892 	/// ditto
893 	alias send = IConnection.send;
894 
895 	final void clearQueue(int priority)
896 	{
897 		if (partiallySent[priority])
898 		{
899 			assert(outQueue[priority].length > 0);
900 			outQueue[priority] = outQueue[priority][0..1];
901 		}
902 		else
903 			outQueue[priority] = null;
904 		updateFlags();
905 	}
906 
907 	/// Clears all queues, even partially sent content.
908 	private final void discardQueues()
909 	{
910 		foreach (priority; 0..MAX_PRIORITY+1)
911 		{
912 			outQueue[priority] = null;
913 			partiallySent[priority] = false;
914 		}
915 		updateFlags();
916 	}
917 
918 	@property
919 	final bool writePending()
920 	{
921 		foreach (queue; outQueue)
922 			if (queue.length)
923 				return true;
924 		return false;
925 	}
926 
927 	final bool queuePresent(int priority = DEFAULT_PRIORITY)
928 	{
929 		if (partiallySent[priority])
930 		{
931 			assert(outQueue[priority].length > 0);
932 			return outQueue[priority].length > 1;
933 		}
934 		else
935 			return outQueue[priority].length > 0;
936 	}
937 
938 	final size_t packetsQueued(int priority = DEFAULT_PRIORITY)
939 	{
940 		return outQueue[priority].length;
941 	}
942 
943 	final size_t bytesQueued(int priority = DEFAULT_PRIORITY)
944 	{
945 		size_t bytes;
946 		foreach (datum; outQueue[priority])
947 			bytes += datum.length;
948 		return bytes;
949 	}
950 
951 public:
952 	private ConnectHandler connectHandler;
953 	/// Callback for when a connection has been established.
954 	@property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); }
955 
956 	private ReadDataHandler readDataHandler;
957 	/// Callback for incoming data.
958 	/// Data will not be received unless this handler is set.
959 	@property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); }
960 
961 	private DisconnectHandler disconnectHandler;
962 	/// Callback for when a connection was closed.
963 	@property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); }
964 
965 	/// Callback setter for when all queued data has been sent.
966 	private BufferFlushedHandler bufferFlushedHandler;
967 	@property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); }
968 }
969 
970 class StreamConnection : Connection
971 {
972 protected:
973 	this()
974 	{
975 		super();
976 	}
977 
978 	/// Called when a socket is writable.
979 	override void onWritable()
980 	{
981 		//scope(success) updateFlags();
982 		onWritableImpl();
983 		updateFlags();
984 	}
985 
986 	// Work around scope(success) breaking debugger stack traces
987 	final private void onWritableImpl()
988 	{
989 		if (state == ConnectionState.connecting)
990 		{
991 			state = ConnectionState.connected;
992 
993 			//debug writefln("[%s] Connected", remoteAddress);
994 			try
995 				setKeepAlive();
996 			catch (Exception e)
997 				return disconnect(e.msg, DisconnectType.error);
998 			if (connectHandler)
999 				connectHandler();
1000 			return;
1001 		}
1002 		//debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
1003 
1004 		foreach (priority, ref queue; outQueue)
1005 			while (queue.length)
1006 			{
1007 				auto pdata = queue.ptr; // pointer to first data
1008 
1009 				ptrdiff_t sent = 0;
1010 				if (pdata.length)
1011 				{
1012 					sent = doSend(pdata.contents);
1013 					debug (ASOCKETS) writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length);
1014 				}
1015 				else
1016 				{
1017 					debug (ASOCKETS) writefln("\t\t%s: empty Data object", this);
1018 				}
1019 
1020 				if (sent == Socket.ERROR)
1021 				{
1022 					if (wouldHaveBlocked())
1023 						return;
1024 					else
1025 						return onError("send() error: " ~ lastSocketError);
1026 				}
1027 				else
1028 				if (sent < pdata.length)
1029 				{
1030 					if (sent > 0)
1031 					{
1032 						*pdata = (*pdata)[sent..pdata.length];
1033 						partiallySent[priority] = true;
1034 					}
1035 					return;
1036 				}
1037 				else
1038 				{
1039 					assert(sent == pdata.length);
1040 					//debug writefln("[%s] Sent data:", remoteAddress);
1041 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1042 					pdata.clear();
1043 					queue = queue[1..$];
1044 					partiallySent[priority] = false;
1045 					if (queue.length == 0)
1046 						queue = null;
1047 				}
1048 			}
1049 
1050 		// outQueue is now empty
1051 		if (bufferFlushedHandler)
1052 			bufferFlushedHandler();
1053 		if (state == ConnectionState.disconnecting)
1054 		{
1055 			debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1056 			close();
1057 		}
1058 	}
1059 
1060 public:
1061 	this(Socket conn)
1062 	{
1063 		super(conn);
1064 	}
1065 }
1066 
1067 // ***************************************************************************
1068 
1069 /// A POSIX file stream.
1070 /// Allows adding a file (e.g. stdin/stdout) to the socket manager.
1071 /// Does not dup the given file descriptor, so "disconnecting" this connection
1072 /// will close it.
1073 version (Posix)
1074 class FileConnection : StreamConnection
1075 {
1076 	this(int fileno)
1077 	{
1078 		auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC);
1079 		conn.blocking = false;
1080 		super(conn);
1081 	}
1082 
1083 protected:
1084 	import core.sys.posix.unistd : read, write;
1085 
1086 	override sizediff_t doSend(in void[] buffer)
1087 	{
1088 		return write(socket.handle, buffer.ptr, buffer.length);
1089 	}
1090 
1091 	override sizediff_t doReceive(void[] buffer)
1092 	{
1093 		return read(socket.handle, buffer.ptr, buffer.length);
1094 	}
1095 }
1096 
1097 // ***************************************************************************
1098 
1099 /// An asynchronous TCP connection.
1100 class TcpConnection : StreamConnection
1101 {
1102 protected:
1103 	/// Queue of addresses to try connecting to.
1104 	AddressInfo[] addressQueue;
1105 
1106 	this(Socket conn)
1107 	{
1108 		super(conn);
1109 	}
1110 
1111 	override sizediff_t doSend(in void[] buffer)
1112 	{
1113 		return conn.send(buffer);
1114 	}
1115 
1116 	override sizediff_t doReceive(void[] buffer)
1117 	{
1118 		return conn.receive(buffer);
1119 	}
1120 
1121 	final void tryNextAddress()
1122 	{
1123 		assert(state == ConnectionState.connecting);
1124 		auto addressInfo = addressQueue[0];
1125 		addressQueue = addressQueue[1..$];
1126 
1127 		try
1128 		{
1129 			conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
1130 			conn.blocking = false;
1131 
1132 			socketManager.register(this);
1133 			updateFlags();
1134 			debug (ASOCKETS) writefln("Attempting connection to %s", addressInfo.address.toString());
1135 			conn.connect(addressInfo.address);
1136 		}
1137 		catch (SocketException e)
1138 			return onError("Connect error: " ~ e.msg);
1139 	}
1140 
1141 	/// Called when an error occurs on the socket.
1142 	override void onError(string reason)
1143 	{
1144 		if (state == ConnectionState.connecting && addressQueue.length)
1145 		{
1146 			socketManager.unregister(this);
1147 			conn.close();
1148 			conn = null;
1149 
1150 			return tryNextAddress();
1151 		}
1152 
1153 		super.onError(reason);
1154 	}
1155 
1156 public:
1157 	/// Default constructor
1158 	this()
1159 	{
1160 		debug (ASOCKETS) writefln("New TcpConnection @ %s", cast(void*)this);
1161 	}
1162 
1163 	/// Start establishing a connection.
1164 	final void connect(string host, ushort port)
1165 	{
1166 		assert(host.length, "Empty host");
1167 		assert(port, "No port specified");
1168 
1169 		debug (ASOCKETS) writefln("Connecting to %s:%s", host, port);
1170 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1171 
1172 		state = ConnectionState.resolving;
1173 
1174 		AddressInfo[] addressInfos;
1175 		try
1176 		{
1177 			auto addresses = getAddress(host, port);
1178 			enforce(addresses.length, "No addresses found");
1179 			debug (ASOCKETS)
1180 			{
1181 				writefln("Resolved to %s addresses:", addresses.length);
1182 				foreach (address; addresses)
1183 					writefln("- %s", address.toString());
1184 			}
1185 
1186 			if (addresses.length > 1)
1187 			{
1188 				import std.random : randomShuffle;
1189 				randomShuffle(addresses);
1190 			}
1191 
1192 			foreach (address; addresses)
1193 				addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host);
1194 		}
1195 		catch (SocketException e)
1196 			return onError("Lookup error: " ~ e.msg);
1197 
1198 		state = ConnectionState.disconnected;
1199 		connect(addressInfos);
1200 	}
1201 
1202 	/// ditto
1203 	final void connect(AddressInfo[] addresses)
1204 	{
1205 		assert(addresses.length, "No addresses specified");
1206 
1207 		assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state));
1208 		assert(!conn);
1209 
1210 		addressQueue = addresses;
1211 		state = ConnectionState.connecting;
1212 		tryNextAddress();
1213 	}
1214 }
1215 
1216 // ***************************************************************************
1217 
1218 /// An asynchronous TCP connection server.
1219 final class TcpServer
1220 {
1221 private:
1222 	/// Class that actually performs listening on a certain address family
1223 	final class Listener : GenericSocket
1224 	{
1225 		this(Socket conn)
1226 		{
1227 			debug (ASOCKETS) writefln("New Listener @ %s", cast(void*)this);
1228 			this.conn = conn;
1229 			socketManager.register(this);
1230 		}
1231 
1232 		/// Called when a socket is readable.
1233 		override void onReadable()
1234 		{
1235 			debug (ASOCKETS) writefln("Accepting connection from listener @ %s", cast(void*)this);
1236 			Socket acceptSocket = conn.accept();
1237 			acceptSocket.blocking = false;
1238 			if (handleAccept)
1239 			{
1240 				TcpConnection connection = new TcpConnection(acceptSocket);
1241 				debug (ASOCKETS) writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress);
1242 				connection.setKeepAlive();
1243 				//assert(connection.connected);
1244 				//connection.connected = true;
1245 				acceptHandler(connection);
1246 			}
1247 			else
1248 				acceptSocket.close();
1249 		}
1250 
1251 		/// Called when a socket is writable.
1252 		override void onWritable()
1253 		{
1254 		}
1255 
1256 		/// Called when an error occurs on the socket.
1257 		override void onError(string reason)
1258 		{
1259 			close(); // call parent
1260 		}
1261 
1262 		void closeListener()
1263 		{
1264 			assert(conn);
1265 			socketManager.unregister(this);
1266 			conn.close();
1267 			conn = null;
1268 		}
1269 	}
1270 
1271 	/// Whether the socket is listening.
1272 	bool listening;
1273 	/// Listener instances
1274 	Listener[] listeners;
1275 
1276 	final void updateFlags()
1277 	{
1278 		foreach (listener; listeners)
1279 			listener.notifyRead = handleAccept !is null;
1280 	}
1281 
1282 public:
1283 	/// Start listening on this socket.
1284 	ushort listen(ushort port, string addr = null)
1285 	{
1286 		debug(ASOCKETS) writefln("Attempting to listen on %s:%d", addr, port);
1287 		//assert(!listening, "Attempting to listen on a listening socket");
1288 
1289 		auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
1290 
1291 		debug (ASOCKETS)
1292 		{
1293 			writefln("Resolved to %s addresses:", addressInfos.length);
1294 			foreach (ref addressInfo; addressInfos)
1295 				writefln("- %s", addressInfo);
1296 		}
1297 
1298 		// listen on random ports only on IPv4 for now
1299 		if (port == 0)
1300 		{
1301 			foreach_reverse (i, ref addressInfo; addressInfos)
1302 				if (addressInfo.family != AddressFamily.INET)
1303 					addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$];
1304 		}
1305 
1306 		listen(addressInfos);
1307 
1308 		foreach (listener; listeners)
1309 		{
1310 			auto address = listener.conn.localAddress();
1311 			if (address.addressFamily == AddressFamily.INET)
1312 				port = to!ushort(address.toPortString());
1313 		}
1314 
1315 		return port;
1316 	}
1317 
1318 	/// ditto
1319 	void listen(AddressInfo[] addressInfos)
1320 	{
1321 		foreach (ref addressInfo; addressInfos)
1322 		{
1323 			try
1324 			{
1325 				Socket conn = new Socket(addressInfo);
1326 				conn.blocking = false;
1327 				if (addressInfo.family == AddressFamily.INET6)
1328 					conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true);
1329 				conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
1330 
1331 				conn.bind(addressInfo.address);
1332 				conn.listen(8);
1333 
1334 				listeners ~= new Listener(conn);
1335 			}
1336 			catch (SocketException e)
1337 			{
1338 				debug(ASOCKETS) writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString());
1339 				debug(ASOCKETS) writeln(e.msg);
1340 			}
1341 		}
1342 
1343 		if (listeners.length==0)
1344 			throw new Exception("Unable to bind service");
1345 
1346 		listening = true;
1347 
1348 		updateFlags();
1349 	}
1350 
1351 	@property Address[] localAddresses()
1352 	{
1353 		Address[] result;
1354 		foreach (listener; listeners)
1355 			result ~= listener.localAddress;
1356 		return result;
1357 	}
1358 
1359 	@property bool isListening()
1360 	{
1361 		return listening;
1362 	}
1363 
1364 	/// Stop listening on this socket.
1365 	void close()
1366 	{
1367 		foreach (listener;listeners)
1368 			listener.closeListener();
1369 		listeners = null;
1370 		listening = false;
1371 		if (handleClose)
1372 			handleClose();
1373 	}
1374 
1375 public:
1376 	/// Callback for when the socket was closed.
1377 	void delegate() handleClose;
1378 
1379 	private void delegate(TcpConnection incoming) acceptHandler;
1380 	/// Callback for an incoming connection.
1381 	/// Connections will not be accepted unless this handler is set.
1382 	@property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; }
1383 	/// ditto
1384 	@property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); }
1385 }
1386 
1387 // ***************************************************************************
1388 
1389 /// An asynchronous UDP stream.
1390 /// UDP does not have connections, so this class encapsulates a socket
1391 /// with a fixed destination (sendto) address, and optionally bound to
1392 /// a local address.
1393 /// Currently received packets' address is not exposed.
1394 class UdpConnection : Connection
1395 {
1396 protected:
1397 	this(Socket conn)
1398 	{
1399 		super(conn);
1400 	}
1401 
1402 	/// Called when a socket is writable.
1403 	override void onWritable()
1404 	{
1405 		//scope(success) updateFlags();
1406 		onWritableImpl();
1407 		updateFlags();
1408 	}
1409 
1410 	// Work around scope(success) breaking debugger stack traces
1411 	final private void onWritableImpl()
1412 	{
1413 		foreach (priority, ref queue; outQueue)
1414 			while (queue.length)
1415 			{
1416 				auto pdata = queue.ptr; // pointer to first data
1417 
1418 				auto sent = conn.sendTo(pdata.contents, remoteAddress);
1419 
1420 				if (sent == Socket.ERROR)
1421 				{
1422 					if (wouldHaveBlocked())
1423 						return;
1424 					else
1425 						return onError("send() error: " ~ lastSocketError);
1426 				}
1427 				else
1428 				if (sent < pdata.length)
1429 				{
1430 					return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length));
1431 				}
1432 				else
1433 				{
1434 					assert(sent == pdata.length);
1435 					//debug writefln("[%s] Sent data:", remoteAddress);
1436 					//debug writefln("%s", hexDump(pdata.contents[0..sent]));
1437 					pdata.clear();
1438 					queue = queue[1..$];
1439 					if (queue.length == 0)
1440 						queue = null;
1441 				}
1442 			}
1443 
1444 		// outQueue is now empty
1445 		if (bufferFlushedHandler)
1446 			bufferFlushedHandler();
1447 		if (state == ConnectionState.disconnecting)
1448 		{
1449 			debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this);
1450 			close();
1451 		}
1452 	}
1453 
1454 	override sizediff_t doSend(in void[] buffer)
1455 	{
1456 		assert(false); // never called (called only from overridden methods)
1457 	}
1458 
1459 	override sizediff_t doReceive(void[] buffer)
1460 	{
1461 		return conn.receive(buffer);
1462 	}
1463 
1464 public:
1465 	/// Default constructor
1466 	this()
1467 	{
1468 		debug (ASOCKETS) writefln("New UdpConnection @ %s", cast(void*)this);
1469 	}
1470 
1471 	/// Initialize with the given AddressFamily, without binding to an address.
1472 	final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP)
1473 	{
1474 		initializeImpl(family, type, protocol);
1475 		if (connectHandler)
1476 			connectHandler();
1477 	}
1478 
1479 	final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol)
1480 	{
1481 		assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state));
1482 		assert(!conn);
1483 
1484 		conn = new Socket(family, type, protocol);
1485 		conn.blocking = false;
1486 		socketManager.register(this);
1487 		state = ConnectionState.connected;
1488 		updateFlags();
1489 	}
1490 
1491 	/// Bind to a local address in order to receive packets sent there.
1492 	final ushort bind(string host, ushort port)
1493 	{
1494 		assert(host.length, "Empty host");
1495 
1496 		debug (ASOCKETS) writefln("Connecting to %s:%s", host, port);
1497 
1498 		state = ConnectionState.resolving;
1499 
1500 		AddressInfo addressInfo;
1501 		try
1502 		{
1503 			auto addresses = getAddress(host, port);
1504 			enforce(addresses.length, "No addresses found");
1505 			debug (ASOCKETS)
1506 			{
1507 				writefln("Resolved to %s addresses:", addresses.length);
1508 				foreach (address; addresses)
1509 					writefln("- %s", address.toString());
1510 			}
1511 
1512 			Address address;
1513 			if (addresses.length > 1)
1514 			{
1515 				import std.random : uniform;
1516 				address = addresses[uniform(0, $)];
1517 			}
1518 			else
1519 				address = addresses[0];
1520 			addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host);
1521 		}
1522 		catch (SocketException e)
1523 		{
1524 			onError("Lookup error: " ~ e.msg);
1525 			return 0;
1526 		}
1527 
1528 		state = ConnectionState.disconnected;
1529 		return bind(addressInfo);
1530 	}
1531 
1532 	/// ditto
1533 	final ushort bind(AddressInfo addressInfo)
1534 	{
1535 		initialize(addressInfo.family, addressInfo.type, addressInfo.protocol);
1536 		conn.bind(addressInfo.address);
1537 
1538 		auto address = conn.localAddress();
1539 		auto port = to!ushort(address.toPortString());
1540 
1541 		if (connectHandler)
1542 			connectHandler();
1543 
1544 		return port;
1545 	}
1546 
1547 public:
1548 	/// Where to send packets to.
1549 	Address remoteAddress;
1550 }
1551 
1552 ///
1553 unittest
1554 {
1555 	auto server = new UdpConnection();
1556 	auto serverPort = server.bind("localhost", 0);
1557 
1558 	auto client = new UdpConnection();
1559 	client.initialize(server.localAddress.addressFamily);
1560 
1561 	string[] packets = ["Hello", "there"];
1562 	client.remoteAddress = server.localAddress;
1563 	client.send({
1564 		Data[] data;
1565 		foreach (packet; packets)
1566 			data ~= Data(packet);
1567 		return data;
1568 	}());
1569 
1570 	server.handleReadData = (Data data)
1571 	{
1572 		assert(data.contents == packets[0]);
1573 		packets = packets[1..$];
1574 		if (!packets.length)
1575 		{
1576 			server.close();
1577 			client.close();
1578 		}
1579 	};
1580 	socketManager.loop();
1581 	assert(!packets.length);
1582 }
1583 
1584 // ***************************************************************************
1585 
1586 /// Base class for a connection adapter.
1587 /// By itself, does nothing.
1588 class ConnectionAdapter : IConnection
1589 {
1590 	IConnection next;
1591 
1592 	this(IConnection next)
1593 	{
1594 		this.next = next;
1595 		next.handleConnect = &onConnect;
1596 		next.handleDisconnect = &onDisconnect;
1597 	}
1598 
1599 	@property ConnectionState state() { return next.state; }
1600 
1601 	/// Queue Data for sending.
1602 	void send(Data[] data, int priority)
1603 	{
1604 		next.send(data, priority);
1605 	}
1606 
1607 	alias send = IConnection.send; /// ditto
1608 
1609 	/// Terminate the connection.
1610 	void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested)
1611 	{
1612 		next.disconnect(reason, type);
1613 	}
1614 
1615 	protected void onConnect()
1616 	{
1617 		if (connectHandler)
1618 			connectHandler();
1619 	}
1620 
1621 	protected void onReadData(Data data)
1622 	{
1623 		// onReadData should be fired only if readDataHandler is set
1624 		readDataHandler(data);
1625 	}
1626 
1627 	protected void onDisconnect(string reason, DisconnectType type)
1628 	{
1629 		if (disconnectHandler)
1630 			disconnectHandler(reason, type);
1631 	}
1632 
1633 	protected void onBufferFlushed()
1634 	{
1635 		if (bufferFlushedHandler)
1636 			bufferFlushedHandler();
1637 	}
1638 
1639 	/// Callback for when a connection has been established.
1640 	@property void handleConnect(ConnectHandler value) { connectHandler = value; }
1641 	private ConnectHandler connectHandler;
1642 
1643 	/// Callback setter for when new data is read.
1644 	@property void handleReadData(ReadDataHandler value)
1645 	{
1646 		readDataHandler = value;
1647 		next.handleReadData = value ? &onReadData : null ;
1648 	}
1649 	private ReadDataHandler readDataHandler;
1650 
1651 	/// Callback setter for when a connection was closed.
1652 	@property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; }
1653 	private DisconnectHandler disconnectHandler;
1654 
1655 	/// Callback setter for when all queued data has been written.
1656 	@property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; }
1657 	private BufferFlushedHandler bufferFlushedHandler;
1658 }
1659 
1660 // ***************************************************************************
1661 
1662 /// Adapter for connections with a line-based protocol.
1663 /// Splits data stream into delimiter-separated lines.
1664 class LineBufferedAdapter : ConnectionAdapter
1665 {
1666 	/// The protocol's line delimiter.
1667 	string delimiter = "\r\n";
1668 
1669 	this(IConnection next)
1670 	{
1671 		super(next);
1672 	}
1673 
1674 	/// Append a line to the send buffer.
1675 	void send(string line)
1676 	{
1677 		//super.send(Data(line ~ delimiter));
1678 		// https://issues.dlang.org/show_bug.cgi?id=13985
1679 		ConnectionAdapter ca = this;
1680 		ca.send(Data(line ~ delimiter));
1681 	}
1682 
1683 protected:
1684 	/// The receive buffer.
1685 	Data inBuffer;
1686 
1687 	/// Called when data has been received.
1688 	final override void onReadData(Data data)
1689 	{
1690 		import std.string;
1691 		auto oldBufferLength = inBuffer.length;
1692 		if (oldBufferLength)
1693 			inBuffer ~= data;
1694 		else
1695 			inBuffer = data;
1696 
1697 		if (delimiter.length == 1)
1698 		{
1699 			import core.stdc.string; // memchr
1700 
1701 			char c = delimiter[0];
1702 			auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length);
1703 			while (p)
1704 			{
1705 				sizediff_t index = p - inBuffer.ptr;
1706 				processLine(index);
1707 
1708 				p = memchr(inBuffer.ptr, c, inBuffer.length);
1709 			}
1710 		}
1711 		else
1712 		{
1713 			sizediff_t index;
1714 			// TODO: we can start the search at oldBufferLength-delimiter.length+1
1715 			while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0)
1716 				processLine(index);
1717 		}
1718 	}
1719 
1720 	final void processLine(size_t index)
1721 	{
1722 		auto line = inBuffer[0..index];
1723 		inBuffer = inBuffer[index+delimiter.length..inBuffer.length];
1724 		super.onReadData(line);
1725 	}
1726 
1727 	override void onDisconnect(string reason, DisconnectType type)
1728 	{
1729 		super.onDisconnect(reason, type);
1730 		inBuffer.clear();
1731 	}
1732 }
1733 
1734 // ***************************************************************************
1735 
1736 /// Fires an event handler or disconnects connections
1737 /// after a period of inactivity.
1738 class TimeoutAdapter : ConnectionAdapter
1739 {
1740 	this(IConnection next)
1741 	{
1742 		debug (ASOCKETS) writefln("New TimeoutAdapter @ %s", cast(void*)this);
1743 		super(next);
1744 	}
1745 
1746 	void cancelIdleTimeout()
1747 	{
1748 		debug (ASOCKETS) writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this);
1749 		assert(idleTask !is null);
1750 		assert(idleTask.isWaiting());
1751 		idleTask.cancel();
1752 	}
1753 
1754 	void resumeIdleTimeout()
1755 	{
1756 		debug (ASOCKETS) writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this);
1757 		assert(state == ConnectionState.connected);
1758 		assert(idleTask !is null);
1759 		assert(!idleTask.isWaiting());
1760 		mainTimer.add(idleTask);
1761 	}
1762 
1763 	final void setIdleTimeout(Duration duration)
1764 	{
1765 		debug (ASOCKETS) writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this);
1766 		assert(duration > Duration.zero);
1767 		if (idleTask is null)
1768 		{
1769 			idleTask = new TimerTask(duration);
1770 			idleTask.handleTask = &onTask_Idle;
1771 		}
1772 		else
1773 		{
1774 			if (idleTask.isWaiting())
1775 				idleTask.cancel();
1776 			idleTask.delay = duration;
1777 		}
1778 		if (state == ConnectionState.connected)
1779 			mainTimer.add(idleTask);
1780 	}
1781 
1782 	void markNonIdle()
1783 	{
1784 		debug (ASOCKETS) writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this);
1785 		assert(idleTask !is null);
1786 		if (handleNonIdle)
1787 			handleNonIdle();
1788 		if (idleTask.isWaiting())
1789 			idleTask.restart();
1790 	}
1791 
1792 	/// Callback for when a connection has stopped responding.
1793 	/// If unset, the connection will be disconnected.
1794 	void delegate() handleIdleTimeout;
1795 
1796 	/// Callback for when a connection is marked as non-idle
1797 	/// (when data is received).
1798 	void delegate() handleNonIdle;
1799 
1800 protected:
1801 	override void onConnect()
1802 	{
1803 		debug (ASOCKETS) writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this);
1804 		super.onConnect();
1805 		if (idleTask)
1806 			resumeIdleTimeout();
1807 	}
1808 
1809 	override void onReadData(Data data)
1810 	{
1811 		debug (ASOCKETS) writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this);
1812 		markNonIdle();
1813 		super.onReadData(data);
1814 	}
1815 
1816 	override void onDisconnect(string reason, DisconnectType type)
1817 	{
1818 		debug (ASOCKETS) writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this);
1819 		if (idleTask && idleTask.isWaiting())
1820 			idleTask.cancel();
1821 		super.onDisconnect(reason, type);
1822 	}
1823 
1824 private:
1825 	TimerTask idleTask;
1826 
1827 	final void onTask_Idle(Timer timer, TimerTask task)
1828 	{
1829 		if (state == ConnectionState.disconnecting)
1830 			return disconnect("Delayed disconnect - time-out", DisconnectType.error);
1831 
1832 		if (state != ConnectionState.connected)
1833 			return;
1834 
1835 		if (handleIdleTimeout)
1836 		{
1837 			handleIdleTimeout();
1838 			if (state == ConnectionState.connected)
1839 			{
1840 				assert(!idleTask.isWaiting());
1841 				mainTimer.add(idleTask);
1842 			}
1843 		}
1844 		else
1845 			disconnect("Time-out", DisconnectType.error);
1846 	}
1847 }
1848 
1849 // ***************************************************************************
1850 
1851 unittest
1852 {
1853 	void testTimer()
1854 	{
1855 		bool fired;
1856 		setTimeout({fired = true;}, 10.msecs);
1857 		socketManager.loop();
1858 		assert(fired);
1859 	}
1860 
1861 	testTimer();
1862 }