1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <vladimir@thecybershadow.net> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import std.exception; 24 import std.socket; 25 public import std.socket : Address, Socket; 26 27 debug(ASOCKETS) import std.stdio; 28 debug(PRINTDATA) import ae.utils.text : hexDump; 29 private import std.conv : to; 30 31 import std.random : randomShuffle; 32 33 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 34 static import ae.utils.array; 35 36 version(LIBEV) 37 { 38 import deimos.ev; 39 pragma(lib, "ev"); 40 } 41 42 version(Windows) 43 { 44 import core.sys.windows.windows : Sleep; 45 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 46 } 47 else 48 enum USE_SLEEP = false; 49 50 /// Flags that determine socket wake-up events. 51 int eventCounter; 52 53 version(LIBEV) 54 { 55 // Watchers are a GenericSocket field (as declared in SocketMixin). 56 // Use one watcher per read and write event. 57 // Start/stop those as higher-level code declares interest in those events. 58 // Use the "data" ev_io field to store the parent GenericSocket address. 59 // Also use the "data" field as a flag to indicate whether the watcher is active 60 // (data is null when the watcher is stopped). 61 62 struct SocketManager 63 { 64 private: 65 size_t count; 66 67 extern(C) 68 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 69 { 70 eventCounter++; 71 auto socket = cast(GenericSocket)w.data; 72 assert(socket, "libev event fired on stopped watcher"); 73 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", cast(void*)socket, revents); 74 75 if (revents & EV_READ) 76 socket.onReadable(); 77 else 78 if (revents & EV_WRITE) 79 socket.onWritable(); 80 else 81 assert(false, "Unknown event fired from libev"); 82 83 // TODO? Need to get proper SocketManager instance to call updateTimer on 84 socketManager.updateTimer(false); 85 } 86 87 ev_timer evTimer; 88 MonoTime lastNextEvent = MonoTime.max; 89 90 extern(C) 91 static void timerCallback(ev_loop_t* l, ev_timer* w, int revents) 92 { 93 eventCounter++; 94 debug (ASOCKETS) writefln("Timer callback called."); 95 mainTimer.prod(); 96 97 socketManager.updateTimer(true); 98 debug (ASOCKETS) writefln("Timer callback exiting."); 99 } 100 101 void updateTimer(bool force) 102 { 103 auto nextEvent = mainTimer.getNextEvent(); 104 if (force || lastNextEvent != nextEvent) 105 { 106 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 107 if (nextEvent == MonoTime.max) // Stopping 108 { 109 if (lastNextEvent != MonoTime.max) 110 ev_timer_stop(ev_default_loop(0), &evTimer); 111 } 112 else 113 { 114 auto remaining = mainTimer.getRemainingTime(); 115 while (remaining <= Duration.zero) 116 { 117 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 118 mainTimer.prod(); 119 remaining = mainTimer.getRemainingTime(); 120 } 121 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 122 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 123 if (lastNextEvent == MonoTime.max) // Starting 124 { 125 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 126 ev_timer_start(ev_default_loop(0), &evTimer); 127 } 128 else // Adjusting 129 { 130 evTimer.repeat = tstamp; 131 ev_timer_again(ev_default_loop(0), &evTimer); 132 } 133 } 134 lastNextEvent = nextEvent; 135 } 136 } 137 138 /// Register a socket with the manager. 139 void register(GenericSocket socket) 140 { 141 debug (ASOCKETS) writefln("Registering %s", cast(void*)socket); 142 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 143 auto fd = socket.conn.handle; 144 assert(fd, "Must have fd before socket registration"); 145 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 146 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 147 count++; 148 } 149 150 /// Unregister a socket with the manager. 151 void unregister(GenericSocket socket) 152 { 153 debug (ASOCKETS) writefln("Unregistering %s", cast(void*)socket); 154 socket.notifyRead = false; 155 socket.notifyWrite = false; 156 count--; 157 } 158 159 public: 160 size_t size() 161 { 162 return count; 163 } 164 165 /// Loop continuously until no sockets are left. 166 void loop() 167 { 168 auto evLoop = ev_default_loop(0); 169 enforce(evLoop, "libev initialization failure"); 170 171 updateTimer(true); 172 debug (ASOCKETS) writeln("ev_run"); 173 ev_run(ev_default_loop(0), 0); 174 } 175 } 176 177 private mixin template SocketMixin() 178 { 179 private ev_io evRead, evWrite; 180 181 private void setWatcherState(ref ev_io ev, bool newValue, int event) 182 { 183 if (!conn) 184 { 185 // Can happen when setting delegates before connecting. 186 return; 187 } 188 189 if (newValue && !ev.data) 190 { 191 // Start 192 ev.data = cast(void*)this; 193 ev_io_start(ev_default_loop(0), &ev); 194 } 195 else 196 if (!newValue && ev.data) 197 { 198 // Stop 199 assert(ev.data is cast(void*)this); 200 ev.data = null; 201 ev_io_stop(ev_default_loop(0), &ev); 202 } 203 } 204 205 /// Interested in read notifications (onReadable)? 206 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 207 /// Interested in write notifications (onWritable)? 208 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 209 210 debug ~this() 211 { 212 // The LIBEV SocketManager holds no references to registered sockets. 213 // TODO: Add a doubly-linked list? 214 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 215 } 216 } 217 } 218 else // Use select 219 { 220 struct SocketManager 221 { 222 private: 223 enum FD_SETSIZE = 1024; 224 225 /// List of all sockets to poll. 226 GenericSocket[] sockets; 227 228 /// Register a socket with the manager. 229 void register(GenericSocket conn) 230 { 231 debug (ASOCKETS) writefln("Registering %s (%d total)", cast(void*)conn, sockets.length + 1); 232 sockets ~= conn; 233 } 234 235 /// Unregister a socket with the manager. 236 void unregister(GenericSocket conn) 237 { 238 debug (ASOCKETS) writefln("Unregistering %s (%d total)", cast(void*)conn, sockets.length - 1); 239 foreach (size_t i, GenericSocket j; sockets) 240 if (j is conn) 241 { 242 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 243 return; 244 } 245 assert(false, "Socket not registered"); 246 } 247 248 void delegate()[] idleHandlers; 249 250 public: 251 size_t size() 252 { 253 return sockets.length; 254 } 255 256 /// Loop continuously until no sockets are left. 257 void loop() 258 { 259 debug (ASOCKETS) writeln("Starting event loop."); 260 261 SocketSet readset, writeset, errorset; 262 size_t sockcount; 263 readset = new SocketSet(FD_SETSIZE); 264 writeset = new SocketSet(FD_SETSIZE); 265 errorset = new SocketSet(FD_SETSIZE); 266 while (true) 267 { 268 // SocketSet.add() doesn't have an overflow check, so we need to do it manually 269 // this is just a debug check, the actual check is done when registering sockets 270 // TODO: this is inaccurate on POSIX, "max" means maximum fd value 271 if (sockets.length > readset.max || sockets.length > writeset.max || sockets.length > errorset.max) 272 { 273 readset = new SocketSet(to!uint(sockets.length*2)); 274 writeset = new SocketSet(to!uint(sockets.length*2)); 275 errorset = new SocketSet(to!uint(sockets.length*2)); 276 } 277 else 278 { 279 readset.reset(); 280 writeset.reset(); 281 errorset.reset(); 282 } 283 284 sockcount = 0; 285 bool haveActive; 286 debug (ASOCKETS) writeln("Populating sets"); 287 foreach (GenericSocket conn; sockets) 288 { 289 if (!conn.socket) 290 continue; 291 sockcount++; 292 if (!conn.daemon) 293 haveActive = true; 294 295 debug (ASOCKETS) writef("\t%s:", cast(void*)conn); 296 if (conn.notifyRead) 297 { 298 readset.add(conn.socket); 299 debug (ASOCKETS) write(" READ"); 300 } 301 if (conn.notifyWrite) 302 { 303 writeset.add(conn.socket); 304 debug (ASOCKETS) write(" WRITE"); 305 } 306 errorset.add(conn.socket); 307 debug (ASOCKETS) writeln(); 308 } 309 debug (ASOCKETS) writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...", 310 sockcount, 311 mainTimer.isWaiting() ? "with" : "no", 312 idleHandlers.length, 313 ); 314 if (!haveActive && !mainTimer.isWaiting()) 315 { 316 debug (ASOCKETS) writeln("No more sockets or timer events, exiting loop."); 317 break; 318 } 319 320 int events; 321 if (idleHandlers.length) 322 { 323 if (sockcount==0) 324 events = 0; 325 else 326 events = Socket.select(readset, writeset, errorset, 0.seconds); 327 } 328 else 329 if (USE_SLEEP && sockcount==0) 330 { 331 version(Windows) 332 { 333 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 334 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 335 if (duration <= 0) 336 duration = 1; // Avoid busywait 337 else 338 if (duration > int.max) 339 duration = int.max; 340 Sleep(cast(int)duration); 341 events = 0; 342 } 343 else 344 assert(0); 345 } 346 else 347 if (mainTimer.isWaiting()) 348 events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime()); 349 else 350 events = Socket.select(readset, writeset, errorset); 351 352 debug (ASOCKETS) writefln("%d events fired.", events); 353 354 if (events > 0) 355 { 356 foreach (GenericSocket conn; sockets) 357 { 358 if (!conn.socket) 359 { 360 debug (ASOCKETS) writefln("\t%s is unset", cast(void*)conn); 361 continue; 362 } 363 if (readset.isSet(conn.socket)) 364 { 365 debug (ASOCKETS) writefln("\t%s is readable", cast(void*)conn); 366 conn.onReadable(); 367 } 368 369 if (!conn.socket) 370 { 371 debug (ASOCKETS) writefln("\t%s is unset", cast(void*)conn); 372 continue; 373 } 374 if (writeset.isSet(conn.socket)) 375 { 376 debug (ASOCKETS) writefln("\t%s is writable", cast(void*)conn); 377 conn.onWritable(); 378 } 379 380 if (!conn.socket) 381 { 382 debug (ASOCKETS) writefln("\t%s is unset", cast(void*)conn); 383 continue; 384 } 385 if (errorset.isSet(conn.socket)) 386 { 387 debug (ASOCKETS) writefln("\t%s is errored", cast(void*)conn); 388 conn.onError("select() error: " ~ conn.socket.getErrorText()); 389 } 390 } 391 } 392 else 393 if (idleHandlers.length) 394 { 395 import ae.utils.array; 396 auto handler = idleHandlers.shift(); 397 398 // Rotate the idle handler queue before running it, 399 // in case the handler unregisters itself. 400 idleHandlers ~= handler; 401 402 handler(); 403 } 404 405 // Timers may invalidate our select results, so fire them after processing the latter 406 mainTimer.prod(); 407 408 eventCounter++; 409 } 410 } 411 } 412 413 // Use UFCS to allow removeIdleHandler to have a predicate with context 414 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 415 { 416 foreach (i, idleHandler; socketManager.idleHandlers) 417 assert(handler !is idleHandler); 418 419 socketManager.idleHandlers ~= handler; 420 } 421 422 static bool isFun(T)(T a, T b) { return a is b; } 423 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 424 { 425 foreach (i, idleHandler; socketManager.idleHandlers) 426 if (pred(idleHandler, args)) 427 { 428 import std.algorithm; 429 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 430 return; 431 } 432 assert(false, "No such idle handler"); 433 } 434 435 private mixin template SocketMixin() 436 { 437 /// Interested in read notifications (onReadable)? 438 bool notifyRead; 439 /// Interested in write notifications (onWritable)? 440 bool notifyWrite; 441 } 442 } 443 444 enum DisconnectType 445 { 446 Requested, // initiated by the application 447 Graceful, // peer gracefully closed the connection 448 Error // abnormal network condition 449 } 450 451 /// General methods for an asynchronous socket 452 private abstract class GenericSocket 453 { 454 /// Declares notifyRead and notifyWrite. 455 mixin SocketMixin; 456 457 protected: 458 /// The socket this class wraps. 459 Socket conn; 460 461 protected: 462 /// Retrieve the socket class this class wraps. 463 @property final Socket socket() 464 { 465 return conn; 466 } 467 468 void onReadable() 469 { 470 } 471 472 void onWritable() 473 { 474 } 475 476 void onError(string reason) 477 { 478 } 479 480 public: 481 /// allow getting the address of connections that are already disconnected 482 private Address cachedLocalAddress, cachedRemoteAddress; 483 484 /// Don't block the process from exiting. 485 /// TODO: Not implemented with libev 486 bool daemon; 487 488 final @property Address localAddress() 489 { 490 if (cachedLocalAddress !is null) 491 return cachedLocalAddress; 492 else 493 if (conn is null) 494 return null; 495 else 496 return cachedLocalAddress = conn.localAddress(); 497 } 498 499 final @property Address remoteAddress() 500 { 501 if (cachedRemoteAddress !is null) 502 return cachedRemoteAddress; 503 else 504 if (conn is null) 505 return null; 506 else 507 return cachedRemoteAddress = conn.remoteAddress(); 508 } 509 510 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 511 { 512 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 513 if (enabled) 514 { 515 try 516 conn.setKeepAlive(time, interval); 517 catch (SocketFeatureException) 518 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 519 } 520 else 521 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 522 } 523 } 524 525 526 /// An asynchronous client socket. 527 class ClientSocket : GenericSocket 528 { 529 private: 530 TimerTask idleTask; 531 532 /// Blocks of data larger than this value are passed as unmanaged memory 533 /// (in Data objects). Blocks smaller than this value will be reallocated 534 /// on the managed heap. The disadvantage of placing large objects on the 535 /// managed heap is false pointers; the disadvantage of using Data for 536 /// small objects is wasted slack space due to the page size alignment 537 /// requirement. 538 enum UNMANAGED_THRESHOLD = 256; 539 540 /// Queue of addresses to try connecting to. 541 Address[] addressQueue; 542 543 public: 544 /// Whether the socket is connected. 545 bool connected; 546 547 enum MAX_PRIORITY = 4; 548 enum DEFAULT_PRIORITY = 2; 549 550 protected: 551 /// The send buffers. 552 Data[][MAX_PRIORITY+1] outQueue; 553 /// Whether the first item from each queue has been partially sent (and thus can't be cancelled). 554 bool[MAX_PRIORITY+1] partiallySent; 555 /// Whether a disconnect is pending after all data is sent 556 bool disconnecting; 557 558 /// Constructor used by a ServerSocket for new connections 559 this(Socket conn) 560 { 561 this(); 562 this.conn = conn; 563 connected = !(conn is null); 564 if (connected) 565 socketManager.register(this); 566 updateFlags(); 567 } 568 569 final void updateFlags() 570 { 571 if (!connected) 572 notifyWrite = true; 573 else 574 notifyWrite = writePending; 575 576 notifyRead = connected && handleReadData; 577 } 578 579 /// Called when a socket is readable. 580 override void onReadable() 581 { 582 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 583 static ubyte[0x10000] inBuffer; 584 auto received = conn.receive(inBuffer); 585 586 if (received == 0) 587 return disconnect("Connection closed", DisconnectType.Graceful); 588 589 if (received == Socket.ERROR) 590 { 591 if (wouldHaveBlocked) 592 { 593 debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", cast(void*)this); 594 return; 595 } 596 else 597 onError("recv() error: " ~ lastSocketError); 598 } 599 else 600 { 601 debug (PRINTDATA) 602 { 603 std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress); 604 std.stdio.write(hexDump(inBuffer[0 .. received])); 605 std.stdio.stdout.flush(); 606 } 607 608 if (disconnecting) 609 { 610 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because we are disconnecting", cast(void*)this); 611 } 612 else 613 if (!handleReadData) 614 { 615 debug (ASOCKETS) writefln("\t\t%s: Discarding received data because there is no data handler", cast(void*)this); 616 } 617 else 618 { 619 // Currently, unlike the D1 version of this module, 620 // we will always reallocate read network data. 621 // This disfavours code which doesn't need to store 622 // read data after processing it, but otherwise 623 // makes things simpler and safer all around. 624 625 if (received < UNMANAGED_THRESHOLD) 626 { 627 // Copy to the managed heap 628 _handleReadData(this, Data(inBuffer[0 .. received].dup)); 629 } 630 else 631 { 632 // Copy to unmanaged memory 633 _handleReadData(this, Data(inBuffer[0 .. received], true)); 634 } 635 } 636 } 637 } 638 639 /// Called when a socket is writable. 640 override void onWritable() 641 { 642 scope(success) updateFlags(); 643 644 if (!connected) 645 { 646 connected = true; 647 //debug writefln("[%s] Connected", remoteAddress); 648 try 649 setKeepAlive(); 650 catch (Exception e) 651 return disconnect(e.msg, DisconnectType.Error); 652 if (idleTask !is null) 653 mainTimer.add(idleTask); 654 if (handleConnect) 655 handleConnect(this); 656 return; 657 } 658 //debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 659 660 foreach (priority, ref queue; outQueue) 661 while (queue.length) 662 { 663 auto pdata = queue.ptr; // pointer to first data 664 665 ptrdiff_t sent = 0; 666 if (pdata.length) 667 { 668 sent = conn.send(pdata.contents); 669 debug (ASOCKETS) writefln("\t\t%s: sent %d/%d bytes", cast(void*)this, sent, pdata.length); 670 } 671 else 672 { 673 debug (ASOCKETS) writefln("\t\t%s: empty Data object", cast(void*)this); 674 } 675 676 if (sent == Socket.ERROR) 677 { 678 if (wouldHaveBlocked()) 679 return; 680 else 681 return onError("send() error: " ~ lastSocketError); 682 } 683 else 684 if (sent < pdata.length) 685 { 686 if (sent > 0) 687 { 688 *pdata = (*pdata)[sent..pdata.length]; 689 partiallySent[priority] = true; 690 } 691 return; 692 } 693 else 694 { 695 assert(sent == pdata.length); 696 //debug writefln("[%s] Sent data:", remoteAddress); 697 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 698 pdata.clear(); 699 queue = queue[1..$]; 700 partiallySent[priority] = false; 701 if (queue.length == 0) 702 queue = null; 703 } 704 } 705 706 // outQueue is now empty 707 if (handleBufferFlushed) 708 handleBufferFlushed(this); 709 if (disconnecting) 710 disconnect("Delayed disconnect - buffer flushed", DisconnectType.Requested); 711 } 712 713 /// Called when an error occurs on the socket. 714 override void onError(string reason) 715 { 716 if (!connected && addressQueue.length) 717 return tryNextAddress(); 718 disconnect("Socket error: " ~ reason, DisconnectType.Error); 719 } 720 721 final void onTask_Idle(Timer timer, TimerTask task) 722 { 723 if (!connected) 724 return; 725 726 if (disconnecting) 727 return disconnect("Delayed disconnect - time-out", DisconnectType.Error); 728 729 if (handleIdleTimeout) 730 { 731 handleIdleTimeout(this); 732 if (connected && !disconnecting) 733 { 734 assert(!idleTask.isWaiting()); 735 mainTimer.add(idleTask); 736 } 737 } 738 else 739 disconnect("Time-out", DisconnectType.Error); 740 } 741 742 final void tryNextAddress() 743 { 744 auto address = addressQueue[0]; 745 addressQueue = addressQueue[1..$]; 746 747 try 748 { 749 conn = new Socket(address.addressFamily(), SocketType.STREAM, ProtocolType.TCP); 750 conn.blocking = false; 751 752 socketManager.register(this); 753 updateFlags(); 754 debug (ASOCKETS) writefln("Attempting connection to %s", address.toString()); 755 conn.connect(address); 756 } 757 catch (SocketException e) 758 return onError("Connect error: " ~ e.msg); 759 } 760 761 public: 762 /// Default constructor 763 this() 764 { 765 debug (ASOCKETS) writefln("New ClientSocket @ %s", cast(void*)this); 766 } 767 768 /// Start establishing a connection. 769 final void connect(string host, ushort port) 770 { 771 if (conn || connected) 772 throw new Exception("Socket object is already connected"); 773 774 try 775 { 776 addressQueue = getAddress(host, port); 777 enforce(addressQueue.length, "No addresses found"); 778 if (addressQueue.length > 1) 779 randomShuffle(addressQueue); 780 } 781 catch (SocketException e) 782 return onError("Lookup error: " ~ e.msg); 783 784 tryNextAddress(); 785 } 786 787 static const DefaultDisconnectReason = "Software closed the connection"; 788 789 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 790 void disconnect(string reason = DefaultDisconnectReason, DisconnectType type = DisconnectType.Requested) 791 { 792 scope(success) updateFlags(); 793 794 if (writePending) 795 { 796 if (type==DisconnectType.Requested) 797 { 798 assert(conn, "Attempting to disconnect on an uninitialized socket"); 799 // queue disconnect after all data is sent 800 debug (ASOCKETS) writefln("[%s] Queueing disconnect: %s", remoteAddress, reason); 801 assert(!disconnecting, "Attempting to disconnect on a disconnecting socket"); 802 disconnecting = true; 803 setIdleTimeout(30.seconds); 804 return; 805 } 806 else 807 discardQueues(); 808 } 809 810 debug (ASOCKETS) writefln("[%s] Disconnecting: %s", remoteAddress, reason); 811 if (conn) 812 { 813 socketManager.unregister(this); 814 conn.close(); 815 conn = null; 816 outQueue[] = null; 817 connected = disconnecting = false; 818 } 819 else 820 { 821 assert(!connected); 822 } 823 if (idleTask && idleTask.isWaiting()) 824 idleTask.cancel(); 825 if (handleDisconnect) 826 handleDisconnect(this, reason, type); 827 } 828 829 /// Append data to the send buffer. 830 final void send(Data datum, int priority = DEFAULT_PRIORITY) 831 { 832 Data[1] data; 833 data[0] = datum; 834 send(data); 835 data[] = Data.init; 836 } 837 838 /// ditto 839 void send(Data[] data, int priority = DEFAULT_PRIORITY) 840 { 841 assert(connected, "Attempting to send on a disconnected socket"); 842 assert(!disconnecting, "Attempting to send on a disconnecting socket"); 843 outQueue[priority] ~= data; 844 notifyWrite = true; // Fast updateFlags() 845 846 debug (PRINTDATA) 847 { 848 std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress); 849 foreach (datum; data) 850 if (datum.length) 851 std.stdio.write(hexDump(datum.contents)); 852 else 853 std.stdio.writeln("(empty Data)"); 854 std.stdio.stdout.flush(); 855 } 856 } 857 858 final void clearQueue(int priority) 859 { 860 if (partiallySent[priority]) 861 { 862 assert(outQueue[priority].length > 0); 863 outQueue[priority] = outQueue[priority][0..1]; 864 } 865 else 866 outQueue[priority] = null; 867 updateFlags(); 868 } 869 870 /// Clears all queues, even partially sent content. 871 private final void discardQueues() 872 { 873 foreach (priority; 0..MAX_PRIORITY+1) 874 { 875 outQueue[priority] = null; 876 partiallySent[priority] = false; 877 } 878 updateFlags(); 879 } 880 881 @property 882 final bool writePending() 883 { 884 foreach (queue; outQueue) 885 if (queue.length) 886 return true; 887 return false; 888 } 889 890 final bool queuePresent(int priority) 891 { 892 if (partiallySent[priority]) 893 { 894 assert(outQueue[priority].length > 0); 895 return outQueue[priority].length > 1; 896 } 897 else 898 return outQueue[priority].length > 0; 899 } 900 901 void cancelIdleTimeout() 902 { 903 assert(idleTask !is null); 904 assert(idleTask.isWaiting()); 905 idleTask.cancel(); 906 } 907 908 void resumeIdleTimeout() 909 { 910 assert(connected); 911 assert(idleTask !is null); 912 assert(!idleTask.isWaiting()); 913 mainTimer.add(idleTask); 914 } 915 916 final void setIdleTimeout(Duration duration) 917 { 918 assert(duration > Duration.zero); 919 if (idleTask is null) 920 { 921 idleTask = new TimerTask(duration); 922 idleTask.handleTask = &onTask_Idle; 923 } 924 else 925 { 926 if (idleTask.isWaiting()) 927 idleTask.cancel(); 928 idleTask.delay = duration; 929 } 930 if (connected) 931 mainTimer.add(idleTask); 932 } 933 934 void markNonIdle() 935 { 936 assert(idleTask !is null); 937 if (idleTask.isWaiting()) 938 idleTask.restart(); 939 } 940 941 final bool isConnected() 942 { 943 return connected && !disconnecting; 944 } 945 946 public: 947 /// Callback for when a connection has been established. 948 void delegate(ClientSocket sender) handleConnect; 949 /// Callback for when a connection was closed. 950 void delegate(ClientSocket sender, string reason, DisconnectType type) handleDisconnect; 951 /// Callback for when a connection has stopped responding. 952 void delegate(ClientSocket sender) handleIdleTimeout; 953 /// Callback for when the send buffer has been flushed. 954 void delegate(ClientSocket sender) handleBufferFlushed; 955 956 alias void delegate(ClientSocket sender, Data data) ReadDataHandler; 957 958 private ReadDataHandler _handleReadData; 959 /// Callback for incoming data. 960 /// Data will not be received unless this handler is set. 961 @property final ReadDataHandler handleReadData() { return _handleReadData; } 962 /// ditto 963 @property final void handleReadData(ReadDataHandler value) { _handleReadData = value; updateFlags(); } 964 } 965 966 /// An asynchronous server socket. 967 final class GenericServerSocket(T : ClientSocket) 968 { 969 private: 970 /// Class that actually performs listening on a certain address family 971 final class Listener : GenericSocket 972 { 973 this(Socket conn) 974 { 975 debug (ASOCKETS) writefln("New Listener @ %s", cast(void*)this); 976 this.conn = conn; 977 socketManager.register(this); 978 } 979 980 /// Called when a socket is readable. 981 override void onReadable() 982 { 983 Socket acceptSocket = conn.accept(); 984 acceptSocket.blocking = false; 985 if (handleAccept) 986 { 987 T connection = new T(acceptSocket); 988 connection.setKeepAlive(); 989 //assert(connection.connected); 990 //connection.connected = true; 991 _handleAccept(connection); 992 } 993 else 994 acceptSocket.close(); 995 } 996 997 /// Called when a socket is writable. 998 override void onWritable() 999 { 1000 } 1001 1002 /// Called when an error occurs on the socket. 1003 override void onError(string reason) 1004 { 1005 close(); // call parent 1006 } 1007 1008 void closeListener() 1009 { 1010 assert(conn); 1011 socketManager.unregister(this); 1012 conn.close(); 1013 conn = null; 1014 } 1015 } 1016 1017 /// Whether the socket is listening. 1018 bool listening; 1019 /// Listener instances 1020 Listener[] listeners; 1021 1022 final void updateFlags() 1023 { 1024 foreach (listener; listeners) 1025 listener.notifyRead = handleAccept !is null; 1026 } 1027 1028 public: 1029 /// Debugging aids 1030 ushort port; 1031 string addr; 1032 1033 /// Start listening on this socket. 1034 ushort listen(ushort port, string addr = null) 1035 { 1036 //debug writefln("Listening on %s:%d", addr, port); 1037 assert(!listening, "Attempting to listen on a listening socket"); 1038 1039 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1040 1041 foreach (ref addressInfo; addressInfos) 1042 { 1043 if (addressInfo.family != AddressFamily.INET && port == 0) 1044 continue; // listen on random ports only on IPv4 for now 1045 1046 try 1047 { 1048 Socket conn = new Socket(addressInfo); 1049 conn.blocking = false; 1050 if (addressInfo.family == AddressFamily.INET6) 1051 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1052 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1053 1054 conn.bind(addressInfo.address); 1055 conn.listen(8); 1056 1057 if (addressInfo.family == AddressFamily.INET) 1058 port = to!ushort(conn.localAddress().toPortString()); 1059 1060 listeners ~= new Listener(conn); 1061 } 1062 catch (SocketException e) 1063 { 1064 debug(ASOCKETS) writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1065 debug(ASOCKETS) writeln(e.msg); 1066 } 1067 } 1068 1069 if (listeners.length==0) 1070 throw new Exception("Unable to bind service"); 1071 1072 this.port = port; 1073 this.addr = addr; 1074 listening = true; 1075 1076 updateFlags(); 1077 1078 return port; 1079 } 1080 1081 @property Address[] localAddresses() 1082 { 1083 Address[] result; 1084 foreach (listener; listeners) 1085 result ~= listener.localAddress; 1086 return result; 1087 } 1088 1089 @property bool isListening() 1090 { 1091 return listening; 1092 } 1093 1094 /// Stop listening on this socket. 1095 void close() 1096 { 1097 foreach (listener;listeners) 1098 listener.closeListener(); 1099 listeners = null; 1100 listening = false; 1101 if (handleClose) 1102 handleClose(); 1103 } 1104 1105 public: 1106 /// Callback for when the socket was closed. 1107 void delegate() handleClose; 1108 1109 private void delegate(T incoming) _handleAccept; 1110 /// Callback for an incoming connection. 1111 /// Connections will not be accepted unless this handler is set. 1112 @property final void delegate(T incoming) handleAccept() { return _handleAccept; } 1113 /// ditto 1114 @property final void handleAccept(void delegate(T incoming) value) { _handleAccept = value; updateFlags(); } 1115 } 1116 1117 /// Server socket type for ordinary sockets. 1118 alias GenericServerSocket!(ClientSocket) ServerSocket; 1119 1120 /// Asynchronous class for client sockets with a line-based protocol. 1121 class LineBufferedSocket : ClientSocket 1122 { 1123 private: 1124 /// The receive buffer. 1125 Data inBuffer; 1126 1127 public: 1128 /// The protocol's line delimiter. 1129 string delimiter = "\r\n"; 1130 1131 private: 1132 /// Called when data has been received. 1133 final void onReadData(ClientSocket sender, Data data) 1134 { 1135 import std.string; 1136 auto oldBufferLength = inBuffer.length; 1137 if (oldBufferLength) 1138 inBuffer ~= data; 1139 else 1140 inBuffer = data; 1141 1142 bool gotLines; 1143 1144 if (delimiter.length == 1) 1145 { 1146 import core.stdc.string; // memchr 1147 1148 char c = delimiter[0]; 1149 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1150 while (p) 1151 { 1152 sizediff_t index = p - inBuffer.ptr; 1153 processLine(index); 1154 gotLines = true; 1155 1156 p = memchr(inBuffer.ptr, c, inBuffer.length); 1157 } 1158 } 1159 else 1160 { 1161 sizediff_t index; 1162 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1163 while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0) 1164 { 1165 processLine(index); 1166 gotLines = true; 1167 } 1168 } 1169 1170 if (gotLines) 1171 markNonIdle(); 1172 } 1173 1174 final void processLine(size_t index) 1175 { 1176 auto line = inBuffer[0..index]; 1177 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1178 1179 if (_handleReadLine) 1180 _handleReadLine(this, cast(string)line.toHeap()); 1181 } 1182 1183 final void updateHandler() 1184 { 1185 handleReadData = _handleReadLine ? &onReadData : null; 1186 } 1187 1188 public: 1189 override void cancelIdleTimeout() { assert(false); } 1190 override void resumeIdleTimeout() { assert(false); } 1191 //override void setIdleTimeout(d_time duration) { assert(false); } 1192 //override void markNonIdle() { assert(false); } 1193 1194 this(Duration idleTimeout) 1195 { 1196 super.setIdleTimeout(idleTimeout); 1197 } 1198 1199 this(Socket conn) 1200 { 1201 super.setIdleTimeout(60.seconds); 1202 super(conn); 1203 } 1204 1205 /// Cancel a connection. 1206 override final void disconnect(string reason = DefaultDisconnectReason, DisconnectType type = DisconnectType.Requested) 1207 { 1208 super.disconnect(reason, type); 1209 inBuffer.clear(); 1210 } 1211 1212 /// Append a line to the send buffer. 1213 void send(string line) 1214 { 1215 super.send(Data(line ~ delimiter)); 1216 } 1217 1218 public: 1219 alias void delegate(LineBufferedSocket sender, string line) ReadLineHandler; 1220 1221 private ReadLineHandler _handleReadLine; 1222 /// Callback for an incoming line. 1223 /// Data will not be received unless this handler is set. 1224 @property final ReadLineHandler handleReadLine() { return _handleReadLine; } 1225 /// ditto 1226 @property final void handleReadLine(ReadLineHandler value) { _handleReadLine = value; updateHandler(); } 1227 } 1228 1229 /// The default socket manager. 1230 // __gshared for ae.sys.shutdown 1231 //__gshared 1232 SocketManager socketManager; 1233 1234 // *************************************************************************** 1235 1236 unittest 1237 { 1238 void testTimer() 1239 { 1240 bool fired; 1241 setTimeout({fired = true;}, 10.msecs); 1242 socketManager.loop(); 1243 assert(fired); 1244 } 1245 1246 testTimer(); 1247 }