1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <ae@cy.md> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import core.stdc.stdint : int32_t; 24 25 import std.exception; 26 import std.parallelism : totalCPUs; 27 import std.socket; 28 import std.string : format; 29 public import std.socket : Address, AddressInfo, Socket; 30 31 version (Windows) 32 private import c_socks = core.sys.windows.winsock2; 33 else version (Posix) 34 private import c_socks = core.sys.posix.sys.socket; 35 36 debug(ASOCKETS) import std.stdio : stderr; 37 debug(PRINTDATA) import std.stdio : stderr; 38 debug(PRINTDATA) import ae.utils.text : hexDump; 39 private import std.conv : to; 40 41 42 // https://issues.dlang.org/show_bug.cgi?id=7016 43 static import ae.utils.array; 44 45 version(LIBEV) 46 { 47 import deimos.ev; 48 pragma(lib, "ev"); 49 } 50 51 version(Windows) 52 { 53 import core.sys.windows.windows : Sleep; 54 private enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 55 } 56 else 57 private enum USE_SLEEP = false; 58 59 /// Flags that determine socket wake-up events. 60 int eventCounter; 61 62 version(LIBEV) 63 { 64 // Watchers are a GenericSocket field (as declared in SocketMixin). 65 // Use one watcher per read and write event. 66 // Start/stop those as higher-level code declares interest in those events. 67 // Use the "data" ev_io field to store the parent GenericSocket address. 68 // Also use the "data" field as a flag to indicate whether the watcher is active 69 // (data is null when the watcher is stopped). 70 71 /// `libev`-based event loop implementation. 72 struct SocketManager 73 { 74 private: 75 size_t count; 76 77 extern(C) 78 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 79 { 80 eventCounter++; 81 auto socket = cast(GenericSocket)w.data; 82 assert(socket, "libev event fired on stopped watcher"); 83 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 84 85 if (revents & EV_READ) 86 socket.onReadable(); 87 else 88 if (revents & EV_WRITE) 89 socket.onWritable(); 90 else 91 assert(false, "Unknown event fired from libev"); 92 93 // TODO? Need to get proper SocketManager instance to call updateTimer on 94 socketManager.updateTimer(false); 95 } 96 97 ev_timer evTimer; 98 MonoTime lastNextEvent = MonoTime.max; 99 100 extern(C) 101 static void timerCallback(ev_loop_t* l, ev_timer* w, int /*revents*/) 102 { 103 eventCounter++; 104 debug (ASOCKETS) writefln("Timer callback called."); 105 mainTimer.prod(); 106 107 socketManager.updateTimer(true); 108 debug (ASOCKETS) writefln("Timer callback exiting."); 109 } 110 111 void updateTimer(bool force) 112 { 113 auto nextEvent = mainTimer.getNextEvent(); 114 if (force || lastNextEvent != nextEvent) 115 { 116 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 117 if (nextEvent == MonoTime.max) // Stopping 118 { 119 if (lastNextEvent != MonoTime.max) 120 ev_timer_stop(ev_default_loop(0), &evTimer); 121 } 122 else 123 { 124 auto remaining = mainTimer.getRemainingTime(); 125 while (remaining <= Duration.zero) 126 { 127 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 128 mainTimer.prod(); 129 remaining = mainTimer.getRemainingTime(); 130 } 131 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 132 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 133 if (lastNextEvent == MonoTime.max) // Starting 134 { 135 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 136 ev_timer_start(ev_default_loop(0), &evTimer); 137 } 138 else // Adjusting 139 { 140 evTimer.repeat = tstamp; 141 ev_timer_again(ev_default_loop(0), &evTimer); 142 } 143 } 144 lastNextEvent = nextEvent; 145 } 146 } 147 148 public: 149 /// Register a socket with the manager. 150 void register(GenericSocket socket) 151 { 152 debug (ASOCKETS) writefln("Registering %s", socket); 153 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 154 auto fd = socket.conn.handle; 155 assert(fd, "Must have fd before socket registration"); 156 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 157 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 158 count++; 159 } 160 161 /// Unregister a socket with the manager. 162 void unregister(GenericSocket socket) 163 { 164 debug (ASOCKETS) writefln("Unregistering %s", socket); 165 socket.notifyRead = false; 166 socket.notifyWrite = false; 167 count--; 168 } 169 170 /// Returns the number of registered sockets. 171 size_t size() 172 { 173 return count; 174 } 175 176 /// Loop continuously until no sockets are left. 177 void loop() 178 { 179 auto evLoop = ev_default_loop(0); 180 enforce(evLoop, "libev initialization failure"); 181 182 updateTimer(true); 183 debug (ASOCKETS) writeln("ev_run"); 184 ev_run(ev_default_loop(0), 0); 185 } 186 } 187 188 private mixin template SocketMixin() 189 { 190 private ev_io evRead, evWrite; 191 192 private void setWatcherState(ref ev_io ev, bool newValue, int /*event*/) 193 { 194 if (!conn) 195 { 196 // Can happen when setting delegates before connecting. 197 return; 198 } 199 200 if (newValue && !ev.data) 201 { 202 // Start 203 ev.data = cast(void*)this; 204 ev_io_start(ev_default_loop(0), &ev); 205 } 206 else 207 if (!newValue && ev.data) 208 { 209 // Stop 210 assert(ev.data is cast(void*)this); 211 ev.data = null; 212 ev_io_stop(ev_default_loop(0), &ev); 213 } 214 } 215 216 /// Interested in read notifications (onReadable)? 217 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 218 /// Interested in write notifications (onWritable)? 219 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 220 221 debug ~this() @nogc 222 { 223 // The LIBEV SocketManager holds no references to registered sockets. 224 // TODO: Add a doubly-linked list? 225 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 226 } 227 } 228 } 229 else // Use select 230 { 231 /// `select`-based event loop implementation. 232 struct SocketManager 233 { 234 private: 235 enum FD_SETSIZE = 1024; 236 237 /// List of all sockets to poll. 238 GenericSocket[] sockets; 239 240 /// Debug AA to check for dangling socket references. 241 debug GenericSocket[socket_t] socketHandles; 242 243 void delegate()[] nextTickHandlers, idleHandlers; 244 245 public: 246 /// Register a socket with the manager. 247 void register(GenericSocket conn) 248 { 249 debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1); 250 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 251 sockets ~= conn; 252 253 debug 254 { 255 auto handle = conn.socket.handle; 256 assert(handle != socket_t.init, "Can't register a closed socket"); 257 assert(handle !in socketHandles, "This socket handle is already registered"); 258 socketHandles[handle] = conn; 259 } 260 } 261 262 /// Unregister a socket with the manager. 263 void unregister(GenericSocket conn) 264 { 265 debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 266 267 debug 268 { 269 auto handle = conn.socket.handle; 270 assert(handle != socket_t.init, "Can't unregister a closed socket"); 271 auto pconn = handle in socketHandles; 272 assert(pconn, "This socket handle is not registered"); 273 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 274 socketHandles.remove(handle); 275 } 276 277 foreach (size_t i, GenericSocket j; sockets) 278 if (j is conn) 279 { 280 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 281 return; 282 } 283 assert(false, "Socket not registered"); 284 } 285 286 /// Returns the number of registered sockets. 287 size_t size() 288 { 289 return sockets.length; 290 } 291 292 /// Loop continuously until no sockets are left. 293 void loop() 294 { 295 debug (ASOCKETS) stderr.writeln("Starting event loop."); 296 297 SocketSet readset, writeset; 298 size_t sockcount; 299 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 300 readset = new SocketSet(setSize); 301 writeset = new SocketSet(setSize); 302 while (true) 303 { 304 if (nextTickHandlers.length) 305 { 306 auto thisTickHandlers = nextTickHandlers; 307 nextTickHandlers = null; 308 309 foreach (handler; thisTickHandlers) 310 handler(); 311 312 continue; 313 } 314 315 uint minSize = 0; 316 version(Windows) 317 minSize = cast(uint)sockets.length; 318 else 319 { 320 foreach (s; sockets) 321 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 322 minSize = s.socket.handle; 323 } 324 minSize++; 325 326 if (setSize < minSize) 327 { 328 debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 329 setSize = minSize * 2; 330 readset = new SocketSet(setSize); 331 writeset = new SocketSet(setSize); 332 } 333 else 334 { 335 readset.reset(); 336 writeset.reset(); 337 } 338 339 sockcount = 0; 340 bool haveActive; 341 debug (ASOCKETS) stderr.writeln("Populating sets"); 342 foreach (GenericSocket conn; sockets) 343 { 344 if (!conn.socket) 345 continue; 346 sockcount++; 347 348 debug (ASOCKETS) stderr.writef("\t%s:", conn); 349 if (conn.notifyRead) 350 { 351 readset.add(conn.socket); 352 if (!conn.daemonRead) 353 haveActive = true; 354 debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : ""); 355 } 356 if (conn.notifyWrite) 357 { 358 writeset.add(conn.socket); 359 if (!conn.daemonWrite) 360 haveActive = true; 361 debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : ""); 362 } 363 debug (ASOCKETS) stderr.writeln(); 364 } 365 debug (ASOCKETS) 366 { 367 stderr.writefln("Sets populated as follows:"); 368 printSets(readset, writeset); 369 } 370 371 debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...", 372 haveActive ? "" : "not ", 373 sockcount, 374 mainTimer.isWaiting() ? "with" : "no", 375 idleHandlers.length, 376 ); 377 if (!haveActive && !mainTimer.isWaiting() && !nextTickHandlers.length) 378 { 379 debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop."); 380 break; 381 } 382 383 debug (ASOCKETS) stderr.flush(); 384 385 int events; 386 if (idleHandlers.length) 387 { 388 if (sockcount==0) 389 events = 0; 390 else 391 events = Socket.select(readset, writeset, null, 0.seconds); 392 } 393 else 394 if (USE_SLEEP && sockcount==0) 395 { 396 version(Windows) 397 { 398 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 399 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 400 if (duration <= 0) 401 duration = 1; // Avoid busywait 402 else 403 if (duration > int.max) 404 duration = int.max; 405 Sleep(cast(int)duration); 406 events = 0; 407 } 408 else 409 assert(0); 410 } 411 else 412 if (mainTimer.isWaiting()) 413 events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime()); 414 else 415 events = Socket.select(readset, writeset, null); 416 417 debug (ASOCKETS) stderr.writefln("%d events fired.", events); 418 419 if (events > 0) 420 { 421 // Handle just one event at a time, as the first 422 // handler might invalidate select()'s results. 423 handleEvent(readset, writeset); 424 } 425 else 426 if (idleHandlers.length) 427 { 428 import ae.utils.array : shift; 429 auto handler = idleHandlers.shift(); 430 431 // Rotate the idle handler queue before running it, 432 // in case the handler unregisters itself. 433 idleHandlers ~= handler; 434 435 handler(); 436 } 437 438 // Timers may invalidate our select results, so fire them after processing the latter 439 mainTimer.prod(); 440 441 eventCounter++; 442 } 443 } 444 445 debug (ASOCKETS) 446 private void printSets(SocketSet readset, SocketSet writeset) 447 { 448 foreach (GenericSocket conn; sockets) 449 { 450 if (!conn.socket) 451 stderr.writefln("\t\t%s is unset", conn); 452 else 453 { 454 if (readset.isSet(conn.socket)) 455 stderr.writefln("\t\t%s is readable", conn); 456 if (writeset.isSet(conn.socket)) 457 stderr.writefln("\t\t%s is writable", conn); 458 } 459 } 460 } 461 462 private void handleEvent(SocketSet readset, SocketSet writeset) 463 { 464 debug (ASOCKETS) 465 { 466 stderr.writefln("\tSelect results:"); 467 printSets(readset, writeset); 468 } 469 470 foreach (GenericSocket conn; sockets) 471 { 472 if (!conn.socket) 473 continue; 474 475 if (readset.isSet(conn.socket)) 476 { 477 debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn); 478 return conn.onReadable(); 479 } 480 else 481 if (writeset.isSet(conn.socket)) 482 { 483 debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn); 484 return conn.onWritable(); 485 } 486 } 487 488 assert(false, "select() reported events available, but no registered sockets are set"); 489 } 490 } 491 492 /// Schedule a function to run on the next event loop iteration. 493 /// Can be used to queue logic to run once all current execution frames exit. 494 /// Similar to e.g. process.nextTick in Node. 495 void onNextTick(ref SocketManager socketManager, void delegate() dg) pure @safe nothrow 496 { 497 socketManager.nextTickHandlers ~= dg; 498 } 499 500 // Use UFCS to allow removeIdleHandler to have a predicate with context 501 /// Register a function to be called when the event loop is idle, 502 /// and would otherwise sleep. 503 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 504 { 505 foreach (i, idleHandler; socketManager.idleHandlers) 506 assert(handler !is idleHandler); 507 508 socketManager.idleHandlers ~= handler; 509 } 510 511 /// Unregister a function previously registered with `addIdleHandler`. 512 void removeIdleHandler(alias pred=(a, b) => a is b, Args...)(ref SocketManager socketManager, Args args) 513 { 514 foreach (i, idleHandler; socketManager.idleHandlers) 515 if (pred(idleHandler, args)) 516 { 517 import std.algorithm : remove; 518 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 519 return; 520 } 521 assert(false, "No such idle handler"); 522 } 523 524 private mixin template SocketMixin() 525 { 526 /// Interested in read notifications (onReadable)? 527 bool notifyRead; 528 /// Interested in write notifications (onWritable)? 529 bool notifyWrite; 530 } 531 } 532 533 /// The default socket manager. 534 SocketManager socketManager; 535 536 // *************************************************************************** 537 538 /// General methods for an asynchronous socket. 539 abstract class GenericSocket 540 { 541 /// Declares notifyRead and notifyWrite. 542 mixin SocketMixin; 543 544 protected: 545 /// The socket this class wraps. 546 Socket conn; 547 548 // protected: 549 /// Retrieve the socket class this class wraps. 550 @property final Socket socket() 551 { 552 return conn; 553 } 554 555 void onReadable() 556 { 557 } 558 559 void onWritable() 560 { 561 } 562 563 void onError(string /*reason*/) 564 { 565 } 566 567 public: 568 /// allow getting the address of connections that are already disconnected 569 private Address[2] cachedAddress; 570 571 /*private*/ final @property Address _address(bool local)() 572 { 573 if (cachedAddress[local] !is null) 574 return cachedAddress[local]; 575 else 576 if (conn is null) 577 return null; 578 else 579 { 580 Address a; 581 if (conn.addressFamily == AddressFamily.UNSPEC) 582 { 583 // Socket will attempt to construct an UnknownAddress, 584 // which will almost certainly not match the real address length. 585 static if (local) 586 alias getname = c_socks.getsockname; 587 else 588 alias getname = c_socks.getpeername; 589 590 c_socks.socklen_t nameLen = 0; 591 if (getname(conn.handle, null, &nameLen) < 0) 592 throw new SocketOSException("Unable to obtain socket address"); 593 594 auto buf = new ubyte[nameLen]; 595 auto sa = cast(c_socks.sockaddr*)buf.ptr; 596 if (getname(conn.handle, sa, &nameLen) < 0) 597 throw new SocketOSException("Unable to obtain socket address"); 598 a = new UnknownAddressReference(sa, nameLen); 599 } 600 else 601 a = local ? conn.localAddress() : conn.remoteAddress(); 602 return cachedAddress[local] = a; 603 } 604 } 605 606 alias localAddress = _address!true; /// Retrieve this socket's local address. 607 alias remoteAddress = _address!false; /// Retrieve this socket's remote address. 608 609 /*private*/ final @property string _addressStr(bool local)() nothrow 610 { 611 try 612 { 613 auto a = _address!local; 614 if (a is null) 615 return "[null address]"; 616 string host = a.toAddrString(); 617 import std.string : indexOf; 618 if (host.indexOf(':') >= 0) 619 host = "[" ~ host ~ "]"; 620 try 621 { 622 string port = a.toPortString(); 623 return host ~ ":" ~ port; 624 } 625 catch (Exception e) 626 return host; 627 } 628 catch (Exception e) 629 return "[error: " ~ e.msg ~ "]"; 630 } 631 632 alias localAddressStr = _addressStr!true; /// Retrieve this socket's local address, as a string. 633 alias remoteAddressStr = _addressStr!false; /// Retrieve this socket's remote address, as a string. 634 635 /// Don't block the process from exiting, even if the socket is ready to receive data. 636 /// TODO: Not implemented with libev 637 bool daemonRead; 638 639 /// Don't block the process from exiting, even if the socket is ready to send data. 640 /// TODO: Not implemented with libev 641 bool daemonWrite; 642 643 deprecated alias daemon = daemonRead; 644 645 /// Enable TCP keep-alive on the socket with the given settings. 646 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 647 { 648 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 649 if (enabled) 650 { 651 try 652 conn.setKeepAlive(time, interval); 653 catch (SocketException) 654 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 655 } 656 else 657 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 658 } 659 660 /// Returns a string containing the class name, address, and file descriptor. 661 override string toString() const 662 { 663 import std.string : format, split; 664 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 665 } 666 } 667 668 // *************************************************************************** 669 670 /// Classifies the cause of the disconnect. 671 /// Can be used to decide e.g. when it makes sense to reconnect. 672 enum DisconnectType 673 { 674 requested, /// Initiated by the application. 675 graceful, /// The peer gracefully closed the connection. 676 error /// Some abnormal network condition. 677 } 678 679 /// Used to indicate the state of a connection throughout its lifecycle. 680 enum ConnectionState 681 { 682 /// The initial state, or the state after a disconnect was fully processed. 683 disconnected, 684 685 /// Name resolution. Currently done synchronously. 686 resolving, 687 688 /// A connection attempt is in progress. 689 connecting, 690 691 /// A connection is established. 692 connected, 693 694 /// Disconnecting in progress. No data can be sent or received at this point. 695 /// We are waiting for queued data to be actually sent before disconnecting. 696 disconnecting, 697 } 698 699 /// Returns true if this is a connection state for which disconnecting is valid. 700 /// Generally, applications should be aware of the life cycle of their sockets, 701 /// so checking the state of a connection is unnecessary (and a code smell). 702 /// However, unconditionally disconnecting some connected sockets can be useful 703 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle), 704 /// such as in response to a signal. 705 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; } 706 707 /// Common interface for connections and adapters. 708 interface IConnection 709 { 710 /// `send` queues data for sending in one of five queues, indexed 711 /// by a numeric priority. 712 /// `MAX_PRIORITY` is the highest (least urgent) priority index. 713 /// `DEFAULT_PRIORITY` is the default priority 714 enum MAX_PRIORITY = 4; 715 enum DEFAULT_PRIORITY = 2; /// ditto 716 717 /// This is the default value for the `disconnect` `reason` string parameter. 718 static const defaultDisconnectReason = "Software closed the connection"; 719 720 /// Get connection state. 721 @property ConnectionState state(); 722 723 /// Has a connection been established? 724 deprecated final @property bool connected() { return state == ConnectionState.connected; } 725 726 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 727 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 728 729 /// Queue Data for sending. 730 void send(Data[] data, int priority = DEFAULT_PRIORITY); 731 732 /// ditto 733 final void send(Data datum, int priority = DEFAULT_PRIORITY) 734 { 735 Data[1] data; 736 data[0] = datum; 737 this.send(data, priority); 738 data[] = Data.init; 739 } 740 741 /// Terminate the connection. 742 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 743 744 /// Callback setter for when a connection has been established 745 /// (if applicable). 746 alias ConnectHandler = void delegate(); 747 @property void handleConnect(ConnectHandler value); /// ditto 748 749 /// Callback setter for when new data is read. 750 alias ReadDataHandler = void delegate(Data data); 751 @property void handleReadData(ReadDataHandler value); /// ditto 752 753 /// Callback setter for when a connection was closed. 754 alias DisconnectHandler = void delegate(string reason, DisconnectType type); 755 @property void handleDisconnect(DisconnectHandler value); /// ditto 756 757 /// Callback setter for when all queued data has been sent. 758 alias BufferFlushedHandler = void delegate(); 759 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 760 } 761 762 // *************************************************************************** 763 764 /// Implementation of `IConnection` using a socket. 765 /// Implements receiving data when readable and sending queued data 766 /// when writable. 767 class Connection : GenericSocket, IConnection 768 { 769 private: 770 /// Blocks of data larger than this value are passed as unmanaged memory 771 /// (in Data objects). Blocks smaller than this value will be reallocated 772 /// on the managed heap. The disadvantage of placing large objects on the 773 /// managed heap is false pointers; the disadvantage of using Data for 774 /// small objects is wasted slack space due to the page size alignment 775 /// requirement. 776 enum UNMANAGED_THRESHOLD = 256; 777 778 ConnectionState _state; 779 final @property ConnectionState state(ConnectionState value) { return _state = value; } 780 781 public: 782 /// Get connection state. 783 override @property ConnectionState state() { return _state; } 784 785 protected: 786 abstract sizediff_t doSend(in void[] buffer); 787 abstract sizediff_t doReceive(void[] buffer); 788 789 /// The send buffers. 790 Data[][MAX_PRIORITY+1] outQueue; 791 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 792 int partiallySent = -1; 793 794 /// Constructor used by a ServerSocket for new connections 795 this(Socket conn) 796 { 797 this(); 798 this.conn = conn; 799 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 800 if (conn) 801 socketManager.register(this); 802 updateFlags(); 803 } 804 805 final void updateFlags() 806 { 807 if (state == ConnectionState.connecting) 808 notifyWrite = true; 809 else 810 notifyWrite = writePending; 811 812 notifyRead = state == ConnectionState.connected && readDataHandler; 813 debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite); 814 } 815 816 /// Called when a socket is readable. 817 override void onReadable() 818 { 819 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 820 static ubyte[0x10000] inBuffer = void; 821 auto received = doReceive(inBuffer); 822 823 if (received == 0) 824 return disconnect("Connection closed", DisconnectType.graceful); 825 826 if (received == Socket.ERROR) 827 { 828 // if (wouldHaveBlocked) 829 // { 830 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 831 // return; 832 // } 833 // else 834 onError("recv() error: " ~ lastSocketError); 835 } 836 else 837 { 838 debug (PRINTDATA) 839 { 840 stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr); 841 stderr.write(hexDump(inBuffer[0 .. received])); 842 stderr.flush(); 843 } 844 845 if (state == ConnectionState.disconnecting) 846 { 847 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this); 848 } 849 else 850 if (!readDataHandler) 851 { 852 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this); 853 } 854 else 855 { 856 // Currently, unlike the D1 version of this module, 857 // we will always reallocate read network data. 858 // This disfavours code which doesn't need to store 859 // read data after processing it, but otherwise 860 // makes things simpler and safer all around. 861 862 Data data; 863 if (received < UNMANAGED_THRESHOLD) 864 { 865 // Copy to the managed heap 866 data = Data(inBuffer[0 .. received].dup); 867 } 868 else 869 { 870 // Copy to unmanaged memory 871 data = Data(inBuffer[0 .. received], true); 872 } 873 readDataHandler(data); 874 } 875 } 876 } 877 878 /// Called when an error occurs on the socket. 879 override void onError(string reason) 880 { 881 if (state == ConnectionState.disconnecting) 882 { 883 debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 884 return close(); 885 } 886 887 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 888 disconnect("Socket error: " ~ reason, DisconnectType.error); 889 } 890 891 this() 892 { 893 } 894 895 public: 896 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 897 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 898 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 899 { 900 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 901 assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state)); 902 903 if (writePending) 904 { 905 if (type==DisconnectType.requested) 906 { 907 assert(conn, "Attempting to disconnect on an uninitialized socket"); 908 // queue disconnect after all data is sent 909 debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 910 state = ConnectionState.disconnecting; 911 //setIdleTimeout(30.seconds); 912 if (disconnectHandler) 913 disconnectHandler(reason, type); 914 updateFlags(); 915 return; 916 } 917 else 918 discardQueues(); 919 } 920 921 debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 922 923 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 924 close(); 925 else 926 { 927 assert(conn is null, "Registered but %s socket".format(state)); 928 if (state == ConnectionState.resolving) 929 state = ConnectionState.disconnected; 930 } 931 932 if (disconnectHandler) 933 disconnectHandler(reason, type); 934 updateFlags(); 935 } 936 937 private final void close() 938 { 939 assert(conn, "Attempting to close an unregistered socket"); 940 socketManager.unregister(this); 941 conn.close(); 942 conn = null; 943 outQueue[] = null; 944 state = ConnectionState.disconnected; 945 } 946 947 /// Append data to the send buffer. 948 void send(Data[] data, int priority = DEFAULT_PRIORITY) 949 { 950 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 951 outQueue[priority] ~= data; 952 notifyWrite = true; // Fast updateFlags() 953 954 debug (PRINTDATA) 955 { 956 stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr); 957 foreach (datum; data) 958 if (datum.length) 959 stderr.write(hexDump(datum.contents)); 960 else 961 stderr.writeln("(empty Data)"); 962 stderr.flush(); 963 } 964 } 965 966 /// ditto 967 alias send = IConnection.send; 968 969 /// Cancel all queued `Data` packets with the given priority. 970 /// Does not cancel any partially-sent `Data`. 971 final void clearQueue(int priority) 972 { 973 if (priority == partiallySent) 974 { 975 assert(outQueue[priority].length > 0); 976 outQueue[priority] = outQueue[priority][0..1]; 977 } 978 else 979 outQueue[priority] = null; 980 updateFlags(); 981 } 982 983 /// Clears all queues, even partially sent content. 984 private final void discardQueues() 985 { 986 foreach (priority; 0..MAX_PRIORITY+1) 987 outQueue[priority] = null; 988 partiallySent = -1; 989 updateFlags(); 990 } 991 992 /// Returns true if any queues have pending data. 993 @property 994 final bool writePending() 995 { 996 foreach (queue; outQueue) 997 if (queue.length) 998 return true; 999 return false; 1000 } 1001 1002 /// Returns true if there are any queued `Data` which have not yet 1003 /// begun to be sent. 1004 final bool queuePresent(int priority = DEFAULT_PRIORITY) 1005 { 1006 if (priority == partiallySent) 1007 { 1008 assert(outQueue[priority].length > 0); 1009 return outQueue[priority].length > 1; 1010 } 1011 else 1012 return outQueue[priority].length > 0; 1013 } 1014 1015 /// Returns the number of queued `Data` at the given priority. 1016 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 1017 { 1018 return outQueue[priority].length; 1019 } 1020 1021 /// Returns the number of queued bytes at the given priority. 1022 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 1023 { 1024 size_t bytes; 1025 foreach (datum; outQueue[priority]) 1026 bytes += datum.length; 1027 return bytes; 1028 } 1029 1030 // public: 1031 private ConnectHandler connectHandler; 1032 /// Callback for when a connection has been established. 1033 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 1034 1035 private ReadDataHandler readDataHandler; 1036 /// Callback for incoming data. 1037 /// Data will not be received unless this handler is set. 1038 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 1039 1040 private DisconnectHandler disconnectHandler; 1041 /// Callback for when a connection was closed. 1042 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 1043 1044 private BufferFlushedHandler bufferFlushedHandler; 1045 /// Callback setter for when all queued data has been sent. 1046 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 1047 } 1048 1049 /// Implements a stream connection. 1050 /// Queued `Data` is allowed to be fragmented. 1051 class StreamConnection : Connection 1052 { 1053 protected: 1054 this() 1055 { 1056 super(); 1057 } 1058 1059 /// Called when a socket is writable. 1060 override void onWritable() 1061 { 1062 //scope(success) updateFlags(); 1063 onWritableImpl(); 1064 updateFlags(); 1065 } 1066 1067 // Work around scope(success) breaking debugger stack traces 1068 final private void onWritableImpl() 1069 { 1070 debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state); 1071 if (state == ConnectionState.connecting) 1072 { 1073 int32_t error; 1074 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 1075 if (error) 1076 return disconnect(formatSocketError(error), DisconnectType.error); 1077 1078 state = ConnectionState.connected; 1079 1080 //debug writefln("[%s] Connected", remoteAddressStr); 1081 try 1082 setKeepAlive(); 1083 catch (Exception e) 1084 return disconnect(e.msg, DisconnectType.error); 1085 if (connectHandler) 1086 connectHandler(); 1087 return; 1088 } 1089 //debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1090 1091 foreach (sendPartial; [true, false]) 1092 foreach (int priority, ref queue; outQueue) 1093 while (queue.length && (!sendPartial || priority == partiallySent)) 1094 { 1095 assert(partiallySent == -1 || partiallySent == priority); 1096 1097 auto pdata = queue.ptr; // pointer to first data 1098 1099 ptrdiff_t sent = 0; 1100 if (pdata.length) 1101 { 1102 sent = doSend(pdata.contents); 1103 debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 1104 } 1105 else 1106 { 1107 debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this); 1108 } 1109 1110 if (sent == Socket.ERROR) 1111 { 1112 if (wouldHaveBlocked()) 1113 return; 1114 else 1115 return onError("send() error: " ~ lastSocketError); 1116 } 1117 else 1118 if (sent < pdata.length) 1119 { 1120 if (sent > 0) 1121 { 1122 *pdata = (*pdata)[sent..pdata.length]; 1123 partiallySent = priority; 1124 } 1125 return; 1126 } 1127 else 1128 { 1129 assert(sent == pdata.length); 1130 //debug writefln("[%s] Sent data:", remoteAddressStr); 1131 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1132 pdata.clear(); 1133 queue = queue[1..$]; 1134 partiallySent = -1; 1135 if (queue.length == 0) 1136 queue = null; 1137 } 1138 } 1139 1140 // outQueue is now empty 1141 if (bufferFlushedHandler) 1142 bufferFlushedHandler(); 1143 if (state == ConnectionState.disconnecting) 1144 { 1145 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1146 close(); 1147 } 1148 } 1149 1150 public: 1151 this(Socket conn) 1152 { 1153 super(conn); 1154 } /// 1155 } 1156 1157 // *************************************************************************** 1158 1159 /// A POSIX file stream. 1160 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1161 /// Does not dup the given file descriptor, so "disconnecting" this connection 1162 /// will close it. 1163 version (Posix) 1164 class FileConnection : StreamConnection 1165 { 1166 this(int fileno) 1167 { 1168 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1169 conn.blocking = false; 1170 super(conn); 1171 } /// 1172 1173 protected: 1174 import core.sys.posix.unistd : read, write; 1175 1176 override sizediff_t doSend(in void[] buffer) 1177 { 1178 return write(socket.handle, buffer.ptr, buffer.length); 1179 } 1180 1181 override sizediff_t doReceive(void[] buffer) 1182 { 1183 return read(socket.handle, buffer.ptr, buffer.length); 1184 } 1185 } 1186 1187 /// Separates reading and writing, e.g. for stdin/stdout. 1188 class Duplex : IConnection 1189 { 1190 /// 1191 IConnection reader, writer; 1192 1193 this(IConnection reader, IConnection writer) 1194 { 1195 this.reader = reader; 1196 this.writer = writer; 1197 reader.handleConnect = &onConnect; 1198 writer.handleConnect = &onConnect; 1199 reader.handleDisconnect = &onDisconnect; 1200 writer.handleDisconnect = &onDisconnect; 1201 } /// 1202 1203 @property ConnectionState state() 1204 { 1205 if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting) 1206 return ConnectionState.disconnecting; 1207 else 1208 return reader.state < writer.state ? reader.state : writer.state; 1209 } /// 1210 1211 /// Queue Data for sending. 1212 void send(Data[] data, int priority) 1213 { 1214 writer.send(data, priority); 1215 } 1216 1217 alias send = IConnection.send; /// ditto 1218 1219 /// Terminate the connection. 1220 /// Note: this isn't quite fleshed out - applications may want to 1221 /// wait and send some more data even after stdin is closed, but 1222 /// such an interface can't be fitted into an IConnection 1223 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1224 { 1225 if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting) 1226 reader.disconnect(reason, type); 1227 if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting) 1228 writer.disconnect(reason, type); 1229 debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1230 } 1231 1232 protected void onConnect() 1233 { 1234 if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected) 1235 connectHandler(); 1236 } 1237 1238 protected void onDisconnect(string reason, DisconnectType type) 1239 { 1240 debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1241 if (disconnectHandler) 1242 { 1243 disconnectHandler(reason, type); 1244 disconnectHandler = null; // don't call it twice for the other connection 1245 } 1246 // It is our responsibility to disconnect the other connection 1247 // Use DisconnectType.requested to ensure that any written data is flushed 1248 disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested); 1249 } 1250 1251 /// Callback for when a connection has been established. 1252 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1253 private ConnectHandler connectHandler; 1254 1255 /// Callback setter for when new data is read. 1256 @property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; } 1257 1258 /// Callback setter for when a connection was closed. 1259 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1260 private DisconnectHandler disconnectHandler; 1261 1262 /// Callback setter for when all queued data has been written. 1263 @property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; } 1264 } 1265 1266 unittest { if (false) new Duplex(null, null); } 1267 1268 // *************************************************************************** 1269 1270 /// An asynchronous socket-based connection. 1271 class SocketConnection : StreamConnection 1272 { 1273 protected: 1274 AddressInfo[] addressQueue; 1275 1276 this(Socket conn) 1277 { 1278 super(conn); 1279 } 1280 1281 override sizediff_t doSend(in void[] buffer) 1282 { 1283 return conn.send(buffer); 1284 } 1285 1286 override sizediff_t doReceive(void[] buffer) 1287 { 1288 return conn.receive(buffer); 1289 } 1290 1291 final void tryNextAddress() 1292 { 1293 assert(state == ConnectionState.connecting); 1294 auto addressInfo = addressQueue[0]; 1295 addressQueue = addressQueue[1..$]; 1296 1297 try 1298 { 1299 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1300 conn.blocking = false; 1301 1302 socketManager.register(this); 1303 updateFlags(); 1304 debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString()); 1305 conn.connect(addressInfo.address); 1306 } 1307 catch (SocketException e) 1308 return onError("Connect error: " ~ e.msg); 1309 } 1310 1311 /// Called when an error occurs on the socket. 1312 override void onError(string reason) 1313 { 1314 if (state == ConnectionState.connecting && addressQueue.length) 1315 { 1316 socketManager.unregister(this); 1317 conn.close(); 1318 conn = null; 1319 1320 return tryNextAddress(); 1321 } 1322 1323 super.onError(reason); 1324 } 1325 1326 public: 1327 /// Default constructor 1328 this() 1329 { 1330 debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this); 1331 } 1332 1333 /// Start establishing a connection. 1334 final void connect(AddressInfo[] addresses) 1335 { 1336 assert(addresses.length, "No addresses specified"); 1337 1338 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1339 assert(!conn); 1340 1341 addressQueue = addresses; 1342 state = ConnectionState.connecting; 1343 tryNextAddress(); 1344 } 1345 } 1346 1347 /// An asynchronous TCP connection. 1348 class TcpConnection : SocketConnection 1349 { 1350 protected: 1351 this(Socket conn) 1352 { 1353 super(conn); 1354 } 1355 1356 public: 1357 /// Default constructor 1358 this() 1359 { 1360 debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this); 1361 } 1362 1363 /// 1364 alias connect = SocketConnection.connect; // raise overload 1365 1366 /// Start establishing a connection. 1367 final void connect(string host, ushort port) 1368 { 1369 assert(host.length, "Empty host"); 1370 assert(port, "No port specified"); 1371 1372 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1373 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1374 1375 state = ConnectionState.resolving; 1376 1377 AddressInfo[] addressInfos; 1378 try 1379 { 1380 auto addresses = getAddress(host, port); 1381 enforce(addresses.length, "No addresses found"); 1382 debug (ASOCKETS) 1383 { 1384 stderr.writefln("Resolved to %s addresses:", addresses.length); 1385 foreach (address; addresses) 1386 stderr.writefln("- %s", address.toString()); 1387 } 1388 1389 if (addresses.length > 1) 1390 { 1391 import std.random : randomShuffle; 1392 randomShuffle(addresses); 1393 } 1394 1395 foreach (address; addresses) 1396 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1397 } 1398 catch (SocketException e) 1399 return onError("Lookup error: " ~ e.msg); 1400 1401 state = ConnectionState.disconnected; 1402 connect(addressInfos); 1403 } 1404 } 1405 1406 // *************************************************************************** 1407 1408 /// An asynchronous connection server for socket-based connections. 1409 class SocketServer 1410 { 1411 protected: 1412 /// Class that actually performs listening on a certain address family 1413 final class Listener : GenericSocket 1414 { 1415 this(Socket conn) 1416 { 1417 debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this); 1418 this.conn = conn; 1419 socketManager.register(this); 1420 } 1421 1422 /// Called when a socket is readable. 1423 override void onReadable() 1424 { 1425 debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this); 1426 Socket acceptSocket = conn.accept(); 1427 acceptSocket.blocking = false; 1428 if (handleAccept) 1429 { 1430 auto connection = createConnection(acceptSocket); 1431 debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr); 1432 connection.setKeepAlive(); 1433 //assert(connection.connected); 1434 //connection.connected = true; 1435 acceptHandler(connection); 1436 } 1437 else 1438 acceptSocket.close(); 1439 } 1440 1441 /// Called when a socket is writable. 1442 override void onWritable() 1443 { 1444 } 1445 1446 /// Called when an error occurs on the socket. 1447 override void onError(string reason) 1448 { 1449 close(); // call parent 1450 } 1451 1452 void closeListener() 1453 { 1454 assert(conn); 1455 socketManager.unregister(this); 1456 conn.close(); 1457 conn = null; 1458 } 1459 } 1460 1461 SocketConnection createConnection(Socket socket) 1462 { 1463 return new SocketConnection(socket); 1464 } 1465 1466 /// Whether the socket is listening. 1467 bool listening; 1468 /// Listener instances 1469 Listener[] listeners; 1470 1471 final void updateFlags() 1472 { 1473 foreach (listener; listeners) 1474 listener.notifyRead = handleAccept !is null; 1475 } 1476 1477 public: 1478 /// Start listening on this socket. 1479 final void listen(AddressInfo[] addressInfos) 1480 { 1481 foreach (ref addressInfo; addressInfos) 1482 { 1483 try 1484 { 1485 Socket conn = new Socket(addressInfo); 1486 conn.blocking = false; 1487 if (addressInfo.family == AddressFamily.INET6) 1488 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1489 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1490 1491 conn.bind(addressInfo.address); 1492 conn.listen(totalCPUs * 2); 1493 1494 listeners ~= new Listener(conn); 1495 } 1496 catch (SocketException e) 1497 { 1498 debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1499 debug(ASOCKETS) stderr.writeln(e.msg); 1500 } 1501 } 1502 1503 if (listeners.length==0) 1504 throw new Exception("Unable to bind service"); 1505 1506 listening = true; 1507 1508 updateFlags(); 1509 } 1510 1511 this() 1512 { 1513 } /// 1514 1515 /// Creates a Server with the given sockets. 1516 /// The sockets must have already had `bind` and `listen` called on them. 1517 this(Socket[] sockets...) 1518 { 1519 foreach (socket; sockets) 1520 listeners ~= new Listener(socket); 1521 } 1522 1523 /// Returns all listening addresses. 1524 final @property Address[] localAddresses() 1525 { 1526 Address[] result; 1527 foreach (listener; listeners) 1528 result ~= listener.localAddress; 1529 return result; 1530 } 1531 1532 /// Returns `true` if the server is listening for incoming connections. 1533 final @property bool isListening() 1534 { 1535 return listening; 1536 } 1537 1538 /// Stop listening on this socket. 1539 final void close() 1540 { 1541 foreach (listener;listeners) 1542 listener.closeListener(); 1543 listeners = null; 1544 listening = false; 1545 if (handleClose) 1546 handleClose(); 1547 } 1548 1549 /// Create a SocketServer using the handle passed on standard input, 1550 /// for which `listen` had already been called. Used by 1551 /// e.g. FastCGI and systemd sockets with "Listen = yes". 1552 static SocketServer fromStdin() 1553 { 1554 socket_t socket; 1555 version (Windows) 1556 { 1557 import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE; 1558 socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE); 1559 } 1560 else 1561 socket = cast(socket_t)0; 1562 1563 auto s = new Socket(socket, AddressFamily.UNSPEC); 1564 s.blocking = false; 1565 return new SocketServer(s); 1566 } 1567 1568 /// Callback for when the socket was closed. 1569 void delegate() handleClose; 1570 1571 private void delegate(SocketConnection incoming) acceptHandler; 1572 /// Callback for an incoming connection. 1573 /// Connections will not be accepted unless this handler is set. 1574 @property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; } 1575 /// ditto 1576 @property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); } 1577 } 1578 1579 /// An asynchronous TCP connection server. 1580 class TcpServer : SocketServer 1581 { 1582 protected: 1583 override SocketConnection createConnection(Socket socket) 1584 { 1585 return new TcpConnection(socket); 1586 } 1587 1588 public: 1589 this() 1590 { 1591 } /// 1592 1593 this(Socket[] sockets...) 1594 { 1595 super(sockets); 1596 } /// Construct from the given sockets. 1597 1598 /// 1599 alias listen = SocketServer.listen; // raise overload 1600 1601 /// Start listening on this socket. 1602 final ushort listen(ushort port, string addr = null) 1603 { 1604 debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port); 1605 //assert(!listening, "Attempting to listen on a listening socket"); 1606 1607 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1608 1609 debug (ASOCKETS) 1610 { 1611 stderr.writefln("Resolved to %s addresses:", addressInfos.length); 1612 foreach (ref addressInfo; addressInfos) 1613 stderr.writefln("- %s", addressInfo); 1614 } 1615 1616 // listen on random ports only on IPv4 for now 1617 if (port == 0) 1618 { 1619 foreach_reverse (i, ref addressInfo; addressInfos) 1620 if (addressInfo.family != AddressFamily.INET) 1621 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1622 } 1623 1624 listen(addressInfos); 1625 1626 foreach (listener; listeners) 1627 { 1628 auto address = listener.conn.localAddress(); 1629 if (address.addressFamily == AddressFamily.INET) 1630 port = to!ushort(address.toPortString()); 1631 } 1632 1633 return port; 1634 } 1635 1636 deprecated("Use SocketServer.fromStdin") 1637 static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; } 1638 1639 /// Delegate to be called when a connection is accepted. 1640 @property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); } 1641 } 1642 1643 // *************************************************************************** 1644 1645 /// An asynchronous UDP stream. 1646 /// UDP does not have connections, so this class encapsulates a socket 1647 /// with a fixed destination (sendto) address, and optionally bound to 1648 /// a local address. 1649 /// Currently received packets' address is not exposed. 1650 class UdpConnection : Connection 1651 { 1652 protected: 1653 this(Socket conn) 1654 { 1655 super(conn); 1656 } 1657 1658 /// Called when a socket is writable. 1659 override void onWritable() 1660 { 1661 //scope(success) updateFlags(); 1662 onWritableImpl(); 1663 updateFlags(); 1664 } 1665 1666 // Work around scope(success) breaking debugger stack traces 1667 final private void onWritableImpl() 1668 { 1669 foreach (priority, ref queue; outQueue) 1670 while (queue.length) 1671 { 1672 auto pdata = queue.ptr; // pointer to first data 1673 1674 auto sent = conn.sendTo(pdata.contents, remoteAddress); 1675 1676 if (sent == Socket.ERROR) 1677 { 1678 if (wouldHaveBlocked()) 1679 return; 1680 else 1681 return onError("send() error: " ~ lastSocketError); 1682 } 1683 else 1684 if (sent < pdata.length) 1685 { 1686 return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length)); 1687 } 1688 else 1689 { 1690 assert(sent == pdata.length); 1691 //debug writefln("[%s] Sent data:", remoteAddressStr); 1692 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1693 pdata.clear(); 1694 queue = queue[1..$]; 1695 if (queue.length == 0) 1696 queue = null; 1697 } 1698 } 1699 1700 // outQueue is now empty 1701 if (bufferFlushedHandler) 1702 bufferFlushedHandler(); 1703 if (state == ConnectionState.disconnecting) 1704 { 1705 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1706 close(); 1707 } 1708 } 1709 1710 override sizediff_t doSend(in void[] buffer) 1711 { 1712 assert(false); // never called (called only from overridden methods) 1713 } 1714 1715 override sizediff_t doReceive(void[] buffer) 1716 { 1717 return conn.receive(buffer); 1718 } 1719 1720 public: 1721 /// Default constructor 1722 this() 1723 { 1724 debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this); 1725 } 1726 1727 /// Initialize with the given `AddressFamily`, without binding to an address. 1728 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP) 1729 { 1730 initializeImpl(family, type, protocol); 1731 if (connectHandler) 1732 connectHandler(); 1733 } 1734 1735 private final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1736 { 1737 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1738 assert(!conn); 1739 1740 conn = new Socket(family, type, protocol); 1741 conn.blocking = false; 1742 socketManager.register(this); 1743 state = ConnectionState.connected; 1744 updateFlags(); 1745 } 1746 1747 /// Bind to a local address in order to receive packets sent there. 1748 final ushort bind(string host, ushort port) 1749 { 1750 assert(host.length, "Empty host"); 1751 1752 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1753 1754 state = ConnectionState.resolving; 1755 1756 AddressInfo addressInfo; 1757 try 1758 { 1759 auto addresses = getAddress(host, port); 1760 enforce(addresses.length, "No addresses found"); 1761 debug (ASOCKETS) 1762 { 1763 stderr.writefln("Resolved to %s addresses:", addresses.length); 1764 foreach (address; addresses) 1765 stderr.writefln("- %s", address.toString()); 1766 } 1767 1768 Address address; 1769 if (addresses.length > 1) 1770 { 1771 import std.random : uniform; 1772 address = addresses[uniform(0, $)]; 1773 } 1774 else 1775 address = addresses[0]; 1776 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1777 } 1778 catch (SocketException e) 1779 { 1780 onError("Lookup error: " ~ e.msg); 1781 return 0; 1782 } 1783 1784 state = ConnectionState.disconnected; 1785 return bind(addressInfo); 1786 } 1787 1788 /// ditto 1789 final ushort bind(AddressInfo addressInfo) 1790 { 1791 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1792 conn.bind(addressInfo.address); 1793 1794 auto address = conn.localAddress(); 1795 auto port = to!ushort(address.toPortString()); 1796 1797 if (connectHandler) 1798 connectHandler(); 1799 1800 return port; 1801 } 1802 1803 // public: 1804 /// Where to send packets to. 1805 Address remoteAddress; 1806 } 1807 1808 /// 1809 unittest 1810 { 1811 auto server = new UdpConnection(); 1812 server.bind("localhost", 0); 1813 1814 auto client = new UdpConnection(); 1815 client.initialize(server.localAddress.addressFamily); 1816 1817 string[] packets = ["Hello", "there"]; 1818 client.remoteAddress = server.localAddress; 1819 client.send({ 1820 Data[] data; 1821 foreach (packet; packets) 1822 data ~= Data(packet); 1823 return data; 1824 }()); 1825 1826 server.handleReadData = (Data data) 1827 { 1828 assert(data.contents == packets[0]); 1829 packets = packets[1..$]; 1830 if (!packets.length) 1831 { 1832 server.close(); 1833 client.close(); 1834 } 1835 }; 1836 socketManager.loop(); 1837 assert(!packets.length); 1838 } 1839 1840 // *************************************************************************** 1841 1842 /// Base class for a connection adapter. 1843 /// By itself, does nothing. 1844 class ConnectionAdapter : IConnection 1845 { 1846 /// The next connection in the chain (towards the raw transport). 1847 IConnection next; 1848 1849 this(IConnection next) 1850 { 1851 this.next = next; 1852 next.handleConnect = &onConnect; 1853 next.handleDisconnect = &onDisconnect; 1854 next.handleBufferFlushed = &onBufferFlushed; 1855 } /// 1856 1857 @property ConnectionState state() { return next.state; } /// 1858 1859 /// Queue Data for sending. 1860 void send(Data[] data, int priority) 1861 { 1862 next.send(data, priority); 1863 } 1864 1865 alias send = IConnection.send; /// ditto 1866 1867 /// Terminate the connection. 1868 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1869 { 1870 next.disconnect(reason, type); 1871 } 1872 1873 protected void onConnect() 1874 { 1875 if (connectHandler) 1876 connectHandler(); 1877 } 1878 1879 protected void onReadData(Data data) 1880 { 1881 // onReadData should be fired only if readDataHandler is set 1882 assert(readDataHandler, "onReadData caled with null readDataHandler"); 1883 readDataHandler(data); 1884 } 1885 1886 protected void onDisconnect(string reason, DisconnectType type) 1887 { 1888 if (disconnectHandler) 1889 disconnectHandler(reason, type); 1890 } 1891 1892 protected void onBufferFlushed() 1893 { 1894 if (bufferFlushedHandler) 1895 bufferFlushedHandler(); 1896 } 1897 1898 /// Callback for when a connection has been established. 1899 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1900 protected ConnectHandler connectHandler; 1901 1902 /// Callback setter for when new data is read. 1903 @property void handleReadData(ReadDataHandler value) 1904 { 1905 readDataHandler = value; 1906 next.handleReadData = value ? &onReadData : null ; 1907 } 1908 protected ReadDataHandler readDataHandler; 1909 1910 /// Callback setter for when a connection was closed. 1911 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1912 protected DisconnectHandler disconnectHandler; 1913 1914 /// Callback setter for when all queued data has been written. 1915 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 1916 protected BufferFlushedHandler bufferFlushedHandler; 1917 } 1918 1919 // *************************************************************************** 1920 1921 /// Adapter for connections with a line-based protocol. 1922 /// Splits data stream into delimiter-separated lines. 1923 class LineBufferedAdapter : ConnectionAdapter 1924 { 1925 /// The protocol's line delimiter. 1926 string delimiter = "\r\n"; 1927 1928 /// Maximum line length (0 means unlimited). 1929 size_t maxLength = 0; 1930 1931 this(IConnection next) 1932 { 1933 super(next); 1934 } /// 1935 1936 /// Append a line to the send buffer. 1937 void send(string line) 1938 { 1939 //super.send(Data(line ~ delimiter)); 1940 // https://issues.dlang.org/show_bug.cgi?id=13985 1941 ConnectionAdapter ca = this; 1942 ca.send(Data(line ~ delimiter)); 1943 } 1944 1945 protected: 1946 /// The receive buffer. 1947 Data inBuffer; 1948 1949 /// Called when data has been received. 1950 final override void onReadData(Data data) 1951 { 1952 import std.string : indexOf; 1953 auto oldBufferLength = inBuffer.length; 1954 if (oldBufferLength) 1955 inBuffer ~= data; 1956 else 1957 inBuffer = data; 1958 1959 if (delimiter.length == 1) 1960 { 1961 import core.stdc.string : memchr; 1962 1963 char c = delimiter[0]; 1964 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1965 while (p) 1966 { 1967 sizediff_t index = p - inBuffer.ptr; 1968 if (!processLine(index)) 1969 break; 1970 1971 p = memchr(inBuffer.ptr, c, inBuffer.length); 1972 } 1973 } 1974 else 1975 { 1976 sizediff_t index; 1977 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1978 while ((index = indexOf(cast(string)inBuffer.contents, delimiter)) >= 0 1979 && processLine(index)) 1980 {} 1981 } 1982 1983 if (maxLength && inBuffer.length > maxLength) 1984 disconnect("Line too long", DisconnectType.error); 1985 } 1986 1987 final bool processLine(size_t index) 1988 { 1989 if (maxLength && index > maxLength) 1990 { 1991 disconnect("Line too long", DisconnectType.error); 1992 return false; 1993 } 1994 auto line = inBuffer[0..index]; 1995 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1996 super.onReadData(line); 1997 return true; 1998 } 1999 2000 override void onDisconnect(string reason, DisconnectType type) 2001 { 2002 super.onDisconnect(reason, type); 2003 inBuffer.clear(); 2004 } 2005 } 2006 2007 // *************************************************************************** 2008 2009 /// Fires an event handler or disconnects connections 2010 /// after a period of inactivity. 2011 class TimeoutAdapter : ConnectionAdapter 2012 { 2013 this(IConnection next) 2014 { 2015 debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this); 2016 super(next); 2017 } /// 2018 2019 /// Set the `Duration` indicating the period of inactivity after which to take action. 2020 final void setIdleTimeout(Duration duration) 2021 { 2022 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 2023 assert(duration > Duration.zero); 2024 2025 // Configure idleTask 2026 if (idleTask is null) 2027 { 2028 idleTask = new TimerTask(duration); 2029 idleTask.handleTask = &onTask_Idle; 2030 } 2031 else 2032 { 2033 if (idleTask.isWaiting()) 2034 idleTask.cancel(); 2035 idleTask.delay = duration; 2036 } 2037 2038 mainTimer.add(idleTask); 2039 } 2040 2041 /// Manually mark this connection as non-idle, restarting the idle timer. 2042 /// `handleNonIdle` will be called, if set. 2043 void markNonIdle() 2044 { 2045 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 2046 if (handleNonIdle) 2047 handleNonIdle(); 2048 if (idleTask && idleTask.isWaiting()) 2049 idleTask.restart(); 2050 } 2051 2052 /// Stop the idle timer. 2053 void cancelIdleTimeout() 2054 { 2055 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 2056 assert(idleTask !is null); 2057 assert(idleTask.isWaiting()); 2058 idleTask.cancel(); 2059 } 2060 2061 /// Restart the idle timer. 2062 void resumeIdleTimeout() 2063 { 2064 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 2065 assert(idleTask !is null); 2066 assert(!idleTask.isWaiting()); 2067 mainTimer.add(idleTask); 2068 } 2069 2070 /// Callback for when a connection has stopped responding. 2071 /// If unset, the connection will be disconnected. 2072 void delegate() handleIdleTimeout; 2073 2074 /// Callback for when a connection is marked as non-idle 2075 /// (when data is received). 2076 void delegate() handleNonIdle; 2077 2078 protected: 2079 override void onConnect() 2080 { 2081 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 2082 markNonIdle(); 2083 super.onConnect(); 2084 } 2085 2086 override void onReadData(Data data) 2087 { 2088 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 2089 markNonIdle(); 2090 super.onReadData(data); 2091 } 2092 2093 override void onDisconnect(string reason, DisconnectType type) 2094 { 2095 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 2096 if (idleTask && idleTask.isWaiting()) 2097 idleTask.cancel(); 2098 super.onDisconnect(reason, type); 2099 } 2100 2101 private: 2102 TimerTask idleTask; // non-null if an idle timeout has been set 2103 2104 final void onTask_Idle(Timer /*timer*/, TimerTask /*task*/) 2105 { 2106 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this); 2107 if (state == ConnectionState.disconnecting) 2108 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 2109 2110 if (state == ConnectionState.disconnected) 2111 return; 2112 2113 if (handleIdleTimeout) 2114 { 2115 resumeIdleTimeout(); // reschedule (by default) 2116 handleIdleTimeout(); 2117 } 2118 else 2119 disconnect("Time-out", DisconnectType.error); 2120 } 2121 } 2122 2123 // *************************************************************************** 2124 2125 unittest 2126 { 2127 void testTimer() 2128 { 2129 bool fired; 2130 setTimeout({fired = true;}, 10.msecs); 2131 socketManager.loop(); 2132 assert(fired); 2133 } 2134 2135 testTimer(); 2136 }