1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <vladimir@thecybershadow.net> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import core.stdc.stdint : int32_t; 24 25 import std.exception; 26 import std.socket; 27 import std.string : format; 28 public import std.socket : Address, AddressInfo, Socket; 29 30 debug(ASOCKETS) import std.stdio; 31 debug(PRINTDATA) static import std.stdio; 32 debug(PRINTDATA) import ae.utils.text : hexDump; 33 private import std.conv : to; 34 35 36 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 37 static import ae.utils.array; 38 39 version(LIBEV) 40 { 41 import deimos.ev; 42 pragma(lib, "ev"); 43 } 44 45 version(Windows) 46 { 47 import core.sys.windows.windows : Sleep; 48 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 49 } 50 else 51 enum USE_SLEEP = false; 52 53 /// Flags that determine socket wake-up events. 54 int eventCounter; 55 56 version(LIBEV) 57 { 58 // Watchers are a GenericSocket field (as declared in SocketMixin). 59 // Use one watcher per read and write event. 60 // Start/stop those as higher-level code declares interest in those events. 61 // Use the "data" ev_io field to store the parent GenericSocket address. 62 // Also use the "data" field as a flag to indicate whether the watcher is active 63 // (data is null when the watcher is stopped). 64 65 struct SocketManager 66 { 67 private: 68 size_t count; 69 70 extern(C) 71 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 72 { 73 eventCounter++; 74 auto socket = cast(GenericSocket)w.data; 75 assert(socket, "libev event fired on stopped watcher"); 76 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 77 78 if (revents & EV_READ) 79 socket.onReadable(); 80 else 81 if (revents & EV_WRITE) 82 socket.onWritable(); 83 else 84 assert(false, "Unknown event fired from libev"); 85 86 // TODO? Need to get proper SocketManager instance to call updateTimer on 87 socketManager.updateTimer(false); 88 } 89 90 ev_timer evTimer; 91 MonoTime lastNextEvent = MonoTime.max; 92 93 extern(C) 94 static void timerCallback(ev_loop_t* l, ev_timer* w, int revents) 95 { 96 eventCounter++; 97 debug (ASOCKETS) writefln("Timer callback called."); 98 mainTimer.prod(); 99 100 socketManager.updateTimer(true); 101 debug (ASOCKETS) writefln("Timer callback exiting."); 102 } 103 104 void updateTimer(bool force) 105 { 106 auto nextEvent = mainTimer.getNextEvent(); 107 if (force || lastNextEvent != nextEvent) 108 { 109 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 110 if (nextEvent == MonoTime.max) // Stopping 111 { 112 if (lastNextEvent != MonoTime.max) 113 ev_timer_stop(ev_default_loop(0), &evTimer); 114 } 115 else 116 { 117 auto remaining = mainTimer.getRemainingTime(); 118 while (remaining <= Duration.zero) 119 { 120 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 121 mainTimer.prod(); 122 remaining = mainTimer.getRemainingTime(); 123 } 124 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 125 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 126 if (lastNextEvent == MonoTime.max) // Starting 127 { 128 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 129 ev_timer_start(ev_default_loop(0), &evTimer); 130 } 131 else // Adjusting 132 { 133 evTimer.repeat = tstamp; 134 ev_timer_again(ev_default_loop(0), &evTimer); 135 } 136 } 137 lastNextEvent = nextEvent; 138 } 139 } 140 141 /// Register a socket with the manager. 142 void register(GenericSocket socket) 143 { 144 debug (ASOCKETS) writefln("Registering %s", socket); 145 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 146 auto fd = socket.conn.handle; 147 assert(fd, "Must have fd before socket registration"); 148 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 149 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 150 count++; 151 } 152 153 /// Unregister a socket with the manager. 154 void unregister(GenericSocket socket) 155 { 156 debug (ASOCKETS) writefln("Unregistering %s", socket); 157 socket.notifyRead = false; 158 socket.notifyWrite = false; 159 count--; 160 } 161 162 public: 163 size_t size() 164 { 165 return count; 166 } 167 168 /// Loop continuously until no sockets are left. 169 void loop() 170 { 171 auto evLoop = ev_default_loop(0); 172 enforce(evLoop, "libev initialization failure"); 173 174 updateTimer(true); 175 debug (ASOCKETS) writeln("ev_run"); 176 ev_run(ev_default_loop(0), 0); 177 } 178 } 179 180 private mixin template SocketMixin() 181 { 182 private ev_io evRead, evWrite; 183 184 private void setWatcherState(ref ev_io ev, bool newValue, int event) 185 { 186 if (!conn) 187 { 188 // Can happen when setting delegates before connecting. 189 return; 190 } 191 192 if (newValue && !ev.data) 193 { 194 // Start 195 ev.data = cast(void*)this; 196 ev_io_start(ev_default_loop(0), &ev); 197 } 198 else 199 if (!newValue && ev.data) 200 { 201 // Stop 202 assert(ev.data is cast(void*)this); 203 ev.data = null; 204 ev_io_stop(ev_default_loop(0), &ev); 205 } 206 } 207 208 /// Interested in read notifications (onReadable)? 209 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 210 /// Interested in write notifications (onWritable)? 211 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 212 213 debug ~this() 214 { 215 // The LIBEV SocketManager holds no references to registered sockets. 216 // TODO: Add a doubly-linked list? 217 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 218 } 219 } 220 } 221 else // Use select 222 { 223 struct SocketManager 224 { 225 private: 226 enum FD_SETSIZE = 1024; 227 228 /// List of all sockets to poll. 229 GenericSocket[] sockets; 230 231 /// Debug AA to check for dangling socket references. 232 debug GenericSocket[socket_t] socketHandles; 233 234 /// Register a socket with the manager. 235 void register(GenericSocket conn) 236 { 237 debug (ASOCKETS) writefln("Registering %s (%d total)", conn, sockets.length + 1); 238 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 239 sockets ~= conn; 240 241 debug 242 { 243 auto handle = conn.socket.handle; 244 assert(handle != socket_t.init, "Can't register a closed socket"); 245 assert(handle !in socketHandles, "This socket handle is already registered"); 246 socketHandles[handle] = conn; 247 } 248 } 249 250 /// Unregister a socket with the manager. 251 void unregister(GenericSocket conn) 252 { 253 debug (ASOCKETS) writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 254 255 debug 256 { 257 auto handle = conn.socket.handle; 258 assert(handle != socket_t.init, "Can't unregister a closed socket"); 259 auto pconn = handle in socketHandles; 260 assert(pconn, "This socket handle is not registered"); 261 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 262 socketHandles.remove(handle); 263 } 264 265 foreach (size_t i, GenericSocket j; sockets) 266 if (j is conn) 267 { 268 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 269 return; 270 } 271 assert(false, "Socket not registered"); 272 } 273 274 void delegate()[] idleHandlers; 275 276 public: 277 size_t size() 278 { 279 return sockets.length; 280 } 281 282 /// Loop continuously until no sockets are left. 283 void loop() 284 { 285 debug (ASOCKETS) writeln("Starting event loop."); 286 287 SocketSet readset, writeset, errorset; 288 size_t sockcount; 289 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 290 readset = new SocketSet(setSize); 291 writeset = new SocketSet(setSize); 292 errorset = new SocketSet(setSize); 293 while (true) 294 { 295 uint minSize = 0; 296 version(Windows) 297 minSize = cast(uint)sockets.length; 298 else 299 { 300 foreach (s; sockets) 301 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 302 minSize = s.socket.handle; 303 } 304 minSize++; 305 306 if (setSize < minSize) 307 { 308 debug (ASOCKETS) writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 309 setSize = minSize * 2; 310 readset = new SocketSet(setSize); 311 writeset = new SocketSet(setSize); 312 errorset = new SocketSet(setSize); 313 } 314 else 315 { 316 readset.reset(); 317 writeset.reset(); 318 errorset.reset(); 319 } 320 321 sockcount = 0; 322 bool haveActive; 323 debug (ASOCKETS) writeln("Populating sets"); 324 foreach (GenericSocket conn; sockets) 325 { 326 if (!conn.socket) 327 continue; 328 sockcount++; 329 if (!conn.daemon) 330 haveActive = true; 331 332 debug (ASOCKETS) writef("\t%s:", conn); 333 if (conn.notifyRead) 334 { 335 readset.add(conn.socket); 336 debug (ASOCKETS) write(" READ"); 337 } 338 if (conn.notifyWrite) 339 { 340 writeset.add(conn.socket); 341 debug (ASOCKETS) write(" WRITE"); 342 } 343 errorset.add(conn.socket); 344 debug (ASOCKETS) writeln(); 345 } 346 debug (ASOCKETS) 347 { 348 writefln("Sets populated as follows:"); 349 printSets(readset, writeset, errorset); 350 } 351 352 debug (ASOCKETS) writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...", 353 sockcount, 354 mainTimer.isWaiting() ? "with" : "no", 355 idleHandlers.length, 356 ); 357 if (!haveActive && !mainTimer.isWaiting()) 358 { 359 debug (ASOCKETS) writeln("No more sockets or timer events, exiting loop."); 360 break; 361 } 362 363 debug (ASOCKETS) { stdout.flush(); stderr.flush(); } 364 365 int events; 366 if (idleHandlers.length) 367 { 368 if (sockcount==0) 369 events = 0; 370 else 371 events = Socket.select(readset, writeset, errorset, 0.seconds); 372 } 373 else 374 if (USE_SLEEP && sockcount==0) 375 { 376 version(Windows) 377 { 378 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 379 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 380 if (duration <= 0) 381 duration = 1; // Avoid busywait 382 else 383 if (duration > int.max) 384 duration = int.max; 385 Sleep(cast(int)duration); 386 events = 0; 387 } 388 else 389 assert(0); 390 } 391 else 392 if (mainTimer.isWaiting()) 393 events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime()); 394 else 395 events = Socket.select(readset, writeset, errorset); 396 397 debug (ASOCKETS) writefln("%d events fired.", events); 398 399 if (events > 0) 400 { 401 // Handle just one event at a time, as the first 402 // handler might invalidate select()'s results. 403 handleEvent(readset, writeset, errorset); 404 } 405 else 406 if (idleHandlers.length) 407 { 408 import ae.utils.array; 409 auto handler = idleHandlers.shift(); 410 411 // Rotate the idle handler queue before running it, 412 // in case the handler unregisters itself. 413 idleHandlers ~= handler; 414 415 handler(); 416 } 417 418 // Timers may invalidate our select results, so fire them after processing the latter 419 mainTimer.prod(); 420 421 eventCounter++; 422 } 423 } 424 425 debug (ASOCKETS) 426 void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset) 427 { 428 foreach (GenericSocket conn; sockets) 429 { 430 if (!conn.socket) 431 writefln("\t\t%s is unset", conn); 432 else 433 { 434 if (readset.isSet(conn.socket)) 435 writefln("\t\t%s is readable", conn); 436 if (writeset.isSet(conn.socket)) 437 writefln("\t\t%s is writable", conn); 438 if (errorset.isSet(conn.socket)) 439 writefln("\t\t%s is errored", conn); 440 } 441 } 442 } 443 444 void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset) 445 { 446 debug (ASOCKETS) 447 { 448 writefln("\tSelect results:"); 449 printSets(readset, writeset, errorset); 450 } 451 452 foreach (GenericSocket conn; sockets) 453 { 454 if (!conn.socket) 455 continue; 456 457 if (readset.isSet(conn.socket)) 458 { 459 debug (ASOCKETS) writefln("\t%s - calling onReadable", conn); 460 return conn.onReadable(); 461 } 462 else 463 if (writeset.isSet(conn.socket)) 464 { 465 debug (ASOCKETS) writefln("\t%s - calling onWritable", conn); 466 return conn.onWritable(); 467 } 468 else 469 if (errorset.isSet(conn.socket)) 470 { 471 debug (ASOCKETS) writefln("\t%s - calling onError", conn); 472 return conn.onError("select() error: " ~ conn.socket.getErrorText()); 473 } 474 } 475 476 assert(false, "select() reported events available, but no registered sockets are set"); 477 } 478 } 479 480 // Use UFCS to allow removeIdleHandler to have a predicate with context 481 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 482 { 483 foreach (i, idleHandler; socketManager.idleHandlers) 484 assert(handler !is idleHandler); 485 486 socketManager.idleHandlers ~= handler; 487 } 488 489 static bool isFun(T)(T a, T b) { return a is b; } 490 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 491 { 492 foreach (i, idleHandler; socketManager.idleHandlers) 493 if (pred(idleHandler, args)) 494 { 495 import std.algorithm; 496 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 497 return; 498 } 499 assert(false, "No such idle handler"); 500 } 501 502 private mixin template SocketMixin() 503 { 504 /// Interested in read notifications (onReadable)? 505 bool notifyRead; 506 /// Interested in write notifications (onWritable)? 507 bool notifyWrite; 508 } 509 } 510 511 /// The default socket manager. 512 SocketManager socketManager; 513 514 // *************************************************************************** 515 516 /// General methods for an asynchronous socket. 517 private abstract class GenericSocket 518 { 519 /// Declares notifyRead and notifyWrite. 520 mixin SocketMixin; 521 522 protected: 523 /// The socket this class wraps. 524 Socket conn; 525 526 protected: 527 /// Retrieve the socket class this class wraps. 528 @property final Socket socket() 529 { 530 return conn; 531 } 532 533 void onReadable() 534 { 535 } 536 537 void onWritable() 538 { 539 } 540 541 void onError(string reason) 542 { 543 } 544 545 public: 546 /// allow getting the address of connections that are already disconnected 547 private Address cachedLocalAddress, cachedRemoteAddress; 548 549 /// Don't block the process from exiting. 550 /// TODO: Not implemented with libev 551 bool daemon; 552 553 final @property Address localAddress() 554 { 555 if (cachedLocalAddress !is null) 556 return cachedLocalAddress; 557 else 558 if (conn is null) 559 return null; 560 else 561 return cachedLocalAddress = conn.localAddress(); 562 } 563 564 final @property string localAddressStr() nothrow 565 { 566 try 567 { 568 auto a = localAddress; 569 return a is null ? "[null address]" : a.toString(); 570 } 571 catch (Exception e) 572 return "[error: " ~ e.msg ~ "]"; 573 } 574 575 final @property Address remoteAddress() 576 { 577 if (cachedRemoteAddress !is null) 578 return cachedRemoteAddress; 579 else 580 if (conn is null) 581 return null; 582 else 583 return cachedRemoteAddress = conn.remoteAddress(); 584 } 585 586 final @property string remoteAddressStr() nothrow 587 { 588 try 589 { 590 auto a = remoteAddress; 591 return a is null ? "[null address]" : a.toString(); 592 } 593 catch (Exception e) 594 return "[error: " ~ e.msg ~ "]"; 595 } 596 597 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 598 { 599 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 600 if (enabled) 601 { 602 try 603 conn.setKeepAlive(time, interval); 604 catch (SocketException) 605 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 606 } 607 else 608 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 609 } 610 611 override string toString() const 612 { 613 import std.string; 614 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 615 } 616 } 617 618 // *************************************************************************** 619 620 enum DisconnectType 621 { 622 requested, // initiated by the application 623 graceful, // peer gracefully closed the connection 624 error // abnormal network condition 625 } 626 627 enum ConnectionState 628 { 629 /// The initial state, or the state after a disconnect was fully processed. 630 disconnected, 631 632 /// Name resolution. Currently done synchronously. 633 resolving, 634 635 /// A connection attempt is in progress. 636 connecting, 637 638 /// A connection is established. 639 connected, 640 641 /// Disconnecting in progress. No data can be sent or received at this point. 642 /// We are waiting for queued data to be actually sent before disconnecting. 643 disconnecting, 644 } 645 646 /// Common interface for connections and adapters. 647 interface IConnection 648 { 649 enum MAX_PRIORITY = 4; 650 enum DEFAULT_PRIORITY = 2; 651 652 static const defaultDisconnectReason = "Software closed the connection"; 653 654 /// Get connection state. 655 @property ConnectionState state(); 656 657 /// Has a connection been established? 658 deprecated final @property bool connected() { return state == ConnectionState.connected; } 659 660 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 661 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 662 663 /// Queue Data for sending. 664 void send(Data[] data, int priority = DEFAULT_PRIORITY); 665 666 /// ditto 667 final void send(Data datum, int priority = DEFAULT_PRIORITY) 668 { 669 Data[1] data; 670 data[0] = datum; 671 this.send(data, priority); 672 data[] = Data.init; 673 } 674 675 /// Terminate the connection. 676 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 677 678 /// Callback setter for when a connection has been established 679 /// (if applicable). 680 alias void delegate() ConnectHandler; 681 @property void handleConnect(ConnectHandler value); /// ditto 682 683 /// Callback setter for when new data is read. 684 alias void delegate(Data data) ReadDataHandler; 685 @property void handleReadData(ReadDataHandler value); /// ditto 686 687 /// Callback setter for when a connection was closed. 688 alias void delegate(string reason, DisconnectType type) DisconnectHandler; 689 @property void handleDisconnect(DisconnectHandler value); /// ditto 690 691 /// Callback setter for when all queued data has been sent. 692 alias void delegate() BufferFlushedHandler; 693 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 694 } 695 696 // *************************************************************************** 697 698 class Connection : GenericSocket, IConnection 699 { 700 private: 701 /// Blocks of data larger than this value are passed as unmanaged memory 702 /// (in Data objects). Blocks smaller than this value will be reallocated 703 /// on the managed heap. The disadvantage of placing large objects on the 704 /// managed heap is false pointers; the disadvantage of using Data for 705 /// small objects is wasted slack space due to the page size alignment 706 /// requirement. 707 enum UNMANAGED_THRESHOLD = 256; 708 709 ConnectionState _state; 710 final @property ConnectionState state(ConnectionState value) { return _state = value; } 711 712 public: 713 /// Get connection state. 714 override @property ConnectionState state() { return _state; } 715 716 protected: 717 abstract sizediff_t doSend(in void[] buffer); 718 abstract sizediff_t doReceive(void[] buffer); 719 720 /// The send buffers. 721 Data[][MAX_PRIORITY+1] outQueue; 722 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 723 int partiallySent = -1; 724 725 /// Constructor used by a ServerSocket for new connections 726 this(Socket conn) 727 { 728 this(); 729 this.conn = conn; 730 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 731 if (conn) 732 socketManager.register(this); 733 updateFlags(); 734 } 735 736 final void updateFlags() 737 { 738 if (state == ConnectionState.connecting) 739 notifyWrite = true; 740 else 741 notifyWrite = writePending; 742 743 notifyRead = state == ConnectionState.connected && readDataHandler; 744 } 745 746 /// Called when a socket is readable. 747 override void onReadable() 748 { 749 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 750 static ubyte[0x10000] inBuffer = void; 751 auto received = doReceive(inBuffer); 752 753 if (received == 0) 754 return disconnect("Connection closed", DisconnectType.graceful); 755 756 if (received == Socket.ERROR) 757 { 758 // if (wouldHaveBlocked) 759 // { 760 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 761 // return; 762 // } 763 // else 764 onError("recv() error: " ~ lastSocketError); 765 } 766 else 767 { 768 debug (PRINTDATA) 769 { 770 std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress); 771 std.stdio.write(hexDump(inBuffer[0 .. received])); 772 std.stdio.stdout.flush(); 773 } 774 775 if (state == ConnectionState.disconnecting) 776 { 777 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because we are disconnecting", this); 778 } 779 else 780 if (!readDataHandler) 781 { 782 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because there is no data handler", this); 783 } 784 else 785 { 786 // Currently, unlike the D1 version of this module, 787 // we will always reallocate read network data. 788 // This disfavours code which doesn't need to store 789 // read data after processing it, but otherwise 790 // makes things simpler and safer all around. 791 792 if (received < UNMANAGED_THRESHOLD) 793 { 794 // Copy to the managed heap 795 readDataHandler(Data(inBuffer[0 .. received].dup)); 796 } 797 else 798 { 799 // Copy to unmanaged memory 800 readDataHandler(Data(inBuffer[0 .. received], true)); 801 } 802 } 803 } 804 } 805 806 /// Called when an error occurs on the socket. 807 override void onError(string reason) 808 { 809 if (state == ConnectionState.disconnecting) 810 { 811 debug (ASOCKETS) writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 812 return close(); 813 } 814 815 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 816 disconnect("Socket error: " ~ reason, DisconnectType.error); 817 } 818 819 this() 820 { 821 } 822 823 public: 824 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 825 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 826 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 827 { 828 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 829 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected, "Attempting to disconnect on a %s socket".format(state)); 830 831 if (writePending) 832 { 833 if (type==DisconnectType.requested) 834 { 835 assert(conn, "Attempting to disconnect on an uninitialized socket"); 836 // queue disconnect after all data is sent 837 debug (ASOCKETS) writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 838 state = ConnectionState.disconnecting; 839 //setIdleTimeout(30.seconds); 840 if (disconnectHandler) 841 disconnectHandler(reason, type); 842 updateFlags(); 843 return; 844 } 845 else 846 discardQueues(); 847 } 848 849 debug (ASOCKETS) writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 850 851 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 852 close(); 853 else 854 { 855 assert(conn is null, "Registered but %s socket".format(state)); 856 if (state == ConnectionState.resolving) 857 state = ConnectionState.disconnected; 858 } 859 860 if (disconnectHandler) 861 disconnectHandler(reason, type); 862 updateFlags(); 863 } 864 865 private final void close() 866 { 867 assert(conn, "Attempting to close an unregistered socket"); 868 socketManager.unregister(this); 869 conn.close(); 870 conn = null; 871 outQueue[] = null; 872 state = ConnectionState.disconnected; 873 } 874 875 /// Append data to the send buffer. 876 void send(Data[] data, int priority = DEFAULT_PRIORITY) 877 { 878 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 879 outQueue[priority] ~= data; 880 notifyWrite = true; // Fast updateFlags() 881 882 debug (PRINTDATA) 883 { 884 std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress); 885 foreach (datum; data) 886 if (datum.length) 887 std.stdio.write(hexDump(datum.contents)); 888 else 889 std.stdio.writeln("(empty Data)"); 890 std.stdio.stdout.flush(); 891 } 892 } 893 894 /// ditto 895 alias send = IConnection.send; 896 897 final void clearQueue(int priority) 898 { 899 if (priority == partiallySent) 900 { 901 assert(outQueue[priority].length > 0); 902 outQueue[priority] = outQueue[priority][0..1]; 903 } 904 else 905 outQueue[priority] = null; 906 updateFlags(); 907 } 908 909 /// Clears all queues, even partially sent content. 910 private final void discardQueues() 911 { 912 foreach (priority; 0..MAX_PRIORITY+1) 913 outQueue[priority] = null; 914 partiallySent = -1; 915 updateFlags(); 916 } 917 918 @property 919 final bool writePending() 920 { 921 foreach (queue; outQueue) 922 if (queue.length) 923 return true; 924 return false; 925 } 926 927 final bool queuePresent(int priority = DEFAULT_PRIORITY) 928 { 929 if (priority == partiallySent) 930 { 931 assert(outQueue[priority].length > 0); 932 return outQueue[priority].length > 1; 933 } 934 else 935 return outQueue[priority].length > 0; 936 } 937 938 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 939 { 940 return outQueue[priority].length; 941 } 942 943 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 944 { 945 size_t bytes; 946 foreach (datum; outQueue[priority]) 947 bytes += datum.length; 948 return bytes; 949 } 950 951 public: 952 private ConnectHandler connectHandler; 953 /// Callback for when a connection has been established. 954 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 955 956 private ReadDataHandler readDataHandler; 957 /// Callback for incoming data. 958 /// Data will not be received unless this handler is set. 959 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 960 961 private DisconnectHandler disconnectHandler; 962 /// Callback for when a connection was closed. 963 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 964 965 /// Callback setter for when all queued data has been sent. 966 private BufferFlushedHandler bufferFlushedHandler; 967 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 968 } 969 970 class StreamConnection : Connection 971 { 972 protected: 973 this() 974 { 975 super(); 976 } 977 978 /// Called when a socket is writable. 979 override void onWritable() 980 { 981 //scope(success) updateFlags(); 982 onWritableImpl(); 983 updateFlags(); 984 } 985 986 // Work around scope(success) breaking debugger stack traces 987 final private void onWritableImpl() 988 { 989 if (state == ConnectionState.connecting) 990 { 991 int32_t error; 992 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 993 if (error) 994 return disconnect(formatSocketError(error), DisconnectType.error); 995 996 state = ConnectionState.connected; 997 998 //debug writefln("[%s] Connected", remoteAddress); 999 try 1000 setKeepAlive(); 1001 catch (Exception e) 1002 return disconnect(e.msg, DisconnectType.error); 1003 if (connectHandler) 1004 connectHandler(); 1005 return; 1006 } 1007 //debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1008 1009 foreach (sendPartial; [true, false]) 1010 foreach (int priority, ref queue; outQueue) 1011 while (queue.length && (!sendPartial || priority == partiallySent)) 1012 { 1013 assert(partiallySent == -1 || partiallySent == priority); 1014 1015 auto pdata = queue.ptr; // pointer to first data 1016 1017 ptrdiff_t sent = 0; 1018 if (pdata.length) 1019 { 1020 sent = doSend(pdata.contents); 1021 debug (ASOCKETS) writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 1022 } 1023 else 1024 { 1025 debug (ASOCKETS) writefln("\t\t%s: empty Data object", this); 1026 } 1027 1028 if (sent == Socket.ERROR) 1029 { 1030 if (wouldHaveBlocked()) 1031 return; 1032 else 1033 return onError("send() error: " ~ lastSocketError); 1034 } 1035 else 1036 if (sent < pdata.length) 1037 { 1038 if (sent > 0) 1039 { 1040 *pdata = (*pdata)[sent..pdata.length]; 1041 partiallySent = priority; 1042 } 1043 return; 1044 } 1045 else 1046 { 1047 assert(sent == pdata.length); 1048 //debug writefln("[%s] Sent data:", remoteAddress); 1049 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1050 pdata.clear(); 1051 queue = queue[1..$]; 1052 partiallySent = -1; 1053 if (queue.length == 0) 1054 queue = null; 1055 } 1056 } 1057 1058 // outQueue is now empty 1059 if (bufferFlushedHandler) 1060 bufferFlushedHandler(); 1061 if (state == ConnectionState.disconnecting) 1062 { 1063 debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1064 close(); 1065 } 1066 } 1067 1068 public: 1069 this(Socket conn) 1070 { 1071 super(conn); 1072 } 1073 } 1074 1075 // *************************************************************************** 1076 1077 /// A POSIX file stream. 1078 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1079 /// Does not dup the given file descriptor, so "disconnecting" this connection 1080 /// will close it. 1081 version (Posix) 1082 class FileConnection : StreamConnection 1083 { 1084 this(int fileno) 1085 { 1086 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1087 conn.blocking = false; 1088 super(conn); 1089 } 1090 1091 protected: 1092 import core.sys.posix.unistd : read, write; 1093 1094 override sizediff_t doSend(in void[] buffer) 1095 { 1096 return write(socket.handle, buffer.ptr, buffer.length); 1097 } 1098 1099 override sizediff_t doReceive(void[] buffer) 1100 { 1101 return read(socket.handle, buffer.ptr, buffer.length); 1102 } 1103 } 1104 1105 // *************************************************************************** 1106 1107 /// An asynchronous TCP connection. 1108 class TcpConnection : StreamConnection 1109 { 1110 protected: 1111 /// Queue of addresses to try connecting to. 1112 AddressInfo[] addressQueue; 1113 1114 this(Socket conn) 1115 { 1116 super(conn); 1117 } 1118 1119 override sizediff_t doSend(in void[] buffer) 1120 { 1121 return conn.send(buffer); 1122 } 1123 1124 override sizediff_t doReceive(void[] buffer) 1125 { 1126 return conn.receive(buffer); 1127 } 1128 1129 final void tryNextAddress() 1130 { 1131 assert(state == ConnectionState.connecting); 1132 auto addressInfo = addressQueue[0]; 1133 addressQueue = addressQueue[1..$]; 1134 1135 try 1136 { 1137 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1138 conn.blocking = false; 1139 1140 socketManager.register(this); 1141 updateFlags(); 1142 debug (ASOCKETS) writefln("Attempting connection to %s", addressInfo.address.toString()); 1143 conn.connect(addressInfo.address); 1144 } 1145 catch (SocketException e) 1146 return onError("Connect error: " ~ e.msg); 1147 } 1148 1149 /// Called when an error occurs on the socket. 1150 override void onError(string reason) 1151 { 1152 if (state == ConnectionState.connecting && addressQueue.length) 1153 { 1154 socketManager.unregister(this); 1155 conn.close(); 1156 conn = null; 1157 1158 return tryNextAddress(); 1159 } 1160 1161 super.onError(reason); 1162 } 1163 1164 public: 1165 /// Default constructor 1166 this() 1167 { 1168 debug (ASOCKETS) writefln("New TcpConnection @ %s", cast(void*)this); 1169 } 1170 1171 /// Start establishing a connection. 1172 final void connect(string host, ushort port) 1173 { 1174 assert(host.length, "Empty host"); 1175 assert(port, "No port specified"); 1176 1177 debug (ASOCKETS) writefln("Connecting to %s:%s", host, port); 1178 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1179 1180 state = ConnectionState.resolving; 1181 1182 AddressInfo[] addressInfos; 1183 try 1184 { 1185 auto addresses = getAddress(host, port); 1186 enforce(addresses.length, "No addresses found"); 1187 debug (ASOCKETS) 1188 { 1189 writefln("Resolved to %s addresses:", addresses.length); 1190 foreach (address; addresses) 1191 writefln("- %s", address.toString()); 1192 } 1193 1194 if (addresses.length > 1) 1195 { 1196 import std.random : randomShuffle; 1197 randomShuffle(addresses); 1198 } 1199 1200 foreach (address; addresses) 1201 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1202 } 1203 catch (SocketException e) 1204 return onError("Lookup error: " ~ e.msg); 1205 1206 state = ConnectionState.disconnected; 1207 connect(addressInfos); 1208 } 1209 1210 /// ditto 1211 final void connect(AddressInfo[] addresses) 1212 { 1213 assert(addresses.length, "No addresses specified"); 1214 1215 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1216 assert(!conn); 1217 1218 addressQueue = addresses; 1219 state = ConnectionState.connecting; 1220 tryNextAddress(); 1221 } 1222 } 1223 1224 // *************************************************************************** 1225 1226 /// An asynchronous TCP connection server. 1227 final class TcpServer 1228 { 1229 private: 1230 /// Class that actually performs listening on a certain address family 1231 final class Listener : GenericSocket 1232 { 1233 this(Socket conn) 1234 { 1235 debug (ASOCKETS) writefln("New Listener @ %s", cast(void*)this); 1236 this.conn = conn; 1237 socketManager.register(this); 1238 } 1239 1240 /// Called when a socket is readable. 1241 override void onReadable() 1242 { 1243 debug (ASOCKETS) writefln("Accepting connection from listener @ %s", cast(void*)this); 1244 Socket acceptSocket = conn.accept(); 1245 acceptSocket.blocking = false; 1246 if (handleAccept) 1247 { 1248 TcpConnection connection = new TcpConnection(acceptSocket); 1249 debug (ASOCKETS) writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress); 1250 connection.setKeepAlive(); 1251 //assert(connection.connected); 1252 //connection.connected = true; 1253 acceptHandler(connection); 1254 } 1255 else 1256 acceptSocket.close(); 1257 } 1258 1259 /// Called when a socket is writable. 1260 override void onWritable() 1261 { 1262 } 1263 1264 /// Called when an error occurs on the socket. 1265 override void onError(string reason) 1266 { 1267 close(); // call parent 1268 } 1269 1270 void closeListener() 1271 { 1272 assert(conn); 1273 socketManager.unregister(this); 1274 conn.close(); 1275 conn = null; 1276 } 1277 } 1278 1279 /// Whether the socket is listening. 1280 bool listening; 1281 /// Listener instances 1282 Listener[] listeners; 1283 1284 final void updateFlags() 1285 { 1286 foreach (listener; listeners) 1287 listener.notifyRead = handleAccept !is null; 1288 } 1289 1290 public: 1291 /// Start listening on this socket. 1292 ushort listen(ushort port, string addr = null) 1293 { 1294 debug(ASOCKETS) writefln("Attempting to listen on %s:%d", addr, port); 1295 //assert(!listening, "Attempting to listen on a listening socket"); 1296 1297 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1298 1299 debug (ASOCKETS) 1300 { 1301 writefln("Resolved to %s addresses:", addressInfos.length); 1302 foreach (ref addressInfo; addressInfos) 1303 writefln("- %s", addressInfo); 1304 } 1305 1306 // listen on random ports only on IPv4 for now 1307 if (port == 0) 1308 { 1309 foreach_reverse (i, ref addressInfo; addressInfos) 1310 if (addressInfo.family != AddressFamily.INET) 1311 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1312 } 1313 1314 listen(addressInfos); 1315 1316 foreach (listener; listeners) 1317 { 1318 auto address = listener.conn.localAddress(); 1319 if (address.addressFamily == AddressFamily.INET) 1320 port = to!ushort(address.toPortString()); 1321 } 1322 1323 return port; 1324 } 1325 1326 /// ditto 1327 void listen(AddressInfo[] addressInfos) 1328 { 1329 foreach (ref addressInfo; addressInfos) 1330 { 1331 try 1332 { 1333 Socket conn = new Socket(addressInfo); 1334 conn.blocking = false; 1335 if (addressInfo.family == AddressFamily.INET6) 1336 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1337 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1338 1339 conn.bind(addressInfo.address); 1340 conn.listen(8); 1341 1342 listeners ~= new Listener(conn); 1343 } 1344 catch (SocketException e) 1345 { 1346 debug(ASOCKETS) writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1347 debug(ASOCKETS) writeln(e.msg); 1348 } 1349 } 1350 1351 if (listeners.length==0) 1352 throw new Exception("Unable to bind service"); 1353 1354 listening = true; 1355 1356 updateFlags(); 1357 } 1358 1359 @property Address[] localAddresses() 1360 { 1361 Address[] result; 1362 foreach (listener; listeners) 1363 result ~= listener.localAddress; 1364 return result; 1365 } 1366 1367 @property bool isListening() 1368 { 1369 return listening; 1370 } 1371 1372 /// Stop listening on this socket. 1373 void close() 1374 { 1375 foreach (listener;listeners) 1376 listener.closeListener(); 1377 listeners = null; 1378 listening = false; 1379 if (handleClose) 1380 handleClose(); 1381 } 1382 1383 public: 1384 /// Callback for when the socket was closed. 1385 void delegate() handleClose; 1386 1387 private void delegate(TcpConnection incoming) acceptHandler; 1388 /// Callback for an incoming connection. 1389 /// Connections will not be accepted unless this handler is set. 1390 @property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; } 1391 /// ditto 1392 @property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); } 1393 } 1394 1395 // *************************************************************************** 1396 1397 /// An asynchronous UDP stream. 1398 /// UDP does not have connections, so this class encapsulates a socket 1399 /// with a fixed destination (sendto) address, and optionally bound to 1400 /// a local address. 1401 /// Currently received packets' address is not exposed. 1402 class UdpConnection : Connection 1403 { 1404 protected: 1405 this(Socket conn) 1406 { 1407 super(conn); 1408 } 1409 1410 /// Called when a socket is writable. 1411 override void onWritable() 1412 { 1413 //scope(success) updateFlags(); 1414 onWritableImpl(); 1415 updateFlags(); 1416 } 1417 1418 // Work around scope(success) breaking debugger stack traces 1419 final private void onWritableImpl() 1420 { 1421 foreach (priority, ref queue; outQueue) 1422 while (queue.length) 1423 { 1424 auto pdata = queue.ptr; // pointer to first data 1425 1426 auto sent = conn.sendTo(pdata.contents, remoteAddress); 1427 1428 if (sent == Socket.ERROR) 1429 { 1430 if (wouldHaveBlocked()) 1431 return; 1432 else 1433 return onError("send() error: " ~ lastSocketError); 1434 } 1435 else 1436 if (sent < pdata.length) 1437 { 1438 return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length)); 1439 } 1440 else 1441 { 1442 assert(sent == pdata.length); 1443 //debug writefln("[%s] Sent data:", remoteAddress); 1444 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1445 pdata.clear(); 1446 queue = queue[1..$]; 1447 if (queue.length == 0) 1448 queue = null; 1449 } 1450 } 1451 1452 // outQueue is now empty 1453 if (bufferFlushedHandler) 1454 bufferFlushedHandler(); 1455 if (state == ConnectionState.disconnecting) 1456 { 1457 debug (ASOCKETS) writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1458 close(); 1459 } 1460 } 1461 1462 override sizediff_t doSend(in void[] buffer) 1463 { 1464 assert(false); // never called (called only from overridden methods) 1465 } 1466 1467 override sizediff_t doReceive(void[] buffer) 1468 { 1469 return conn.receive(buffer); 1470 } 1471 1472 public: 1473 /// Default constructor 1474 this() 1475 { 1476 debug (ASOCKETS) writefln("New UdpConnection @ %s", cast(void*)this); 1477 } 1478 1479 /// Initialize with the given AddressFamily, without binding to an address. 1480 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP) 1481 { 1482 initializeImpl(family, type, protocol); 1483 if (connectHandler) 1484 connectHandler(); 1485 } 1486 1487 final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1488 { 1489 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1490 assert(!conn); 1491 1492 conn = new Socket(family, type, protocol); 1493 conn.blocking = false; 1494 socketManager.register(this); 1495 state = ConnectionState.connected; 1496 updateFlags(); 1497 } 1498 1499 /// Bind to a local address in order to receive packets sent there. 1500 final ushort bind(string host, ushort port) 1501 { 1502 assert(host.length, "Empty host"); 1503 1504 debug (ASOCKETS) writefln("Connecting to %s:%s", host, port); 1505 1506 state = ConnectionState.resolving; 1507 1508 AddressInfo addressInfo; 1509 try 1510 { 1511 auto addresses = getAddress(host, port); 1512 enforce(addresses.length, "No addresses found"); 1513 debug (ASOCKETS) 1514 { 1515 writefln("Resolved to %s addresses:", addresses.length); 1516 foreach (address; addresses) 1517 writefln("- %s", address.toString()); 1518 } 1519 1520 Address address; 1521 if (addresses.length > 1) 1522 { 1523 import std.random : uniform; 1524 address = addresses[uniform(0, $)]; 1525 } 1526 else 1527 address = addresses[0]; 1528 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1529 } 1530 catch (SocketException e) 1531 { 1532 onError("Lookup error: " ~ e.msg); 1533 return 0; 1534 } 1535 1536 state = ConnectionState.disconnected; 1537 return bind(addressInfo); 1538 } 1539 1540 /// ditto 1541 final ushort bind(AddressInfo addressInfo) 1542 { 1543 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1544 conn.bind(addressInfo.address); 1545 1546 auto address = conn.localAddress(); 1547 auto port = to!ushort(address.toPortString()); 1548 1549 if (connectHandler) 1550 connectHandler(); 1551 1552 return port; 1553 } 1554 1555 public: 1556 /// Where to send packets to. 1557 Address remoteAddress; 1558 } 1559 1560 /// 1561 unittest 1562 { 1563 auto server = new UdpConnection(); 1564 auto serverPort = server.bind("localhost", 0); 1565 1566 auto client = new UdpConnection(); 1567 client.initialize(server.localAddress.addressFamily); 1568 1569 string[] packets = ["Hello", "there"]; 1570 client.remoteAddress = server.localAddress; 1571 client.send({ 1572 Data[] data; 1573 foreach (packet; packets) 1574 data ~= Data(packet); 1575 return data; 1576 }()); 1577 1578 server.handleReadData = (Data data) 1579 { 1580 assert(data.contents == packets[0]); 1581 packets = packets[1..$]; 1582 if (!packets.length) 1583 { 1584 server.close(); 1585 client.close(); 1586 } 1587 }; 1588 socketManager.loop(); 1589 assert(!packets.length); 1590 } 1591 1592 // *************************************************************************** 1593 1594 /// Base class for a connection adapter. 1595 /// By itself, does nothing. 1596 class ConnectionAdapter : IConnection 1597 { 1598 IConnection next; 1599 1600 this(IConnection next) 1601 { 1602 this.next = next; 1603 next.handleConnect = &onConnect; 1604 next.handleDisconnect = &onDisconnect; 1605 } 1606 1607 @property ConnectionState state() { return next.state; } 1608 1609 /// Queue Data for sending. 1610 void send(Data[] data, int priority) 1611 { 1612 next.send(data, priority); 1613 } 1614 1615 alias send = IConnection.send; /// ditto 1616 1617 /// Terminate the connection. 1618 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1619 { 1620 next.disconnect(reason, type); 1621 } 1622 1623 protected void onConnect() 1624 { 1625 if (connectHandler) 1626 connectHandler(); 1627 } 1628 1629 protected void onReadData(Data data) 1630 { 1631 // onReadData should be fired only if readDataHandler is set 1632 readDataHandler(data); 1633 } 1634 1635 protected void onDisconnect(string reason, DisconnectType type) 1636 { 1637 if (disconnectHandler) 1638 disconnectHandler(reason, type); 1639 } 1640 1641 protected void onBufferFlushed() 1642 { 1643 if (bufferFlushedHandler) 1644 bufferFlushedHandler(); 1645 } 1646 1647 /// Callback for when a connection has been established. 1648 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1649 private ConnectHandler connectHandler; 1650 1651 /// Callback setter for when new data is read. 1652 @property void handleReadData(ReadDataHandler value) 1653 { 1654 readDataHandler = value; 1655 next.handleReadData = value ? &onReadData : null ; 1656 } 1657 private ReadDataHandler readDataHandler; 1658 1659 /// Callback setter for when a connection was closed. 1660 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1661 private DisconnectHandler disconnectHandler; 1662 1663 /// Callback setter for when all queued data has been written. 1664 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 1665 private BufferFlushedHandler bufferFlushedHandler; 1666 } 1667 1668 // *************************************************************************** 1669 1670 /// Adapter for connections with a line-based protocol. 1671 /// Splits data stream into delimiter-separated lines. 1672 class LineBufferedAdapter : ConnectionAdapter 1673 { 1674 /// The protocol's line delimiter. 1675 string delimiter = "\r\n"; 1676 1677 this(IConnection next) 1678 { 1679 super(next); 1680 } 1681 1682 /// Append a line to the send buffer. 1683 void send(string line) 1684 { 1685 //super.send(Data(line ~ delimiter)); 1686 // https://issues.dlang.org/show_bug.cgi?id=13985 1687 ConnectionAdapter ca = this; 1688 ca.send(Data(line ~ delimiter)); 1689 } 1690 1691 protected: 1692 /// The receive buffer. 1693 Data inBuffer; 1694 1695 /// Called when data has been received. 1696 final override void onReadData(Data data) 1697 { 1698 import std.string; 1699 auto oldBufferLength = inBuffer.length; 1700 if (oldBufferLength) 1701 inBuffer ~= data; 1702 else 1703 inBuffer = data; 1704 1705 if (delimiter.length == 1) 1706 { 1707 import core.stdc.string; // memchr 1708 1709 char c = delimiter[0]; 1710 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1711 while (p) 1712 { 1713 sizediff_t index = p - inBuffer.ptr; 1714 processLine(index); 1715 1716 p = memchr(inBuffer.ptr, c, inBuffer.length); 1717 } 1718 } 1719 else 1720 { 1721 sizediff_t index; 1722 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1723 while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0) 1724 processLine(index); 1725 } 1726 } 1727 1728 final void processLine(size_t index) 1729 { 1730 auto line = inBuffer[0..index]; 1731 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1732 super.onReadData(line); 1733 } 1734 1735 override void onDisconnect(string reason, DisconnectType type) 1736 { 1737 super.onDisconnect(reason, type); 1738 inBuffer.clear(); 1739 } 1740 } 1741 1742 // *************************************************************************** 1743 1744 /// Fires an event handler or disconnects connections 1745 /// after a period of inactivity. 1746 class TimeoutAdapter : ConnectionAdapter 1747 { 1748 this(IConnection next) 1749 { 1750 debug (ASOCKETS) writefln("New TimeoutAdapter @ %s", cast(void*)this); 1751 super(next); 1752 } 1753 1754 void cancelIdleTimeout() 1755 { 1756 debug (ASOCKETS) writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 1757 assert(idleTask !is null); 1758 assert(idleTask.isWaiting()); 1759 idleTask.cancel(); 1760 } 1761 1762 void resumeIdleTimeout() 1763 { 1764 debug (ASOCKETS) writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 1765 assert(idleTask !is null); 1766 assert(!idleTask.isWaiting()); 1767 mainTimer.add(idleTask); 1768 } 1769 1770 final void setIdleTimeout(Duration duration) 1771 { 1772 debug (ASOCKETS) writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 1773 assert(duration > Duration.zero); 1774 1775 // Configure idleTask 1776 if (idleTask is null) 1777 { 1778 idleTask = new TimerTask(duration); 1779 idleTask.handleTask = &onTask_Idle; 1780 } 1781 else 1782 { 1783 if (idleTask.isWaiting()) 1784 idleTask.cancel(); 1785 idleTask.delay = duration; 1786 } 1787 1788 mainTimer.add(idleTask); 1789 } 1790 1791 void markNonIdle() 1792 { 1793 debug (ASOCKETS) writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 1794 if (handleNonIdle) 1795 handleNonIdle(); 1796 if (idleTask && idleTask.isWaiting()) 1797 idleTask.restart(); 1798 } 1799 1800 /// Callback for when a connection has stopped responding. 1801 /// If unset, the connection will be disconnected. 1802 void delegate() handleIdleTimeout; 1803 1804 /// Callback for when a connection is marked as non-idle 1805 /// (when data is received). 1806 void delegate() handleNonIdle; 1807 1808 protected: 1809 override void onConnect() 1810 { 1811 debug (ASOCKETS) writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 1812 markNonIdle(); 1813 super.onConnect(); 1814 } 1815 1816 override void onReadData(Data data) 1817 { 1818 debug (ASOCKETS) writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 1819 markNonIdle(); 1820 super.onReadData(data); 1821 } 1822 1823 override void onDisconnect(string reason, DisconnectType type) 1824 { 1825 debug (ASOCKETS) writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 1826 if (idleTask && idleTask.isWaiting()) 1827 idleTask.cancel(); 1828 super.onDisconnect(reason, type); 1829 } 1830 1831 private: 1832 TimerTask idleTask; // non-null if an idle timeout has been set 1833 1834 final void onTask_Idle(Timer timer, TimerTask task) 1835 { 1836 if (state == ConnectionState.disconnecting) 1837 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 1838 1839 if (state == ConnectionState.disconnected) 1840 return; 1841 1842 if (handleIdleTimeout) 1843 { 1844 resumeIdleTimeout(); // reschedule (by default) 1845 handleIdleTimeout(); 1846 } 1847 else 1848 disconnect("Time-out", DisconnectType.error); 1849 } 1850 } 1851 1852 // *************************************************************************** 1853 1854 unittest 1855 { 1856 void testTimer() 1857 { 1858 bool fired; 1859 setTimeout({fired = true;}, 10.msecs); 1860 socketManager.loop(); 1861 assert(fired); 1862 } 1863 1864 testTimer(); 1865 }