1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <vladimir@thecybershadow.net> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import std.exception; 24 import std.socket; 25 import std.string : format; 26 public import std.socket : Address, Socket; 27 28 debug(ASOCKETS) import std.stdio; 29 debug(PRINTDATA) import ae.utils.text : hexDump; 30 private import std.conv : to; 31 32 33 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 34 static import ae.utils.array; 35 36 version(LIBEV) 37 { 38 import deimos.ev; 39 pragma(lib, "ev"); 40 } 41 42 version(Windows) 43 { 44 import core.sys.windows.windows : Sleep; 45 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 46 } 47 else 48 enum USE_SLEEP = false; 49 50 /// Flags that determine socket wake-up events. 51 int eventCounter; 52 53 version(LIBEV) 54 { 55 // Watchers are a GenericSocket field (as declared in SocketMixin). 56 // Use one watcher per read and write event. 57 // Start/stop those as higher-level code declares interest in those events. 58 // Use the "data" ev_io field to store the parent GenericSocket address. 59 // Also use the "data" field as a flag to indicate whether the watcher is active 60 // (data is null when the watcher is stopped). 61 62 struct SocketManager 63 { 64 private: 65 size_t count; 66 67 extern(C) 68 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 69 { 70 eventCounter++; 71 auto socket = cast(GenericSocket)w.data; 72 assert(socket, "libev event fired on stopped watcher"); 73 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 74 75 if (revents & EV_READ) 76 socket.onReadable(); 77 else 78 if (revents & EV_WRITE) 79 socket.onWritable(); 80 else 81 assert(false, "Unknown event fired from libev"); 82 83 // TODO? Need to get proper SocketManager instance to call updateTimer on 84 socketManager.updateTimer(false); 85 } 86 87 ev_timer evTimer; 88 MonoTime lastNextEvent = MonoTime.max; 89 90 extern(C) 91 static void timerCallback(ev_loop_t* l, ev_timer* w, int revents) 92 { 93 eventCounter++; 94 debug (ASOCKETS) writefln("Timer callback called."); 95 mainTimer.prod(); 96 97 socketManager.updateTimer(true); 98 debug (ASOCKETS) writefln("Timer callback exiting."); 99 } 100 101 void updateTimer(bool force) 102 { 103 auto nextEvent = mainTimer.getNextEvent(); 104 if (force || lastNextEvent != nextEvent) 105 { 106 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 107 if (nextEvent == MonoTime.max) // Stopping 108 { 109 if (lastNextEvent != MonoTime.max) 110 ev_timer_stop(ev_default_loop(0), &evTimer); 111 } 112 else 113 { 114 auto remaining = mainTimer.getRemainingTime(); 115 while (remaining <= Duration.zero) 116 { 117 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 118 mainTimer.prod(); 119 remaining = mainTimer.getRemainingTime(); 120 } 121 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 122 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 123 if (lastNextEvent == MonoTime.max) // Starting 124 { 125 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 126 ev_timer_start(ev_default_loop(0), &evTimer); 127 } 128 else // Adjusting 129 { 130 evTimer.repeat = tstamp; 131 ev_timer_again(ev_default_loop(0), &evTimer); 132 } 133 } 134 lastNextEvent = nextEvent; 135 } 136 } 137 138 /// Register a socket with the manager. 139 void register(GenericSocket socket) 140 { 141 debug (ASOCKETS) writefln("Registering %s", socket); 142 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 143 auto fd = socket.conn.handle; 144 assert(fd, "Must have fd before socket registration"); 145 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 146 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 147 count++; 148 } 149 150 /// Unregister a socket with the manager. 151 void unregister(GenericSocket socket) 152 { 153 debug (ASOCKETS) writefln("Unregistering %s", socket); 154 socket.notifyRead = false; 155 socket.notifyWrite = false; 156 count--; 157 } 158 159 public: 160 size_t size() 161 { 162 return count; 163 } 164 165 /// Loop continuously until no sockets are left. 166 void loop() 167 { 168 auto evLoop = ev_default_loop(0); 169 enforce(evLoop, "libev initialization failure"); 170 171 updateTimer(true); 172 debug (ASOCKETS) writeln("ev_run"); 173 ev_run(ev_default_loop(0), 0); 174 } 175 } 176 177 private mixin template SocketMixin() 178 { 179 private ev_io evRead, evWrite; 180 181 private void setWatcherState(ref ev_io ev, bool newValue, int event) 182 { 183 if (!conn) 184 { 185 // Can happen when setting delegates before connecting. 186 return; 187 } 188 189 if (newValue && !ev.data) 190 { 191 // Start 192 ev.data = cast(void*)this; 193 ev_io_start(ev_default_loop(0), &ev); 194 } 195 else 196 if (!newValue && ev.data) 197 { 198 // Stop 199 assert(ev.data is cast(void*)this); 200 ev.data = null; 201 ev_io_stop(ev_default_loop(0), &ev); 202 } 203 } 204 205 /// Interested in read notifications (onReadable)? 206 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 207 /// Interested in write notifications (onWritable)? 208 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 209 210 debug ~this() 211 { 212 // The LIBEV SocketManager holds no references to registered sockets. 213 // TODO: Add a doubly-linked list? 214 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 215 } 216 } 217 } 218 else // Use select 219 { 220 struct SocketManager 221 { 222 private: 223 enum FD_SETSIZE = 1024; 224 225 /// List of all sockets to poll. 226 GenericSocket[] sockets; 227 228 /// Debug AA to check for dangling socket references. 229 debug GenericSocket[socket_t] socketHandles; 230 231 /// Register a socket with the manager. 232 void register(GenericSocket conn) 233 { 234 debug (ASOCKETS) writefln("Registering %s (%d total)", conn, sockets.length + 1); 235 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 236 sockets ~= conn; 237 238 debug 239 { 240 auto handle = conn.socket.handle; 241 assert(handle != socket_t.init, "Can't register a closed socket"); 242 assert(handle !in socketHandles, "This socket handle is already registered"); 243 socketHandles[handle] = conn; 244 } 245 } 246 247 /// Unregister a socket with the manager. 248 void unregister(GenericSocket conn) 249 { 250 debug (ASOCKETS) writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 251 252 debug 253 { 254 auto handle = conn.socket.handle; 255 assert(handle != socket_t.init, "Can't unregister a closed socket"); 256 auto pconn = handle in socketHandles; 257 assert(pconn, "This socket handle is not registered"); 258 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 259 socketHandles.remove(handle); 260 } 261 262 foreach (size_t i, GenericSocket j; sockets) 263 if (j is conn) 264 { 265 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 266 return; 267 } 268 assert(false, "Socket not registered"); 269 } 270 271 void delegate()[] idleHandlers; 272 273 public: 274 size_t size() 275 { 276 return sockets.length; 277 } 278 279 /// Loop continuously until no sockets are left. 280 void loop() 281 { 282 debug (ASOCKETS) writeln("Starting event loop."); 283 284 SocketSet readset, writeset, errorset; 285 size_t sockcount; 286 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 287 readset = new SocketSet(setSize); 288 writeset = new SocketSet(setSize); 289 errorset = new SocketSet(setSize); 290 while (true) 291 { 292 uint minSize = 0; 293 version(Windows) 294 minSize = cast(uint)sockets.length; 295 else 296 { 297 foreach (s; sockets) 298 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 299 minSize = s.socket.handle; 300 } 301 minSize++; 302 303 if (setSize < minSize) 304 { 305 debug (ASOCKETS) writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 306 setSize = minSize * 2; 307 readset = new SocketSet(setSize); 308 writeset = new SocketSet(setSize); 309 errorset = new SocketSet(setSize); 310 } 311 else 312 { 313 readset.reset(); 314 writeset.reset(); 315 errorset.reset(); 316 } 317 318 sockcount = 0; 319 bool haveActive; 320 debug (ASOCKETS) writeln("Populating sets"); 321 foreach (GenericSocket conn; sockets) 322 { 323 if (!conn.socket) 324 continue; 325 sockcount++; 326 if (!conn.daemon) 327 haveActive = true; 328 329 debug (ASOCKETS) writef("\t%s:", conn); 330 if (conn.notifyRead) 331 { 332 readset.add(conn.socket); 333 debug (ASOCKETS) write(" READ"); 334 } 335 if (conn.notifyWrite) 336 { 337 writeset.add(conn.socket); 338 debug (ASOCKETS) write(" WRITE"); 339 } 340 errorset.add(conn.socket); 341 debug (ASOCKETS) writeln(); 342 } 343 debug (ASOCKETS) 344 { 345 writefln("Sets populated as follows:"); 346 printSets(readset, writeset, errorset); 347 } 348 349 debug (ASOCKETS) writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...", 350 sockcount, 351 mainTimer.isWaiting() ? "with" : "no", 352 idleHandlers.length, 353 ); 354 if (!haveActive && !mainTimer.isWaiting()) 355 { 356 debug (ASOCKETS) writeln("No more sockets or timer events, exiting loop."); 357 break; 358 } 359 360 debug (ASOCKETS) { stdout.flush(); stderr.flush(); } 361 362 int events; 363 if (idleHandlers.length) 364 { 365 if (sockcount==0) 366 events = 0; 367 else 368 events = Socket.select(readset, writeset, errorset, 0.seconds); 369 } 370 else 371 if (USE_SLEEP && sockcount==0) 372 { 373 version(Windows) 374 { 375 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 376 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 377 if (duration <= 0) 378 duration = 1; // Avoid busywait 379 else 380 if (duration > int.max) 381 duration = int.max; 382 Sleep(cast(int)duration); 383 events = 0; 384 } 385 else 386 assert(0); 387 } 388 else 389 if (mainTimer.isWaiting()) 390 events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime()); 391 else 392 events = Socket.select(readset, writeset, errorset); 393 394 debug (ASOCKETS) writefln("%d events fired.", events); 395 396 if (events > 0) 397 { 398 // Handle just one event at a time, as the first 399 // handler might invalidate select()'s results. 400 handleEvent(readset, writeset, errorset); 401 } 402 else 403 if (idleHandlers.length) 404 { 405 import ae.utils.array; 406 auto handler = idleHandlers.shift(); 407 408 // Rotate the idle handler queue before running it, 409 // in case the handler unregisters itself. 410 idleHandlers ~= handler; 411 412 handler(); 413 } 414 415 // Timers may invalidate our select results, so fire them after processing the latter 416 mainTimer.prod(); 417 418 eventCounter++; 419 } 420 } 421 422 debug (ASOCKETS) 423 void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset) 424 { 425 foreach (GenericSocket conn; sockets) 426 { 427 if (!conn.socket) 428 writefln("\t\t%s is unset", conn); 429 else 430 if (readset.isSet(conn.socket)) 431 writefln("\t\t%s is readable", conn); 432 else 433 if (writeset.isSet(conn.socket)) 434 writefln("\t\t%s is writable", conn); 435 else 436 if (errorset.isSet(conn.socket)) 437 writefln("\t\t%s is errored", conn); 438 } 439 } 440 441 void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset) 442 { 443 debug (ASOCKETS) 444 { 445 writefln("\tSelect results:"); 446 printSets(readset, writeset, errorset); 447 } 448 449 foreach (GenericSocket conn; sockets) 450 { 451 if (!conn.socket) 452 continue; 453 454 if (readset.isSet(conn.socket)) 455 { 456 debug (ASOCKETS) writefln("\t%s - calling onReadable", conn); 457 return conn.onReadable(); 458 } 459 else 460 if (writeset.isSet(conn.socket)) 461 { 462 debug (ASOCKETS) writefln("\t%s - calling onWritable", conn); 463 return conn.onWritable(); 464 } 465 else 466 if (errorset.isSet(conn.socket)) 467 { 468 debug (ASOCKETS) writefln("\t%s - calling onError", conn); 469 return conn.onError("select() error: " ~ conn.socket.getErrorText()); 470 } 471 } 472 473 assert(false, "select() reported events available, but no registered sockets are set"); 474 } 475 } 476 477 // Use UFCS to allow removeIdleHandler to have a predicate with context 478 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 479 { 480 foreach (i, idleHandler; socketManager.idleHandlers) 481 assert(handler !is idleHandler); 482 483 socketManager.idleHandlers ~= handler; 484 } 485 486 static bool isFun(T)(T a, T b) { return a is b; } 487 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 488 { 489 foreach (i, idleHandler; socketManager.idleHandlers) 490 if (pred(idleHandler, args)) 491 { 492 import std.algorithm; 493 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 494 return; 495 } 496 assert(false, "No such idle handler"); 497 } 498 499 private mixin template SocketMixin() 500 { 501 /// Interested in read notifications (onReadable)? 502 bool notifyRead; 503 /// Interested in write notifications (onWritable)? 504 bool notifyWrite; 505 } 506 } 507 508 /// The default socket manager. 509 SocketManager socketManager; 510 511 // *************************************************************************** 512 513 /// General methods for an asynchronous socket. 514 private abstract class GenericSocket 515 { 516 /// Declares notifyRead and notifyWrite. 517 mixin SocketMixin; 518 519 protected: 520 /// The socket this class wraps. 521 Socket conn; 522 523 protected: 524 /// Retrieve the socket class this class wraps. 525 @property final Socket socket() 526 { 527 return conn; 528 } 529 530 void onReadable() 531 { 532 } 533 534 void onWritable() 535 { 536 } 537 538 void onError(string reason) 539 { 540 } 541 542 public: 543 /// allow getting the address of connections that are already disconnected 544 private Address cachedLocalAddress, cachedRemoteAddress; 545 546 /// Don't block the process from exiting. 547 /// TODO: Not implemented with libev 548 bool daemon; 549 550 final @property Address localAddress() 551 { 552 if (cachedLocalAddress !is null) 553 return cachedLocalAddress; 554 else 555 if (conn is null) 556 return null; 557 else 558 return cachedLocalAddress = conn.localAddress(); 559 } 560 561 final @property string localAddressStr() nothrow 562 { 563 try 564 { 565 auto a = localAddress; 566 return a is null ? "[null address]" : a.toString(); 567 } 568 catch (Exception e) 569 return "[error: " ~ e.msg ~ "]"; 570 } 571 572 final @property Address remoteAddress() 573 { 574 if (cachedRemoteAddress !is null) 575 return cachedRemoteAddress; 576 else 577 if (conn is null) 578 return null; 579 else 580 return cachedRemoteAddress = conn.remoteAddress(); 581 } 582 583 final @property string remoteAddressStr() nothrow 584 { 585 try 586 { 587 auto a = remoteAddress; 588 return a is null ? "[null address]" : a.toString(); 589 } 590 catch (Exception e) 591 return "[error: " ~ e.msg ~ "]"; 592 } 593 594 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 595 { 596 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 597 if (enabled) 598 { 599 try 600 conn.setKeepAlive(time, interval); 601 catch (SocketFeatureException) 602 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 603 } 604 else 605 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 606 } 607 608 override string toString() const 609 { 610 import std.string; 611 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 612 } 613 } 614 615 // *************************************************************************** 616 617 enum DisconnectType 618 { 619 requested, // initiated by the application 620 graceful, // peer gracefully closed the connection 621 error // abnormal network condition 622 } 623 624 enum ConnectionState 625 { 626 /// The initial state, or the state after a disconnect was fully processed. 627 disconnected, 628 629 /// Name resolution. Currently done synchronously. 630 resolving, 631 632 /// A connection attempt is in progress. 633 connecting, 634 635 /// A connection is established. 636 connected, 637 638 /// Disconnecting in progress. No data can be sent or received at this point. 639 /// We are waiting for queued data to be actually sent before disconnecting. 640 disconnecting, 641 } 642 643 /// Common interface for connections and adapters. 644 interface IConnection 645 { 646 enum MAX_PRIORITY = 4; 647 enum DEFAULT_PRIORITY = 2; 648 649 static const defaultDisconnectReason = "Software closed the connection"; 650 651 /// Get connection state. 652 @property ConnectionState state(); 653 654 /// Has a connection been established? 655 deprecated final @property bool connected() { return state == ConnectionState.connected; } 656 657 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 658 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 659 660 /// Queue Data for sending. 661 void send(Data[] data, int priority = DEFAULT_PRIORITY); 662 663 /// ditto 664 final void send(Data datum, int priority = DEFAULT_PRIORITY) 665 { 666 Data[1] data; 667 data[0] = datum; 668 this.send(data); 669 data[] = Data.init; 670 } 671 672 /// Terminate the connection. 673 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 674 675 /// Callback setter for when a connection has been established 676 /// (if applicable). 677 alias void delegate() ConnectHandler; 678 @property void handleConnect(ConnectHandler value); /// ditto 679 680 /// Callback setter for when new data is read. 681 alias void delegate(Data data) ReadDataHandler; 682 @property void handleReadData(ReadDataHandler value); /// ditto 683 684 /// Callback setter for when a connection was closed. 685 alias void delegate(string reason, DisconnectType type) DisconnectHandler; 686 @property void handleDisconnect(DisconnectHandler value); /// ditto 687 } 688 689 // *************************************************************************** 690 691 /// An asynchronous TCP connection. 692 class TcpConnection : GenericSocket, IConnection 693 { 694 private: 695 /// Blocks of data larger than this value are passed as unmanaged memory 696 /// (in Data objects). Blocks smaller than this value will be reallocated 697 /// on the managed heap. The disadvantage of placing large objects on the 698 /// managed heap is false pointers; the disadvantage of using Data for 699 /// small objects is wasted slack space due to the page size alignment 700 /// requirement. 701 enum UNMANAGED_THRESHOLD = 256; 702 703 /// Queue of addresses to try connecting to. 704 Address[] addressQueue; 705 706 ConnectionState _state; 707 final @property ConnectionState state(ConnectionState value) { return _state = value; } 708 709 public: 710 /// Get connection state. 711 override @property ConnectionState state() { return _state; } 712 713 protected: 714 /// The send buffers. 715 Data[][MAX_PRIORITY+1] outQueue; 716 /// Whether the first item from each queue has been partially sent (and thus can't be cancelled). 717 bool[MAX_PRIORITY+1] partiallySent; 718 719 /// Constructor used by a ServerSocket for new connections 720 this(Socket conn) 721 { 722 this(); 723 this.conn = conn; 724 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 725 if (conn) 726 socketManager.register(this); 727 updateFlags(); 728 } 729 730 final void updateFlags() 731 { 732 if (state == ConnectionState.connecting) 733 notifyWrite = true; 734 else 735 notifyWrite = writePending; 736 737 notifyRead = state == ConnectionState.connected && readDataHandler; 738 } 739 740 /// Called when a socket is readable. 741 override void onReadable() 742 { 743 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 744 static ubyte[0x10000] inBuffer; 745 auto received = conn.receive(inBuffer); 746 747 if (received == 0) 748 return disconnect("Connection closed", DisconnectType.graceful); 749 750 if (received == Socket.ERROR) 751 { 752 // if (wouldHaveBlocked) 753 // { 754 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 755 // return; 756 // } 757 // else 758 onError("recv() error: " ~ lastSocketError); 759 } 760 else 761 { 762 debug (PRINTDATA) 763 { 764 std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress); 765 std.stdio.write(hexDump(inBuffer[0 .. received])); 766 std.stdio.stdout.flush(); 767 } 768 769 if (state == ConnectionState.disconnecting) 770 { 771 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because we are disconnecting", this); 772 } 773 else 774 if (!readDataHandler) 775 { 776 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because there is no data handler", this); 777 } 778 else 779 { 780 // Currently, unlike the D1 version of this module, 781 // we will always reallocate read network data. 782 // This disfavours code which doesn't need to store 783 // read data after processing it, but otherwise 784 // makes things simpler and safer all around. 785 786 if (received < UNMANAGED_THRESHOLD) 787 { 788 // Copy to the managed heap 789 readDataHandler(Data(inBuffer[0 .. received].dup)); 790 } 791 else 792 { 793 // Copy to unmanaged memory 794 readDataHandler(Data(inBuffer[0 .. received], true)); 795 } 796 } 797 } 798 } 799 800 /// Called when a socket is writable. 801 override void onWritable() 802 { 803 //scope(success) updateFlags(); 804 onWritableImpl(); 805 updateFlags(); 806 } 807 808 // Work around scope(success) breaking debugger stack traces 809 final private void onWritableImpl() 810 { 811 if (state == ConnectionState.connecting) 812 { 813 state = ConnectionState.connected; 814 815 //debug writefln("[%s] Connected", remoteAddress); 816 try 817 setKeepAlive(); 818 catch (Exception e) 819 return disconnect(e.msg, DisconnectType.error); 820 if (connectHandler) 821 connectHandler(); 822 return; 823 } 824 //debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 825 826 foreach (priority, ref queue; outQueue) 827 while (queue.length) 828 { 829 auto pdata = queue.ptr; // pointer to first data 830 831 ptrdiff_t sent = 0; 832 if (pdata.length) 833 { 834 sent = conn.send(pdata.contents); 835 debug (ASOCKETS) writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 836 } 837 else 838 { 839 debug (ASOCKETS) writefln("\t\t%s: empty Data object", this); 840 } 841 842 if (sent == Socket.ERROR) 843 { 844 if (wouldHaveBlocked()) 845 return; 846 else 847 return onError("send() error: " ~ lastSocketError); 848 } 849 else 850 if (sent < pdata.length) 851 { 852 if (sent > 0) 853 { 854 *pdata = (*pdata)[sent..pdata.length]; 855 partiallySent[priority] = true; 856 } 857 return; 858 } 859 else 860 { 861 assert(sent == pdata.length); 862 //debug writefln("[%s] Sent data:", remoteAddress); 863 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 864 pdata.clear(); 865 queue = queue[1..$]; 866 partiallySent[priority] = false; 867 if (queue.length == 0) 868 queue = null; 869 } 870 } 871 872 // outQueue is now empty 873 if (handleBufferFlushed) 874 handleBufferFlushed(); 875 if (state == ConnectionState.disconnecting) 876 { 877 debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 878 close(); 879 } 880 } 881 882 /// Called when an error occurs on the socket. 883 override void onError(string reason) 884 { 885 if (state == ConnectionState.connecting && addressQueue.length) 886 { 887 socketManager.unregister(this); 888 conn.close(); 889 conn = null; 890 891 return tryNextAddress(); 892 } 893 894 if (state == ConnectionState.disconnecting) 895 { 896 debug (ASOCKETS) writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 897 return close(); 898 } 899 900 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 901 disconnect("Socket error: " ~ reason, DisconnectType.error); 902 } 903 904 final void tryNextAddress() 905 { 906 assert(state == ConnectionState.connecting); 907 auto address = addressQueue[0]; 908 addressQueue = addressQueue[1..$]; 909 910 try 911 { 912 conn = new Socket(address.addressFamily(), SocketType.STREAM, ProtocolType.TCP); 913 conn.blocking = false; 914 915 socketManager.register(this); 916 updateFlags(); 917 debug (ASOCKETS) writefln("Attempting connection to %s", address.toString()); 918 conn.connect(address); 919 } 920 catch (SocketException e) 921 return onError("Connect error: " ~ e.msg); 922 } 923 924 public: 925 /// Default constructor 926 this() 927 { 928 debug (ASOCKETS) writefln("New TcpConnection @ %s", cast(void*)this); 929 } 930 931 /// Start establishing a connection. 932 final void connect(string host, ushort port) 933 { 934 assert(host.length, "Empty host"); 935 assert(port, "No port specified"); 936 937 debug (ASOCKETS) writefln("Connecting to %s:%s", host, port); 938 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 939 assert(!conn); 940 941 state = ConnectionState.resolving; 942 943 try 944 { 945 addressQueue = getAddress(host, port); 946 enforce(addressQueue.length, "No addresses found"); 947 debug (ASOCKETS) 948 { 949 writefln("Resolved to %s addresses:", addressQueue.length); 950 foreach (address; addressQueue) 951 writefln("- %s", address.toString()); 952 } 953 954 state = ConnectionState.connecting; 955 if (addressQueue.length > 1) 956 { 957 import std.random : randomShuffle; 958 randomShuffle(addressQueue); 959 } 960 } 961 catch (SocketException e) 962 return onError("Lookup error: " ~ e.msg); 963 964 tryNextAddress(); 965 } 966 967 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 968 /// The disconnect handler will be called when all data has been flushed. 969 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 970 { 971 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 972 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected, "Attempting to disconnect on a %s socket".format(state)); 973 974 if (writePending) 975 { 976 if (type==DisconnectType.requested) 977 { 978 assert(conn, "Attempting to disconnect on an uninitialized socket"); 979 // queue disconnect after all data is sent 980 debug (ASOCKETS) writefln("[%s] Queueing disconnect: %s", remoteAddress, reason); 981 state = ConnectionState.disconnecting; 982 //setIdleTimeout(30.seconds); 983 if (disconnectHandler) 984 disconnectHandler(reason, type); 985 updateFlags(); 986 return; 987 } 988 else 989 discardQueues(); 990 } 991 992 debug (ASOCKETS) writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 993 994 if (state == ConnectionState.connecting || state == ConnectionState.connected) 995 close(); 996 else 997 assert(conn is null, "Registered but %s socket".format(state)); 998 999 if (disconnectHandler) 1000 disconnectHandler(reason, type); 1001 updateFlags(); 1002 } 1003 1004 private final void close() 1005 { 1006 assert(conn, "Attempting to close an unregistered socket"); 1007 socketManager.unregister(this); 1008 conn.close(); 1009 conn = null; 1010 outQueue[] = null; 1011 state = ConnectionState.disconnected; 1012 } 1013 1014 /// Append data to the send buffer. 1015 void send(Data[] data, int priority = DEFAULT_PRIORITY) 1016 { 1017 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 1018 outQueue[priority] ~= data; 1019 notifyWrite = true; // Fast updateFlags() 1020 1021 debug (PRINTDATA) 1022 { 1023 std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress); 1024 foreach (datum; data) 1025 if (datum.length) 1026 std.stdio.write(hexDump(datum.contents)); 1027 else 1028 std.stdio.writeln("(empty Data)"); 1029 std.stdio.stdout.flush(); 1030 } 1031 } 1032 1033 /// ditto 1034 alias send = IConnection.send; 1035 1036 final void clearQueue(int priority) 1037 { 1038 if (partiallySent[priority]) 1039 { 1040 assert(outQueue[priority].length > 0); 1041 outQueue[priority] = outQueue[priority][0..1]; 1042 } 1043 else 1044 outQueue[priority] = null; 1045 updateFlags(); 1046 } 1047 1048 /// Clears all queues, even partially sent content. 1049 private final void discardQueues() 1050 { 1051 foreach (priority; 0..MAX_PRIORITY+1) 1052 { 1053 outQueue[priority] = null; 1054 partiallySent[priority] = false; 1055 } 1056 updateFlags(); 1057 } 1058 1059 @property 1060 final bool writePending() 1061 { 1062 foreach (queue; outQueue) 1063 if (queue.length) 1064 return true; 1065 return false; 1066 } 1067 1068 final bool queuePresent(int priority) 1069 { 1070 if (partiallySent[priority]) 1071 { 1072 assert(outQueue[priority].length > 0); 1073 return outQueue[priority].length > 1; 1074 } 1075 else 1076 return outQueue[priority].length > 0; 1077 } 1078 1079 public: 1080 private ConnectHandler connectHandler; 1081 /// Callback for when a connection has been established. 1082 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 1083 1084 /// Callback for when the send buffer has been flushed. 1085 void delegate() handleBufferFlushed; 1086 1087 private ReadDataHandler readDataHandler; 1088 /// Callback for incoming data. 1089 /// Data will not be received unless this handler is set. 1090 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 1091 1092 private DisconnectHandler disconnectHandler; 1093 /// Callback for when a connection was closed. 1094 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 1095 } 1096 1097 // *************************************************************************** 1098 1099 /// An asynchronous TCP connection server. 1100 final class TcpServer 1101 { 1102 private: 1103 /// Class that actually performs listening on a certain address family 1104 final class Listener : GenericSocket 1105 { 1106 this(Socket conn) 1107 { 1108 debug (ASOCKETS) writefln("New Listener @ %s", cast(void*)this); 1109 this.conn = conn; 1110 socketManager.register(this); 1111 } 1112 1113 /// Called when a socket is readable. 1114 override void onReadable() 1115 { 1116 debug (ASOCKETS) writefln("Accepting connection from listener @ %s", cast(void*)this); 1117 Socket acceptSocket = conn.accept(); 1118 acceptSocket.blocking = false; 1119 if (handleAccept) 1120 { 1121 TcpConnection connection = new TcpConnection(acceptSocket); 1122 debug (ASOCKETS) writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress); 1123 connection.setKeepAlive(); 1124 //assert(connection.connected); 1125 //connection.connected = true; 1126 acceptHandler(connection); 1127 } 1128 else 1129 acceptSocket.close(); 1130 } 1131 1132 /// Called when a socket is writable. 1133 override void onWritable() 1134 { 1135 } 1136 1137 /// Called when an error occurs on the socket. 1138 override void onError(string reason) 1139 { 1140 close(); // call parent 1141 } 1142 1143 void closeListener() 1144 { 1145 assert(conn); 1146 socketManager.unregister(this); 1147 conn.close(); 1148 conn = null; 1149 } 1150 } 1151 1152 /// Whether the socket is listening. 1153 bool listening; 1154 /// Listener instances 1155 Listener[] listeners; 1156 1157 final void updateFlags() 1158 { 1159 foreach (listener; listeners) 1160 listener.notifyRead = handleAccept !is null; 1161 } 1162 1163 public: 1164 /// Start listening on this socket. 1165 ushort listen(ushort port, string addr = null) 1166 { 1167 debug(ASOCKETS) writefln("Attempting to listen on %s:%d", addr, port); 1168 //assert(!listening, "Attempting to listen on a listening socket"); 1169 1170 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1171 1172 foreach (ref addressInfo; addressInfos) 1173 { 1174 if (addressInfo.family != AddressFamily.INET && port == 0) 1175 continue; // listen on random ports only on IPv4 for now 1176 1177 try 1178 { 1179 Socket conn = new Socket(addressInfo); 1180 conn.blocking = false; 1181 if (addressInfo.family == AddressFamily.INET6) 1182 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1183 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1184 1185 conn.bind(addressInfo.address); 1186 conn.listen(8); 1187 1188 if (addressInfo.family == AddressFamily.INET) 1189 port = to!ushort(conn.localAddress().toPortString()); 1190 1191 listeners ~= new Listener(conn); 1192 } 1193 catch (SocketException e) 1194 { 1195 debug(ASOCKETS) writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1196 debug(ASOCKETS) writeln(e.msg); 1197 } 1198 } 1199 1200 if (listeners.length==0) 1201 throw new Exception("Unable to bind service"); 1202 1203 listening = true; 1204 1205 updateFlags(); 1206 1207 return port; 1208 } 1209 1210 @property Address[] localAddresses() 1211 { 1212 Address[] result; 1213 foreach (listener; listeners) 1214 result ~= listener.localAddress; 1215 return result; 1216 } 1217 1218 @property bool isListening() 1219 { 1220 return listening; 1221 } 1222 1223 /// Stop listening on this socket. 1224 void close() 1225 { 1226 foreach (listener;listeners) 1227 listener.closeListener(); 1228 listeners = null; 1229 listening = false; 1230 if (handleClose) 1231 handleClose(); 1232 } 1233 1234 public: 1235 /// Callback for when the socket was closed. 1236 void delegate() handleClose; 1237 1238 private void delegate(TcpConnection incoming) acceptHandler; 1239 /// Callback for an incoming connection. 1240 /// Connections will not be accepted unless this handler is set. 1241 @property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; } 1242 /// ditto 1243 @property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); } 1244 } 1245 1246 // *************************************************************************** 1247 1248 /// Base class for a connection adapter. 1249 /// By itself, does nothing. 1250 class ConnectionAdapter : IConnection 1251 { 1252 IConnection next; 1253 1254 this(IConnection next) 1255 { 1256 this.next = next; 1257 next.handleConnect = &onConnect; 1258 next.handleDisconnect = &onDisconnect; 1259 } 1260 1261 @property ConnectionState state() { return next.state; } 1262 1263 /// Queue Data for sending. 1264 void send(Data[] data, int priority) 1265 { 1266 next.send(data, priority); 1267 } 1268 1269 alias send = IConnection.send; /// ditto 1270 1271 /// Terminate the connection. 1272 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1273 { 1274 next.disconnect(reason, type); 1275 } 1276 1277 protected void onConnect() 1278 { 1279 if (connectHandler) 1280 connectHandler(); 1281 } 1282 1283 protected void onReadData(Data data) 1284 { 1285 // onReadData should be fired only if readDataHandler is set 1286 readDataHandler(data); 1287 } 1288 1289 protected void onDisconnect(string reason, DisconnectType type) 1290 { 1291 if (disconnectHandler) 1292 disconnectHandler(reason, type); 1293 } 1294 1295 /// Callback for when a connection has been established. 1296 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1297 private ConnectHandler connectHandler; 1298 1299 /// Callback setter for when new data is read. 1300 @property void handleReadData(ReadDataHandler value) 1301 { 1302 readDataHandler = value; 1303 next.handleReadData = value ? &onReadData : null ; 1304 } 1305 private ReadDataHandler readDataHandler; 1306 1307 /// Callback setter for when a connection was closed. 1308 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1309 private DisconnectHandler disconnectHandler; 1310 } 1311 1312 // *************************************************************************** 1313 1314 /// Adapter for connections with a line-based protocol. 1315 /// Splits data stream into delimiter-separated lines. 1316 class LineBufferedAdapter : ConnectionAdapter 1317 { 1318 /// The protocol's line delimiter. 1319 string delimiter = "\r\n"; 1320 1321 this(IConnection next) 1322 { 1323 super(next); 1324 } 1325 1326 /// Append a line to the send buffer. 1327 void send(string line) 1328 { 1329 //super.send(Data(line ~ delimiter)); 1330 // https://issues.dlang.org/show_bug.cgi?id=13985 1331 ConnectionAdapter ca = this; 1332 ca.send(Data(line ~ delimiter)); 1333 } 1334 1335 protected: 1336 /// The receive buffer. 1337 Data inBuffer; 1338 1339 /// Called when data has been received. 1340 final override void onReadData(Data data) 1341 { 1342 import std.string; 1343 auto oldBufferLength = inBuffer.length; 1344 if (oldBufferLength) 1345 inBuffer ~= data; 1346 else 1347 inBuffer = data; 1348 1349 if (delimiter.length == 1) 1350 { 1351 import core.stdc.string; // memchr 1352 1353 char c = delimiter[0]; 1354 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1355 while (p) 1356 { 1357 sizediff_t index = p - inBuffer.ptr; 1358 processLine(index); 1359 1360 p = memchr(inBuffer.ptr, c, inBuffer.length); 1361 } 1362 } 1363 else 1364 { 1365 sizediff_t index; 1366 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1367 while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0) 1368 processLine(index); 1369 } 1370 } 1371 1372 final void processLine(size_t index) 1373 { 1374 auto line = inBuffer[0..index]; 1375 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1376 super.onReadData(line); 1377 } 1378 1379 override void onDisconnect(string reason, DisconnectType type) 1380 { 1381 super.onDisconnect(reason, type); 1382 inBuffer.clear(); 1383 } 1384 } 1385 1386 // *************************************************************************** 1387 1388 /// Fires an event handler or disconnects connections 1389 /// after a period of inactivity. 1390 class TimeoutAdapter : ConnectionAdapter 1391 { 1392 this(IConnection next) 1393 { 1394 debug (ASOCKETS) writefln("New TimeoutAdapter @ %s", cast(void*)this); 1395 super(next); 1396 } 1397 1398 void cancelIdleTimeout() 1399 { 1400 debug (ASOCKETS) writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 1401 assert(idleTask !is null); 1402 assert(idleTask.isWaiting()); 1403 idleTask.cancel(); 1404 } 1405 1406 void resumeIdleTimeout() 1407 { 1408 debug (ASOCKETS) writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 1409 assert(state == ConnectionState.connected); 1410 assert(idleTask !is null); 1411 assert(!idleTask.isWaiting()); 1412 mainTimer.add(idleTask); 1413 } 1414 1415 final void setIdleTimeout(Duration duration) 1416 { 1417 debug (ASOCKETS) writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 1418 assert(duration > Duration.zero); 1419 if (idleTask is null) 1420 { 1421 idleTask = new TimerTask(duration); 1422 idleTask.handleTask = &onTask_Idle; 1423 } 1424 else 1425 { 1426 if (idleTask.isWaiting()) 1427 idleTask.cancel(); 1428 idleTask.delay = duration; 1429 } 1430 if (state == ConnectionState.connected) 1431 mainTimer.add(idleTask); 1432 } 1433 1434 void markNonIdle() 1435 { 1436 debug (ASOCKETS) writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 1437 assert(idleTask !is null); 1438 if (handleNonIdle) 1439 handleNonIdle(); 1440 if (idleTask.isWaiting()) 1441 idleTask.restart(); 1442 } 1443 1444 /// Callback for when a connection has stopped responding. 1445 /// If unset, the connection will be disconnected. 1446 void delegate() handleIdleTimeout; 1447 1448 /// Callback for when a connection is marked as non-idle 1449 /// (when data is received). 1450 void delegate() handleNonIdle; 1451 1452 protected: 1453 override void onConnect() 1454 { 1455 debug (ASOCKETS) writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 1456 super.onConnect(); 1457 if (idleTask) 1458 resumeIdleTimeout(); 1459 } 1460 1461 override void onReadData(Data data) 1462 { 1463 debug (ASOCKETS) writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 1464 markNonIdle(); 1465 super.onReadData(data); 1466 } 1467 1468 override void onDisconnect(string reason, DisconnectType type) 1469 { 1470 debug (ASOCKETS) writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 1471 super.onDisconnect(reason, type); 1472 if (idleTask && idleTask.isWaiting()) 1473 idleTask.cancel(); 1474 } 1475 1476 private: 1477 TimerTask idleTask; 1478 1479 final void onTask_Idle(Timer timer, TimerTask task) 1480 { 1481 if (state == ConnectionState.disconnecting) 1482 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 1483 1484 if (state != ConnectionState.connected) 1485 return; 1486 1487 if (handleIdleTimeout) 1488 { 1489 handleIdleTimeout(); 1490 if (state == ConnectionState.connected) 1491 { 1492 assert(!idleTask.isWaiting()); 1493 mainTimer.add(idleTask); 1494 } 1495 } 1496 else 1497 disconnect("Time-out", DisconnectType.error); 1498 } 1499 } 1500 1501 // *************************************************************************** 1502 1503 unittest 1504 { 1505 void testTimer() 1506 { 1507 bool fired; 1508 setTimeout({fired = true;}, 10.msecs); 1509 socketManager.loop(); 1510 assert(fired); 1511 } 1512 1513 testTimer(); 1514 }