1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <vladimir@thecybershadow.net> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import std.exception; 24 import std.socket; 25 import std.string : format; 26 public import std.socket : Address, Socket; 27 28 debug(ASOCKETS) import std.stdio; 29 debug(PRINTDATA) import ae.utils.text : hexDump; 30 private import std.conv : to; 31 32 33 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 34 static import ae.utils.array; 35 36 version(LIBEV) 37 { 38 import deimos.ev; 39 pragma(lib, "ev"); 40 } 41 42 version(Windows) 43 { 44 import core.sys.windows.windows : Sleep; 45 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 46 } 47 else 48 enum USE_SLEEP = false; 49 50 /// Flags that determine socket wake-up events. 51 int eventCounter; 52 53 version(LIBEV) 54 { 55 // Watchers are a GenericSocket field (as declared in SocketMixin). 56 // Use one watcher per read and write event. 57 // Start/stop those as higher-level code declares interest in those events. 58 // Use the "data" ev_io field to store the parent GenericSocket address. 59 // Also use the "data" field as a flag to indicate whether the watcher is active 60 // (data is null when the watcher is stopped). 61 62 struct SocketManager 63 { 64 private: 65 size_t count; 66 67 extern(C) 68 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 69 { 70 eventCounter++; 71 auto socket = cast(GenericSocket)w.data; 72 assert(socket, "libev event fired on stopped watcher"); 73 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 74 75 if (revents & EV_READ) 76 socket.onReadable(); 77 else 78 if (revents & EV_WRITE) 79 socket.onWritable(); 80 else 81 assert(false, "Unknown event fired from libev"); 82 83 // TODO? Need to get proper SocketManager instance to call updateTimer on 84 socketManager.updateTimer(false); 85 } 86 87 ev_timer evTimer; 88 MonoTime lastNextEvent = MonoTime.max; 89 90 extern(C) 91 static void timerCallback(ev_loop_t* l, ev_timer* w, int revents) 92 { 93 eventCounter++; 94 debug (ASOCKETS) writefln("Timer callback called."); 95 mainTimer.prod(); 96 97 socketManager.updateTimer(true); 98 debug (ASOCKETS) writefln("Timer callback exiting."); 99 } 100 101 void updateTimer(bool force) 102 { 103 auto nextEvent = mainTimer.getNextEvent(); 104 if (force || lastNextEvent != nextEvent) 105 { 106 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 107 if (nextEvent == MonoTime.max) // Stopping 108 { 109 if (lastNextEvent != MonoTime.max) 110 ev_timer_stop(ev_default_loop(0), &evTimer); 111 } 112 else 113 { 114 auto remaining = mainTimer.getRemainingTime(); 115 while (remaining <= Duration.zero) 116 { 117 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 118 mainTimer.prod(); 119 remaining = mainTimer.getRemainingTime(); 120 } 121 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 122 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 123 if (lastNextEvent == MonoTime.max) // Starting 124 { 125 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 126 ev_timer_start(ev_default_loop(0), &evTimer); 127 } 128 else // Adjusting 129 { 130 evTimer.repeat = tstamp; 131 ev_timer_again(ev_default_loop(0), &evTimer); 132 } 133 } 134 lastNextEvent = nextEvent; 135 } 136 } 137 138 /// Register a socket with the manager. 139 void register(GenericSocket socket) 140 { 141 debug (ASOCKETS) writefln("Registering %s", socket); 142 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 143 auto fd = socket.conn.handle; 144 assert(fd, "Must have fd before socket registration"); 145 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 146 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 147 count++; 148 } 149 150 /// Unregister a socket with the manager. 151 void unregister(GenericSocket socket) 152 { 153 debug (ASOCKETS) writefln("Unregistering %s", socket); 154 socket.notifyRead = false; 155 socket.notifyWrite = false; 156 count--; 157 } 158 159 public: 160 size_t size() 161 { 162 return count; 163 } 164 165 /// Loop continuously until no sockets are left. 166 void loop() 167 { 168 auto evLoop = ev_default_loop(0); 169 enforce(evLoop, "libev initialization failure"); 170 171 updateTimer(true); 172 debug (ASOCKETS) writeln("ev_run"); 173 ev_run(ev_default_loop(0), 0); 174 } 175 } 176 177 private mixin template SocketMixin() 178 { 179 private ev_io evRead, evWrite; 180 181 private void setWatcherState(ref ev_io ev, bool newValue, int event) 182 { 183 if (!conn) 184 { 185 // Can happen when setting delegates before connecting. 186 return; 187 } 188 189 if (newValue && !ev.data) 190 { 191 // Start 192 ev.data = cast(void*)this; 193 ev_io_start(ev_default_loop(0), &ev); 194 } 195 else 196 if (!newValue && ev.data) 197 { 198 // Stop 199 assert(ev.data is cast(void*)this); 200 ev.data = null; 201 ev_io_stop(ev_default_loop(0), &ev); 202 } 203 } 204 205 /// Interested in read notifications (onReadable)? 206 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 207 /// Interested in write notifications (onWritable)? 208 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 209 210 debug ~this() 211 { 212 // The LIBEV SocketManager holds no references to registered sockets. 213 // TODO: Add a doubly-linked list? 214 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 215 } 216 } 217 } 218 else // Use select 219 { 220 struct SocketManager 221 { 222 private: 223 enum FD_SETSIZE = 1024; 224 225 /// List of all sockets to poll. 226 GenericSocket[] sockets; 227 228 /// Debug AA to check for dangling socket references. 229 debug GenericSocket[socket_t] socketHandles; 230 231 /// Register a socket with the manager. 232 void register(GenericSocket conn) 233 { 234 debug (ASOCKETS) writefln("Registering %s (%d total)", conn, sockets.length + 1); 235 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 236 sockets ~= conn; 237 238 debug 239 { 240 auto handle = conn.socket.handle; 241 assert(handle != socket_t.init, "Can't register a closed socket"); 242 assert(handle !in socketHandles, "This socket handle is already registered"); 243 socketHandles[handle] = conn; 244 } 245 } 246 247 /// Unregister a socket with the manager. 248 void unregister(GenericSocket conn) 249 { 250 debug (ASOCKETS) writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 251 252 debug 253 { 254 auto handle = conn.socket.handle; 255 assert(handle != socket_t.init, "Can't unregister a closed socket"); 256 auto pconn = handle in socketHandles; 257 assert(pconn, "This socket handle is not registered"); 258 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 259 socketHandles.remove(handle); 260 } 261 262 foreach (size_t i, GenericSocket j; sockets) 263 if (j is conn) 264 { 265 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 266 return; 267 } 268 assert(false, "Socket not registered"); 269 } 270 271 void delegate()[] idleHandlers; 272 273 public: 274 size_t size() 275 { 276 return sockets.length; 277 } 278 279 /// Loop continuously until no sockets are left. 280 void loop() 281 { 282 debug (ASOCKETS) writeln("Starting event loop."); 283 284 SocketSet readset, writeset, errorset; 285 size_t sockcount; 286 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 287 readset = new SocketSet(setSize); 288 writeset = new SocketSet(setSize); 289 errorset = new SocketSet(setSize); 290 while (true) 291 { 292 uint minSize = 0; 293 version(Windows) 294 minSize = cast(uint)sockets.length; 295 else 296 { 297 foreach (s; sockets) 298 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 299 minSize = s.socket.handle; 300 } 301 minSize++; 302 303 if (setSize < minSize) 304 { 305 debug (ASOCKETS) writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 306 setSize = minSize * 2; 307 readset = new SocketSet(setSize); 308 writeset = new SocketSet(setSize); 309 errorset = new SocketSet(setSize); 310 } 311 else 312 { 313 readset.reset(); 314 writeset.reset(); 315 errorset.reset(); 316 } 317 318 sockcount = 0; 319 bool haveActive; 320 debug (ASOCKETS) writeln("Populating sets"); 321 foreach (GenericSocket conn; sockets) 322 { 323 if (!conn.socket) 324 continue; 325 sockcount++; 326 if (!conn.daemon) 327 haveActive = true; 328 329 debug (ASOCKETS) writef("\t%s:", conn); 330 if (conn.notifyRead) 331 { 332 readset.add(conn.socket); 333 debug (ASOCKETS) write(" READ"); 334 } 335 if (conn.notifyWrite) 336 { 337 writeset.add(conn.socket); 338 debug (ASOCKETS) write(" WRITE"); 339 } 340 errorset.add(conn.socket); 341 debug (ASOCKETS) writeln(); 342 } 343 debug (ASOCKETS) 344 { 345 writefln("Sets populated as follows:"); 346 printSets(readset, writeset, errorset); 347 } 348 349 debug (ASOCKETS) writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...", 350 sockcount, 351 mainTimer.isWaiting() ? "with" : "no", 352 idleHandlers.length, 353 ); 354 if (!haveActive && !mainTimer.isWaiting()) 355 { 356 debug (ASOCKETS) writeln("No more sockets or timer events, exiting loop."); 357 break; 358 } 359 360 debug (ASOCKETS) { stdout.flush(); stderr.flush(); } 361 362 int events; 363 if (idleHandlers.length) 364 { 365 if (sockcount==0) 366 events = 0; 367 else 368 events = Socket.select(readset, writeset, errorset, 0.seconds); 369 } 370 else 371 if (USE_SLEEP && sockcount==0) 372 { 373 version(Windows) 374 { 375 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 376 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 377 if (duration <= 0) 378 duration = 1; // Avoid busywait 379 else 380 if (duration > int.max) 381 duration = int.max; 382 Sleep(cast(int)duration); 383 events = 0; 384 } 385 else 386 assert(0); 387 } 388 else 389 if (mainTimer.isWaiting()) 390 events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime()); 391 else 392 events = Socket.select(readset, writeset, errorset); 393 394 debug (ASOCKETS) writefln("%d events fired.", events); 395 396 if (events > 0) 397 { 398 // Handle just one event at a time, as the first 399 // handler might invalidate select()'s results. 400 handleEvent(readset, writeset, errorset); 401 } 402 else 403 if (idleHandlers.length) 404 { 405 import ae.utils.array; 406 auto handler = idleHandlers.shift(); 407 408 // Rotate the idle handler queue before running it, 409 // in case the handler unregisters itself. 410 idleHandlers ~= handler; 411 412 handler(); 413 } 414 415 // Timers may invalidate our select results, so fire them after processing the latter 416 mainTimer.prod(); 417 418 eventCounter++; 419 } 420 } 421 422 debug (ASOCKETS) 423 void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset) 424 { 425 foreach (GenericSocket conn; sockets) 426 { 427 if (!conn.socket) 428 writefln("\t\t%s is unset", conn); 429 else 430 if (readset.isSet(conn.socket)) 431 writefln("\t\t%s is readable", conn); 432 else 433 if (writeset.isSet(conn.socket)) 434 writefln("\t\t%s is writable", conn); 435 else 436 if (errorset.isSet(conn.socket)) 437 writefln("\t\t%s is errored", conn); 438 } 439 } 440 441 void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset) 442 { 443 debug (ASOCKETS) 444 { 445 writefln("\tSelect results:"); 446 printSets(readset, writeset, errorset); 447 } 448 449 foreach (GenericSocket conn; sockets) 450 { 451 if (!conn.socket) 452 continue; 453 454 if (readset.isSet(conn.socket)) 455 { 456 debug (ASOCKETS) writefln("\t%s - calling onReadable", conn); 457 return conn.onReadable(); 458 } 459 else 460 if (writeset.isSet(conn.socket)) 461 { 462 debug (ASOCKETS) writefln("\t%s - calling onWritable", conn); 463 return conn.onWritable(); 464 } 465 else 466 if (errorset.isSet(conn.socket)) 467 { 468 debug (ASOCKETS) writefln("\t%s - calling onError", conn); 469 return conn.onError("select() error: " ~ conn.socket.getErrorText()); 470 } 471 } 472 473 assert(false, "select() reported events available, but no registered sockets are set"); 474 } 475 } 476 477 // Use UFCS to allow removeIdleHandler to have a predicate with context 478 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 479 { 480 foreach (i, idleHandler; socketManager.idleHandlers) 481 assert(handler !is idleHandler); 482 483 socketManager.idleHandlers ~= handler; 484 } 485 486 static bool isFun(T)(T a, T b) { return a is b; } 487 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 488 { 489 foreach (i, idleHandler; socketManager.idleHandlers) 490 if (pred(idleHandler, args)) 491 { 492 import std.algorithm; 493 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 494 return; 495 } 496 assert(false, "No such idle handler"); 497 } 498 499 private mixin template SocketMixin() 500 { 501 /// Interested in read notifications (onReadable)? 502 bool notifyRead; 503 /// Interested in write notifications (onWritable)? 504 bool notifyWrite; 505 } 506 } 507 508 /// The default socket manager. 509 SocketManager socketManager; 510 511 // *************************************************************************** 512 513 /// General methods for an asynchronous socket. 514 private abstract class GenericSocket 515 { 516 /// Declares notifyRead and notifyWrite. 517 mixin SocketMixin; 518 519 protected: 520 /// The socket this class wraps. 521 Socket conn; 522 523 protected: 524 /// Retrieve the socket class this class wraps. 525 @property final Socket socket() 526 { 527 return conn; 528 } 529 530 void onReadable() 531 { 532 } 533 534 void onWritable() 535 { 536 } 537 538 void onError(string reason) 539 { 540 } 541 542 public: 543 /// allow getting the address of connections that are already disconnected 544 private Address cachedLocalAddress, cachedRemoteAddress; 545 546 /// Don't block the process from exiting. 547 /// TODO: Not implemented with libev 548 bool daemon; 549 550 final @property Address localAddress() 551 { 552 if (cachedLocalAddress !is null) 553 return cachedLocalAddress; 554 else 555 if (conn is null) 556 return null; 557 else 558 return cachedLocalAddress = conn.localAddress(); 559 } 560 561 final @property string localAddressStr() nothrow 562 { 563 try 564 { 565 auto a = localAddress; 566 return a is null ? "[null address]" : a.toString(); 567 } 568 catch (Exception e) 569 return "[error: " ~ e.msg ~ "]"; 570 } 571 572 final @property Address remoteAddress() 573 { 574 if (cachedRemoteAddress !is null) 575 return cachedRemoteAddress; 576 else 577 if (conn is null) 578 return null; 579 else 580 return cachedRemoteAddress = conn.remoteAddress(); 581 } 582 583 final @property string remoteAddressStr() nothrow 584 { 585 try 586 { 587 auto a = remoteAddress; 588 return a is null ? "[null address]" : a.toString(); 589 } 590 catch (Exception e) 591 return "[error: " ~ e.msg ~ "]"; 592 } 593 594 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 595 { 596 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 597 if (enabled) 598 { 599 try 600 conn.setKeepAlive(time, interval); 601 catch (SocketFeatureException) 602 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 603 } 604 else 605 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 606 } 607 608 override string toString() const 609 { 610 import std.string; 611 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 612 } 613 } 614 615 // *************************************************************************** 616 617 enum DisconnectType 618 { 619 requested, // initiated by the application 620 graceful, // peer gracefully closed the connection 621 error // abnormal network condition 622 } 623 624 enum ConnectionState 625 { 626 /// The initial state, or the state after a disconnect was fully processed. 627 disconnected, 628 629 /// Name resolution. Currently done synchronously. 630 resolving, 631 632 /// A connection attempt is in progress. 633 connecting, 634 635 /// A connection is established. 636 connected, 637 638 /// Disconnecting in progress. No data can be sent or received at this point. 639 /// We are waiting for queued data to be actually sent before disconnecting. 640 disconnecting, 641 } 642 643 /// Common interface for connections and adapters. 644 interface IConnection 645 { 646 enum MAX_PRIORITY = 4; 647 enum DEFAULT_PRIORITY = 2; 648 649 static const defaultDisconnectReason = "Software closed the connection"; 650 651 /// Get connection state. 652 @property ConnectionState state(); 653 654 /// Has a connection been established? 655 deprecated final @property bool connected() { return state == ConnectionState.connected; } 656 657 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 658 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 659 660 /// Queue Data for sending. 661 void send(Data[] data, int priority = DEFAULT_PRIORITY); 662 663 /// ditto 664 final void send(Data datum, int priority = DEFAULT_PRIORITY) 665 { 666 Data[1] data; 667 data[0] = datum; 668 this.send(data); 669 data[] = Data.init; 670 } 671 672 /// Terminate the connection. 673 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 674 675 /// Callback setter for when a connection has been established 676 /// (if applicable). 677 alias void delegate() ConnectHandler; 678 @property void handleConnect(ConnectHandler value); /// ditto 679 680 /// Callback setter for when new data is read. 681 alias void delegate(Data data) ReadDataHandler; 682 @property void handleReadData(ReadDataHandler value); /// ditto 683 684 /// Callback setter for when a connection was closed. 685 alias void delegate(string reason, DisconnectType type) DisconnectHandler; 686 @property void handleDisconnect(DisconnectHandler value); /// ditto 687 } 688 689 // *************************************************************************** 690 691 class StreamConnection : GenericSocket, IConnection 692 { 693 private: 694 /// Blocks of data larger than this value are passed as unmanaged memory 695 /// (in Data objects). Blocks smaller than this value will be reallocated 696 /// on the managed heap. The disadvantage of placing large objects on the 697 /// managed heap is false pointers; the disadvantage of using Data for 698 /// small objects is wasted slack space due to the page size alignment 699 /// requirement. 700 enum UNMANAGED_THRESHOLD = 256; 701 702 /// Queue of addresses to try connecting to. 703 Address[] addressQueue; 704 705 ConnectionState _state; 706 final @property ConnectionState state(ConnectionState value) { return _state = value; } 707 708 public: 709 /// Get connection state. 710 override @property ConnectionState state() { return _state; } 711 712 protected: 713 abstract sizediff_t doSend(in void[] buffer); 714 abstract sizediff_t doReceive(void[] buffer); 715 716 /// The send buffers. 717 Data[][MAX_PRIORITY+1] outQueue; 718 /// Whether the first item from each queue has been partially sent (and thus can't be cancelled). 719 bool[MAX_PRIORITY+1] partiallySent; 720 721 /// Constructor used by a ServerSocket for new connections 722 this(Socket conn) 723 { 724 this(); 725 this.conn = conn; 726 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 727 if (conn) 728 socketManager.register(this); 729 updateFlags(); 730 } 731 732 final void updateFlags() 733 { 734 if (state == ConnectionState.connecting) 735 notifyWrite = true; 736 else 737 notifyWrite = writePending; 738 739 notifyRead = state == ConnectionState.connected && readDataHandler; 740 } 741 742 /// Called when a socket is readable. 743 override void onReadable() 744 { 745 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 746 static ubyte[0x10000] inBuffer; 747 auto received = doReceive(inBuffer); 748 749 if (received == 0) 750 return disconnect("Connection closed", DisconnectType.graceful); 751 752 if (received == Socket.ERROR) 753 { 754 // if (wouldHaveBlocked) 755 // { 756 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 757 // return; 758 // } 759 // else 760 onError("recv() error: " ~ lastSocketError); 761 } 762 else 763 { 764 debug (PRINTDATA) 765 { 766 std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress); 767 std.stdio.write(hexDump(inBuffer[0 .. received])); 768 std.stdio.stdout.flush(); 769 } 770 771 if (state == ConnectionState.disconnecting) 772 { 773 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because we are disconnecting", this); 774 } 775 else 776 if (!readDataHandler) 777 { 778 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because there is no data handler", this); 779 } 780 else 781 { 782 // Currently, unlike the D1 version of this module, 783 // we will always reallocate read network data. 784 // This disfavours code which doesn't need to store 785 // read data after processing it, but otherwise 786 // makes things simpler and safer all around. 787 788 if (received < UNMANAGED_THRESHOLD) 789 { 790 // Copy to the managed heap 791 readDataHandler(Data(inBuffer[0 .. received].dup)); 792 } 793 else 794 { 795 // Copy to unmanaged memory 796 readDataHandler(Data(inBuffer[0 .. received], true)); 797 } 798 } 799 } 800 } 801 802 /// Called when a socket is writable. 803 override void onWritable() 804 { 805 //scope(success) updateFlags(); 806 onWritableImpl(); 807 updateFlags(); 808 } 809 810 // Work around scope(success) breaking debugger stack traces 811 final private void onWritableImpl() 812 { 813 if (state == ConnectionState.connecting) 814 { 815 state = ConnectionState.connected; 816 817 //debug writefln("[%s] Connected", remoteAddress); 818 try 819 setKeepAlive(); 820 catch (Exception e) 821 return disconnect(e.msg, DisconnectType.error); 822 if (connectHandler) 823 connectHandler(); 824 return; 825 } 826 //debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 827 828 foreach (priority, ref queue; outQueue) 829 while (queue.length) 830 { 831 auto pdata = queue.ptr; // pointer to first data 832 833 ptrdiff_t sent = 0; 834 if (pdata.length) 835 { 836 sent = doSend(pdata.contents); 837 debug (ASOCKETS) writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 838 } 839 else 840 { 841 debug (ASOCKETS) writefln("\t\t%s: empty Data object", this); 842 } 843 844 if (sent == Socket.ERROR) 845 { 846 if (wouldHaveBlocked()) 847 return; 848 else 849 return onError("send() error: " ~ lastSocketError); 850 } 851 else 852 if (sent < pdata.length) 853 { 854 if (sent > 0) 855 { 856 *pdata = (*pdata)[sent..pdata.length]; 857 partiallySent[priority] = true; 858 } 859 return; 860 } 861 else 862 { 863 assert(sent == pdata.length); 864 //debug writefln("[%s] Sent data:", remoteAddress); 865 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 866 pdata.clear(); 867 queue = queue[1..$]; 868 partiallySent[priority] = false; 869 if (queue.length == 0) 870 queue = null; 871 } 872 } 873 874 // outQueue is now empty 875 if (handleBufferFlushed) 876 handleBufferFlushed(); 877 if (state == ConnectionState.disconnecting) 878 { 879 debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 880 close(); 881 } 882 } 883 884 /// Called when an error occurs on the socket. 885 override void onError(string reason) 886 { 887 if (state == ConnectionState.disconnecting) 888 { 889 debug (ASOCKETS) writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 890 return close(); 891 } 892 893 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 894 disconnect("Socket error: " ~ reason, DisconnectType.error); 895 } 896 897 this() 898 { 899 } 900 901 public: 902 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 903 /// The disconnect handler will be called when all data has been flushed. 904 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 905 { 906 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 907 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected, "Attempting to disconnect on a %s socket".format(state)); 908 909 if (writePending) 910 { 911 if (type==DisconnectType.requested) 912 { 913 assert(conn, "Attempting to disconnect on an uninitialized socket"); 914 // queue disconnect after all data is sent 915 debug (ASOCKETS) writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 916 state = ConnectionState.disconnecting; 917 //setIdleTimeout(30.seconds); 918 if (disconnectHandler) 919 disconnectHandler(reason, type); 920 updateFlags(); 921 return; 922 } 923 else 924 discardQueues(); 925 } 926 927 debug (ASOCKETS) writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 928 929 if (state == ConnectionState.connecting || state == ConnectionState.connected) 930 close(); 931 else 932 assert(conn is null, "Registered but %s socket".format(state)); 933 934 if (disconnectHandler) 935 disconnectHandler(reason, type); 936 updateFlags(); 937 } 938 939 private final void close() 940 { 941 assert(conn, "Attempting to close an unregistered socket"); 942 socketManager.unregister(this); 943 conn.close(); 944 conn = null; 945 outQueue[] = null; 946 state = ConnectionState.disconnected; 947 } 948 949 /// Append data to the send buffer. 950 void send(Data[] data, int priority = DEFAULT_PRIORITY) 951 { 952 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 953 outQueue[priority] ~= data; 954 notifyWrite = true; // Fast updateFlags() 955 956 debug (PRINTDATA) 957 { 958 std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress); 959 foreach (datum; data) 960 if (datum.length) 961 std.stdio.write(hexDump(datum.contents)); 962 else 963 std.stdio.writeln("(empty Data)"); 964 std.stdio.stdout.flush(); 965 } 966 } 967 968 /// ditto 969 alias send = IConnection.send; 970 971 final void clearQueue(int priority) 972 { 973 if (partiallySent[priority]) 974 { 975 assert(outQueue[priority].length > 0); 976 outQueue[priority] = outQueue[priority][0..1]; 977 } 978 else 979 outQueue[priority] = null; 980 updateFlags(); 981 } 982 983 /// Clears all queues, even partially sent content. 984 private final void discardQueues() 985 { 986 foreach (priority; 0..MAX_PRIORITY+1) 987 { 988 outQueue[priority] = null; 989 partiallySent[priority] = false; 990 } 991 updateFlags(); 992 } 993 994 @property 995 final bool writePending() 996 { 997 foreach (queue; outQueue) 998 if (queue.length) 999 return true; 1000 return false; 1001 } 1002 1003 final bool queuePresent(int priority) 1004 { 1005 if (partiallySent[priority]) 1006 { 1007 assert(outQueue[priority].length > 0); 1008 return outQueue[priority].length > 1; 1009 } 1010 else 1011 return outQueue[priority].length > 0; 1012 } 1013 1014 public: 1015 private ConnectHandler connectHandler; 1016 /// Callback for when a connection has been established. 1017 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 1018 1019 /// Callback for when the send buffer has been flushed. 1020 void delegate() handleBufferFlushed; 1021 1022 private ReadDataHandler readDataHandler; 1023 /// Callback for incoming data. 1024 /// Data will not be received unless this handler is set. 1025 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 1026 1027 private DisconnectHandler disconnectHandler; 1028 /// Callback for when a connection was closed. 1029 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 1030 } 1031 1032 // *************************************************************************** 1033 1034 /// A POSIX file stream. 1035 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1036 /// Does not dup the given file descriptor, so "disconnecting" this connection 1037 /// will close it. 1038 version (Posix) 1039 class FileConnection : StreamConnection 1040 { 1041 this(int fileno) 1042 { 1043 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1044 conn.blocking = false; 1045 super(conn); 1046 } 1047 1048 protected: 1049 import core.sys.posix.unistd : read, write; 1050 1051 override sizediff_t doSend(in void[] buffer) 1052 { 1053 return write(socket.handle, buffer.ptr, buffer.length); 1054 } 1055 1056 override sizediff_t doReceive(void[] buffer) 1057 { 1058 return read(socket.handle, buffer.ptr, buffer.length); 1059 } 1060 } 1061 1062 // *************************************************************************** 1063 1064 /// An asynchronous TCP connection. 1065 class TcpConnection : StreamConnection 1066 { 1067 protected: 1068 this(Socket conn) 1069 { 1070 super(conn); 1071 } 1072 1073 override sizediff_t doSend(in void[] buffer) 1074 { 1075 return conn.send(buffer); 1076 } 1077 1078 override sizediff_t doReceive(void[] buffer) 1079 { 1080 return conn.receive(buffer); 1081 } 1082 1083 final void tryNextAddress() 1084 { 1085 assert(state == ConnectionState.connecting); 1086 auto address = addressQueue[0]; 1087 addressQueue = addressQueue[1..$]; 1088 1089 try 1090 { 1091 conn = new Socket(address.addressFamily(), SocketType.STREAM, ProtocolType.TCP); 1092 conn.blocking = false; 1093 1094 socketManager.register(this); 1095 updateFlags(); 1096 debug (ASOCKETS) writefln("Attempting connection to %s", address.toString()); 1097 conn.connect(address); 1098 } 1099 catch (SocketException e) 1100 return onError("Connect error: " ~ e.msg); 1101 } 1102 1103 /// Called when an error occurs on the socket. 1104 override void onError(string reason) 1105 { 1106 if (state == ConnectionState.connecting && addressQueue.length) 1107 { 1108 socketManager.unregister(this); 1109 conn.close(); 1110 conn = null; 1111 1112 return tryNextAddress(); 1113 } 1114 1115 super.onError(reason); 1116 } 1117 1118 public: 1119 /// Default constructor 1120 this() 1121 { 1122 debug (ASOCKETS) writefln("New TcpConnection @ %s", cast(void*)this); 1123 } 1124 1125 /// Start establishing a connection. 1126 final void connect(string host, ushort port) 1127 { 1128 assert(host.length, "Empty host"); 1129 assert(port, "No port specified"); 1130 1131 debug (ASOCKETS) writefln("Connecting to %s:%s", host, port); 1132 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1133 assert(!conn); 1134 1135 state = ConnectionState.resolving; 1136 1137 try 1138 { 1139 addressQueue = getAddress(host, port); 1140 enforce(addressQueue.length, "No addresses found"); 1141 debug (ASOCKETS) 1142 { 1143 writefln("Resolved to %s addresses:", addressQueue.length); 1144 foreach (address; addressQueue) 1145 writefln("- %s", address.toString()); 1146 } 1147 1148 state = ConnectionState.connecting; 1149 if (addressQueue.length > 1) 1150 { 1151 import std.random : randomShuffle; 1152 randomShuffle(addressQueue); 1153 } 1154 } 1155 catch (SocketException e) 1156 return onError("Lookup error: " ~ e.msg); 1157 1158 tryNextAddress(); 1159 } 1160 1161 } 1162 1163 // *************************************************************************** 1164 1165 /// An asynchronous TCP connection server. 1166 final class TcpServer 1167 { 1168 private: 1169 /// Class that actually performs listening on a certain address family 1170 final class Listener : GenericSocket 1171 { 1172 this(Socket conn) 1173 { 1174 debug (ASOCKETS) writefln("New Listener @ %s", cast(void*)this); 1175 this.conn = conn; 1176 socketManager.register(this); 1177 } 1178 1179 /// Called when a socket is readable. 1180 override void onReadable() 1181 { 1182 debug (ASOCKETS) writefln("Accepting connection from listener @ %s", cast(void*)this); 1183 Socket acceptSocket = conn.accept(); 1184 acceptSocket.blocking = false; 1185 if (handleAccept) 1186 { 1187 TcpConnection connection = new TcpConnection(acceptSocket); 1188 debug (ASOCKETS) writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress); 1189 connection.setKeepAlive(); 1190 //assert(connection.connected); 1191 //connection.connected = true; 1192 acceptHandler(connection); 1193 } 1194 else 1195 acceptSocket.close(); 1196 } 1197 1198 /// Called when a socket is writable. 1199 override void onWritable() 1200 { 1201 } 1202 1203 /// Called when an error occurs on the socket. 1204 override void onError(string reason) 1205 { 1206 close(); // call parent 1207 } 1208 1209 void closeListener() 1210 { 1211 assert(conn); 1212 socketManager.unregister(this); 1213 conn.close(); 1214 conn = null; 1215 } 1216 } 1217 1218 /// Whether the socket is listening. 1219 bool listening; 1220 /// Listener instances 1221 Listener[] listeners; 1222 1223 final void updateFlags() 1224 { 1225 foreach (listener; listeners) 1226 listener.notifyRead = handleAccept !is null; 1227 } 1228 1229 public: 1230 /// Start listening on this socket. 1231 ushort listen(ushort port, string addr = null) 1232 { 1233 debug(ASOCKETS) writefln("Attempting to listen on %s:%d", addr, port); 1234 //assert(!listening, "Attempting to listen on a listening socket"); 1235 1236 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1237 1238 foreach (ref addressInfo; addressInfos) 1239 { 1240 if (addressInfo.family != AddressFamily.INET && port == 0) 1241 continue; // listen on random ports only on IPv4 for now 1242 1243 try 1244 { 1245 Socket conn = new Socket(addressInfo); 1246 conn.blocking = false; 1247 if (addressInfo.family == AddressFamily.INET6) 1248 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1249 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1250 1251 conn.bind(addressInfo.address); 1252 conn.listen(8); 1253 1254 if (addressInfo.family == AddressFamily.INET) 1255 port = to!ushort(conn.localAddress().toPortString()); 1256 1257 listeners ~= new Listener(conn); 1258 } 1259 catch (SocketException e) 1260 { 1261 debug(ASOCKETS) writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1262 debug(ASOCKETS) writeln(e.msg); 1263 } 1264 } 1265 1266 if (listeners.length==0) 1267 throw new Exception("Unable to bind service"); 1268 1269 listening = true; 1270 1271 updateFlags(); 1272 1273 return port; 1274 } 1275 1276 @property Address[] localAddresses() 1277 { 1278 Address[] result; 1279 foreach (listener; listeners) 1280 result ~= listener.localAddress; 1281 return result; 1282 } 1283 1284 @property bool isListening() 1285 { 1286 return listening; 1287 } 1288 1289 /// Stop listening on this socket. 1290 void close() 1291 { 1292 foreach (listener;listeners) 1293 listener.closeListener(); 1294 listeners = null; 1295 listening = false; 1296 if (handleClose) 1297 handleClose(); 1298 } 1299 1300 public: 1301 /// Callback for when the socket was closed. 1302 void delegate() handleClose; 1303 1304 private void delegate(TcpConnection incoming) acceptHandler; 1305 /// Callback for an incoming connection. 1306 /// Connections will not be accepted unless this handler is set. 1307 @property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; } 1308 /// ditto 1309 @property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); } 1310 } 1311 1312 // *************************************************************************** 1313 1314 /// Base class for a connection adapter. 1315 /// By itself, does nothing. 1316 class ConnectionAdapter : IConnection 1317 { 1318 IConnection next; 1319 1320 this(IConnection next) 1321 { 1322 this.next = next; 1323 next.handleConnect = &onConnect; 1324 next.handleDisconnect = &onDisconnect; 1325 } 1326 1327 @property ConnectionState state() { return next.state; } 1328 1329 /// Queue Data for sending. 1330 void send(Data[] data, int priority) 1331 { 1332 next.send(data, priority); 1333 } 1334 1335 alias send = IConnection.send; /// ditto 1336 1337 /// Terminate the connection. 1338 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1339 { 1340 next.disconnect(reason, type); 1341 } 1342 1343 protected void onConnect() 1344 { 1345 if (connectHandler) 1346 connectHandler(); 1347 } 1348 1349 protected void onReadData(Data data) 1350 { 1351 // onReadData should be fired only if readDataHandler is set 1352 readDataHandler(data); 1353 } 1354 1355 protected void onDisconnect(string reason, DisconnectType type) 1356 { 1357 if (disconnectHandler) 1358 disconnectHandler(reason, type); 1359 } 1360 1361 /// Callback for when a connection has been established. 1362 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1363 private ConnectHandler connectHandler; 1364 1365 /// Callback setter for when new data is read. 1366 @property void handleReadData(ReadDataHandler value) 1367 { 1368 readDataHandler = value; 1369 next.handleReadData = value ? &onReadData : null ; 1370 } 1371 private ReadDataHandler readDataHandler; 1372 1373 /// Callback setter for when a connection was closed. 1374 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1375 private DisconnectHandler disconnectHandler; 1376 } 1377 1378 // *************************************************************************** 1379 1380 /// Adapter for connections with a line-based protocol. 1381 /// Splits data stream into delimiter-separated lines. 1382 class LineBufferedAdapter : ConnectionAdapter 1383 { 1384 /// The protocol's line delimiter. 1385 string delimiter = "\r\n"; 1386 1387 this(IConnection next) 1388 { 1389 super(next); 1390 } 1391 1392 /// Append a line to the send buffer. 1393 void send(string line) 1394 { 1395 //super.send(Data(line ~ delimiter)); 1396 // https://issues.dlang.org/show_bug.cgi?id=13985 1397 ConnectionAdapter ca = this; 1398 ca.send(Data(line ~ delimiter)); 1399 } 1400 1401 protected: 1402 /// The receive buffer. 1403 Data inBuffer; 1404 1405 /// Called when data has been received. 1406 final override void onReadData(Data data) 1407 { 1408 import std.string; 1409 auto oldBufferLength = inBuffer.length; 1410 if (oldBufferLength) 1411 inBuffer ~= data; 1412 else 1413 inBuffer = data; 1414 1415 if (delimiter.length == 1) 1416 { 1417 import core.stdc.string; // memchr 1418 1419 char c = delimiter[0]; 1420 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1421 while (p) 1422 { 1423 sizediff_t index = p - inBuffer.ptr; 1424 processLine(index); 1425 1426 p = memchr(inBuffer.ptr, c, inBuffer.length); 1427 } 1428 } 1429 else 1430 { 1431 sizediff_t index; 1432 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1433 while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0) 1434 processLine(index); 1435 } 1436 } 1437 1438 final void processLine(size_t index) 1439 { 1440 auto line = inBuffer[0..index]; 1441 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1442 super.onReadData(line); 1443 } 1444 1445 override void onDisconnect(string reason, DisconnectType type) 1446 { 1447 super.onDisconnect(reason, type); 1448 inBuffer.clear(); 1449 } 1450 } 1451 1452 // *************************************************************************** 1453 1454 /// Fires an event handler or disconnects connections 1455 /// after a period of inactivity. 1456 class TimeoutAdapter : ConnectionAdapter 1457 { 1458 this(IConnection next) 1459 { 1460 debug (ASOCKETS) writefln("New TimeoutAdapter @ %s", cast(void*)this); 1461 super(next); 1462 } 1463 1464 void cancelIdleTimeout() 1465 { 1466 debug (ASOCKETS) writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 1467 assert(idleTask !is null); 1468 assert(idleTask.isWaiting()); 1469 idleTask.cancel(); 1470 } 1471 1472 void resumeIdleTimeout() 1473 { 1474 debug (ASOCKETS) writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 1475 assert(state == ConnectionState.connected); 1476 assert(idleTask !is null); 1477 assert(!idleTask.isWaiting()); 1478 mainTimer.add(idleTask); 1479 } 1480 1481 final void setIdleTimeout(Duration duration) 1482 { 1483 debug (ASOCKETS) writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 1484 assert(duration > Duration.zero); 1485 if (idleTask is null) 1486 { 1487 idleTask = new TimerTask(duration); 1488 idleTask.handleTask = &onTask_Idle; 1489 } 1490 else 1491 { 1492 if (idleTask.isWaiting()) 1493 idleTask.cancel(); 1494 idleTask.delay = duration; 1495 } 1496 if (state == ConnectionState.connected) 1497 mainTimer.add(idleTask); 1498 } 1499 1500 void markNonIdle() 1501 { 1502 debug (ASOCKETS) writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 1503 assert(idleTask !is null); 1504 if (handleNonIdle) 1505 handleNonIdle(); 1506 if (idleTask.isWaiting()) 1507 idleTask.restart(); 1508 } 1509 1510 /// Callback for when a connection has stopped responding. 1511 /// If unset, the connection will be disconnected. 1512 void delegate() handleIdleTimeout; 1513 1514 /// Callback for when a connection is marked as non-idle 1515 /// (when data is received). 1516 void delegate() handleNonIdle; 1517 1518 protected: 1519 override void onConnect() 1520 { 1521 debug (ASOCKETS) writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 1522 super.onConnect(); 1523 if (idleTask) 1524 resumeIdleTimeout(); 1525 } 1526 1527 override void onReadData(Data data) 1528 { 1529 debug (ASOCKETS) writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 1530 markNonIdle(); 1531 super.onReadData(data); 1532 } 1533 1534 override void onDisconnect(string reason, DisconnectType type) 1535 { 1536 debug (ASOCKETS) writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 1537 super.onDisconnect(reason, type); 1538 if (idleTask && idleTask.isWaiting()) 1539 idleTask.cancel(); 1540 } 1541 1542 private: 1543 TimerTask idleTask; 1544 1545 final void onTask_Idle(Timer timer, TimerTask task) 1546 { 1547 if (state == ConnectionState.disconnecting) 1548 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 1549 1550 if (state != ConnectionState.connected) 1551 return; 1552 1553 if (handleIdleTimeout) 1554 { 1555 handleIdleTimeout(); 1556 if (state == ConnectionState.connected) 1557 { 1558 assert(!idleTask.isWaiting()); 1559 mainTimer.add(idleTask); 1560 } 1561 } 1562 else 1563 disconnect("Time-out", DisconnectType.error); 1564 } 1565 } 1566 1567 // *************************************************************************** 1568 1569 unittest 1570 { 1571 void testTimer() 1572 { 1573 bool fired; 1574 setTimeout({fired = true;}, 10.msecs); 1575 socketManager.loop(); 1576 assert(fired); 1577 } 1578 1579 testTimer(); 1580 }