1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <ae@cy.md> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.dataset : DataVec; 20 import ae.sys.timing; 21 import ae.utils.array : asSlice, asBytes, queuePush, queuePop; 22 import ae.utils.math; 23 public import ae.sys.data; 24 25 import core.stdc.stdint : int32_t; 26 27 import std.exception; 28 import std.parallelism : totalCPUs; 29 import std.socket; 30 import std.string : format; 31 public import std.socket : Address, AddressInfo, Socket; 32 33 version (Windows) 34 private import c_socks = core.sys.windows.winsock2; 35 else version (Posix) 36 private import c_socks = core.sys.posix.sys.socket; 37 38 debug(ASOCKETS) import std.stdio : stderr; 39 debug(PRINTDATA) import std.stdio : stderr; 40 debug(PRINTDATA) import ae.utils.text : hexDump; 41 private import std.conv : to; 42 43 44 // https://issues.dlang.org/show_bug.cgi?id=7016 45 static import ae.utils.array; 46 47 version(LIBEV) 48 { 49 import deimos.ev; 50 pragma(lib, "ev"); 51 } 52 53 version(Windows) 54 { 55 import core.sys.windows.windows : Sleep; 56 private enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 57 } 58 else 59 private enum USE_SLEEP = false; 60 61 int eventCounter; 62 63 version (LIBEV) 64 { 65 // Watchers are a GenericSocket field (as declared in SocketMixin). 66 // Use one watcher per read and write event. 67 // Start/stop those as higher-level code declares interest in those events. 68 // Use the "data" ev_io field to store the parent GenericSocket address. 69 // Also use the "data" field as a flag to indicate whether the watcher is active 70 // (data is null when the watcher is stopped). 71 72 /// `libev`-based event loop implementation. 73 struct SocketManager 74 { 75 private: 76 size_t count; 77 78 void delegate()[] nextTickHandlers; 79 80 extern(C) 81 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 82 { 83 auto socket = cast(GenericSocket)w.data; 84 assert(socket, "libev event fired on stopped watcher"); 85 debug (ASOCKETS) stderr.writefln("ioCallback(%s, 0x%X)", socket, revents); 86 87 // TODO? Need to get proper SocketManager instance to call updateTimer on 88 socketManager.preEvent(); 89 90 if (revents & EV_READ) 91 socket.onReadable(); 92 else 93 if (revents & EV_WRITE) 94 socket.onWritable(); 95 else 96 assert(false, "Unknown event fired from libev"); 97 98 socketManager.postEvent(false); 99 } 100 101 ev_timer evTimer; 102 MonoTime lastNextEvent = MonoTime.max; 103 104 extern(C) 105 static void timerCallback(ev_loop_t* l, ev_timer* w, int /*revents*/) 106 { 107 debug (ASOCKETS) stderr.writefln("Timer callback called."); 108 109 socketManager.preEvent(); // This also updates socketManager.now 110 mainTimer.prod(socketManager.now); 111 112 socketManager.postEvent(true); 113 debug (ASOCKETS) stderr.writefln("Timer callback exiting."); 114 } 115 116 /// Called upon waking up, before calling any users' event handlers. 117 void preEvent() 118 { 119 eventCounter++; 120 socketManager.now = MonoTime.currTime(); 121 } 122 123 /// Called before going back to sleep, after calling any users' event handlers. 124 void postEvent(bool wokeDueToTimeout) 125 { 126 while (nextTickHandlers.length) 127 { 128 auto thisTickHandlers = nextTickHandlers; 129 nextTickHandlers = null; 130 131 foreach (handler; thisTickHandlers) 132 handler(); 133 } 134 135 socketManager.updateTimer(wokeDueToTimeout); 136 } 137 138 void updateTimer(bool force) 139 { 140 auto nextEvent = mainTimer.getNextEvent(); 141 if (force || lastNextEvent != nextEvent) 142 { 143 debug (ASOCKETS) stderr.writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 144 if (nextEvent == MonoTime.max) // Stopping 145 { 146 if (lastNextEvent != MonoTime.max) 147 ev_timer_stop(ev_default_loop(0), &evTimer); 148 } 149 else 150 { 151 auto remaining = mainTimer.getRemainingTime(socketManager.now); 152 while (remaining <= Duration.zero) 153 { 154 debug (ASOCKETS) stderr.writefln("remaining=%s, prodding timer.", remaining); 155 mainTimer.prod(socketManager.now); 156 remaining = mainTimer.getRemainingTime(socketManager.now); 157 } 158 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 159 debug (ASOCKETS) stderr.writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 160 if (lastNextEvent == MonoTime.max) // Starting 161 { 162 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 163 ev_timer_start(ev_default_loop(0), &evTimer); 164 } 165 else // Adjusting 166 { 167 evTimer.repeat = tstamp; 168 ev_timer_again(ev_default_loop(0), &evTimer); 169 } 170 } 171 lastNextEvent = nextEvent; 172 } 173 } 174 175 public: 176 MonoTime now; 177 178 /// Register a socket with the manager. 179 void register(GenericSocket socket) 180 { 181 debug (ASOCKETS) stderr.writefln("Registering %s", socket); 182 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 183 auto fd = socket.conn.handle; 184 assert(fd, "Must have fd before socket registration"); 185 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 186 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 187 count++; 188 } 189 190 /// Unregister a socket with the manager. 191 void unregister(GenericSocket socket) 192 { 193 debug (ASOCKETS) stderr.writefln("Unregistering %s", socket); 194 socket.notifyRead = false; 195 socket.notifyWrite = false; 196 count--; 197 } 198 199 /// Returns the number of registered sockets. 200 size_t size() 201 { 202 return count; 203 } 204 205 /// Loop continuously until no sockets are left. 206 void loop() 207 { 208 auto evLoop = ev_default_loop(0); 209 enforce(evLoop, "libev initialization failure"); 210 211 updateTimer(true); 212 debug (ASOCKETS) stderr.writeln("ev_run"); 213 ev_run(ev_default_loop(0), 0); 214 } 215 } 216 217 private mixin template SocketMixin() 218 { 219 private ev_io evRead, evWrite; 220 221 private final void setWatcherState(ref ev_io ev, bool newValue, int /*event*/) 222 { 223 if (!conn) 224 { 225 // Can happen when setting delegates before connecting. 226 return; 227 } 228 229 if (newValue && !ev.data) 230 { 231 // Start 232 ev.data = cast(void*)this; 233 ev_io_start(ev_default_loop(0), &ev); 234 } 235 else 236 if (!newValue && ev.data) 237 { 238 // Stop 239 assert(ev.data is cast(void*)this); 240 ev.data = null; 241 ev_io_stop(ev_default_loop(0), &ev); 242 } 243 } 244 245 private final bool getWatcherState(ref ev_io ev) { return !!ev.data; } 246 247 // Flags that determine socket wake-up events. 248 249 /// Interested in read notifications (onReadable)? 250 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 251 @property final bool notifyRead () { return getWatcherState(evRead); } /// ditto 252 /// Interested in write notifications (onWritable)? 253 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 254 @property final bool notifyWrite() { return getWatcherState(evWrite); } /// ditto 255 256 debug ~this() @nogc 257 { 258 // The LIBEV SocketManager holds no references to registered sockets. 259 // TODO: Add a doubly-linked list? 260 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 261 } 262 } 263 } 264 else // Use select 265 { 266 /// `select`-based event loop implementation. 267 struct SocketManager 268 { 269 private: 270 enum FD_SETSIZE = 1024; 271 272 /// List of all sockets to poll. 273 GenericSocket[] sockets; 274 275 /// Debug AA to check for dangling socket references. 276 debug GenericSocket[socket_t] socketHandles; 277 278 void delegate()[] nextTickHandlers, idleHandlers; 279 280 public: 281 MonoTime now; 282 283 /// Register a socket with the manager. 284 void register(GenericSocket conn) 285 { 286 debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1); 287 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 288 sockets ~= conn; 289 290 debug 291 { 292 auto handle = conn.socket.handle; 293 assert(handle != socket_t.init, "Can't register a closed socket"); 294 assert(handle !in socketHandles, "This socket handle is already registered"); 295 socketHandles[handle] = conn; 296 } 297 } 298 299 /// Unregister a socket with the manager. 300 void unregister(GenericSocket conn) 301 { 302 debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 303 304 debug 305 { 306 auto handle = conn.socket.handle; 307 assert(handle != socket_t.init, "Can't unregister a closed socket"); 308 auto pconn = handle in socketHandles; 309 assert(pconn, "This socket handle is not registered"); 310 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 311 socketHandles.remove(handle); 312 } 313 314 foreach (size_t i, GenericSocket j; sockets) 315 if (j is conn) 316 { 317 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 318 return; 319 } 320 assert(false, "Socket not registered"); 321 } 322 323 /// Returns the number of registered sockets. 324 size_t size() 325 { 326 return sockets.length; 327 } 328 329 /// Loop continuously until no sockets are left. 330 void loop() 331 { 332 debug (ASOCKETS) stderr.writeln("Starting event loop."); 333 334 SocketSet readset, writeset; 335 size_t sockcount; 336 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 337 readset = new SocketSet(setSize); 338 writeset = new SocketSet(setSize); 339 while (true) 340 { 341 if (nextTickHandlers.length) 342 { 343 auto thisTickHandlers = nextTickHandlers; 344 nextTickHandlers = null; 345 346 foreach (handler; thisTickHandlers) 347 handler(); 348 349 continue; 350 } 351 352 uint minSize = 0; 353 version(Windows) 354 minSize = cast(uint)sockets.length; 355 else 356 { 357 foreach (s; sockets) 358 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 359 minSize = s.socket.handle; 360 } 361 minSize++; 362 363 if (setSize < minSize) 364 { 365 debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 366 setSize = minSize * 2; 367 readset = new SocketSet(setSize); 368 writeset = new SocketSet(setSize); 369 } 370 else 371 { 372 readset.reset(); 373 writeset.reset(); 374 } 375 376 sockcount = 0; 377 bool haveActive; 378 debug (ASOCKETS) stderr.writeln("Populating sets"); 379 foreach (GenericSocket conn; sockets) 380 { 381 if (!conn.socket) 382 continue; 383 sockcount++; 384 385 debug (ASOCKETS) stderr.writef("\t%s:", conn); 386 if (conn.notifyRead) 387 { 388 readset.add(conn.socket); 389 if (!conn.daemonRead) 390 haveActive = true; 391 debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : ""); 392 } 393 if (conn.notifyWrite) 394 { 395 writeset.add(conn.socket); 396 if (!conn.daemonWrite) 397 haveActive = true; 398 debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : ""); 399 } 400 debug (ASOCKETS) stderr.writeln(); 401 } 402 debug (ASOCKETS) 403 { 404 stderr.writefln("Sets populated as follows:"); 405 printSets(readset, writeset); 406 } 407 408 debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...", 409 haveActive ? "" : "not ", 410 sockcount, 411 mainTimer.isWaiting() ? "with" : "no", 412 idleHandlers.length, 413 ); 414 if (!haveActive && !mainTimer.isWaiting() && !nextTickHandlers.length) 415 { 416 debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop."); 417 break; 418 } 419 420 debug (ASOCKETS) stderr.flush(); 421 422 int events; 423 if (idleHandlers.length) 424 { 425 if (sockcount==0) 426 events = 0; 427 else 428 events = Socket.select(readset, writeset, null, 0.seconds); 429 } 430 else 431 if (USE_SLEEP && sockcount==0) 432 { 433 version (Windows) 434 { 435 now = MonoTime.currTime(); 436 auto duration = mainTimer.getRemainingTime(now).total!"msecs"(); 437 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 438 if (duration <= 0) 439 duration = 1; // Avoid busywait 440 else 441 if (duration > int.max) 442 duration = int.max; 443 Sleep(cast(int)duration); 444 events = 0; 445 } 446 else 447 assert(0); 448 } 449 else 450 if (mainTimer.isWaiting()) 451 { 452 // Refresh time before sleeping, to ensure that a 453 // slow event handler does not skew everything else 454 now = MonoTime.currTime(); 455 456 events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime(now)); 457 } 458 else 459 events = Socket.select(readset, writeset, null); 460 461 debug (ASOCKETS) stderr.writefln("%d events fired.", events); 462 463 // Update time after sleeping 464 now = MonoTime.currTime(); 465 466 if (events > 0) 467 { 468 // Handle just one event at a time, as the first 469 // handler might invalidate select()'s results. 470 handleEvent(readset, writeset); 471 } 472 else 473 if (idleHandlers.length) 474 { 475 import ae.utils.array : shift; 476 auto handler = idleHandlers.shift(); 477 478 // Rotate the idle handler queue before running it, 479 // in case the handler unregisters itself. 480 idleHandlers ~= handler; 481 482 handler(); 483 } 484 485 // Timers may invalidate our select results, so fire them after processing the latter 486 mainTimer.prod(now); 487 488 eventCounter++; 489 } 490 } 491 492 debug (ASOCKETS) 493 private void printSets(SocketSet readset, SocketSet writeset) 494 { 495 foreach (GenericSocket conn; sockets) 496 { 497 if (!conn.socket) 498 stderr.writefln("\t\t%s is unset", conn); 499 else 500 { 501 if (readset.isSet(conn.socket)) 502 stderr.writefln("\t\t%s is readable", conn); 503 if (writeset.isSet(conn.socket)) 504 stderr.writefln("\t\t%s is writable", conn); 505 } 506 } 507 } 508 509 private void handleEvent(SocketSet readset, SocketSet writeset) 510 { 511 debug (ASOCKETS) 512 { 513 stderr.writefln("\tSelect results:"); 514 printSets(readset, writeset); 515 } 516 517 foreach (GenericSocket conn; sockets) 518 { 519 if (!conn.socket) 520 continue; 521 522 if (writeset.isSet(conn.socket)) 523 { 524 debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn); 525 return conn.onWritable(); 526 } 527 if (readset.isSet(conn.socket)) 528 { 529 debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn); 530 return conn.onReadable(); 531 } 532 } 533 534 assert(false, "select() reported events available, but no registered sockets are set"); 535 } 536 } 537 538 // Use UFCS to allow removeIdleHandler to have a predicate with context 539 /// Register a function to be called when the event loop is idle, 540 /// and would otherwise sleep. 541 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 542 { 543 foreach (i, idleHandler; socketManager.idleHandlers) 544 assert(handler !is idleHandler); 545 546 socketManager.idleHandlers ~= handler; 547 } 548 549 /// Unregister a function previously registered with `addIdleHandler`. 550 void removeIdleHandler(alias pred=(a, b) => a is b, Args...)(ref SocketManager socketManager, Args args) 551 { 552 foreach (i, idleHandler; socketManager.idleHandlers) 553 if (pred(idleHandler, args)) 554 { 555 import std.algorithm : remove; 556 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 557 return; 558 } 559 assert(false, "No such idle handler"); 560 } 561 562 private mixin template SocketMixin() 563 { 564 // Flags that determine socket wake-up events. 565 566 /// Interested in read notifications (onReadable)? 567 bool notifyRead; 568 /// Interested in write notifications (onWritable)? 569 bool notifyWrite; 570 } 571 } 572 573 /// The default socket manager. 574 SocketManager socketManager; 575 576 /// Schedule a function to run on the next event loop iteration. 577 /// Can be used to queue logic to run once all current execution frames exit. 578 /// Similar to e.g. process.nextTick in Node. 579 void onNextTick(ref SocketManager socketManager, void delegate() dg) pure @safe nothrow 580 { 581 socketManager.nextTickHandlers ~= dg; 582 } 583 584 /// The current monotonic time. 585 /// Updated by the event loop whenever it is awoken. 586 @property MonoTime now() 587 { 588 if (socketManager.now == MonoTime.init) 589 { 590 // Event loop not yet started. 591 socketManager.now = MonoTime.currTime(); 592 } 593 return socketManager.now; 594 } 595 596 // *************************************************************************** 597 598 /// General methods for an asynchronous socket. 599 abstract class GenericSocket 600 { 601 /// Declares notifyRead and notifyWrite. 602 mixin SocketMixin; 603 604 protected: 605 /// The socket this class wraps. 606 Socket conn; 607 608 // protected: 609 void onReadable() 610 { 611 } 612 613 void onWritable() 614 { 615 } 616 617 void onError(string /*reason*/) 618 { 619 } 620 621 public: 622 /// Retrieve the socket class this class wraps. 623 @property final Socket socket() 624 { 625 return conn; 626 } 627 628 /// allow getting the address of connections that are already disconnected 629 private Address[2] cachedAddress; 630 631 /*private*/ final @property Address _address(bool local)() 632 { 633 if (cachedAddress[local] !is null) 634 return cachedAddress[local]; 635 else 636 if (conn is null) 637 return null; 638 else 639 { 640 Address a; 641 if (conn.addressFamily == AddressFamily.UNSPEC) 642 { 643 // Socket will attempt to construct an UnknownAddress, 644 // which will almost certainly not match the real address length. 645 static if (local) 646 alias getname = c_socks.getsockname; 647 else 648 alias getname = c_socks.getpeername; 649 650 c_socks.socklen_t nameLen = 0; 651 if (getname(conn.handle, null, &nameLen) < 0) 652 throw new SocketOSException("Unable to obtain socket address"); 653 654 auto buf = new ubyte[nameLen]; 655 auto sa = cast(c_socks.sockaddr*)buf.ptr; 656 if (getname(conn.handle, sa, &nameLen) < 0) 657 throw new SocketOSException("Unable to obtain socket address"); 658 a = new UnknownAddressReference(sa, nameLen); 659 } 660 else 661 a = local ? conn.localAddress() : conn.remoteAddress(); 662 return cachedAddress[local] = a; 663 } 664 } 665 666 alias localAddress = _address!true; /// Retrieve this socket's local address. 667 alias remoteAddress = _address!false; /// Retrieve this socket's remote address. 668 669 /*private*/ final @property string _addressStr(bool local)() nothrow 670 { 671 try 672 { 673 auto a = _address!local; 674 if (a is null) 675 return "[null address]"; 676 string host = a.toAddrString(); 677 import std.string : indexOf; 678 if (host.indexOf(':') >= 0) 679 host = "[" ~ host ~ "]"; 680 try 681 { 682 string port = a.toPortString(); 683 return host ~ ":" ~ port; 684 } 685 catch (Exception e) 686 return host; 687 } 688 catch (Exception e) 689 return "[error: " ~ e.msg ~ "]"; 690 } 691 692 alias localAddressStr = _addressStr!true; /// Retrieve this socket's local address, as a string. 693 alias remoteAddressStr = _addressStr!false; /// Retrieve this socket's remote address, as a string. 694 695 /// Don't block the process from exiting, even if the socket is ready to receive data. 696 /// TODO: Not implemented with libev 697 bool daemonRead; 698 699 /// Don't block the process from exiting, even if the socket is ready to send data. 700 /// TODO: Not implemented with libev 701 bool daemonWrite; 702 703 deprecated alias daemon = daemonRead; 704 705 /// Enable TCP keep-alive on the socket with the given settings. 706 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 707 { 708 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 709 if (enabled) 710 { 711 try 712 conn.setKeepAlive(time, interval); 713 catch (SocketException) 714 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 715 } 716 else 717 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 718 } 719 720 /// Returns a string containing the class name, address, and file descriptor. 721 override string toString() const 722 { 723 import std.string : format, split; 724 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 725 } 726 } 727 728 // *************************************************************************** 729 730 /// Classifies the cause of the disconnect. 731 /// Can be used to decide e.g. when it makes sense to reconnect. 732 enum DisconnectType 733 { 734 requested, /// Initiated by the application. 735 graceful, /// The peer gracefully closed the connection. 736 error /// Some abnormal network condition. 737 } 738 739 /// Used to indicate the state of a connection throughout its lifecycle. 740 enum ConnectionState 741 { 742 /// The initial state, or the state after a disconnect was fully processed. 743 disconnected, 744 745 /// Name resolution. Currently done synchronously. 746 resolving, 747 748 /// A connection attempt is in progress. 749 connecting, 750 751 /// A connection is established. 752 connected, 753 754 /// Disconnecting in progress. No data can be sent or received at this point. 755 /// We are waiting for queued data to be actually sent before disconnecting. 756 disconnecting, 757 } 758 759 /// Returns true if this is a connection state for which disconnecting is valid. 760 /// Generally, applications should be aware of the life cycle of their sockets, 761 /// so checking the state of a connection is unnecessary (and a code smell). 762 /// However, unconditionally disconnecting some connected sockets can be useful 763 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle), 764 /// such as in response to a signal. 765 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; } 766 767 /// Common interface for connections and adapters. 768 interface IConnection 769 { 770 /// `send` queues data for sending in one of five queues, indexed 771 /// by a numeric priority. 772 /// `MAX_PRIORITY` is the highest (least urgent) priority index. 773 /// `DEFAULT_PRIORITY` is the default priority 774 enum MAX_PRIORITY = 4; 775 enum DEFAULT_PRIORITY = 2; /// ditto 776 777 /// This is the default value for the `disconnect` `reason` string parameter. 778 static const defaultDisconnectReason = "Software closed the connection"; 779 780 /// Get connection state. 781 @property ConnectionState state(); 782 783 /// Has a connection been established? 784 deprecated final @property bool connected() { return state == ConnectionState.connected; } 785 786 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 787 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 788 789 /// Queue Data for sending. 790 void send(scope Data[] data, int priority = DEFAULT_PRIORITY); 791 792 /// ditto 793 final void send(Data datum, int priority = DEFAULT_PRIORITY) 794 { 795 this.send(datum.asSlice, priority); 796 } 797 798 /// Terminate the connection. 799 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 800 801 /// Callback setter for when a connection has been established 802 /// (if applicable). 803 alias ConnectHandler = void delegate(); 804 @property void handleConnect(ConnectHandler value); /// ditto 805 806 /// Callback setter for when new data is read. 807 alias ReadDataHandler = void delegate(Data data); 808 @property void handleReadData(ReadDataHandler value); /// ditto 809 810 /// Callback setter for when a connection was closed. 811 alias DisconnectHandler = void delegate(string reason, DisconnectType type); 812 @property void handleDisconnect(DisconnectHandler value); /// ditto 813 814 /// Callback setter for when all queued data has been sent. 815 alias BufferFlushedHandler = void delegate(); 816 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 817 } 818 819 // *************************************************************************** 820 821 /// Implementation of `IConnection` using a socket. 822 /// Implements receiving data when readable and sending queued data 823 /// when writable. 824 class Connection : GenericSocket, IConnection 825 { 826 private: 827 ConnectionState _state; 828 final @property ConnectionState state(ConnectionState value) { return _state = value; } 829 830 public: 831 /// Get connection state. 832 override @property ConnectionState state() { return _state; } 833 834 protected: 835 abstract sizediff_t doSend(scope const(void)[] buffer); 836 enum sizediff_t doReceiveEOF = -1; 837 abstract sizediff_t doReceive(scope void[] buffer); 838 839 /// The send buffers. 840 DataVec[MAX_PRIORITY+1] outQueue; 841 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 842 int partiallySent = -1; 843 844 /// Constructor used by a ServerSocket for new connections 845 this(Socket conn) 846 { 847 this(); 848 this.conn = conn; 849 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 850 if (conn) 851 socketManager.register(this); 852 updateFlags(); 853 } 854 855 final void updateFlags() 856 { 857 if (state == ConnectionState.connecting) 858 notifyWrite = true; 859 else 860 notifyWrite = writePending; 861 862 notifyRead = state == ConnectionState.connected && readDataHandler; 863 debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite); 864 } 865 866 // We reuse the same buffer across read calls. 867 // It is allocated on the first read, and also 868 // if the user code decides to keep a reference to it. 869 static Data inBuffer; 870 871 /// Called when a socket is readable. 872 override void onReadable() 873 { 874 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 875 if (!inBuffer) 876 inBuffer = Data(0x10000); 877 else 878 inBuffer = inBuffer.ensureUnique(); 879 sizediff_t received; 880 inBuffer.enter((scope contents) { 881 received = doReceive(contents); 882 }); 883 884 if (received == doReceiveEOF) 885 return disconnect("Connection closed", DisconnectType.graceful); 886 887 if (received == Socket.ERROR) 888 { 889 // if (wouldHaveBlocked) 890 // { 891 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 892 // return; 893 // } 894 // else 895 onError("recv() error: " ~ lastSocketError); 896 } 897 else 898 { 899 debug (PRINTDATA) 900 { 901 stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr); 902 stderr.write(hexDump(inBuffer.unsafeContents[0 .. received])); 903 stderr.flush(); 904 } 905 906 if (state == ConnectionState.disconnecting) 907 { 908 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this); 909 } 910 else 911 if (!readDataHandler) 912 { 913 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this); 914 } 915 else 916 { 917 auto data = inBuffer[0 .. received]; 918 readDataHandler(data); 919 } 920 } 921 } 922 923 /// Called when an error occurs on the socket. 924 override void onError(string reason) 925 { 926 if (state == ConnectionState.disconnecting) 927 { 928 debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 929 return close(); 930 } 931 932 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 933 disconnect("Socket error: " ~ reason, DisconnectType.error); 934 } 935 936 this() 937 { 938 } 939 940 public: 941 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 942 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 943 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 944 { 945 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 946 assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state)); 947 948 if (writePending) 949 { 950 if (type==DisconnectType.requested) 951 { 952 assert(conn, "Attempting to disconnect on an uninitialized socket"); 953 // queue disconnect after all data is sent 954 debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 955 state = ConnectionState.disconnecting; 956 //setIdleTimeout(30.seconds); 957 if (disconnectHandler) 958 disconnectHandler(reason, type); 959 updateFlags(); 960 return; 961 } 962 else 963 discardQueues(); 964 } 965 966 debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 967 968 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 969 close(); 970 else 971 { 972 assert(conn is null, "Registered but %s socket".format(state)); 973 if (state == ConnectionState.resolving) 974 state = ConnectionState.disconnected; 975 } 976 977 if (disconnectHandler) 978 disconnectHandler(reason, type); 979 updateFlags(); 980 } 981 982 private final void close() 983 { 984 assert(conn, "Attempting to close an unregistered socket"); 985 socketManager.unregister(this); 986 conn.close(); 987 conn = null; 988 outQueue[] = DataVec.init; 989 state = ConnectionState.disconnected; 990 } 991 992 /// Append data to the send buffer. 993 void send(scope Data[] data, int priority = DEFAULT_PRIORITY) 994 { 995 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 996 outQueue[priority] ~= data; 997 notifyWrite = true; // Fast updateFlags() 998 999 debug (PRINTDATA) 1000 { 1001 stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr); 1002 foreach (datum; data) 1003 if (datum.length) 1004 stderr.write(hexDump(datum.unsafeContents)); 1005 else 1006 stderr.writeln("(empty Data)"); 1007 stderr.flush(); 1008 } 1009 } 1010 1011 /// ditto 1012 alias send = IConnection.send; 1013 1014 /// Cancel all queued `Data` packets with the given priority. 1015 /// Does not cancel any partially-sent `Data`. 1016 final void clearQueue(int priority) 1017 { 1018 if (priority == partiallySent) 1019 { 1020 assert(outQueue[priority].length > 0); 1021 outQueue[priority].length = 1; 1022 } 1023 else 1024 outQueue[priority] = null; 1025 updateFlags(); 1026 } 1027 1028 /// Clears all queues, even partially sent content. 1029 private final void discardQueues() 1030 { 1031 foreach (priority; 0..MAX_PRIORITY+1) 1032 outQueue[priority] = null; 1033 partiallySent = -1; 1034 updateFlags(); 1035 } 1036 1037 /// Returns true if any queues have pending data. 1038 @property 1039 final bool writePending() 1040 { 1041 foreach (ref queue; outQueue) 1042 if (queue.length) 1043 return true; 1044 return false; 1045 } 1046 1047 /// Returns true if there are any queued `Data` which have not yet 1048 /// begun to be sent. 1049 final bool queuePresent(int priority = DEFAULT_PRIORITY) 1050 { 1051 if (priority == partiallySent) 1052 { 1053 assert(outQueue[priority].length > 0); 1054 return outQueue[priority].length > 1; 1055 } 1056 else 1057 return outQueue[priority].length > 0; 1058 } 1059 1060 /// Returns the number of queued `Data` at the given priority. 1061 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 1062 { 1063 return outQueue[priority].length; 1064 } 1065 1066 /// Returns the number of queued bytes at the given priority. 1067 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 1068 { 1069 size_t bytes; 1070 foreach (datum; outQueue[priority]) 1071 bytes += datum.length; 1072 return bytes; 1073 } 1074 1075 // public: 1076 private ConnectHandler connectHandler; 1077 /// Callback for when a connection has been established. 1078 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 1079 1080 private ReadDataHandler readDataHandler; 1081 /// Callback for incoming data. 1082 /// Data will not be received unless this handler is set. 1083 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 1084 1085 private DisconnectHandler disconnectHandler; 1086 /// Callback for when a connection was closed. 1087 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 1088 1089 private BufferFlushedHandler bufferFlushedHandler; 1090 /// Callback setter for when all queued data has been sent. 1091 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 1092 } 1093 1094 /// Implements a stream connection. 1095 /// Queued `Data` is allowed to be fragmented. 1096 class StreamConnection : Connection 1097 { 1098 protected: 1099 this() 1100 { 1101 super(); 1102 } 1103 1104 /// Called when a socket is writable. 1105 override void onWritable() 1106 { 1107 //scope(success) updateFlags(); 1108 onWritableImpl(); 1109 updateFlags(); 1110 } 1111 1112 // Work around scope(success) breaking debugger stack traces 1113 final private void onWritableImpl() 1114 { 1115 debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state); 1116 if (state == ConnectionState.connecting) 1117 { 1118 int32_t error; 1119 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 1120 if (error) 1121 return disconnect(formatSocketError(error), DisconnectType.error); 1122 1123 state = ConnectionState.connected; 1124 1125 //debug writefln("[%s] Connected", remoteAddressStr); 1126 try 1127 setKeepAlive(); 1128 catch (Exception e) 1129 return disconnect(e.msg, DisconnectType.error); 1130 if (connectHandler) 1131 connectHandler(); 1132 return; 1133 } 1134 //debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1135 1136 foreach (sendPartial; [true, false]) 1137 foreach (int priority, ref queue; outQueue) 1138 while (queue.length && (!sendPartial || priority == partiallySent)) 1139 { 1140 assert(partiallySent == -1 || partiallySent == priority); 1141 1142 ptrdiff_t sent = 0; 1143 if (!queue.front.empty) 1144 { 1145 queue.front.enter((scope contents) { 1146 sent = doSend(contents); 1147 }); 1148 debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, queue.front.length); 1149 } 1150 else 1151 { 1152 debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this); 1153 } 1154 1155 if (sent == Socket.ERROR) 1156 { 1157 if (wouldHaveBlocked()) 1158 return; 1159 else 1160 return onError("send() error: " ~ lastSocketError); 1161 } 1162 else 1163 if (sent < queue.front.length) 1164 { 1165 if (sent > 0) 1166 { 1167 queue.front = queue.front[sent..queue.front.length]; 1168 partiallySent = priority; 1169 } 1170 return; 1171 } 1172 else 1173 { 1174 assert(sent == queue.front.length); 1175 //debug writefln("[%s] Sent data:", remoteAddressStr); 1176 //debug writefln("%s", hexDump(queue.front.contents[0..sent])); 1177 queue.front.clear(); 1178 queue.popFront(); 1179 partiallySent = -1; 1180 if (queue.length == 0) 1181 queue = null; 1182 } 1183 } 1184 1185 // outQueue is now empty 1186 if (bufferFlushedHandler) 1187 bufferFlushedHandler(); 1188 if (state == ConnectionState.disconnecting) 1189 { 1190 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1191 close(); 1192 } 1193 } 1194 1195 public: 1196 this(Socket conn) 1197 { 1198 super(conn); 1199 } /// 1200 } 1201 1202 // *************************************************************************** 1203 1204 /// A POSIX file stream. 1205 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1206 /// Does not dup the given file descriptor, so "disconnecting" this connection 1207 /// will close it. 1208 version (Posix) 1209 class FileConnection : StreamConnection 1210 { 1211 this(int fileno) 1212 { 1213 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1214 conn.blocking = false; 1215 super(conn); 1216 } /// 1217 1218 protected: 1219 import core.sys.posix.unistd : read, write; 1220 1221 override sizediff_t doSend(scope const(void)[] buffer) 1222 { 1223 return write(socket.handle, buffer.ptr, buffer.length); 1224 } 1225 1226 override sizediff_t doReceive(scope void[] buffer) 1227 { 1228 auto bytesRead = read(socket.handle, buffer.ptr, buffer.length); 1229 if (bytesRead == 0) 1230 return doReceiveEOF; 1231 return bytesRead; 1232 } 1233 } 1234 1235 /// Separates reading and writing, e.g. for stdin/stdout. 1236 class Duplex : IConnection 1237 { 1238 /// 1239 IConnection reader, writer; 1240 1241 this(IConnection reader, IConnection writer) 1242 { 1243 this.reader = reader; 1244 this.writer = writer; 1245 reader.handleConnect = &onConnect; 1246 writer.handleConnect = &onConnect; 1247 reader.handleDisconnect = &onDisconnect; 1248 writer.handleDisconnect = &onDisconnect; 1249 } /// 1250 1251 @property ConnectionState state() 1252 { 1253 if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting) 1254 return ConnectionState.disconnecting; 1255 else 1256 return reader.state < writer.state ? reader.state : writer.state; 1257 } /// 1258 1259 /// Queue Data for sending. 1260 void send(scope Data[] data, int priority) 1261 { 1262 writer.send(data, priority); 1263 } 1264 1265 alias send = IConnection.send; /// ditto 1266 1267 /// Terminate the connection. 1268 /// Note: this isn't quite fleshed out - applications may want to 1269 /// wait and send some more data even after stdin is closed, but 1270 /// such an interface can't be fitted into an IConnection 1271 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1272 { 1273 if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting) 1274 reader.disconnect(reason, type); 1275 if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting) 1276 writer.disconnect(reason, type); 1277 debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1278 } 1279 1280 protected void onConnect() 1281 { 1282 if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected) 1283 connectHandler(); 1284 } 1285 1286 protected void onDisconnect(string reason, DisconnectType type) 1287 { 1288 debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1289 if (disconnectHandler) 1290 { 1291 disconnectHandler(reason, type); 1292 disconnectHandler = null; // don't call it twice for the other connection 1293 } 1294 // It is our responsibility to disconnect the other connection 1295 // Use DisconnectType.requested to ensure that any written data is flushed 1296 disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested); 1297 } 1298 1299 /// Callback for when a connection has been established. 1300 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1301 private ConnectHandler connectHandler; 1302 1303 /// Callback setter for when new data is read. 1304 @property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; } 1305 1306 /// Callback setter for when a connection was closed. 1307 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1308 private DisconnectHandler disconnectHandler; 1309 1310 /// Callback setter for when all queued data has been written. 1311 @property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; } 1312 } 1313 1314 debug(ae_unittest) unittest { if (false) new Duplex(null, null); } 1315 1316 // *************************************************************************** 1317 1318 /// An asynchronous socket-based connection. 1319 class SocketConnection : StreamConnection 1320 { 1321 protected: 1322 AddressInfo[] addressQueue; 1323 bool datagram; 1324 1325 this(Socket conn, bool datagram = false) 1326 { 1327 super(conn); 1328 this.datagram = datagram; 1329 } 1330 1331 override sizediff_t doSend(scope const(void)[] buffer) 1332 { 1333 return conn.send(buffer); 1334 } 1335 1336 override sizediff_t doReceive(scope void[] buffer) 1337 { 1338 auto bytesReceived = conn.receive(buffer); 1339 if (bytesReceived == 0 && !datagram) 1340 return doReceiveEOF; 1341 return bytesReceived; 1342 } 1343 1344 final void tryNextAddress() 1345 { 1346 assert(state == ConnectionState.connecting); 1347 auto addressInfo = addressQueue[0]; 1348 addressQueue = addressQueue[1..$]; 1349 1350 try 1351 { 1352 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1353 conn.blocking = false; 1354 1355 socketManager.register(this); 1356 updateFlags(); 1357 debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString()); 1358 conn.connect(addressInfo.address); 1359 } 1360 catch (SocketException e) 1361 return onError("Connect error: " ~ e.msg); 1362 } 1363 1364 /// Called when an error occurs on the socket. 1365 override void onError(string reason) 1366 { 1367 if (state == ConnectionState.connecting && addressQueue.length) 1368 { 1369 socketManager.unregister(this); 1370 conn.close(); 1371 conn = null; 1372 1373 return tryNextAddress(); 1374 } 1375 1376 super.onError(reason); 1377 } 1378 1379 public: 1380 /// Default constructor 1381 this() 1382 { 1383 debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this); 1384 } 1385 1386 /// Start establishing a connection. 1387 final void connect(AddressInfo[] addresses) 1388 { 1389 assert(addresses.length, "No addresses specified"); 1390 1391 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1392 assert(!conn); 1393 1394 addressQueue = addresses; 1395 state = ConnectionState.connecting; 1396 tryNextAddress(); 1397 } 1398 } 1399 1400 /// An asynchronous TCP connection. 1401 class TcpConnection : SocketConnection 1402 { 1403 protected: 1404 this(Socket conn) 1405 { 1406 super(conn, false); 1407 } 1408 1409 public: 1410 /// Default constructor 1411 this() 1412 { 1413 debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this); 1414 } 1415 1416 /// 1417 alias connect = SocketConnection.connect; // raise overload 1418 1419 /// Start establishing a connection. 1420 final void connect(string host, ushort port) 1421 { 1422 assert(host.length, "Empty host"); 1423 assert(port, "No port specified"); 1424 1425 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1426 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1427 1428 state = ConnectionState.resolving; 1429 1430 AddressInfo[] addressInfos; 1431 try 1432 { 1433 auto addresses = getAddress(host, port); 1434 enforce(addresses.length, "No addresses found"); 1435 debug (ASOCKETS) 1436 { 1437 stderr.writefln("Resolved to %s addresses:", addresses.length); 1438 foreach (address; addresses) 1439 stderr.writefln("- %s", address.toString()); 1440 } 1441 1442 if (addresses.length > 1) 1443 { 1444 import std.random : randomShuffle; 1445 randomShuffle(addresses); 1446 } 1447 1448 foreach (address; addresses) 1449 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1450 } 1451 catch (SocketException e) 1452 return onError("Lookup error: " ~ e.msg); 1453 1454 state = ConnectionState.disconnected; 1455 connect(addressInfos); 1456 } 1457 } 1458 1459 // *************************************************************************** 1460 1461 /// An asynchronous connection server for socket-based connections. 1462 class SocketServer 1463 { 1464 protected: 1465 /// Class that actually performs listening on a certain address family 1466 final class Listener : GenericSocket 1467 { 1468 this(Socket conn) 1469 { 1470 debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this); 1471 this.conn = conn; 1472 socketManager.register(this); 1473 } 1474 1475 /// Called when a socket is readable. 1476 override void onReadable() 1477 { 1478 debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this); 1479 Socket acceptSocket = conn.accept(); 1480 acceptSocket.blocking = false; 1481 if (handleAccept) 1482 { 1483 auto connection = createConnection(acceptSocket); 1484 debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr); 1485 connection.setKeepAlive(); 1486 //assert(connection.connected); 1487 //connection.connected = true; 1488 acceptHandler(connection); 1489 } 1490 else 1491 acceptSocket.close(); 1492 } 1493 1494 /// Called when a socket is writable. 1495 override void onWritable() 1496 { 1497 } 1498 1499 /// Called when an error occurs on the socket. 1500 override void onError(string reason) 1501 { 1502 close(); // call parent 1503 } 1504 1505 void closeListener() 1506 { 1507 assert(conn); 1508 socketManager.unregister(this); 1509 conn.close(); 1510 conn = null; 1511 } 1512 } 1513 1514 SocketConnection createConnection(Socket socket) 1515 { 1516 return new SocketConnection(socket, datagram); 1517 } 1518 1519 /// Whether the socket is listening. 1520 bool listening; 1521 /// Listener instances 1522 Listener[] listeners; 1523 bool datagram; 1524 1525 final void updateFlags() 1526 { 1527 foreach (listener; listeners) 1528 listener.notifyRead = handleAccept !is null; 1529 } 1530 1531 public: 1532 /// Start listening on this socket. 1533 final void listen(AddressInfo[] addressInfos) 1534 { 1535 foreach (ref addressInfo; addressInfos) 1536 { 1537 try 1538 { 1539 Socket conn = new Socket(addressInfo); 1540 conn.blocking = false; 1541 if (addressInfo.family == AddressFamily.INET6) 1542 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1543 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1544 1545 conn.bind(addressInfo.address); 1546 conn.listen(totalCPUs * 2); 1547 1548 listeners ~= new Listener(conn); 1549 } 1550 catch (SocketException e) 1551 { 1552 debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1553 debug(ASOCKETS) stderr.writeln(e.msg); 1554 } 1555 } 1556 1557 if (listeners.length==0) 1558 throw new Exception("Unable to bind service"); 1559 1560 listening = true; 1561 1562 updateFlags(); 1563 } 1564 1565 this() 1566 { 1567 this(false); 1568 } /// 1569 1570 this(bool datagram) 1571 { 1572 this.datagram = datagram; 1573 } /// 1574 1575 /// Creates a Server with the given sockets. 1576 /// The sockets must have already had `bind` and `listen` called on them. 1577 this(Socket[] sockets...) 1578 { 1579 foreach (socket; sockets) 1580 listeners ~= new Listener(socket); 1581 } 1582 1583 /// Returns all listening addresses. 1584 final @property Address[] localAddresses() 1585 { 1586 Address[] result; 1587 foreach (listener; listeners) 1588 result ~= listener.localAddress; 1589 return result; 1590 } 1591 1592 /// Returns `true` if the server is listening for incoming connections. 1593 final @property bool isListening() 1594 { 1595 return listening; 1596 } 1597 1598 /// Stop listening on this socket. 1599 final void close() 1600 { 1601 foreach (listener;listeners) 1602 listener.closeListener(); 1603 listeners = null; 1604 listening = false; 1605 if (handleClose) 1606 handleClose(); 1607 } 1608 1609 /// Create a SocketServer using the handle passed on standard input, 1610 /// for which `listen` had already been called. Used by 1611 /// e.g. FastCGI and systemd sockets with "Listen = yes". 1612 static SocketServer fromStdin() 1613 { 1614 socket_t socket; 1615 version (Windows) 1616 { 1617 import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE; 1618 socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE); 1619 } 1620 else 1621 socket = cast(socket_t)0; 1622 1623 auto s = new Socket(socket, AddressFamily.UNSPEC); 1624 s.blocking = false; 1625 return new SocketServer(s); 1626 } 1627 1628 /// Callback for when the socket was closed. 1629 void delegate() handleClose; 1630 1631 private void delegate(SocketConnection incoming) acceptHandler; 1632 /// Callback for an incoming connection. 1633 /// Connections will not be accepted unless this handler is set. 1634 @property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; } 1635 /// ditto 1636 @property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); } 1637 } 1638 1639 /// An asynchronous TCP connection server. 1640 class TcpServer : SocketServer 1641 { 1642 protected: 1643 override SocketConnection createConnection(Socket socket) 1644 { 1645 return new TcpConnection(socket); 1646 } 1647 1648 public: 1649 this() 1650 { 1651 } /// 1652 1653 this(Socket[] sockets...) 1654 { 1655 super(sockets); 1656 } /// Construct from the given sockets. 1657 1658 /// 1659 alias listen = SocketServer.listen; // raise overload 1660 1661 /// Start listening on this socket. 1662 final ushort listen(ushort port, string addr = null) 1663 { 1664 debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port); 1665 //assert(!listening, "Attempting to listen on a listening socket"); 1666 1667 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1668 1669 debug (ASOCKETS) 1670 { 1671 stderr.writefln("Resolved to %s addresses:", addressInfos.length); 1672 foreach (ref addressInfo; addressInfos) 1673 stderr.writefln("- %s", addressInfo); 1674 } 1675 1676 // listen on random ports only on IPv4 for now 1677 if (port == 0) 1678 { 1679 foreach_reverse (i, ref addressInfo; addressInfos) 1680 if (addressInfo.family != AddressFamily.INET) 1681 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1682 } 1683 1684 listen(addressInfos); 1685 1686 foreach (listener; listeners) 1687 { 1688 auto address = listener.conn.localAddress(); 1689 if (address.addressFamily == AddressFamily.INET) 1690 port = to!ushort(address.toPortString()); 1691 } 1692 1693 return port; 1694 } 1695 1696 deprecated("Use SocketServer.fromStdin") 1697 static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; } 1698 1699 /// Delegate to be called when a connection is accepted. 1700 @property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); } 1701 } 1702 1703 // *************************************************************************** 1704 1705 /// Base class for connection-less socket protocols, i.e. those for 1706 /// which we must use sendto instead of connect/send. 1707 /// These generally correspond to stateless / datagram-based 1708 /// protocols, like UDP. 1709 /// This module's class hierarchy is mostly oriented towards 1710 /// stateful, stream-based protocols; to represent connectionless 1711 /// protocols, this class encapsulates a socket with a fixed 1712 /// destination (sendto) address, and optionally bound to a local 1713 /// address. 1714 /// Currently received packets' address is not exposed. 1715 class ConnectionlessSocketConnection : Connection 1716 { 1717 protected: 1718 this(Socket conn) 1719 { 1720 super(conn); 1721 } 1722 1723 /// Called when a socket is writable. 1724 override void onWritable() 1725 { 1726 //scope(success) updateFlags(); 1727 onWritableImpl(); 1728 updateFlags(); 1729 } 1730 1731 // Work around scope(success) breaking debugger stack traces 1732 final private void onWritableImpl() 1733 { 1734 foreach (priority, ref queue; outQueue) 1735 while (queue.length) 1736 { 1737 ptrdiff_t sent; 1738 queue.front.enter((scope contents) { 1739 sent = conn.sendTo(contents, remoteAddress); 1740 }); 1741 1742 if (sent == Socket.ERROR) 1743 { 1744 if (wouldHaveBlocked()) 1745 return; 1746 else 1747 return onError("send() error: " ~ lastSocketError); 1748 } 1749 else 1750 if (sent < queue.front.length) 1751 { 1752 return onError("Sent only %d/%d bytes of the datagram!".format(sent, queue.front.length)); 1753 } 1754 else 1755 { 1756 assert(sent == queue.front.length); 1757 //debug writefln("[%s] Sent data:", remoteAddressStr); 1758 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1759 queue.front.clear(); 1760 queue.popFront(); 1761 if (queue.length == 0) 1762 queue = null; 1763 } 1764 } 1765 1766 // outQueue is now empty 1767 if (bufferFlushedHandler) 1768 bufferFlushedHandler(); 1769 if (state == ConnectionState.disconnecting) 1770 { 1771 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1772 close(); 1773 } 1774 } 1775 1776 override sizediff_t doSend(scope const(void)[] buffer) 1777 { 1778 assert(false); // never called (called only from overridden methods) 1779 } 1780 1781 override sizediff_t doReceive(scope void[] buffer) 1782 { 1783 return conn.receive(buffer); 1784 } 1785 1786 public: 1787 /// Default constructor 1788 this() 1789 { 1790 debug (ASOCKETS) stderr.writefln("New ConnectionlessSocketConnection @ %s", cast(void*)this); 1791 } 1792 1793 /// Initialize with the given `AddressFamily`, without binding to an address. 1794 final void initialize(AddressFamily family, SocketType type, ProtocolType protocol) 1795 { 1796 initializeImpl(family, type, protocol); 1797 if (connectHandler) 1798 connectHandler(); 1799 } 1800 1801 private final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1802 { 1803 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1804 assert(!conn); 1805 1806 conn = new Socket(family, type, protocol); 1807 conn.blocking = false; 1808 socketManager.register(this); 1809 state = ConnectionState.connected; 1810 updateFlags(); 1811 } 1812 1813 /// Bind to a local address in order to receive packets sent there. 1814 final ushort bind(AddressInfo addressInfo) 1815 { 1816 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1817 conn.bind(addressInfo.address); 1818 1819 auto address = conn.localAddress(); 1820 auto port = to!ushort(address.toPortString()); 1821 1822 if (connectHandler) 1823 connectHandler(); 1824 1825 return port; 1826 } 1827 1828 // public: 1829 /// Where to send packets to. 1830 Address remoteAddress; 1831 } 1832 1833 /// An asynchronous UDP stream. 1834 /// UDP does not have connections, so this class encapsulates a socket 1835 /// with a fixed destination (sendto) address, and optionally bound to 1836 /// a local address. 1837 /// Currently received packets' address is not exposed. 1838 class UdpConnection : ConnectionlessSocketConnection 1839 { 1840 protected: 1841 this(Socket conn) 1842 { 1843 super(conn); 1844 } 1845 1846 public: 1847 /// Default constructor 1848 this() 1849 { 1850 debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this); 1851 } 1852 1853 /// Initialize with the given `AddressFamily`, without binding to an address. 1854 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM) 1855 { 1856 super.initialize(family, type, ProtocolType.UDP); 1857 } 1858 1859 /// Bind to a local address in order to receive packets sent there. 1860 final ushort bind(string host, ushort port) 1861 { 1862 assert(host.length, "Empty host"); 1863 1864 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1865 1866 state = ConnectionState.resolving; 1867 1868 AddressInfo addressInfo; 1869 try 1870 { 1871 auto addresses = getAddress(host, port); 1872 enforce(addresses.length, "No addresses found"); 1873 debug (ASOCKETS) 1874 { 1875 stderr.writefln("Resolved to %s addresses:", addresses.length); 1876 foreach (address; addresses) 1877 stderr.writefln("- %s", address.toString()); 1878 } 1879 1880 Address address; 1881 if (addresses.length > 1) 1882 { 1883 import std.random : uniform; 1884 address = addresses[uniform(0, $)]; 1885 } 1886 else 1887 address = addresses[0]; 1888 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1889 } 1890 catch (SocketException e) 1891 { 1892 onError("Lookup error: " ~ e.msg); 1893 return 0; 1894 } 1895 1896 state = ConnectionState.disconnected; 1897 return super.bind(addressInfo); 1898 } 1899 } 1900 1901 /// 1902 debug(ae_unittest) unittest 1903 { 1904 auto server = new UdpConnection(); 1905 server.bind("127.0.0.1", 0); 1906 1907 auto client = new UdpConnection(); 1908 client.initialize(server.localAddress.addressFamily); 1909 1910 string[] packets = ["", "Hello", "there"]; 1911 client.remoteAddress = server.localAddress; 1912 client.send({ 1913 DataVec data; 1914 foreach (packet; packets) 1915 data ~= Data(packet.asBytes); 1916 return data; 1917 }()[]); 1918 1919 server.handleReadData = (Data data) 1920 { 1921 assert(data.unsafeContents == packets[0]); 1922 packets = packets[1..$]; 1923 if (!packets.length) 1924 { 1925 server.close(); 1926 client.close(); 1927 } 1928 }; 1929 socketManager.loop(); 1930 assert(!packets.length); 1931 } 1932 1933 // *************************************************************************** 1934 1935 /// Base class for a connection adapter. 1936 /// By itself, does nothing. 1937 class ConnectionAdapter : IConnection 1938 { 1939 /// The next connection in the chain (towards the raw transport). 1940 IConnection next; 1941 1942 this(IConnection next) 1943 { 1944 this.next = next; 1945 next.handleConnect = &onConnect; 1946 next.handleDisconnect = &onDisconnect; 1947 next.handleBufferFlushed = &onBufferFlushed; 1948 } /// 1949 1950 @property ConnectionState state() { return next.state; } /// 1951 1952 /// Queue Data for sending. 1953 void send(scope Data[] data, int priority) 1954 { 1955 next.send(data, priority); 1956 } 1957 1958 alias send = IConnection.send; /// ditto 1959 1960 /// Terminate the connection. 1961 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1962 { 1963 next.disconnect(reason, type); 1964 } 1965 1966 protected void onConnect() 1967 { 1968 if (connectHandler) 1969 connectHandler(); 1970 } 1971 1972 protected void onReadData(Data data) 1973 { 1974 // onReadData should be fired only if readDataHandler is set 1975 assert(readDataHandler, "onReadData caled with null readDataHandler"); 1976 readDataHandler(data); 1977 } 1978 1979 protected void onDisconnect(string reason, DisconnectType type) 1980 { 1981 if (disconnectHandler) 1982 disconnectHandler(reason, type); 1983 } 1984 1985 protected void onBufferFlushed() 1986 { 1987 if (bufferFlushedHandler) 1988 bufferFlushedHandler(); 1989 } 1990 1991 /// Callback for when a connection has been established. 1992 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1993 protected ConnectHandler connectHandler; 1994 1995 /// Callback setter for when new data is read. 1996 @property void handleReadData(ReadDataHandler value) 1997 { 1998 readDataHandler = value; 1999 next.handleReadData = value ? &onReadData : null ; 2000 } 2001 protected ReadDataHandler readDataHandler; 2002 2003 /// Callback setter for when a connection was closed. 2004 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 2005 protected DisconnectHandler disconnectHandler; 2006 2007 /// Callback setter for when all queued data has been written. 2008 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 2009 protected BufferFlushedHandler bufferFlushedHandler; 2010 } 2011 2012 // *************************************************************************** 2013 2014 /// Buffers the data received from the next connection, 2015 /// even when this object is not marked as readable (handleReadData unset). 2016 class BufferingAdapter : ConnectionAdapter 2017 { 2018 Data[] buffer; 2019 2020 this(IConnection next) 2021 { 2022 super(next); 2023 next.handleReadData = &onReadData; 2024 } 2025 2026 override void onReadData(Data data) 2027 { 2028 if (readDataHandler) 2029 { 2030 assert(buffer.length == 0); 2031 readDataHandler(data); 2032 } 2033 else 2034 buffer.queuePush(data); 2035 } 2036 2037 override @property void handleReadData(ReadDataHandler value) 2038 { 2039 readDataHandler = value; 2040 while (readDataHandler && buffer.length) 2041 readDataHandler(buffer.queuePop); 2042 } 2043 } 2044 2045 // *************************************************************************** 2046 2047 /// Adapter for connections with a line-based protocol. 2048 /// Splits data stream into delimiter-separated lines. 2049 class LineBufferedAdapter : ConnectionAdapter 2050 { 2051 /// The protocol's line delimiter. 2052 string delimiter = "\r\n"; 2053 2054 /// Maximum line length (0 means unlimited). 2055 size_t maxLength = 0; 2056 2057 this(IConnection next) 2058 { 2059 super(next); 2060 } /// 2061 2062 /// Append a line to the send buffer. 2063 void send(string line) 2064 { 2065 //super.send(Data(line ~ delimiter)); 2066 // https://issues.dlang.org/show_bug.cgi?id=13985 2067 ConnectionAdapter ca = this; 2068 ca.send(Data(line.asBytes ~ delimiter.asBytes)); 2069 } 2070 2071 protected: 2072 /// The receive buffer. 2073 Data inBuffer; 2074 2075 /// Called when data has been received. 2076 final override void onReadData(Data data) 2077 { 2078 import std.string : indexOf; 2079 auto startIndex = inBuffer.length; 2080 if (inBuffer.length) 2081 inBuffer ~= data; 2082 else 2083 inBuffer = data; 2084 2085 assert(delimiter.length >= 1); 2086 if (startIndex >= delimiter.length) 2087 startIndex -= delimiter.length - 1; 2088 else 2089 startIndex = 0; 2090 2091 auto index = inBuffer[startIndex .. $].indexOf(delimiter.asBytes); 2092 while (index >= 0) 2093 { 2094 if (!processLine(startIndex + index)) 2095 break; 2096 2097 startIndex = 0; 2098 index = inBuffer.indexOf(delimiter.asBytes); 2099 } 2100 2101 if (maxLength && inBuffer.length > maxLength) 2102 disconnect("Line too long", DisconnectType.error); 2103 } 2104 2105 // `index` is the index of the delimiter's first character. 2106 final bool processLine(size_t index) 2107 { 2108 if (maxLength && index > maxLength) 2109 { 2110 disconnect("Line too long", DisconnectType.error); 2111 return false; 2112 } 2113 auto line = inBuffer[0..index]; 2114 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 2115 super.onReadData(line); 2116 return true; 2117 } 2118 2119 override void onDisconnect(string reason, DisconnectType type) 2120 { 2121 super.onDisconnect(reason, type); 2122 inBuffer.clear(); 2123 } 2124 } 2125 2126 // *************************************************************************** 2127 2128 /// Fires an event handler or disconnects connections 2129 /// after a period of inactivity. 2130 class TimeoutAdapter : ConnectionAdapter 2131 { 2132 this(IConnection next) 2133 { 2134 debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this); 2135 super(next); 2136 } /// 2137 2138 /// Set the `Duration` indicating the period of inactivity after which to take action. 2139 final void setIdleTimeout(Duration timeout) 2140 { 2141 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 2142 assert(timeout > Duration.zero); 2143 this.timeout = timeout; 2144 2145 // Configure idleTask 2146 if (idleTask is null) 2147 { 2148 idleTask = new TimerTask(); 2149 idleTask.handleTask = &onTask_Idle; 2150 } 2151 else if (idleTask.isWaiting()) 2152 idleTask.cancel(); 2153 2154 mainTimer.add(idleTask, now + timeout); 2155 } 2156 2157 /// Manually mark this connection as non-idle, restarting the idle timer. 2158 /// `handleNonIdle` will be called, if set. 2159 void markNonIdle() 2160 { 2161 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 2162 if (handleNonIdle) 2163 handleNonIdle(); 2164 if (idleTask && idleTask.isWaiting()) 2165 idleTask.restart(now + timeout); 2166 } 2167 2168 /// Stop the idle timer. 2169 void cancelIdleTimeout() 2170 { 2171 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 2172 assert(idleTask !is null); 2173 assert(idleTask.isWaiting()); 2174 idleTask.cancel(); 2175 } 2176 2177 /// Restart the idle timer. 2178 void resumeIdleTimeout() 2179 { 2180 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 2181 assert(idleTask !is null); 2182 assert(!idleTask.isWaiting()); 2183 mainTimer.add(idleTask, now + timeout); 2184 } 2185 2186 /// Returns the point in time when the idle handler is scheduled 2187 /// to be called, or null if it is not scheduled. 2188 /*Nullable!MonoTime*/auto when()() 2189 { 2190 import std.typecons : Nullable; 2191 return idleTask.isWaiting() 2192 ? Nullable!MonoTime(idleTask.when) 2193 : Nullable!MonoTime.init; 2194 } 2195 2196 /// Callback for when a connection has stopped responding. 2197 /// If unset, the connection will be disconnected. 2198 void delegate() handleIdleTimeout; 2199 2200 /// Callback for when a connection is marked as non-idle 2201 /// (when data is received). 2202 void delegate() handleNonIdle; 2203 2204 protected: 2205 override void onConnect() 2206 { 2207 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 2208 markNonIdle(); 2209 super.onConnect(); 2210 } 2211 2212 override void onReadData(Data data) 2213 { 2214 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 2215 markNonIdle(); 2216 super.onReadData(data); 2217 } 2218 2219 override void onDisconnect(string reason, DisconnectType type) 2220 { 2221 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 2222 if (idleTask && idleTask.isWaiting()) 2223 idleTask.cancel(); 2224 super.onDisconnect(reason, type); 2225 } 2226 2227 private: 2228 TimerTask idleTask; // non-null if an idle timeout has been set 2229 Duration timeout; 2230 2231 final void onTask_Idle(Timer /*timer*/, TimerTask /*task*/) 2232 { 2233 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this); 2234 if (state == ConnectionState.disconnecting) 2235 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 2236 2237 if (state == ConnectionState.disconnected) 2238 return; 2239 2240 if (handleIdleTimeout) 2241 { 2242 resumeIdleTimeout(); // reschedule (by default) 2243 handleIdleTimeout(); 2244 } 2245 else 2246 disconnect("Time-out", DisconnectType.error); 2247 } 2248 } 2249 2250 // *************************************************************************** 2251 2252 debug(ae_unittest) unittest 2253 { 2254 void testTimer() 2255 { 2256 bool fired; 2257 setTimeout({fired = true;}, 10.msecs); 2258 socketManager.loop(); 2259 assert(fired); 2260 } 2261 2262 testTimer(); 2263 }