1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <vladimir@thecybershadow.net> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import std.exception; 24 import std.socket; 25 import std.string : format; 26 public import std.socket : Address, AddressInfo, Socket; 27 28 debug(ASOCKETS) import std.stdio; 29 debug(PRINTDATA) static import std.stdio; 30 debug(PRINTDATA) import ae.utils.text : hexDump; 31 private import std.conv : to; 32 33 34 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 35 static import ae.utils.array; 36 37 version(LIBEV) 38 { 39 import deimos.ev; 40 pragma(lib, "ev"); 41 } 42 43 version(Windows) 44 { 45 import core.sys.windows.windows : Sleep; 46 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 47 } 48 else 49 enum USE_SLEEP = false; 50 51 /// Flags that determine socket wake-up events. 52 int eventCounter; 53 54 version(LIBEV) 55 { 56 // Watchers are a GenericSocket field (as declared in SocketMixin). 57 // Use one watcher per read and write event. 58 // Start/stop those as higher-level code declares interest in those events. 59 // Use the "data" ev_io field to store the parent GenericSocket address. 60 // Also use the "data" field as a flag to indicate whether the watcher is active 61 // (data is null when the watcher is stopped). 62 63 struct SocketManager 64 { 65 private: 66 size_t count; 67 68 extern(C) 69 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 70 { 71 eventCounter++; 72 auto socket = cast(GenericSocket)w.data; 73 assert(socket, "libev event fired on stopped watcher"); 74 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 75 76 if (revents & EV_READ) 77 socket.onReadable(); 78 else 79 if (revents & EV_WRITE) 80 socket.onWritable(); 81 else 82 assert(false, "Unknown event fired from libev"); 83 84 // TODO? Need to get proper SocketManager instance to call updateTimer on 85 socketManager.updateTimer(false); 86 } 87 88 ev_timer evTimer; 89 MonoTime lastNextEvent = MonoTime.max; 90 91 extern(C) 92 static void timerCallback(ev_loop_t* l, ev_timer* w, int revents) 93 { 94 eventCounter++; 95 debug (ASOCKETS) writefln("Timer callback called."); 96 mainTimer.prod(); 97 98 socketManager.updateTimer(true); 99 debug (ASOCKETS) writefln("Timer callback exiting."); 100 } 101 102 void updateTimer(bool force) 103 { 104 auto nextEvent = mainTimer.getNextEvent(); 105 if (force || lastNextEvent != nextEvent) 106 { 107 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 108 if (nextEvent == MonoTime.max) // Stopping 109 { 110 if (lastNextEvent != MonoTime.max) 111 ev_timer_stop(ev_default_loop(0), &evTimer); 112 } 113 else 114 { 115 auto remaining = mainTimer.getRemainingTime(); 116 while (remaining <= Duration.zero) 117 { 118 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 119 mainTimer.prod(); 120 remaining = mainTimer.getRemainingTime(); 121 } 122 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 123 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 124 if (lastNextEvent == MonoTime.max) // Starting 125 { 126 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 127 ev_timer_start(ev_default_loop(0), &evTimer); 128 } 129 else // Adjusting 130 { 131 evTimer.repeat = tstamp; 132 ev_timer_again(ev_default_loop(0), &evTimer); 133 } 134 } 135 lastNextEvent = nextEvent; 136 } 137 } 138 139 /// Register a socket with the manager. 140 void register(GenericSocket socket) 141 { 142 debug (ASOCKETS) writefln("Registering %s", socket); 143 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 144 auto fd = socket.conn.handle; 145 assert(fd, "Must have fd before socket registration"); 146 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 147 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 148 count++; 149 } 150 151 /// Unregister a socket with the manager. 152 void unregister(GenericSocket socket) 153 { 154 debug (ASOCKETS) writefln("Unregistering %s", socket); 155 socket.notifyRead = false; 156 socket.notifyWrite = false; 157 count--; 158 } 159 160 public: 161 size_t size() 162 { 163 return count; 164 } 165 166 /// Loop continuously until no sockets are left. 167 void loop() 168 { 169 auto evLoop = ev_default_loop(0); 170 enforce(evLoop, "libev initialization failure"); 171 172 updateTimer(true); 173 debug (ASOCKETS) writeln("ev_run"); 174 ev_run(ev_default_loop(0), 0); 175 } 176 } 177 178 private mixin template SocketMixin() 179 { 180 private ev_io evRead, evWrite; 181 182 private void setWatcherState(ref ev_io ev, bool newValue, int event) 183 { 184 if (!conn) 185 { 186 // Can happen when setting delegates before connecting. 187 return; 188 } 189 190 if (newValue && !ev.data) 191 { 192 // Start 193 ev.data = cast(void*)this; 194 ev_io_start(ev_default_loop(0), &ev); 195 } 196 else 197 if (!newValue && ev.data) 198 { 199 // Stop 200 assert(ev.data is cast(void*)this); 201 ev.data = null; 202 ev_io_stop(ev_default_loop(0), &ev); 203 } 204 } 205 206 /// Interested in read notifications (onReadable)? 207 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 208 /// Interested in write notifications (onWritable)? 209 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 210 211 debug ~this() 212 { 213 // The LIBEV SocketManager holds no references to registered sockets. 214 // TODO: Add a doubly-linked list? 215 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 216 } 217 } 218 } 219 else // Use select 220 { 221 struct SocketManager 222 { 223 private: 224 enum FD_SETSIZE = 1024; 225 226 /// List of all sockets to poll. 227 GenericSocket[] sockets; 228 229 /// Debug AA to check for dangling socket references. 230 debug GenericSocket[socket_t] socketHandles; 231 232 /// Register a socket with the manager. 233 void register(GenericSocket conn) 234 { 235 debug (ASOCKETS) writefln("Registering %s (%d total)", conn, sockets.length + 1); 236 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 237 sockets ~= conn; 238 239 debug 240 { 241 auto handle = conn.socket.handle; 242 assert(handle != socket_t.init, "Can't register a closed socket"); 243 assert(handle !in socketHandles, "This socket handle is already registered"); 244 socketHandles[handle] = conn; 245 } 246 } 247 248 /// Unregister a socket with the manager. 249 void unregister(GenericSocket conn) 250 { 251 debug (ASOCKETS) writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 252 253 debug 254 { 255 auto handle = conn.socket.handle; 256 assert(handle != socket_t.init, "Can't unregister a closed socket"); 257 auto pconn = handle in socketHandles; 258 assert(pconn, "This socket handle is not registered"); 259 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 260 socketHandles.remove(handle); 261 } 262 263 foreach (size_t i, GenericSocket j; sockets) 264 if (j is conn) 265 { 266 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 267 return; 268 } 269 assert(false, "Socket not registered"); 270 } 271 272 void delegate()[] idleHandlers; 273 274 public: 275 size_t size() 276 { 277 return sockets.length; 278 } 279 280 /// Loop continuously until no sockets are left. 281 void loop() 282 { 283 debug (ASOCKETS) writeln("Starting event loop."); 284 285 SocketSet readset, writeset, errorset; 286 size_t sockcount; 287 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 288 readset = new SocketSet(setSize); 289 writeset = new SocketSet(setSize); 290 errorset = new SocketSet(setSize); 291 while (true) 292 { 293 uint minSize = 0; 294 version(Windows) 295 minSize = cast(uint)sockets.length; 296 else 297 { 298 foreach (s; sockets) 299 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 300 minSize = s.socket.handle; 301 } 302 minSize++; 303 304 if (setSize < minSize) 305 { 306 debug (ASOCKETS) writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 307 setSize = minSize * 2; 308 readset = new SocketSet(setSize); 309 writeset = new SocketSet(setSize); 310 errorset = new SocketSet(setSize); 311 } 312 else 313 { 314 readset.reset(); 315 writeset.reset(); 316 errorset.reset(); 317 } 318 319 sockcount = 0; 320 bool haveActive; 321 debug (ASOCKETS) writeln("Populating sets"); 322 foreach (GenericSocket conn; sockets) 323 { 324 if (!conn.socket) 325 continue; 326 sockcount++; 327 if (!conn.daemon) 328 haveActive = true; 329 330 debug (ASOCKETS) writef("\t%s:", conn); 331 if (conn.notifyRead) 332 { 333 readset.add(conn.socket); 334 debug (ASOCKETS) write(" READ"); 335 } 336 if (conn.notifyWrite) 337 { 338 writeset.add(conn.socket); 339 debug (ASOCKETS) write(" WRITE"); 340 } 341 errorset.add(conn.socket); 342 debug (ASOCKETS) writeln(); 343 } 344 debug (ASOCKETS) 345 { 346 writefln("Sets populated as follows:"); 347 printSets(readset, writeset, errorset); 348 } 349 350 debug (ASOCKETS) writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...", 351 sockcount, 352 mainTimer.isWaiting() ? "with" : "no", 353 idleHandlers.length, 354 ); 355 if (!haveActive && !mainTimer.isWaiting()) 356 { 357 debug (ASOCKETS) writeln("No more sockets or timer events, exiting loop."); 358 break; 359 } 360 361 debug (ASOCKETS) { stdout.flush(); stderr.flush(); } 362 363 int events; 364 if (idleHandlers.length) 365 { 366 if (sockcount==0) 367 events = 0; 368 else 369 events = Socket.select(readset, writeset, errorset, 0.seconds); 370 } 371 else 372 if (USE_SLEEP && sockcount==0) 373 { 374 version(Windows) 375 { 376 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 377 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 378 if (duration <= 0) 379 duration = 1; // Avoid busywait 380 else 381 if (duration > int.max) 382 duration = int.max; 383 Sleep(cast(int)duration); 384 events = 0; 385 } 386 else 387 assert(0); 388 } 389 else 390 if (mainTimer.isWaiting()) 391 events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime()); 392 else 393 events = Socket.select(readset, writeset, errorset); 394 395 debug (ASOCKETS) writefln("%d events fired.", events); 396 397 if (events > 0) 398 { 399 // Handle just one event at a time, as the first 400 // handler might invalidate select()'s results. 401 handleEvent(readset, writeset, errorset); 402 } 403 else 404 if (idleHandlers.length) 405 { 406 import ae.utils.array; 407 auto handler = idleHandlers.shift(); 408 409 // Rotate the idle handler queue before running it, 410 // in case the handler unregisters itself. 411 idleHandlers ~= handler; 412 413 handler(); 414 } 415 416 // Timers may invalidate our select results, so fire them after processing the latter 417 mainTimer.prod(); 418 419 eventCounter++; 420 } 421 } 422 423 debug (ASOCKETS) 424 void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset) 425 { 426 foreach (GenericSocket conn; sockets) 427 { 428 if (!conn.socket) 429 writefln("\t\t%s is unset", conn); 430 else 431 if (readset.isSet(conn.socket)) 432 writefln("\t\t%s is readable", conn); 433 else 434 if (writeset.isSet(conn.socket)) 435 writefln("\t\t%s is writable", conn); 436 else 437 if (errorset.isSet(conn.socket)) 438 writefln("\t\t%s is errored", conn); 439 } 440 } 441 442 void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset) 443 { 444 debug (ASOCKETS) 445 { 446 writefln("\tSelect results:"); 447 printSets(readset, writeset, errorset); 448 } 449 450 foreach (GenericSocket conn; sockets) 451 { 452 if (!conn.socket) 453 continue; 454 455 if (readset.isSet(conn.socket)) 456 { 457 debug (ASOCKETS) writefln("\t%s - calling onReadable", conn); 458 return conn.onReadable(); 459 } 460 else 461 if (writeset.isSet(conn.socket)) 462 { 463 debug (ASOCKETS) writefln("\t%s - calling onWritable", conn); 464 return conn.onWritable(); 465 } 466 else 467 if (errorset.isSet(conn.socket)) 468 { 469 debug (ASOCKETS) writefln("\t%s - calling onError", conn); 470 return conn.onError("select() error: " ~ conn.socket.getErrorText()); 471 } 472 } 473 474 assert(false, "select() reported events available, but no registered sockets are set"); 475 } 476 } 477 478 // Use UFCS to allow removeIdleHandler to have a predicate with context 479 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 480 { 481 foreach (i, idleHandler; socketManager.idleHandlers) 482 assert(handler !is idleHandler); 483 484 socketManager.idleHandlers ~= handler; 485 } 486 487 static bool isFun(T)(T a, T b) { return a is b; } 488 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 489 { 490 foreach (i, idleHandler; socketManager.idleHandlers) 491 if (pred(idleHandler, args)) 492 { 493 import std.algorithm; 494 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 495 return; 496 } 497 assert(false, "No such idle handler"); 498 } 499 500 private mixin template SocketMixin() 501 { 502 /// Interested in read notifications (onReadable)? 503 bool notifyRead; 504 /// Interested in write notifications (onWritable)? 505 bool notifyWrite; 506 } 507 } 508 509 /// The default socket manager. 510 SocketManager socketManager; 511 512 // *************************************************************************** 513 514 /// General methods for an asynchronous socket. 515 private abstract class GenericSocket 516 { 517 /// Declares notifyRead and notifyWrite. 518 mixin SocketMixin; 519 520 protected: 521 /// The socket this class wraps. 522 Socket conn; 523 524 protected: 525 /// Retrieve the socket class this class wraps. 526 @property final Socket socket() 527 { 528 return conn; 529 } 530 531 void onReadable() 532 { 533 } 534 535 void onWritable() 536 { 537 } 538 539 void onError(string reason) 540 { 541 } 542 543 public: 544 /// allow getting the address of connections that are already disconnected 545 private Address cachedLocalAddress, cachedRemoteAddress; 546 547 /// Don't block the process from exiting. 548 /// TODO: Not implemented with libev 549 bool daemon; 550 551 final @property Address localAddress() 552 { 553 if (cachedLocalAddress !is null) 554 return cachedLocalAddress; 555 else 556 if (conn is null) 557 return null; 558 else 559 return cachedLocalAddress = conn.localAddress(); 560 } 561 562 final @property string localAddressStr() nothrow 563 { 564 try 565 { 566 auto a = localAddress; 567 return a is null ? "[null address]" : a.toString(); 568 } 569 catch (Exception e) 570 return "[error: " ~ e.msg ~ "]"; 571 } 572 573 final @property Address remoteAddress() 574 { 575 if (cachedRemoteAddress !is null) 576 return cachedRemoteAddress; 577 else 578 if (conn is null) 579 return null; 580 else 581 return cachedRemoteAddress = conn.remoteAddress(); 582 } 583 584 final @property string remoteAddressStr() nothrow 585 { 586 try 587 { 588 auto a = remoteAddress; 589 return a is null ? "[null address]" : a.toString(); 590 } 591 catch (Exception e) 592 return "[error: " ~ e.msg ~ "]"; 593 } 594 595 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 596 { 597 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 598 if (enabled) 599 { 600 try 601 conn.setKeepAlive(time, interval); 602 catch (SocketException) 603 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 604 } 605 else 606 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 607 } 608 609 override string toString() const 610 { 611 import std.string; 612 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 613 } 614 } 615 616 // *************************************************************************** 617 618 enum DisconnectType 619 { 620 requested, // initiated by the application 621 graceful, // peer gracefully closed the connection 622 error // abnormal network condition 623 } 624 625 enum ConnectionState 626 { 627 /// The initial state, or the state after a disconnect was fully processed. 628 disconnected, 629 630 /// Name resolution. Currently done synchronously. 631 resolving, 632 633 /// A connection attempt is in progress. 634 connecting, 635 636 /// A connection is established. 637 connected, 638 639 /// Disconnecting in progress. No data can be sent or received at this point. 640 /// We are waiting for queued data to be actually sent before disconnecting. 641 disconnecting, 642 } 643 644 /// Common interface for connections and adapters. 645 interface IConnection 646 { 647 enum MAX_PRIORITY = 4; 648 enum DEFAULT_PRIORITY = 2; 649 650 static const defaultDisconnectReason = "Software closed the connection"; 651 652 /// Get connection state. 653 @property ConnectionState state(); 654 655 /// Has a connection been established? 656 deprecated final @property bool connected() { return state == ConnectionState.connected; } 657 658 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 659 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 660 661 /// Queue Data for sending. 662 void send(Data[] data, int priority = DEFAULT_PRIORITY); 663 664 /// ditto 665 final void send(Data datum, int priority = DEFAULT_PRIORITY) 666 { 667 Data[1] data; 668 data[0] = datum; 669 this.send(data); 670 data[] = Data.init; 671 } 672 673 /// Terminate the connection. 674 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 675 676 /// Callback setter for when a connection has been established 677 /// (if applicable). 678 alias void delegate() ConnectHandler; 679 @property void handleConnect(ConnectHandler value); /// ditto 680 681 /// Callback setter for when new data is read. 682 alias void delegate(Data data) ReadDataHandler; 683 @property void handleReadData(ReadDataHandler value); /// ditto 684 685 /// Callback setter for when a connection was closed. 686 alias void delegate(string reason, DisconnectType type) DisconnectHandler; 687 @property void handleDisconnect(DisconnectHandler value); /// ditto 688 689 /// Callback setter for when all queued data has been sent. 690 alias void delegate() BufferFlushedHandler; 691 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 692 } 693 694 // *************************************************************************** 695 696 class Connection : GenericSocket, IConnection 697 { 698 private: 699 /// Blocks of data larger than this value are passed as unmanaged memory 700 /// (in Data objects). Blocks smaller than this value will be reallocated 701 /// on the managed heap. The disadvantage of placing large objects on the 702 /// managed heap is false pointers; the disadvantage of using Data for 703 /// small objects is wasted slack space due to the page size alignment 704 /// requirement. 705 enum UNMANAGED_THRESHOLD = 256; 706 707 ConnectionState _state; 708 final @property ConnectionState state(ConnectionState value) { return _state = value; } 709 710 public: 711 /// Get connection state. 712 override @property ConnectionState state() { return _state; } 713 714 protected: 715 abstract sizediff_t doSend(in void[] buffer); 716 abstract sizediff_t doReceive(void[] buffer); 717 718 /// The send buffers. 719 Data[][MAX_PRIORITY+1] outQueue; 720 /// Whether the first item from each queue has been partially sent (and thus can't be canceled). 721 bool[MAX_PRIORITY+1] partiallySent; 722 723 /// Constructor used by a ServerSocket for new connections 724 this(Socket conn) 725 { 726 this(); 727 this.conn = conn; 728 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 729 if (conn) 730 socketManager.register(this); 731 updateFlags(); 732 } 733 734 final void updateFlags() 735 { 736 if (state == ConnectionState.connecting) 737 notifyWrite = true; 738 else 739 notifyWrite = writePending; 740 741 notifyRead = state == ConnectionState.connected && readDataHandler; 742 } 743 744 /// Called when a socket is readable. 745 override void onReadable() 746 { 747 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 748 static ubyte[0x10000] inBuffer = void; 749 auto received = doReceive(inBuffer); 750 751 if (received == 0) 752 return disconnect("Connection closed", DisconnectType.graceful); 753 754 if (received == Socket.ERROR) 755 { 756 // if (wouldHaveBlocked) 757 // { 758 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 759 // return; 760 // } 761 // else 762 onError("recv() error: " ~ lastSocketError); 763 } 764 else 765 { 766 debug (PRINTDATA) 767 { 768 std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress); 769 std.stdio.write(hexDump(inBuffer[0 .. received])); 770 std.stdio.stdout.flush(); 771 } 772 773 if (state == ConnectionState.disconnecting) 774 { 775 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because we are disconnecting", this); 776 } 777 else 778 if (!readDataHandler) 779 { 780 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because there is no data handler", this); 781 } 782 else 783 { 784 // Currently, unlike the D1 version of this module, 785 // we will always reallocate read network data. 786 // This disfavours code which doesn't need to store 787 // read data after processing it, but otherwise 788 // makes things simpler and safer all around. 789 790 if (received < UNMANAGED_THRESHOLD) 791 { 792 // Copy to the managed heap 793 readDataHandler(Data(inBuffer[0 .. received].dup)); 794 } 795 else 796 { 797 // Copy to unmanaged memory 798 readDataHandler(Data(inBuffer[0 .. received], true)); 799 } 800 } 801 } 802 } 803 804 /// Called when an error occurs on the socket. 805 override void onError(string reason) 806 { 807 if (state == ConnectionState.disconnecting) 808 { 809 debug (ASOCKETS) writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 810 return close(); 811 } 812 813 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 814 disconnect("Socket error: " ~ reason, DisconnectType.error); 815 } 816 817 this() 818 { 819 } 820 821 public: 822 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 823 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 824 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 825 { 826 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 827 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected, "Attempting to disconnect on a %s socket".format(state)); 828 829 if (writePending) 830 { 831 if (type==DisconnectType.requested) 832 { 833 assert(conn, "Attempting to disconnect on an uninitialized socket"); 834 // queue disconnect after all data is sent 835 debug (ASOCKETS) writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 836 state = ConnectionState.disconnecting; 837 //setIdleTimeout(30.seconds); 838 if (disconnectHandler) 839 disconnectHandler(reason, type); 840 updateFlags(); 841 return; 842 } 843 else 844 discardQueues(); 845 } 846 847 debug (ASOCKETS) writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 848 849 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 850 close(); 851 else 852 { 853 assert(conn is null, "Registered but %s socket".format(state)); 854 if (state == ConnectionState.resolving) 855 state = ConnectionState.disconnected; 856 } 857 858 if (disconnectHandler) 859 disconnectHandler(reason, type); 860 updateFlags(); 861 } 862 863 private final void close() 864 { 865 assert(conn, "Attempting to close an unregistered socket"); 866 socketManager.unregister(this); 867 conn.close(); 868 conn = null; 869 outQueue[] = null; 870 state = ConnectionState.disconnected; 871 } 872 873 /// Append data to the send buffer. 874 void send(Data[] data, int priority = DEFAULT_PRIORITY) 875 { 876 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 877 outQueue[priority] ~= data; 878 notifyWrite = true; // Fast updateFlags() 879 880 debug (PRINTDATA) 881 { 882 std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress); 883 foreach (datum; data) 884 if (datum.length) 885 std.stdio.write(hexDump(datum.contents)); 886 else 887 std.stdio.writeln("(empty Data)"); 888 std.stdio.stdout.flush(); 889 } 890 } 891 892 /// ditto 893 alias send = IConnection.send; 894 895 final void clearQueue(int priority) 896 { 897 if (partiallySent[priority]) 898 { 899 assert(outQueue[priority].length > 0); 900 outQueue[priority] = outQueue[priority][0..1]; 901 } 902 else 903 outQueue[priority] = null; 904 updateFlags(); 905 } 906 907 /// Clears all queues, even partially sent content. 908 private final void discardQueues() 909 { 910 foreach (priority; 0..MAX_PRIORITY+1) 911 { 912 outQueue[priority] = null; 913 partiallySent[priority] = false; 914 } 915 updateFlags(); 916 } 917 918 @property 919 final bool writePending() 920 { 921 foreach (queue; outQueue) 922 if (queue.length) 923 return true; 924 return false; 925 } 926 927 final bool queuePresent(int priority = DEFAULT_PRIORITY) 928 { 929 if (partiallySent[priority]) 930 { 931 assert(outQueue[priority].length > 0); 932 return outQueue[priority].length > 1; 933 } 934 else 935 return outQueue[priority].length > 0; 936 } 937 938 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 939 { 940 return outQueue[priority].length; 941 } 942 943 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 944 { 945 size_t bytes; 946 foreach (datum; outQueue[priority]) 947 bytes += datum.length; 948 return bytes; 949 } 950 951 public: 952 private ConnectHandler connectHandler; 953 /// Callback for when a connection has been established. 954 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 955 956 private ReadDataHandler readDataHandler; 957 /// Callback for incoming data. 958 /// Data will not be received unless this handler is set. 959 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 960 961 private DisconnectHandler disconnectHandler; 962 /// Callback for when a connection was closed. 963 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 964 965 /// Callback setter for when all queued data has been sent. 966 private BufferFlushedHandler bufferFlushedHandler; 967 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 968 } 969 970 class StreamConnection : Connection 971 { 972 protected: 973 this() 974 { 975 super(); 976 } 977 978 /// Called when a socket is writable. 979 override void onWritable() 980 { 981 //scope(success) updateFlags(); 982 onWritableImpl(); 983 updateFlags(); 984 } 985 986 // Work around scope(success) breaking debugger stack traces 987 final private void onWritableImpl() 988 { 989 if (state == ConnectionState.connecting) 990 { 991 state = ConnectionState.connected; 992 993 //debug writefln("[%s] Connected", remoteAddress); 994 try 995 setKeepAlive(); 996 catch (Exception e) 997 return disconnect(e.msg, DisconnectType.error); 998 if (connectHandler) 999 connectHandler(); 1000 return; 1001 } 1002 //debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1003 1004 foreach (priority, ref queue; outQueue) 1005 while (queue.length) 1006 { 1007 auto pdata = queue.ptr; // pointer to first data 1008 1009 ptrdiff_t sent = 0; 1010 if (pdata.length) 1011 { 1012 sent = doSend(pdata.contents); 1013 debug (ASOCKETS) writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 1014 } 1015 else 1016 { 1017 debug (ASOCKETS) writefln("\t\t%s: empty Data object", this); 1018 } 1019 1020 if (sent == Socket.ERROR) 1021 { 1022 if (wouldHaveBlocked()) 1023 return; 1024 else 1025 return onError("send() error: " ~ lastSocketError); 1026 } 1027 else 1028 if (sent < pdata.length) 1029 { 1030 if (sent > 0) 1031 { 1032 *pdata = (*pdata)[sent..pdata.length]; 1033 partiallySent[priority] = true; 1034 } 1035 return; 1036 } 1037 else 1038 { 1039 assert(sent == pdata.length); 1040 //debug writefln("[%s] Sent data:", remoteAddress); 1041 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1042 pdata.clear(); 1043 queue = queue[1..$]; 1044 partiallySent[priority] = false; 1045 if (queue.length == 0) 1046 queue = null; 1047 } 1048 } 1049 1050 // outQueue is now empty 1051 if (bufferFlushedHandler) 1052 bufferFlushedHandler(); 1053 if (state == ConnectionState.disconnecting) 1054 { 1055 debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1056 close(); 1057 } 1058 } 1059 1060 public: 1061 this(Socket conn) 1062 { 1063 super(conn); 1064 } 1065 } 1066 1067 // *************************************************************************** 1068 1069 /// A POSIX file stream. 1070 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1071 /// Does not dup the given file descriptor, so "disconnecting" this connection 1072 /// will close it. 1073 version (Posix) 1074 class FileConnection : StreamConnection 1075 { 1076 this(int fileno) 1077 { 1078 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1079 conn.blocking = false; 1080 super(conn); 1081 } 1082 1083 protected: 1084 import core.sys.posix.unistd : read, write; 1085 1086 override sizediff_t doSend(in void[] buffer) 1087 { 1088 return write(socket.handle, buffer.ptr, buffer.length); 1089 } 1090 1091 override sizediff_t doReceive(void[] buffer) 1092 { 1093 return read(socket.handle, buffer.ptr, buffer.length); 1094 } 1095 } 1096 1097 // *************************************************************************** 1098 1099 /// An asynchronous TCP connection. 1100 class TcpConnection : StreamConnection 1101 { 1102 protected: 1103 /// Queue of addresses to try connecting to. 1104 AddressInfo[] addressQueue; 1105 1106 this(Socket conn) 1107 { 1108 super(conn); 1109 } 1110 1111 override sizediff_t doSend(in void[] buffer) 1112 { 1113 return conn.send(buffer); 1114 } 1115 1116 override sizediff_t doReceive(void[] buffer) 1117 { 1118 return conn.receive(buffer); 1119 } 1120 1121 final void tryNextAddress() 1122 { 1123 assert(state == ConnectionState.connecting); 1124 auto addressInfo = addressQueue[0]; 1125 addressQueue = addressQueue[1..$]; 1126 1127 try 1128 { 1129 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1130 conn.blocking = false; 1131 1132 socketManager.register(this); 1133 updateFlags(); 1134 debug (ASOCKETS) writefln("Attempting connection to %s", addressInfo.address.toString()); 1135 conn.connect(addressInfo.address); 1136 } 1137 catch (SocketException e) 1138 return onError("Connect error: " ~ e.msg); 1139 } 1140 1141 /// Called when an error occurs on the socket. 1142 override void onError(string reason) 1143 { 1144 if (state == ConnectionState.connecting && addressQueue.length) 1145 { 1146 socketManager.unregister(this); 1147 conn.close(); 1148 conn = null; 1149 1150 return tryNextAddress(); 1151 } 1152 1153 super.onError(reason); 1154 } 1155 1156 public: 1157 /// Default constructor 1158 this() 1159 { 1160 debug (ASOCKETS) writefln("New TcpConnection @ %s", cast(void*)this); 1161 } 1162 1163 /// Start establishing a connection. 1164 final void connect(string host, ushort port) 1165 { 1166 assert(host.length, "Empty host"); 1167 assert(port, "No port specified"); 1168 1169 debug (ASOCKETS) writefln("Connecting to %s:%s", host, port); 1170 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1171 1172 state = ConnectionState.resolving; 1173 1174 AddressInfo[] addressInfos; 1175 try 1176 { 1177 auto addresses = getAddress(host, port); 1178 enforce(addresses.length, "No addresses found"); 1179 debug (ASOCKETS) 1180 { 1181 writefln("Resolved to %s addresses:", addresses.length); 1182 foreach (address; addresses) 1183 writefln("- %s", address.toString()); 1184 } 1185 1186 if (addresses.length > 1) 1187 { 1188 import std.random : randomShuffle; 1189 randomShuffle(addresses); 1190 } 1191 1192 foreach (address; addresses) 1193 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1194 } 1195 catch (SocketException e) 1196 return onError("Lookup error: " ~ e.msg); 1197 1198 state = ConnectionState.disconnected; 1199 connect(addressInfos); 1200 } 1201 1202 /// ditto 1203 final void connect(AddressInfo[] addresses) 1204 { 1205 assert(addresses.length, "No addresses specified"); 1206 1207 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1208 assert(!conn); 1209 1210 addressQueue = addresses; 1211 state = ConnectionState.connecting; 1212 tryNextAddress(); 1213 } 1214 } 1215 1216 // *************************************************************************** 1217 1218 /// An asynchronous TCP connection server. 1219 final class TcpServer 1220 { 1221 private: 1222 /// Class that actually performs listening on a certain address family 1223 final class Listener : GenericSocket 1224 { 1225 this(Socket conn) 1226 { 1227 debug (ASOCKETS) writefln("New Listener @ %s", cast(void*)this); 1228 this.conn = conn; 1229 socketManager.register(this); 1230 } 1231 1232 /// Called when a socket is readable. 1233 override void onReadable() 1234 { 1235 debug (ASOCKETS) writefln("Accepting connection from listener @ %s", cast(void*)this); 1236 Socket acceptSocket = conn.accept(); 1237 acceptSocket.blocking = false; 1238 if (handleAccept) 1239 { 1240 TcpConnection connection = new TcpConnection(acceptSocket); 1241 debug (ASOCKETS) writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress); 1242 connection.setKeepAlive(); 1243 //assert(connection.connected); 1244 //connection.connected = true; 1245 acceptHandler(connection); 1246 } 1247 else 1248 acceptSocket.close(); 1249 } 1250 1251 /// Called when a socket is writable. 1252 override void onWritable() 1253 { 1254 } 1255 1256 /// Called when an error occurs on the socket. 1257 override void onError(string reason) 1258 { 1259 close(); // call parent 1260 } 1261 1262 void closeListener() 1263 { 1264 assert(conn); 1265 socketManager.unregister(this); 1266 conn.close(); 1267 conn = null; 1268 } 1269 } 1270 1271 /// Whether the socket is listening. 1272 bool listening; 1273 /// Listener instances 1274 Listener[] listeners; 1275 1276 final void updateFlags() 1277 { 1278 foreach (listener; listeners) 1279 listener.notifyRead = handleAccept !is null; 1280 } 1281 1282 public: 1283 /// Start listening on this socket. 1284 ushort listen(ushort port, string addr = null) 1285 { 1286 debug(ASOCKETS) writefln("Attempting to listen on %s:%d", addr, port); 1287 //assert(!listening, "Attempting to listen on a listening socket"); 1288 1289 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1290 1291 debug (ASOCKETS) 1292 { 1293 writefln("Resolved to %s addresses:", addressInfos.length); 1294 foreach (ref addressInfo; addressInfos) 1295 writefln("- %s", addressInfo); 1296 } 1297 1298 // listen on random ports only on IPv4 for now 1299 if (port == 0) 1300 { 1301 foreach_reverse (i, ref addressInfo; addressInfos) 1302 if (addressInfo.family != AddressFamily.INET) 1303 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1304 } 1305 1306 listen(addressInfos); 1307 1308 foreach (listener; listeners) 1309 { 1310 auto address = listener.conn.localAddress(); 1311 if (address.addressFamily == AddressFamily.INET) 1312 port = to!ushort(address.toPortString()); 1313 } 1314 1315 return port; 1316 } 1317 1318 /// ditto 1319 void listen(AddressInfo[] addressInfos) 1320 { 1321 foreach (ref addressInfo; addressInfos) 1322 { 1323 try 1324 { 1325 Socket conn = new Socket(addressInfo); 1326 conn.blocking = false; 1327 if (addressInfo.family == AddressFamily.INET6) 1328 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1329 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1330 1331 conn.bind(addressInfo.address); 1332 conn.listen(8); 1333 1334 listeners ~= new Listener(conn); 1335 } 1336 catch (SocketException e) 1337 { 1338 debug(ASOCKETS) writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1339 debug(ASOCKETS) writeln(e.msg); 1340 } 1341 } 1342 1343 if (listeners.length==0) 1344 throw new Exception("Unable to bind service"); 1345 1346 listening = true; 1347 1348 updateFlags(); 1349 } 1350 1351 @property Address[] localAddresses() 1352 { 1353 Address[] result; 1354 foreach (listener; listeners) 1355 result ~= listener.localAddress; 1356 return result; 1357 } 1358 1359 @property bool isListening() 1360 { 1361 return listening; 1362 } 1363 1364 /// Stop listening on this socket. 1365 void close() 1366 { 1367 foreach (listener;listeners) 1368 listener.closeListener(); 1369 listeners = null; 1370 listening = false; 1371 if (handleClose) 1372 handleClose(); 1373 } 1374 1375 public: 1376 /// Callback for when the socket was closed. 1377 void delegate() handleClose; 1378 1379 private void delegate(TcpConnection incoming) acceptHandler; 1380 /// Callback for an incoming connection. 1381 /// Connections will not be accepted unless this handler is set. 1382 @property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; } 1383 /// ditto 1384 @property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); } 1385 } 1386 1387 // *************************************************************************** 1388 1389 /// An asynchronous UDP stream. 1390 /// UDP does not have connections, so this class encapsulates a socket 1391 /// with a fixed destination (sendto) address, and optionally bound to 1392 /// a local address. 1393 /// Currently received packets' address is not exposed. 1394 class UdpConnection : Connection 1395 { 1396 protected: 1397 this(Socket conn) 1398 { 1399 super(conn); 1400 } 1401 1402 /// Called when a socket is writable. 1403 override void onWritable() 1404 { 1405 //scope(success) updateFlags(); 1406 onWritableImpl(); 1407 updateFlags(); 1408 } 1409 1410 // Work around scope(success) breaking debugger stack traces 1411 final private void onWritableImpl() 1412 { 1413 foreach (priority, ref queue; outQueue) 1414 while (queue.length) 1415 { 1416 auto pdata = queue.ptr; // pointer to first data 1417 1418 auto sent = conn.sendTo(pdata.contents, remoteAddress); 1419 1420 if (sent == Socket.ERROR) 1421 { 1422 if (wouldHaveBlocked()) 1423 return; 1424 else 1425 return onError("send() error: " ~ lastSocketError); 1426 } 1427 else 1428 if (sent < pdata.length) 1429 { 1430 return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length)); 1431 } 1432 else 1433 { 1434 assert(sent == pdata.length); 1435 //debug writefln("[%s] Sent data:", remoteAddress); 1436 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1437 pdata.clear(); 1438 queue = queue[1..$]; 1439 if (queue.length == 0) 1440 queue = null; 1441 } 1442 } 1443 1444 // outQueue is now empty 1445 if (bufferFlushedHandler) 1446 bufferFlushedHandler(); 1447 if (state == ConnectionState.disconnecting) 1448 { 1449 debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1450 close(); 1451 } 1452 } 1453 1454 override sizediff_t doSend(in void[] buffer) 1455 { 1456 assert(false); // never called (called only from overridden methods) 1457 } 1458 1459 override sizediff_t doReceive(void[] buffer) 1460 { 1461 return conn.receive(buffer); 1462 } 1463 1464 public: 1465 /// Default constructor 1466 this() 1467 { 1468 debug (ASOCKETS) writefln("New UdpConnection @ %s", cast(void*)this); 1469 } 1470 1471 /// Initialize with the given AddressFamily, without binding to an address. 1472 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP) 1473 { 1474 initializeImpl(family, type, protocol); 1475 if (connectHandler) 1476 connectHandler(); 1477 } 1478 1479 final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1480 { 1481 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1482 assert(!conn); 1483 1484 conn = new Socket(family, type, protocol); 1485 conn.blocking = false; 1486 socketManager.register(this); 1487 state = ConnectionState.connected; 1488 updateFlags(); 1489 } 1490 1491 /// Bind to a local address in order to receive packets sent there. 1492 final ushort bind(string host, ushort port) 1493 { 1494 assert(host.length, "Empty host"); 1495 1496 debug (ASOCKETS) writefln("Connecting to %s:%s", host, port); 1497 1498 state = ConnectionState.resolving; 1499 1500 AddressInfo addressInfo; 1501 try 1502 { 1503 auto addresses = getAddress(host, port); 1504 enforce(addresses.length, "No addresses found"); 1505 debug (ASOCKETS) 1506 { 1507 writefln("Resolved to %s addresses:", addresses.length); 1508 foreach (address; addresses) 1509 writefln("- %s", address.toString()); 1510 } 1511 1512 Address address; 1513 if (addresses.length > 1) 1514 { 1515 import std.random : uniform; 1516 address = addresses[uniform(0, $)]; 1517 } 1518 else 1519 address = addresses[0]; 1520 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1521 } 1522 catch (SocketException e) 1523 { 1524 onError("Lookup error: " ~ e.msg); 1525 return 0; 1526 } 1527 1528 state = ConnectionState.disconnected; 1529 return bind(addressInfo); 1530 } 1531 1532 /// ditto 1533 final ushort bind(AddressInfo addressInfo) 1534 { 1535 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1536 conn.bind(addressInfo.address); 1537 1538 auto address = conn.localAddress(); 1539 auto port = to!ushort(address.toPortString()); 1540 1541 if (connectHandler) 1542 connectHandler(); 1543 1544 return port; 1545 } 1546 1547 public: 1548 /// Where to send packets to. 1549 Address remoteAddress; 1550 } 1551 1552 /// 1553 unittest 1554 { 1555 auto server = new UdpConnection(); 1556 auto serverPort = server.bind("localhost", 0); 1557 1558 auto client = new UdpConnection(); 1559 client.initialize(server.localAddress.addressFamily); 1560 1561 string[] packets = ["Hello", "there"]; 1562 client.remoteAddress = server.localAddress; 1563 client.send({ 1564 Data[] data; 1565 foreach (packet; packets) 1566 data ~= Data(packet); 1567 return data; 1568 }()); 1569 1570 server.handleReadData = (Data data) 1571 { 1572 assert(data.contents == packets[0]); 1573 packets = packets[1..$]; 1574 if (!packets.length) 1575 { 1576 server.close(); 1577 client.close(); 1578 } 1579 }; 1580 socketManager.loop(); 1581 assert(!packets.length); 1582 } 1583 1584 // *************************************************************************** 1585 1586 /// Base class for a connection adapter. 1587 /// By itself, does nothing. 1588 class ConnectionAdapter : IConnection 1589 { 1590 IConnection next; 1591 1592 this(IConnection next) 1593 { 1594 this.next = next; 1595 next.handleConnect = &onConnect; 1596 next.handleDisconnect = &onDisconnect; 1597 } 1598 1599 @property ConnectionState state() { return next.state; } 1600 1601 /// Queue Data for sending. 1602 void send(Data[] data, int priority) 1603 { 1604 next.send(data, priority); 1605 } 1606 1607 alias send = IConnection.send; /// ditto 1608 1609 /// Terminate the connection. 1610 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1611 { 1612 next.disconnect(reason, type); 1613 } 1614 1615 protected void onConnect() 1616 { 1617 if (connectHandler) 1618 connectHandler(); 1619 } 1620 1621 protected void onReadData(Data data) 1622 { 1623 // onReadData should be fired only if readDataHandler is set 1624 readDataHandler(data); 1625 } 1626 1627 protected void onDisconnect(string reason, DisconnectType type) 1628 { 1629 if (disconnectHandler) 1630 disconnectHandler(reason, type); 1631 } 1632 1633 protected void onBufferFlushed() 1634 { 1635 if (bufferFlushedHandler) 1636 bufferFlushedHandler(); 1637 } 1638 1639 /// Callback for when a connection has been established. 1640 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1641 private ConnectHandler connectHandler; 1642 1643 /// Callback setter for when new data is read. 1644 @property void handleReadData(ReadDataHandler value) 1645 { 1646 readDataHandler = value; 1647 next.handleReadData = value ? &onReadData : null ; 1648 } 1649 private ReadDataHandler readDataHandler; 1650 1651 /// Callback setter for when a connection was closed. 1652 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1653 private DisconnectHandler disconnectHandler; 1654 1655 /// Callback setter for when all queued data has been written. 1656 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 1657 private BufferFlushedHandler bufferFlushedHandler; 1658 } 1659 1660 // *************************************************************************** 1661 1662 /// Adapter for connections with a line-based protocol. 1663 /// Splits data stream into delimiter-separated lines. 1664 class LineBufferedAdapter : ConnectionAdapter 1665 { 1666 /// The protocol's line delimiter. 1667 string delimiter = "\r\n"; 1668 1669 this(IConnection next) 1670 { 1671 super(next); 1672 } 1673 1674 /// Append a line to the send buffer. 1675 void send(string line) 1676 { 1677 //super.send(Data(line ~ delimiter)); 1678 // https://issues.dlang.org/show_bug.cgi?id=13985 1679 ConnectionAdapter ca = this; 1680 ca.send(Data(line ~ delimiter)); 1681 } 1682 1683 protected: 1684 /// The receive buffer. 1685 Data inBuffer; 1686 1687 /// Called when data has been received. 1688 final override void onReadData(Data data) 1689 { 1690 import std.string; 1691 auto oldBufferLength = inBuffer.length; 1692 if (oldBufferLength) 1693 inBuffer ~= data; 1694 else 1695 inBuffer = data; 1696 1697 if (delimiter.length == 1) 1698 { 1699 import core.stdc.string; // memchr 1700 1701 char c = delimiter[0]; 1702 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1703 while (p) 1704 { 1705 sizediff_t index = p - inBuffer.ptr; 1706 processLine(index); 1707 1708 p = memchr(inBuffer.ptr, c, inBuffer.length); 1709 } 1710 } 1711 else 1712 { 1713 sizediff_t index; 1714 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1715 while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0) 1716 processLine(index); 1717 } 1718 } 1719 1720 final void processLine(size_t index) 1721 { 1722 auto line = inBuffer[0..index]; 1723 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1724 super.onReadData(line); 1725 } 1726 1727 override void onDisconnect(string reason, DisconnectType type) 1728 { 1729 super.onDisconnect(reason, type); 1730 inBuffer.clear(); 1731 } 1732 } 1733 1734 // *************************************************************************** 1735 1736 /// Fires an event handler or disconnects connections 1737 /// after a period of inactivity. 1738 class TimeoutAdapter : ConnectionAdapter 1739 { 1740 this(IConnection next) 1741 { 1742 debug (ASOCKETS) writefln("New TimeoutAdapter @ %s", cast(void*)this); 1743 super(next); 1744 } 1745 1746 void cancelIdleTimeout() 1747 { 1748 debug (ASOCKETS) writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 1749 assert(idleTask !is null); 1750 assert(idleTask.isWaiting()); 1751 idleTask.cancel(); 1752 } 1753 1754 void resumeIdleTimeout() 1755 { 1756 debug (ASOCKETS) writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 1757 assert(state == ConnectionState.connected); 1758 assert(idleTask !is null); 1759 assert(!idleTask.isWaiting()); 1760 mainTimer.add(idleTask); 1761 } 1762 1763 final void setIdleTimeout(Duration duration) 1764 { 1765 debug (ASOCKETS) writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 1766 assert(duration > Duration.zero); 1767 if (idleTask is null) 1768 { 1769 idleTask = new TimerTask(duration); 1770 idleTask.handleTask = &onTask_Idle; 1771 } 1772 else 1773 { 1774 if (idleTask.isWaiting()) 1775 idleTask.cancel(); 1776 idleTask.delay = duration; 1777 } 1778 if (state == ConnectionState.connected) 1779 mainTimer.add(idleTask); 1780 } 1781 1782 void markNonIdle() 1783 { 1784 debug (ASOCKETS) writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 1785 assert(idleTask !is null); 1786 if (handleNonIdle) 1787 handleNonIdle(); 1788 if (idleTask.isWaiting()) 1789 idleTask.restart(); 1790 } 1791 1792 /// Callback for when a connection has stopped responding. 1793 /// If unset, the connection will be disconnected. 1794 void delegate() handleIdleTimeout; 1795 1796 /// Callback for when a connection is marked as non-idle 1797 /// (when data is received). 1798 void delegate() handleNonIdle; 1799 1800 protected: 1801 override void onConnect() 1802 { 1803 debug (ASOCKETS) writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 1804 super.onConnect(); 1805 if (idleTask) 1806 resumeIdleTimeout(); 1807 } 1808 1809 override void onReadData(Data data) 1810 { 1811 debug (ASOCKETS) writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 1812 markNonIdle(); 1813 super.onReadData(data); 1814 } 1815 1816 override void onDisconnect(string reason, DisconnectType type) 1817 { 1818 debug (ASOCKETS) writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 1819 if (idleTask && idleTask.isWaiting()) 1820 idleTask.cancel(); 1821 super.onDisconnect(reason, type); 1822 } 1823 1824 private: 1825 TimerTask idleTask; 1826 1827 final void onTask_Idle(Timer timer, TimerTask task) 1828 { 1829 if (state == ConnectionState.disconnecting) 1830 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 1831 1832 if (state != ConnectionState.connected) 1833 return; 1834 1835 if (handleIdleTimeout) 1836 { 1837 handleIdleTimeout(); 1838 if (state == ConnectionState.connected) 1839 { 1840 assert(!idleTask.isWaiting()); 1841 mainTimer.add(idleTask); 1842 } 1843 } 1844 else 1845 disconnect("Time-out", DisconnectType.error); 1846 } 1847 } 1848 1849 // *************************************************************************** 1850 1851 unittest 1852 { 1853 void testTimer() 1854 { 1855 bool fired; 1856 setTimeout({fired = true;}, 10.msecs); 1857 socketManager.loop(); 1858 assert(fired); 1859 } 1860 1861 testTimer(); 1862 }