1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <ae@cy.md> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.dataset : DataVec; 20 import ae.sys.timing; 21 import ae.utils.array : asSlice, asBytes; 22 import ae.utils.math; 23 public import ae.sys.data; 24 25 import core.stdc.stdint : int32_t; 26 27 import std.exception; 28 import std.parallelism : totalCPUs; 29 import std.socket; 30 import std.string : format; 31 public import std.socket : Address, AddressInfo, Socket; 32 33 version (Windows) 34 private import c_socks = core.sys.windows.winsock2; 35 else version (Posix) 36 private import c_socks = core.sys.posix.sys.socket; 37 38 debug(ASOCKETS) import std.stdio : stderr; 39 debug(PRINTDATA) import std.stdio : stderr; 40 debug(PRINTDATA) import ae.utils.text : hexDump; 41 private import std.conv : to; 42 43 44 // https://issues.dlang.org/show_bug.cgi?id=7016 45 static import ae.utils.array; 46 47 version(LIBEV) 48 { 49 import deimos.ev; 50 pragma(lib, "ev"); 51 } 52 53 version(Windows) 54 { 55 import core.sys.windows.windows : Sleep; 56 private enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 57 } 58 else 59 private enum USE_SLEEP = false; 60 61 int eventCounter; 62 63 version (LIBEV) 64 { 65 // Watchers are a GenericSocket field (as declared in SocketMixin). 66 // Use one watcher per read and write event. 67 // Start/stop those as higher-level code declares interest in those events. 68 // Use the "data" ev_io field to store the parent GenericSocket address. 69 // Also use the "data" field as a flag to indicate whether the watcher is active 70 // (data is null when the watcher is stopped). 71 72 /// `libev`-based event loop implementation. 73 struct SocketManager 74 { 75 private: 76 size_t count; 77 78 void delegate()[] nextTickHandlers; 79 80 extern(C) 81 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 82 { 83 auto socket = cast(GenericSocket)w.data; 84 assert(socket, "libev event fired on stopped watcher"); 85 debug (ASOCKETS) stderr.writefln("ioCallback(%s, 0x%X)", socket, revents); 86 87 // TODO? Need to get proper SocketManager instance to call updateTimer on 88 socketManager.preEvent(); 89 90 if (revents & EV_READ) 91 socket.onReadable(); 92 else 93 if (revents & EV_WRITE) 94 socket.onWritable(); 95 else 96 assert(false, "Unknown event fired from libev"); 97 98 socketManager.postEvent(false); 99 } 100 101 ev_timer evTimer; 102 MonoTime lastNextEvent = MonoTime.max; 103 104 extern(C) 105 static void timerCallback(ev_loop_t* l, ev_timer* w, int /*revents*/) 106 { 107 debug (ASOCKETS) stderr.writefln("Timer callback called."); 108 109 socketManager.preEvent(); // This also updates socketManager.now 110 mainTimer.prod(socketManager.now); 111 112 socketManager.postEvent(true); 113 debug (ASOCKETS) stderr.writefln("Timer callback exiting."); 114 } 115 116 /// Called upon waking up, before calling any users' event handlers. 117 void preEvent() 118 { 119 eventCounter++; 120 socketManager.now = MonoTime.currTime(); 121 } 122 123 /// Called before going back to sleep, after calling any users' event handlers. 124 void postEvent(bool wokeDueToTimeout) 125 { 126 while (nextTickHandlers.length) 127 { 128 auto thisTickHandlers = nextTickHandlers; 129 nextTickHandlers = null; 130 131 foreach (handler; thisTickHandlers) 132 handler(); 133 } 134 135 socketManager.updateTimer(wokeDueToTimeout); 136 } 137 138 void updateTimer(bool force) 139 { 140 auto nextEvent = mainTimer.getNextEvent(); 141 if (force || lastNextEvent != nextEvent) 142 { 143 debug (ASOCKETS) stderr.writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 144 if (nextEvent == MonoTime.max) // Stopping 145 { 146 if (lastNextEvent != MonoTime.max) 147 ev_timer_stop(ev_default_loop(0), &evTimer); 148 } 149 else 150 { 151 auto remaining = mainTimer.getRemainingTime(socketManager.now); 152 while (remaining <= Duration.zero) 153 { 154 debug (ASOCKETS) stderr.writefln("remaining=%s, prodding timer.", remaining); 155 mainTimer.prod(socketManager.now); 156 remaining = mainTimer.getRemainingTime(socketManager.now); 157 } 158 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 159 debug (ASOCKETS) stderr.writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 160 if (lastNextEvent == MonoTime.max) // Starting 161 { 162 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 163 ev_timer_start(ev_default_loop(0), &evTimer); 164 } 165 else // Adjusting 166 { 167 evTimer.repeat = tstamp; 168 ev_timer_again(ev_default_loop(0), &evTimer); 169 } 170 } 171 lastNextEvent = nextEvent; 172 } 173 } 174 175 public: 176 MonoTime now; 177 178 /// Register a socket with the manager. 179 void register(GenericSocket socket) 180 { 181 debug (ASOCKETS) stderr.writefln("Registering %s", socket); 182 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 183 auto fd = socket.conn.handle; 184 assert(fd, "Must have fd before socket registration"); 185 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 186 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 187 count++; 188 } 189 190 /// Unregister a socket with the manager. 191 void unregister(GenericSocket socket) 192 { 193 debug (ASOCKETS) stderr.writefln("Unregistering %s", socket); 194 socket.notifyRead = false; 195 socket.notifyWrite = false; 196 count--; 197 } 198 199 /// Returns the number of registered sockets. 200 size_t size() 201 { 202 return count; 203 } 204 205 /// Loop continuously until no sockets are left. 206 void loop() 207 { 208 auto evLoop = ev_default_loop(0); 209 enforce(evLoop, "libev initialization failure"); 210 211 updateTimer(true); 212 debug (ASOCKETS) stderr.writeln("ev_run"); 213 ev_run(ev_default_loop(0), 0); 214 } 215 } 216 217 private mixin template SocketMixin() 218 { 219 private ev_io evRead, evWrite; 220 221 private final void setWatcherState(ref ev_io ev, bool newValue, int /*event*/) 222 { 223 if (!conn) 224 { 225 // Can happen when setting delegates before connecting. 226 return; 227 } 228 229 if (newValue && !ev.data) 230 { 231 // Start 232 ev.data = cast(void*)this; 233 ev_io_start(ev_default_loop(0), &ev); 234 } 235 else 236 if (!newValue && ev.data) 237 { 238 // Stop 239 assert(ev.data is cast(void*)this); 240 ev.data = null; 241 ev_io_stop(ev_default_loop(0), &ev); 242 } 243 } 244 245 private final bool getWatcherState(ref ev_io ev) { return !!ev.data; } 246 247 // Flags that determine socket wake-up events. 248 249 /// Interested in read notifications (onReadable)? 250 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 251 @property final bool notifyRead () { return getWatcherState(evRead); } /// ditto 252 /// Interested in write notifications (onWritable)? 253 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 254 @property final bool notifyWrite() { return getWatcherState(evWrite); } /// ditto 255 256 debug ~this() @nogc 257 { 258 // The LIBEV SocketManager holds no references to registered sockets. 259 // TODO: Add a doubly-linked list? 260 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 261 } 262 } 263 } 264 else // Use select 265 { 266 /// `select`-based event loop implementation. 267 struct SocketManager 268 { 269 private: 270 enum FD_SETSIZE = 1024; 271 272 /// List of all sockets to poll. 273 GenericSocket[] sockets; 274 275 /// Debug AA to check for dangling socket references. 276 debug GenericSocket[socket_t] socketHandles; 277 278 void delegate()[] nextTickHandlers, idleHandlers; 279 280 public: 281 MonoTime now; 282 283 /// Register a socket with the manager. 284 void register(GenericSocket conn) 285 { 286 debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1); 287 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 288 sockets ~= conn; 289 290 debug 291 { 292 auto handle = conn.socket.handle; 293 assert(handle != socket_t.init, "Can't register a closed socket"); 294 assert(handle !in socketHandles, "This socket handle is already registered"); 295 socketHandles[handle] = conn; 296 } 297 } 298 299 /// Unregister a socket with the manager. 300 void unregister(GenericSocket conn) 301 { 302 debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 303 304 debug 305 { 306 auto handle = conn.socket.handle; 307 assert(handle != socket_t.init, "Can't unregister a closed socket"); 308 auto pconn = handle in socketHandles; 309 assert(pconn, "This socket handle is not registered"); 310 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 311 socketHandles.remove(handle); 312 } 313 314 foreach (size_t i, GenericSocket j; sockets) 315 if (j is conn) 316 { 317 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 318 return; 319 } 320 assert(false, "Socket not registered"); 321 } 322 323 /// Returns the number of registered sockets. 324 size_t size() 325 { 326 return sockets.length; 327 } 328 329 /// Loop continuously until no sockets are left. 330 void loop() 331 { 332 debug (ASOCKETS) stderr.writeln("Starting event loop."); 333 334 SocketSet readset, writeset; 335 size_t sockcount; 336 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 337 readset = new SocketSet(setSize); 338 writeset = new SocketSet(setSize); 339 while (true) 340 { 341 if (nextTickHandlers.length) 342 { 343 auto thisTickHandlers = nextTickHandlers; 344 nextTickHandlers = null; 345 346 foreach (handler; thisTickHandlers) 347 handler(); 348 349 continue; 350 } 351 352 uint minSize = 0; 353 version(Windows) 354 minSize = cast(uint)sockets.length; 355 else 356 { 357 foreach (s; sockets) 358 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 359 minSize = s.socket.handle; 360 } 361 minSize++; 362 363 if (setSize < minSize) 364 { 365 debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 366 setSize = minSize * 2; 367 readset = new SocketSet(setSize); 368 writeset = new SocketSet(setSize); 369 } 370 else 371 { 372 readset.reset(); 373 writeset.reset(); 374 } 375 376 sockcount = 0; 377 bool haveActive; 378 debug (ASOCKETS) stderr.writeln("Populating sets"); 379 foreach (GenericSocket conn; sockets) 380 { 381 if (!conn.socket) 382 continue; 383 sockcount++; 384 385 debug (ASOCKETS) stderr.writef("\t%s:", conn); 386 if (conn.notifyRead) 387 { 388 readset.add(conn.socket); 389 if (!conn.daemonRead) 390 haveActive = true; 391 debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : ""); 392 } 393 if (conn.notifyWrite) 394 { 395 writeset.add(conn.socket); 396 if (!conn.daemonWrite) 397 haveActive = true; 398 debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : ""); 399 } 400 debug (ASOCKETS) stderr.writeln(); 401 } 402 debug (ASOCKETS) 403 { 404 stderr.writefln("Sets populated as follows:"); 405 printSets(readset, writeset); 406 } 407 408 debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...", 409 haveActive ? "" : "not ", 410 sockcount, 411 mainTimer.isWaiting() ? "with" : "no", 412 idleHandlers.length, 413 ); 414 if (!haveActive && !mainTimer.isWaiting() && !nextTickHandlers.length) 415 { 416 debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop."); 417 break; 418 } 419 420 debug (ASOCKETS) stderr.flush(); 421 422 int events; 423 if (idleHandlers.length) 424 { 425 if (sockcount==0) 426 events = 0; 427 else 428 events = Socket.select(readset, writeset, null, 0.seconds); 429 } 430 else 431 if (USE_SLEEP && sockcount==0) 432 { 433 version (Windows) 434 { 435 now = MonoTime.currTime(); 436 auto duration = mainTimer.getRemainingTime(now).total!"msecs"(); 437 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 438 if (duration <= 0) 439 duration = 1; // Avoid busywait 440 else 441 if (duration > int.max) 442 duration = int.max; 443 Sleep(cast(int)duration); 444 events = 0; 445 } 446 else 447 assert(0); 448 } 449 else 450 if (mainTimer.isWaiting()) 451 { 452 // Refresh time before sleeping, to ensure that a 453 // slow event handler does not skew everything else 454 now = MonoTime.currTime(); 455 456 events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime(now)); 457 } 458 else 459 events = Socket.select(readset, writeset, null); 460 461 debug (ASOCKETS) stderr.writefln("%d events fired.", events); 462 463 // Update time after sleeping 464 now = MonoTime.currTime(); 465 466 if (events > 0) 467 { 468 // Handle just one event at a time, as the first 469 // handler might invalidate select()'s results. 470 handleEvent(readset, writeset); 471 } 472 else 473 if (idleHandlers.length) 474 { 475 import ae.utils.array : shift; 476 auto handler = idleHandlers.shift(); 477 478 // Rotate the idle handler queue before running it, 479 // in case the handler unregisters itself. 480 idleHandlers ~= handler; 481 482 handler(); 483 } 484 485 // Timers may invalidate our select results, so fire them after processing the latter 486 mainTimer.prod(now); 487 488 eventCounter++; 489 } 490 } 491 492 debug (ASOCKETS) 493 private void printSets(SocketSet readset, SocketSet writeset) 494 { 495 foreach (GenericSocket conn; sockets) 496 { 497 if (!conn.socket) 498 stderr.writefln("\t\t%s is unset", conn); 499 else 500 { 501 if (readset.isSet(conn.socket)) 502 stderr.writefln("\t\t%s is readable", conn); 503 if (writeset.isSet(conn.socket)) 504 stderr.writefln("\t\t%s is writable", conn); 505 } 506 } 507 } 508 509 private void handleEvent(SocketSet readset, SocketSet writeset) 510 { 511 debug (ASOCKETS) 512 { 513 stderr.writefln("\tSelect results:"); 514 printSets(readset, writeset); 515 } 516 517 foreach (GenericSocket conn; sockets) 518 { 519 if (!conn.socket) 520 continue; 521 522 if (readset.isSet(conn.socket)) 523 { 524 debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn); 525 return conn.onReadable(); 526 } 527 else 528 if (writeset.isSet(conn.socket)) 529 { 530 debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn); 531 return conn.onWritable(); 532 } 533 } 534 535 assert(false, "select() reported events available, but no registered sockets are set"); 536 } 537 } 538 539 // Use UFCS to allow removeIdleHandler to have a predicate with context 540 /// Register a function to be called when the event loop is idle, 541 /// and would otherwise sleep. 542 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 543 { 544 foreach (i, idleHandler; socketManager.idleHandlers) 545 assert(handler !is idleHandler); 546 547 socketManager.idleHandlers ~= handler; 548 } 549 550 /// Unregister a function previously registered with `addIdleHandler`. 551 void removeIdleHandler(alias pred=(a, b) => a is b, Args...)(ref SocketManager socketManager, Args args) 552 { 553 foreach (i, idleHandler; socketManager.idleHandlers) 554 if (pred(idleHandler, args)) 555 { 556 import std.algorithm : remove; 557 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 558 return; 559 } 560 assert(false, "No such idle handler"); 561 } 562 563 private mixin template SocketMixin() 564 { 565 // Flags that determine socket wake-up events. 566 567 /// Interested in read notifications (onReadable)? 568 bool notifyRead; 569 /// Interested in write notifications (onWritable)? 570 bool notifyWrite; 571 } 572 } 573 574 /// The default socket manager. 575 SocketManager socketManager; 576 577 /// Schedule a function to run on the next event loop iteration. 578 /// Can be used to queue logic to run once all current execution frames exit. 579 /// Similar to e.g. process.nextTick in Node. 580 void onNextTick(ref SocketManager socketManager, void delegate() dg) pure @safe nothrow 581 { 582 socketManager.nextTickHandlers ~= dg; 583 } 584 585 /// The current monotonic time. 586 /// Updated by the event loop whenever it is awoken. 587 @property MonoTime now() 588 { 589 if (socketManager.now == MonoTime.init) 590 { 591 // Event loop not yet started. 592 socketManager.now = MonoTime.currTime(); 593 } 594 return socketManager.now; 595 } 596 597 // *************************************************************************** 598 599 /// General methods for an asynchronous socket. 600 abstract class GenericSocket 601 { 602 /// Declares notifyRead and notifyWrite. 603 mixin SocketMixin; 604 605 protected: 606 /// The socket this class wraps. 607 Socket conn; 608 609 // protected: 610 void onReadable() 611 { 612 } 613 614 void onWritable() 615 { 616 } 617 618 void onError(string /*reason*/) 619 { 620 } 621 622 public: 623 /// Retrieve the socket class this class wraps. 624 @property final Socket socket() 625 { 626 return conn; 627 } 628 629 /// allow getting the address of connections that are already disconnected 630 private Address[2] cachedAddress; 631 632 /*private*/ final @property Address _address(bool local)() 633 { 634 if (cachedAddress[local] !is null) 635 return cachedAddress[local]; 636 else 637 if (conn is null) 638 return null; 639 else 640 { 641 Address a; 642 if (conn.addressFamily == AddressFamily.UNSPEC) 643 { 644 // Socket will attempt to construct an UnknownAddress, 645 // which will almost certainly not match the real address length. 646 static if (local) 647 alias getname = c_socks.getsockname; 648 else 649 alias getname = c_socks.getpeername; 650 651 c_socks.socklen_t nameLen = 0; 652 if (getname(conn.handle, null, &nameLen) < 0) 653 throw new SocketOSException("Unable to obtain socket address"); 654 655 auto buf = new ubyte[nameLen]; 656 auto sa = cast(c_socks.sockaddr*)buf.ptr; 657 if (getname(conn.handle, sa, &nameLen) < 0) 658 throw new SocketOSException("Unable to obtain socket address"); 659 a = new UnknownAddressReference(sa, nameLen); 660 } 661 else 662 a = local ? conn.localAddress() : conn.remoteAddress(); 663 return cachedAddress[local] = a; 664 } 665 } 666 667 alias localAddress = _address!true; /// Retrieve this socket's local address. 668 alias remoteAddress = _address!false; /// Retrieve this socket's remote address. 669 670 /*private*/ final @property string _addressStr(bool local)() nothrow 671 { 672 try 673 { 674 auto a = _address!local; 675 if (a is null) 676 return "[null address]"; 677 string host = a.toAddrString(); 678 import std.string : indexOf; 679 if (host.indexOf(':') >= 0) 680 host = "[" ~ host ~ "]"; 681 try 682 { 683 string port = a.toPortString(); 684 return host ~ ":" ~ port; 685 } 686 catch (Exception e) 687 return host; 688 } 689 catch (Exception e) 690 return "[error: " ~ e.msg ~ "]"; 691 } 692 693 alias localAddressStr = _addressStr!true; /// Retrieve this socket's local address, as a string. 694 alias remoteAddressStr = _addressStr!false; /// Retrieve this socket's remote address, as a string. 695 696 /// Don't block the process from exiting, even if the socket is ready to receive data. 697 /// TODO: Not implemented with libev 698 bool daemonRead; 699 700 /// Don't block the process from exiting, even if the socket is ready to send data. 701 /// TODO: Not implemented with libev 702 bool daemonWrite; 703 704 deprecated alias daemon = daemonRead; 705 706 /// Enable TCP keep-alive on the socket with the given settings. 707 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 708 { 709 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 710 if (enabled) 711 { 712 try 713 conn.setKeepAlive(time, interval); 714 catch (SocketException) 715 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 716 } 717 else 718 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 719 } 720 721 /// Returns a string containing the class name, address, and file descriptor. 722 override string toString() const 723 { 724 import std.string : format, split; 725 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 726 } 727 } 728 729 // *************************************************************************** 730 731 /// Classifies the cause of the disconnect. 732 /// Can be used to decide e.g. when it makes sense to reconnect. 733 enum DisconnectType 734 { 735 requested, /// Initiated by the application. 736 graceful, /// The peer gracefully closed the connection. 737 error /// Some abnormal network condition. 738 } 739 740 /// Used to indicate the state of a connection throughout its lifecycle. 741 enum ConnectionState 742 { 743 /// The initial state, or the state after a disconnect was fully processed. 744 disconnected, 745 746 /// Name resolution. Currently done synchronously. 747 resolving, 748 749 /// A connection attempt is in progress. 750 connecting, 751 752 /// A connection is established. 753 connected, 754 755 /// Disconnecting in progress. No data can be sent or received at this point. 756 /// We are waiting for queued data to be actually sent before disconnecting. 757 disconnecting, 758 } 759 760 /// Returns true if this is a connection state for which disconnecting is valid. 761 /// Generally, applications should be aware of the life cycle of their sockets, 762 /// so checking the state of a connection is unnecessary (and a code smell). 763 /// However, unconditionally disconnecting some connected sockets can be useful 764 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle), 765 /// such as in response to a signal. 766 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; } 767 768 /// Common interface for connections and adapters. 769 interface IConnection 770 { 771 /// `send` queues data for sending in one of five queues, indexed 772 /// by a numeric priority. 773 /// `MAX_PRIORITY` is the highest (least urgent) priority index. 774 /// `DEFAULT_PRIORITY` is the default priority 775 enum MAX_PRIORITY = 4; 776 enum DEFAULT_PRIORITY = 2; /// ditto 777 778 /// This is the default value for the `disconnect` `reason` string parameter. 779 static const defaultDisconnectReason = "Software closed the connection"; 780 781 /// Get connection state. 782 @property ConnectionState state(); 783 784 /// Has a connection been established? 785 deprecated final @property bool connected() { return state == ConnectionState.connected; } 786 787 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 788 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 789 790 /// Queue Data for sending. 791 void send(scope Data[] data, int priority = DEFAULT_PRIORITY); 792 793 /// ditto 794 final void send(Data datum, int priority = DEFAULT_PRIORITY) 795 { 796 this.send(datum.asSlice, priority); 797 } 798 799 /// Terminate the connection. 800 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 801 802 /// Callback setter for when a connection has been established 803 /// (if applicable). 804 alias ConnectHandler = void delegate(); 805 @property void handleConnect(ConnectHandler value); /// ditto 806 807 /// Callback setter for when new data is read. 808 alias ReadDataHandler = void delegate(Data data); 809 @property void handleReadData(ReadDataHandler value); /// ditto 810 811 /// Callback setter for when a connection was closed. 812 alias DisconnectHandler = void delegate(string reason, DisconnectType type); 813 @property void handleDisconnect(DisconnectHandler value); /// ditto 814 815 /// Callback setter for when all queued data has been sent. 816 alias BufferFlushedHandler = void delegate(); 817 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 818 } 819 820 // *************************************************************************** 821 822 /// Implementation of `IConnection` using a socket. 823 /// Implements receiving data when readable and sending queued data 824 /// when writable. 825 class Connection : GenericSocket, IConnection 826 { 827 private: 828 ConnectionState _state; 829 final @property ConnectionState state(ConnectionState value) { return _state = value; } 830 831 public: 832 /// Get connection state. 833 override @property ConnectionState state() { return _state; } 834 835 protected: 836 abstract sizediff_t doSend(scope const(void)[] buffer); 837 enum sizediff_t doReceiveEOF = -1; 838 abstract sizediff_t doReceive(scope void[] buffer); 839 840 /// The send buffers. 841 DataVec[MAX_PRIORITY+1] outQueue; 842 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 843 int partiallySent = -1; 844 845 /// Constructor used by a ServerSocket for new connections 846 this(Socket conn) 847 { 848 this(); 849 this.conn = conn; 850 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 851 if (conn) 852 socketManager.register(this); 853 updateFlags(); 854 } 855 856 final void updateFlags() 857 { 858 if (state == ConnectionState.connecting) 859 notifyWrite = true; 860 else 861 notifyWrite = writePending; 862 863 notifyRead = state == ConnectionState.connected && readDataHandler; 864 debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite); 865 } 866 867 // We reuse the same buffer across read calls. 868 // It is allocated on the first read, and also 869 // if the user code decides to keep a reference to it. 870 static Data inBuffer; 871 872 /// Called when a socket is readable. 873 override void onReadable() 874 { 875 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 876 if (!inBuffer) 877 inBuffer = Data(0x10000); 878 else 879 inBuffer = inBuffer.ensureUnique(); 880 sizediff_t received; 881 inBuffer.enter((scope contents) { 882 received = doReceive(contents); 883 }); 884 885 if (received == doReceiveEOF) 886 return disconnect("Connection closed", DisconnectType.graceful); 887 888 if (received == Socket.ERROR) 889 { 890 // if (wouldHaveBlocked) 891 // { 892 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 893 // return; 894 // } 895 // else 896 onError("recv() error: " ~ lastSocketError); 897 } 898 else 899 { 900 debug (PRINTDATA) 901 { 902 stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr); 903 stderr.write(hexDump(inBuffer.unsafeContents[0 .. received])); 904 stderr.flush(); 905 } 906 907 if (state == ConnectionState.disconnecting) 908 { 909 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this); 910 } 911 else 912 if (!readDataHandler) 913 { 914 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this); 915 } 916 else 917 { 918 auto data = inBuffer[0 .. received]; 919 readDataHandler(data); 920 } 921 } 922 } 923 924 /// Called when an error occurs on the socket. 925 override void onError(string reason) 926 { 927 if (state == ConnectionState.disconnecting) 928 { 929 debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 930 return close(); 931 } 932 933 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 934 disconnect("Socket error: " ~ reason, DisconnectType.error); 935 } 936 937 this() 938 { 939 } 940 941 public: 942 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 943 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 944 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 945 { 946 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 947 assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state)); 948 949 if (writePending) 950 { 951 if (type==DisconnectType.requested) 952 { 953 assert(conn, "Attempting to disconnect on an uninitialized socket"); 954 // queue disconnect after all data is sent 955 debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 956 state = ConnectionState.disconnecting; 957 //setIdleTimeout(30.seconds); 958 if (disconnectHandler) 959 disconnectHandler(reason, type); 960 updateFlags(); 961 return; 962 } 963 else 964 discardQueues(); 965 } 966 967 debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 968 969 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 970 close(); 971 else 972 { 973 assert(conn is null, "Registered but %s socket".format(state)); 974 if (state == ConnectionState.resolving) 975 state = ConnectionState.disconnected; 976 } 977 978 if (disconnectHandler) 979 disconnectHandler(reason, type); 980 updateFlags(); 981 } 982 983 private final void close() 984 { 985 assert(conn, "Attempting to close an unregistered socket"); 986 socketManager.unregister(this); 987 conn.close(); 988 conn = null; 989 outQueue[] = DataVec.init; 990 state = ConnectionState.disconnected; 991 } 992 993 /// Append data to the send buffer. 994 void send(scope Data[] data, int priority = DEFAULT_PRIORITY) 995 { 996 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 997 outQueue[priority] ~= data; 998 notifyWrite = true; // Fast updateFlags() 999 1000 debug (PRINTDATA) 1001 { 1002 stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr); 1003 foreach (datum; data) 1004 if (datum.length) 1005 stderr.write(hexDump(datum.unsafeContents)); 1006 else 1007 stderr.writeln("(empty Data)"); 1008 stderr.flush(); 1009 } 1010 } 1011 1012 /// ditto 1013 alias send = IConnection.send; 1014 1015 /// Cancel all queued `Data` packets with the given priority. 1016 /// Does not cancel any partially-sent `Data`. 1017 final void clearQueue(int priority) 1018 { 1019 if (priority == partiallySent) 1020 { 1021 assert(outQueue[priority].length > 0); 1022 outQueue[priority].length = 1; 1023 } 1024 else 1025 outQueue[priority] = null; 1026 updateFlags(); 1027 } 1028 1029 /// Clears all queues, even partially sent content. 1030 private final void discardQueues() 1031 { 1032 foreach (priority; 0..MAX_PRIORITY+1) 1033 outQueue[priority] = null; 1034 partiallySent = -1; 1035 updateFlags(); 1036 } 1037 1038 /// Returns true if any queues have pending data. 1039 @property 1040 final bool writePending() 1041 { 1042 foreach (ref queue; outQueue) 1043 if (queue.length) 1044 return true; 1045 return false; 1046 } 1047 1048 /// Returns true if there are any queued `Data` which have not yet 1049 /// begun to be sent. 1050 final bool queuePresent(int priority = DEFAULT_PRIORITY) 1051 { 1052 if (priority == partiallySent) 1053 { 1054 assert(outQueue[priority].length > 0); 1055 return outQueue[priority].length > 1; 1056 } 1057 else 1058 return outQueue[priority].length > 0; 1059 } 1060 1061 /// Returns the number of queued `Data` at the given priority. 1062 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 1063 { 1064 return outQueue[priority].length; 1065 } 1066 1067 /// Returns the number of queued bytes at the given priority. 1068 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 1069 { 1070 size_t bytes; 1071 foreach (datum; outQueue[priority]) 1072 bytes += datum.length; 1073 return bytes; 1074 } 1075 1076 // public: 1077 private ConnectHandler connectHandler; 1078 /// Callback for when a connection has been established. 1079 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 1080 1081 private ReadDataHandler readDataHandler; 1082 /// Callback for incoming data. 1083 /// Data will not be received unless this handler is set. 1084 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 1085 1086 private DisconnectHandler disconnectHandler; 1087 /// Callback for when a connection was closed. 1088 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 1089 1090 private BufferFlushedHandler bufferFlushedHandler; 1091 /// Callback setter for when all queued data has been sent. 1092 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 1093 } 1094 1095 /// Implements a stream connection. 1096 /// Queued `Data` is allowed to be fragmented. 1097 class StreamConnection : Connection 1098 { 1099 protected: 1100 this() 1101 { 1102 super(); 1103 } 1104 1105 /// Called when a socket is writable. 1106 override void onWritable() 1107 { 1108 //scope(success) updateFlags(); 1109 onWritableImpl(); 1110 updateFlags(); 1111 } 1112 1113 // Work around scope(success) breaking debugger stack traces 1114 final private void onWritableImpl() 1115 { 1116 debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state); 1117 if (state == ConnectionState.connecting) 1118 { 1119 int32_t error; 1120 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 1121 if (error) 1122 return disconnect(formatSocketError(error), DisconnectType.error); 1123 1124 state = ConnectionState.connected; 1125 1126 //debug writefln("[%s] Connected", remoteAddressStr); 1127 try 1128 setKeepAlive(); 1129 catch (Exception e) 1130 return disconnect(e.msg, DisconnectType.error); 1131 if (connectHandler) 1132 connectHandler(); 1133 return; 1134 } 1135 //debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1136 1137 foreach (sendPartial; [true, false]) 1138 foreach (int priority, ref queue; outQueue) 1139 while (queue.length && (!sendPartial || priority == partiallySent)) 1140 { 1141 assert(partiallySent == -1 || partiallySent == priority); 1142 1143 ptrdiff_t sent = 0; 1144 if (!queue.front.empty) 1145 { 1146 queue.front.enter((scope contents) { 1147 sent = doSend(contents); 1148 }); 1149 debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, queue.front.length); 1150 } 1151 else 1152 { 1153 debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this); 1154 } 1155 1156 if (sent == Socket.ERROR) 1157 { 1158 if (wouldHaveBlocked()) 1159 return; 1160 else 1161 return onError("send() error: " ~ lastSocketError); 1162 } 1163 else 1164 if (sent < queue.front.length) 1165 { 1166 if (sent > 0) 1167 { 1168 queue.front = queue.front[sent..queue.front.length]; 1169 partiallySent = priority; 1170 } 1171 return; 1172 } 1173 else 1174 { 1175 assert(sent == queue.front.length); 1176 //debug writefln("[%s] Sent data:", remoteAddressStr); 1177 //debug writefln("%s", hexDump(queue.front.contents[0..sent])); 1178 queue.front.clear(); 1179 queue.popFront(); 1180 partiallySent = -1; 1181 if (queue.length == 0) 1182 queue = null; 1183 } 1184 } 1185 1186 // outQueue is now empty 1187 if (bufferFlushedHandler) 1188 bufferFlushedHandler(); 1189 if (state == ConnectionState.disconnecting) 1190 { 1191 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1192 close(); 1193 } 1194 } 1195 1196 public: 1197 this(Socket conn) 1198 { 1199 super(conn); 1200 } /// 1201 } 1202 1203 // *************************************************************************** 1204 1205 /// A POSIX file stream. 1206 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1207 /// Does not dup the given file descriptor, so "disconnecting" this connection 1208 /// will close it. 1209 version (Posix) 1210 class FileConnection : StreamConnection 1211 { 1212 this(int fileno) 1213 { 1214 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1215 conn.blocking = false; 1216 super(conn); 1217 } /// 1218 1219 protected: 1220 import core.sys.posix.unistd : read, write; 1221 1222 override sizediff_t doSend(scope const(void)[] buffer) 1223 { 1224 return write(socket.handle, buffer.ptr, buffer.length); 1225 } 1226 1227 override sizediff_t doReceive(scope void[] buffer) 1228 { 1229 auto bytesRead = read(socket.handle, buffer.ptr, buffer.length); 1230 if (bytesRead == 0) 1231 return doReceiveEOF; 1232 return bytesRead; 1233 } 1234 } 1235 1236 /// Separates reading and writing, e.g. for stdin/stdout. 1237 class Duplex : IConnection 1238 { 1239 /// 1240 IConnection reader, writer; 1241 1242 this(IConnection reader, IConnection writer) 1243 { 1244 this.reader = reader; 1245 this.writer = writer; 1246 reader.handleConnect = &onConnect; 1247 writer.handleConnect = &onConnect; 1248 reader.handleDisconnect = &onDisconnect; 1249 writer.handleDisconnect = &onDisconnect; 1250 } /// 1251 1252 @property ConnectionState state() 1253 { 1254 if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting) 1255 return ConnectionState.disconnecting; 1256 else 1257 return reader.state < writer.state ? reader.state : writer.state; 1258 } /// 1259 1260 /// Queue Data for sending. 1261 void send(scope Data[] data, int priority) 1262 { 1263 writer.send(data, priority); 1264 } 1265 1266 alias send = IConnection.send; /// ditto 1267 1268 /// Terminate the connection. 1269 /// Note: this isn't quite fleshed out - applications may want to 1270 /// wait and send some more data even after stdin is closed, but 1271 /// such an interface can't be fitted into an IConnection 1272 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1273 { 1274 if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting) 1275 reader.disconnect(reason, type); 1276 if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting) 1277 writer.disconnect(reason, type); 1278 debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1279 } 1280 1281 protected void onConnect() 1282 { 1283 if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected) 1284 connectHandler(); 1285 } 1286 1287 protected void onDisconnect(string reason, DisconnectType type) 1288 { 1289 debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1290 if (disconnectHandler) 1291 { 1292 disconnectHandler(reason, type); 1293 disconnectHandler = null; // don't call it twice for the other connection 1294 } 1295 // It is our responsibility to disconnect the other connection 1296 // Use DisconnectType.requested to ensure that any written data is flushed 1297 disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested); 1298 } 1299 1300 /// Callback for when a connection has been established. 1301 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1302 private ConnectHandler connectHandler; 1303 1304 /// Callback setter for when new data is read. 1305 @property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; } 1306 1307 /// Callback setter for when a connection was closed. 1308 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1309 private DisconnectHandler disconnectHandler; 1310 1311 /// Callback setter for when all queued data has been written. 1312 @property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; } 1313 } 1314 1315 unittest { if (false) new Duplex(null, null); } 1316 1317 // *************************************************************************** 1318 1319 /// An asynchronous socket-based connection. 1320 class SocketConnection : StreamConnection 1321 { 1322 protected: 1323 AddressInfo[] addressQueue; 1324 bool datagram; 1325 1326 this(Socket conn, bool datagram = false) 1327 { 1328 super(conn); 1329 this.datagram = datagram; 1330 } 1331 1332 override sizediff_t doSend(scope const(void)[] buffer) 1333 { 1334 return conn.send(buffer); 1335 } 1336 1337 override sizediff_t doReceive(scope void[] buffer) 1338 { 1339 auto bytesReceived = conn.receive(buffer); 1340 if (bytesReceived == 0 && !datagram) 1341 return doReceiveEOF; 1342 return bytesReceived; 1343 } 1344 1345 final void tryNextAddress() 1346 { 1347 assert(state == ConnectionState.connecting); 1348 auto addressInfo = addressQueue[0]; 1349 addressQueue = addressQueue[1..$]; 1350 1351 try 1352 { 1353 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1354 conn.blocking = false; 1355 1356 socketManager.register(this); 1357 updateFlags(); 1358 debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString()); 1359 conn.connect(addressInfo.address); 1360 } 1361 catch (SocketException e) 1362 return onError("Connect error: " ~ e.msg); 1363 } 1364 1365 /// Called when an error occurs on the socket. 1366 override void onError(string reason) 1367 { 1368 if (state == ConnectionState.connecting && addressQueue.length) 1369 { 1370 socketManager.unregister(this); 1371 conn.close(); 1372 conn = null; 1373 1374 return tryNextAddress(); 1375 } 1376 1377 super.onError(reason); 1378 } 1379 1380 public: 1381 /// Default constructor 1382 this() 1383 { 1384 debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this); 1385 } 1386 1387 /// Start establishing a connection. 1388 final void connect(AddressInfo[] addresses) 1389 { 1390 assert(addresses.length, "No addresses specified"); 1391 1392 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1393 assert(!conn); 1394 1395 addressQueue = addresses; 1396 state = ConnectionState.connecting; 1397 tryNextAddress(); 1398 } 1399 } 1400 1401 /// An asynchronous TCP connection. 1402 class TcpConnection : SocketConnection 1403 { 1404 protected: 1405 this(Socket conn) 1406 { 1407 super(conn, false); 1408 } 1409 1410 public: 1411 /// Default constructor 1412 this() 1413 { 1414 debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this); 1415 } 1416 1417 /// 1418 alias connect = SocketConnection.connect; // raise overload 1419 1420 /// Start establishing a connection. 1421 final void connect(string host, ushort port) 1422 { 1423 assert(host.length, "Empty host"); 1424 assert(port, "No port specified"); 1425 1426 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1427 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1428 1429 state = ConnectionState.resolving; 1430 1431 AddressInfo[] addressInfos; 1432 try 1433 { 1434 auto addresses = getAddress(host, port); 1435 enforce(addresses.length, "No addresses found"); 1436 debug (ASOCKETS) 1437 { 1438 stderr.writefln("Resolved to %s addresses:", addresses.length); 1439 foreach (address; addresses) 1440 stderr.writefln("- %s", address.toString()); 1441 } 1442 1443 if (addresses.length > 1) 1444 { 1445 import std.random : randomShuffle; 1446 randomShuffle(addresses); 1447 } 1448 1449 foreach (address; addresses) 1450 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1451 } 1452 catch (SocketException e) 1453 return onError("Lookup error: " ~ e.msg); 1454 1455 state = ConnectionState.disconnected; 1456 connect(addressInfos); 1457 } 1458 } 1459 1460 // *************************************************************************** 1461 1462 /// An asynchronous connection server for socket-based connections. 1463 class SocketServer 1464 { 1465 protected: 1466 /// Class that actually performs listening on a certain address family 1467 final class Listener : GenericSocket 1468 { 1469 this(Socket conn) 1470 { 1471 debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this); 1472 this.conn = conn; 1473 socketManager.register(this); 1474 } 1475 1476 /// Called when a socket is readable. 1477 override void onReadable() 1478 { 1479 debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this); 1480 Socket acceptSocket = conn.accept(); 1481 acceptSocket.blocking = false; 1482 if (handleAccept) 1483 { 1484 auto connection = createConnection(acceptSocket); 1485 debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr); 1486 connection.setKeepAlive(); 1487 //assert(connection.connected); 1488 //connection.connected = true; 1489 acceptHandler(connection); 1490 } 1491 else 1492 acceptSocket.close(); 1493 } 1494 1495 /// Called when a socket is writable. 1496 override void onWritable() 1497 { 1498 } 1499 1500 /// Called when an error occurs on the socket. 1501 override void onError(string reason) 1502 { 1503 close(); // call parent 1504 } 1505 1506 void closeListener() 1507 { 1508 assert(conn); 1509 socketManager.unregister(this); 1510 conn.close(); 1511 conn = null; 1512 } 1513 } 1514 1515 SocketConnection createConnection(Socket socket) 1516 { 1517 return new SocketConnection(socket, datagram); 1518 } 1519 1520 /// Whether the socket is listening. 1521 bool listening; 1522 /// Listener instances 1523 Listener[] listeners; 1524 bool datagram; 1525 1526 final void updateFlags() 1527 { 1528 foreach (listener; listeners) 1529 listener.notifyRead = handleAccept !is null; 1530 } 1531 1532 public: 1533 /// Start listening on this socket. 1534 final void listen(AddressInfo[] addressInfos) 1535 { 1536 foreach (ref addressInfo; addressInfos) 1537 { 1538 try 1539 { 1540 Socket conn = new Socket(addressInfo); 1541 conn.blocking = false; 1542 if (addressInfo.family == AddressFamily.INET6) 1543 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1544 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1545 1546 conn.bind(addressInfo.address); 1547 conn.listen(totalCPUs * 2); 1548 1549 listeners ~= new Listener(conn); 1550 } 1551 catch (SocketException e) 1552 { 1553 debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1554 debug(ASOCKETS) stderr.writeln(e.msg); 1555 } 1556 } 1557 1558 if (listeners.length==0) 1559 throw new Exception("Unable to bind service"); 1560 1561 listening = true; 1562 1563 updateFlags(); 1564 } 1565 1566 this() 1567 { 1568 this(false); 1569 } /// 1570 1571 this(bool datagram) 1572 { 1573 this.datagram = datagram; 1574 } /// 1575 1576 /// Creates a Server with the given sockets. 1577 /// The sockets must have already had `bind` and `listen` called on them. 1578 this(Socket[] sockets...) 1579 { 1580 foreach (socket; sockets) 1581 listeners ~= new Listener(socket); 1582 } 1583 1584 /// Returns all listening addresses. 1585 final @property Address[] localAddresses() 1586 { 1587 Address[] result; 1588 foreach (listener; listeners) 1589 result ~= listener.localAddress; 1590 return result; 1591 } 1592 1593 /// Returns `true` if the server is listening for incoming connections. 1594 final @property bool isListening() 1595 { 1596 return listening; 1597 } 1598 1599 /// Stop listening on this socket. 1600 final void close() 1601 { 1602 foreach (listener;listeners) 1603 listener.closeListener(); 1604 listeners = null; 1605 listening = false; 1606 if (handleClose) 1607 handleClose(); 1608 } 1609 1610 /// Create a SocketServer using the handle passed on standard input, 1611 /// for which `listen` had already been called. Used by 1612 /// e.g. FastCGI and systemd sockets with "Listen = yes". 1613 static SocketServer fromStdin() 1614 { 1615 socket_t socket; 1616 version (Windows) 1617 { 1618 import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE; 1619 socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE); 1620 } 1621 else 1622 socket = cast(socket_t)0; 1623 1624 auto s = new Socket(socket, AddressFamily.UNSPEC); 1625 s.blocking = false; 1626 return new SocketServer(s); 1627 } 1628 1629 /// Callback for when the socket was closed. 1630 void delegate() handleClose; 1631 1632 private void delegate(SocketConnection incoming) acceptHandler; 1633 /// Callback for an incoming connection. 1634 /// Connections will not be accepted unless this handler is set. 1635 @property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; } 1636 /// ditto 1637 @property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); } 1638 } 1639 1640 /// An asynchronous TCP connection server. 1641 class TcpServer : SocketServer 1642 { 1643 protected: 1644 override SocketConnection createConnection(Socket socket) 1645 { 1646 return new TcpConnection(socket); 1647 } 1648 1649 public: 1650 this() 1651 { 1652 } /// 1653 1654 this(Socket[] sockets...) 1655 { 1656 super(sockets); 1657 } /// Construct from the given sockets. 1658 1659 /// 1660 alias listen = SocketServer.listen; // raise overload 1661 1662 /// Start listening on this socket. 1663 final ushort listen(ushort port, string addr = null) 1664 { 1665 debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port); 1666 //assert(!listening, "Attempting to listen on a listening socket"); 1667 1668 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1669 1670 debug (ASOCKETS) 1671 { 1672 stderr.writefln("Resolved to %s addresses:", addressInfos.length); 1673 foreach (ref addressInfo; addressInfos) 1674 stderr.writefln("- %s", addressInfo); 1675 } 1676 1677 // listen on random ports only on IPv4 for now 1678 if (port == 0) 1679 { 1680 foreach_reverse (i, ref addressInfo; addressInfos) 1681 if (addressInfo.family != AddressFamily.INET) 1682 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1683 } 1684 1685 listen(addressInfos); 1686 1687 foreach (listener; listeners) 1688 { 1689 auto address = listener.conn.localAddress(); 1690 if (address.addressFamily == AddressFamily.INET) 1691 port = to!ushort(address.toPortString()); 1692 } 1693 1694 return port; 1695 } 1696 1697 deprecated("Use SocketServer.fromStdin") 1698 static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; } 1699 1700 /// Delegate to be called when a connection is accepted. 1701 @property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); } 1702 } 1703 1704 // *************************************************************************** 1705 1706 /// Base class for connection-less socket protocols, i.e. those for 1707 /// which we must use sendto instead of connect/send. 1708 /// These generally correspond to stateless / datagram-based 1709 /// protocols, like UDP. 1710 /// This module's class hierarchy is mostly oriented towards 1711 /// stateful, stream-based protocols; to represent connectionless 1712 /// protocols, this class encapsulates a socket with a fixed 1713 /// destination (sendto) address, and optionally bound to a local 1714 /// address. 1715 /// Currently received packets' address is not exposed. 1716 class ConnectionlessSocketConnection : Connection 1717 { 1718 protected: 1719 this(Socket conn) 1720 { 1721 super(conn); 1722 } 1723 1724 /// Called when a socket is writable. 1725 override void onWritable() 1726 { 1727 //scope(success) updateFlags(); 1728 onWritableImpl(); 1729 updateFlags(); 1730 } 1731 1732 // Work around scope(success) breaking debugger stack traces 1733 final private void onWritableImpl() 1734 { 1735 foreach (priority, ref queue; outQueue) 1736 while (queue.length) 1737 { 1738 ptrdiff_t sent; 1739 queue.front.enter((scope contents) { 1740 sent = conn.sendTo(contents, remoteAddress); 1741 }); 1742 1743 if (sent == Socket.ERROR) 1744 { 1745 if (wouldHaveBlocked()) 1746 return; 1747 else 1748 return onError("send() error: " ~ lastSocketError); 1749 } 1750 else 1751 if (sent < queue.front.length) 1752 { 1753 return onError("Sent only %d/%d bytes of the datagram!".format(sent, queue.front.length)); 1754 } 1755 else 1756 { 1757 assert(sent == queue.front.length); 1758 //debug writefln("[%s] Sent data:", remoteAddressStr); 1759 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1760 queue.front.clear(); 1761 queue.popFront(); 1762 if (queue.length == 0) 1763 queue = null; 1764 } 1765 } 1766 1767 // outQueue is now empty 1768 if (bufferFlushedHandler) 1769 bufferFlushedHandler(); 1770 if (state == ConnectionState.disconnecting) 1771 { 1772 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1773 close(); 1774 } 1775 } 1776 1777 override sizediff_t doSend(scope const(void)[] buffer) 1778 { 1779 assert(false); // never called (called only from overridden methods) 1780 } 1781 1782 override sizediff_t doReceive(scope void[] buffer) 1783 { 1784 return conn.receive(buffer); 1785 } 1786 1787 public: 1788 /// Default constructor 1789 this() 1790 { 1791 debug (ASOCKETS) stderr.writefln("New ConnectionlessSocketConnection @ %s", cast(void*)this); 1792 } 1793 1794 /// Initialize with the given `AddressFamily`, without binding to an address. 1795 final void initialize(AddressFamily family, SocketType type, ProtocolType protocol) 1796 { 1797 initializeImpl(family, type, protocol); 1798 if (connectHandler) 1799 connectHandler(); 1800 } 1801 1802 private final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1803 { 1804 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1805 assert(!conn); 1806 1807 conn = new Socket(family, type, protocol); 1808 conn.blocking = false; 1809 socketManager.register(this); 1810 state = ConnectionState.connected; 1811 updateFlags(); 1812 } 1813 1814 /// Bind to a local address in order to receive packets sent there. 1815 final ushort bind(AddressInfo addressInfo) 1816 { 1817 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1818 conn.bind(addressInfo.address); 1819 1820 auto address = conn.localAddress(); 1821 auto port = to!ushort(address.toPortString()); 1822 1823 if (connectHandler) 1824 connectHandler(); 1825 1826 return port; 1827 } 1828 1829 // public: 1830 /// Where to send packets to. 1831 Address remoteAddress; 1832 } 1833 1834 /// An asynchronous UDP stream. 1835 /// UDP does not have connections, so this class encapsulates a socket 1836 /// with a fixed destination (sendto) address, and optionally bound to 1837 /// a local address. 1838 /// Currently received packets' address is not exposed. 1839 class UdpConnection : ConnectionlessSocketConnection 1840 { 1841 protected: 1842 this(Socket conn) 1843 { 1844 super(conn); 1845 } 1846 1847 public: 1848 /// Default constructor 1849 this() 1850 { 1851 debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this); 1852 } 1853 1854 /// Initialize with the given `AddressFamily`, without binding to an address. 1855 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM) 1856 { 1857 super.initialize(family, type, ProtocolType.UDP); 1858 } 1859 1860 /// Bind to a local address in order to receive packets sent there. 1861 final ushort bind(string host, ushort port) 1862 { 1863 assert(host.length, "Empty host"); 1864 1865 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1866 1867 state = ConnectionState.resolving; 1868 1869 AddressInfo addressInfo; 1870 try 1871 { 1872 auto addresses = getAddress(host, port); 1873 enforce(addresses.length, "No addresses found"); 1874 debug (ASOCKETS) 1875 { 1876 stderr.writefln("Resolved to %s addresses:", addresses.length); 1877 foreach (address; addresses) 1878 stderr.writefln("- %s", address.toString()); 1879 } 1880 1881 Address address; 1882 if (addresses.length > 1) 1883 { 1884 import std.random : uniform; 1885 address = addresses[uniform(0, $)]; 1886 } 1887 else 1888 address = addresses[0]; 1889 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1890 } 1891 catch (SocketException e) 1892 { 1893 onError("Lookup error: " ~ e.msg); 1894 return 0; 1895 } 1896 1897 state = ConnectionState.disconnected; 1898 return super.bind(addressInfo); 1899 } 1900 } 1901 1902 /// 1903 unittest 1904 { 1905 auto server = new UdpConnection(); 1906 server.bind("127.0.0.1", 0); 1907 1908 auto client = new UdpConnection(); 1909 client.initialize(server.localAddress.addressFamily); 1910 1911 string[] packets = ["", "Hello", "there"]; 1912 client.remoteAddress = server.localAddress; 1913 client.send({ 1914 DataVec data; 1915 foreach (packet; packets) 1916 data ~= Data(packet.asBytes); 1917 return data; 1918 }()[]); 1919 1920 server.handleReadData = (Data data) 1921 { 1922 assert(data.unsafeContents == packets[0]); 1923 packets = packets[1..$]; 1924 if (!packets.length) 1925 { 1926 server.close(); 1927 client.close(); 1928 } 1929 }; 1930 socketManager.loop(); 1931 assert(!packets.length); 1932 } 1933 1934 // *************************************************************************** 1935 1936 /// Base class for a connection adapter. 1937 /// By itself, does nothing. 1938 class ConnectionAdapter : IConnection 1939 { 1940 /// The next connection in the chain (towards the raw transport). 1941 IConnection next; 1942 1943 this(IConnection next) 1944 { 1945 this.next = next; 1946 next.handleConnect = &onConnect; 1947 next.handleDisconnect = &onDisconnect; 1948 next.handleBufferFlushed = &onBufferFlushed; 1949 } /// 1950 1951 @property ConnectionState state() { return next.state; } /// 1952 1953 /// Queue Data for sending. 1954 void send(scope Data[] data, int priority) 1955 { 1956 next.send(data, priority); 1957 } 1958 1959 alias send = IConnection.send; /// ditto 1960 1961 /// Terminate the connection. 1962 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1963 { 1964 next.disconnect(reason, type); 1965 } 1966 1967 protected void onConnect() 1968 { 1969 if (connectHandler) 1970 connectHandler(); 1971 } 1972 1973 protected void onReadData(Data data) 1974 { 1975 // onReadData should be fired only if readDataHandler is set 1976 assert(readDataHandler, "onReadData caled with null readDataHandler"); 1977 readDataHandler(data); 1978 } 1979 1980 protected void onDisconnect(string reason, DisconnectType type) 1981 { 1982 if (disconnectHandler) 1983 disconnectHandler(reason, type); 1984 } 1985 1986 protected void onBufferFlushed() 1987 { 1988 if (bufferFlushedHandler) 1989 bufferFlushedHandler(); 1990 } 1991 1992 /// Callback for when a connection has been established. 1993 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1994 protected ConnectHandler connectHandler; 1995 1996 /// Callback setter for when new data is read. 1997 @property void handleReadData(ReadDataHandler value) 1998 { 1999 readDataHandler = value; 2000 next.handleReadData = value ? &onReadData : null ; 2001 } 2002 protected ReadDataHandler readDataHandler; 2003 2004 /// Callback setter for when a connection was closed. 2005 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 2006 protected DisconnectHandler disconnectHandler; 2007 2008 /// Callback setter for when all queued data has been written. 2009 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 2010 protected BufferFlushedHandler bufferFlushedHandler; 2011 } 2012 2013 // *************************************************************************** 2014 2015 /// Adapter for connections with a line-based protocol. 2016 /// Splits data stream into delimiter-separated lines. 2017 class LineBufferedAdapter : ConnectionAdapter 2018 { 2019 /// The protocol's line delimiter. 2020 string delimiter = "\r\n"; 2021 2022 /// Maximum line length (0 means unlimited). 2023 size_t maxLength = 0; 2024 2025 this(IConnection next) 2026 { 2027 super(next); 2028 } /// 2029 2030 /// Append a line to the send buffer. 2031 void send(string line) 2032 { 2033 //super.send(Data(line ~ delimiter)); 2034 // https://issues.dlang.org/show_bug.cgi?id=13985 2035 ConnectionAdapter ca = this; 2036 ca.send(Data(line.asBytes ~ delimiter.asBytes)); 2037 } 2038 2039 protected: 2040 /// The receive buffer. 2041 Data inBuffer; 2042 2043 /// Called when data has been received. 2044 final override void onReadData(Data data) 2045 { 2046 import std.string : indexOf; 2047 auto startIndex = inBuffer.length; 2048 if (inBuffer.length) 2049 inBuffer ~= data; 2050 else 2051 inBuffer = data; 2052 2053 assert(delimiter.length >= 1); 2054 if (startIndex >= delimiter.length) 2055 startIndex -= delimiter.length - 1; 2056 else 2057 startIndex = 0; 2058 2059 auto index = inBuffer[startIndex .. $].indexOf(delimiter.asBytes); 2060 while (index >= 0) 2061 { 2062 if (!processLine(startIndex + index)) 2063 break; 2064 2065 startIndex = 0; 2066 index = inBuffer.indexOf(delimiter.asBytes); 2067 } 2068 2069 if (maxLength && inBuffer.length > maxLength) 2070 disconnect("Line too long", DisconnectType.error); 2071 } 2072 2073 // `index` is the index of the delimiter's first character. 2074 final bool processLine(size_t index) 2075 { 2076 if (maxLength && index > maxLength) 2077 { 2078 disconnect("Line too long", DisconnectType.error); 2079 return false; 2080 } 2081 auto line = inBuffer[0..index]; 2082 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 2083 super.onReadData(line); 2084 return true; 2085 } 2086 2087 override void onDisconnect(string reason, DisconnectType type) 2088 { 2089 super.onDisconnect(reason, type); 2090 inBuffer.clear(); 2091 } 2092 } 2093 2094 // *************************************************************************** 2095 2096 /// Fires an event handler or disconnects connections 2097 /// after a period of inactivity. 2098 class TimeoutAdapter : ConnectionAdapter 2099 { 2100 this(IConnection next) 2101 { 2102 debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this); 2103 super(next); 2104 } /// 2105 2106 /// Set the `Duration` indicating the period of inactivity after which to take action. 2107 final void setIdleTimeout(Duration timeout) 2108 { 2109 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 2110 assert(timeout > Duration.zero); 2111 this.timeout = timeout; 2112 2113 // Configure idleTask 2114 if (idleTask is null) 2115 { 2116 idleTask = new TimerTask(); 2117 idleTask.handleTask = &onTask_Idle; 2118 } 2119 else if (idleTask.isWaiting()) 2120 idleTask.cancel(); 2121 2122 mainTimer.add(idleTask, now + timeout); 2123 } 2124 2125 /// Manually mark this connection as non-idle, restarting the idle timer. 2126 /// `handleNonIdle` will be called, if set. 2127 void markNonIdle() 2128 { 2129 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 2130 if (handleNonIdle) 2131 handleNonIdle(); 2132 if (idleTask && idleTask.isWaiting()) 2133 idleTask.restart(now + timeout); 2134 } 2135 2136 /// Stop the idle timer. 2137 void cancelIdleTimeout() 2138 { 2139 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 2140 assert(idleTask !is null); 2141 assert(idleTask.isWaiting()); 2142 idleTask.cancel(); 2143 } 2144 2145 /// Restart the idle timer. 2146 void resumeIdleTimeout() 2147 { 2148 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 2149 assert(idleTask !is null); 2150 assert(!idleTask.isWaiting()); 2151 mainTimer.add(idleTask, now + timeout); 2152 } 2153 2154 /// Returns the point in time when the idle handler is scheduled 2155 /// to be called, or null if it is not scheduled. 2156 /*Nullable!MonoTime*/auto when()() 2157 { 2158 import std.typecons : Nullable; 2159 return idleTask.isWaiting() 2160 ? Nullable!MonoTime(idleTask.when) 2161 : Nullable!MonoTime.init; 2162 } 2163 2164 /// Callback for when a connection has stopped responding. 2165 /// If unset, the connection will be disconnected. 2166 void delegate() handleIdleTimeout; 2167 2168 /// Callback for when a connection is marked as non-idle 2169 /// (when data is received). 2170 void delegate() handleNonIdle; 2171 2172 protected: 2173 override void onConnect() 2174 { 2175 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 2176 markNonIdle(); 2177 super.onConnect(); 2178 } 2179 2180 override void onReadData(Data data) 2181 { 2182 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 2183 markNonIdle(); 2184 super.onReadData(data); 2185 } 2186 2187 override void onDisconnect(string reason, DisconnectType type) 2188 { 2189 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 2190 if (idleTask && idleTask.isWaiting()) 2191 idleTask.cancel(); 2192 super.onDisconnect(reason, type); 2193 } 2194 2195 private: 2196 TimerTask idleTask; // non-null if an idle timeout has been set 2197 Duration timeout; 2198 2199 final void onTask_Idle(Timer /*timer*/, TimerTask /*task*/) 2200 { 2201 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this); 2202 if (state == ConnectionState.disconnecting) 2203 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 2204 2205 if (state == ConnectionState.disconnected) 2206 return; 2207 2208 if (handleIdleTimeout) 2209 { 2210 resumeIdleTimeout(); // reschedule (by default) 2211 handleIdleTimeout(); 2212 } 2213 else 2214 disconnect("Time-out", DisconnectType.error); 2215 } 2216 } 2217 2218 // *************************************************************************** 2219 2220 unittest 2221 { 2222 void testTimer() 2223 { 2224 bool fired; 2225 setTimeout({fired = true;}, 10.msecs); 2226 socketManager.loop(); 2227 assert(fired); 2228 } 2229 2230 testTimer(); 2231 }