1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <ae@cy.md> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.array : toArray; 21 import ae.utils.math; 22 public import ae.sys.data; 23 24 import core.stdc.stdint : int32_t; 25 26 import std.exception; 27 import std.parallelism : totalCPUs; 28 import std.socket; 29 import std.string : format; 30 public import std.socket : Address, AddressInfo, Socket; 31 32 version (Windows) 33 private import c_socks = core.sys.windows.winsock2; 34 else version (Posix) 35 private import c_socks = core.sys.posix.sys.socket; 36 37 debug(ASOCKETS) import std.stdio : stderr; 38 debug(PRINTDATA) import std.stdio : stderr; 39 debug(PRINTDATA) import ae.utils.text : hexDump; 40 private import std.conv : to; 41 42 43 // https://issues.dlang.org/show_bug.cgi?id=7016 44 static import ae.utils.array; 45 46 version(LIBEV) 47 { 48 import deimos.ev; 49 pragma(lib, "ev"); 50 } 51 52 version(Windows) 53 { 54 import core.sys.windows.windows : Sleep; 55 private enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 56 } 57 else 58 private enum USE_SLEEP = false; 59 60 /// Flags that determine socket wake-up events. 61 int eventCounter; 62 63 version(LIBEV) 64 { 65 // Watchers are a GenericSocket field (as declared in SocketMixin). 66 // Use one watcher per read and write event. 67 // Start/stop those as higher-level code declares interest in those events. 68 // Use the "data" ev_io field to store the parent GenericSocket address. 69 // Also use the "data" field as a flag to indicate whether the watcher is active 70 // (data is null when the watcher is stopped). 71 72 /// `libev`-based event loop implementation. 73 struct SocketManager 74 { 75 private: 76 size_t count; 77 78 extern(C) 79 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 80 { 81 eventCounter++; 82 auto socket = cast(GenericSocket)w.data; 83 assert(socket, "libev event fired on stopped watcher"); 84 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 85 86 if (revents & EV_READ) 87 socket.onReadable(); 88 else 89 if (revents & EV_WRITE) 90 socket.onWritable(); 91 else 92 assert(false, "Unknown event fired from libev"); 93 94 // TODO? Need to get proper SocketManager instance to call updateTimer on 95 socketManager.updateTimer(false); 96 } 97 98 ev_timer evTimer; 99 MonoTime lastNextEvent = MonoTime.max; 100 101 extern(C) 102 static void timerCallback(ev_loop_t* l, ev_timer* w, int /*revents*/) 103 { 104 eventCounter++; 105 debug (ASOCKETS) writefln("Timer callback called."); 106 mainTimer.prod(); 107 108 socketManager.updateTimer(true); 109 debug (ASOCKETS) writefln("Timer callback exiting."); 110 } 111 112 void updateTimer(bool force) 113 { 114 auto nextEvent = mainTimer.getNextEvent(); 115 if (force || lastNextEvent != nextEvent) 116 { 117 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 118 if (nextEvent == MonoTime.max) // Stopping 119 { 120 if (lastNextEvent != MonoTime.max) 121 ev_timer_stop(ev_default_loop(0), &evTimer); 122 } 123 else 124 { 125 auto remaining = mainTimer.getRemainingTime(); 126 while (remaining <= Duration.zero) 127 { 128 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 129 mainTimer.prod(); 130 remaining = mainTimer.getRemainingTime(); 131 } 132 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 133 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 134 if (lastNextEvent == MonoTime.max) // Starting 135 { 136 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 137 ev_timer_start(ev_default_loop(0), &evTimer); 138 } 139 else // Adjusting 140 { 141 evTimer.repeat = tstamp; 142 ev_timer_again(ev_default_loop(0), &evTimer); 143 } 144 } 145 lastNextEvent = nextEvent; 146 } 147 } 148 149 public: 150 /// Register a socket with the manager. 151 void register(GenericSocket socket) 152 { 153 debug (ASOCKETS) writefln("Registering %s", socket); 154 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 155 auto fd = socket.conn.handle; 156 assert(fd, "Must have fd before socket registration"); 157 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 158 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 159 count++; 160 } 161 162 /// Unregister a socket with the manager. 163 void unregister(GenericSocket socket) 164 { 165 debug (ASOCKETS) writefln("Unregistering %s", socket); 166 socket.notifyRead = false; 167 socket.notifyWrite = false; 168 count--; 169 } 170 171 /// Returns the number of registered sockets. 172 size_t size() 173 { 174 return count; 175 } 176 177 /// Loop continuously until no sockets are left. 178 void loop() 179 { 180 auto evLoop = ev_default_loop(0); 181 enforce(evLoop, "libev initialization failure"); 182 183 updateTimer(true); 184 debug (ASOCKETS) writeln("ev_run"); 185 ev_run(ev_default_loop(0), 0); 186 } 187 } 188 189 private mixin template SocketMixin() 190 { 191 private ev_io evRead, evWrite; 192 193 private void setWatcherState(ref ev_io ev, bool newValue, int /*event*/) 194 { 195 if (!conn) 196 { 197 // Can happen when setting delegates before connecting. 198 return; 199 } 200 201 if (newValue && !ev.data) 202 { 203 // Start 204 ev.data = cast(void*)this; 205 ev_io_start(ev_default_loop(0), &ev); 206 } 207 else 208 if (!newValue && ev.data) 209 { 210 // Stop 211 assert(ev.data is cast(void*)this); 212 ev.data = null; 213 ev_io_stop(ev_default_loop(0), &ev); 214 } 215 } 216 217 /// Interested in read notifications (onReadable)? 218 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 219 /// Interested in write notifications (onWritable)? 220 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 221 222 debug ~this() @nogc 223 { 224 // The LIBEV SocketManager holds no references to registered sockets. 225 // TODO: Add a doubly-linked list? 226 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 227 } 228 } 229 } 230 else // Use select 231 { 232 /// `select`-based event loop implementation. 233 struct SocketManager 234 { 235 private: 236 enum FD_SETSIZE = 1024; 237 238 /// List of all sockets to poll. 239 GenericSocket[] sockets; 240 241 /// Debug AA to check for dangling socket references. 242 debug GenericSocket[socket_t] socketHandles; 243 244 void delegate()[] nextTickHandlers, idleHandlers; 245 246 public: 247 /// Register a socket with the manager. 248 void register(GenericSocket conn) 249 { 250 debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1); 251 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 252 sockets ~= conn; 253 254 debug 255 { 256 auto handle = conn.socket.handle; 257 assert(handle != socket_t.init, "Can't register a closed socket"); 258 assert(handle !in socketHandles, "This socket handle is already registered"); 259 socketHandles[handle] = conn; 260 } 261 } 262 263 /// Unregister a socket with the manager. 264 void unregister(GenericSocket conn) 265 { 266 debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 267 268 debug 269 { 270 auto handle = conn.socket.handle; 271 assert(handle != socket_t.init, "Can't unregister a closed socket"); 272 auto pconn = handle in socketHandles; 273 assert(pconn, "This socket handle is not registered"); 274 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 275 socketHandles.remove(handle); 276 } 277 278 foreach (size_t i, GenericSocket j; sockets) 279 if (j is conn) 280 { 281 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 282 return; 283 } 284 assert(false, "Socket not registered"); 285 } 286 287 /// Returns the number of registered sockets. 288 size_t size() 289 { 290 return sockets.length; 291 } 292 293 /// Loop continuously until no sockets are left. 294 void loop() 295 { 296 debug (ASOCKETS) stderr.writeln("Starting event loop."); 297 298 SocketSet readset, writeset; 299 size_t sockcount; 300 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 301 readset = new SocketSet(setSize); 302 writeset = new SocketSet(setSize); 303 while (true) 304 { 305 if (nextTickHandlers.length) 306 { 307 auto thisTickHandlers = nextTickHandlers; 308 nextTickHandlers = null; 309 310 foreach (handler; thisTickHandlers) 311 handler(); 312 313 continue; 314 } 315 316 uint minSize = 0; 317 version(Windows) 318 minSize = cast(uint)sockets.length; 319 else 320 { 321 foreach (s; sockets) 322 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 323 minSize = s.socket.handle; 324 } 325 minSize++; 326 327 if (setSize < minSize) 328 { 329 debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 330 setSize = minSize * 2; 331 readset = new SocketSet(setSize); 332 writeset = new SocketSet(setSize); 333 } 334 else 335 { 336 readset.reset(); 337 writeset.reset(); 338 } 339 340 sockcount = 0; 341 bool haveActive; 342 debug (ASOCKETS) stderr.writeln("Populating sets"); 343 foreach (GenericSocket conn; sockets) 344 { 345 if (!conn.socket) 346 continue; 347 sockcount++; 348 349 debug (ASOCKETS) stderr.writef("\t%s:", conn); 350 if (conn.notifyRead) 351 { 352 readset.add(conn.socket); 353 if (!conn.daemonRead) 354 haveActive = true; 355 debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : ""); 356 } 357 if (conn.notifyWrite) 358 { 359 writeset.add(conn.socket); 360 if (!conn.daemonWrite) 361 haveActive = true; 362 debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : ""); 363 } 364 debug (ASOCKETS) stderr.writeln(); 365 } 366 debug (ASOCKETS) 367 { 368 stderr.writefln("Sets populated as follows:"); 369 printSets(readset, writeset); 370 } 371 372 debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...", 373 haveActive ? "" : "not ", 374 sockcount, 375 mainTimer.isWaiting() ? "with" : "no", 376 idleHandlers.length, 377 ); 378 if (!haveActive && !mainTimer.isWaiting() && !nextTickHandlers.length) 379 { 380 debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop."); 381 break; 382 } 383 384 debug (ASOCKETS) stderr.flush(); 385 386 int events; 387 if (idleHandlers.length) 388 { 389 if (sockcount==0) 390 events = 0; 391 else 392 events = Socket.select(readset, writeset, null, 0.seconds); 393 } 394 else 395 if (USE_SLEEP && sockcount==0) 396 { 397 version(Windows) 398 { 399 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 400 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 401 if (duration <= 0) 402 duration = 1; // Avoid busywait 403 else 404 if (duration > int.max) 405 duration = int.max; 406 Sleep(cast(int)duration); 407 events = 0; 408 } 409 else 410 assert(0); 411 } 412 else 413 if (mainTimer.isWaiting()) 414 events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime()); 415 else 416 events = Socket.select(readset, writeset, null); 417 418 debug (ASOCKETS) stderr.writefln("%d events fired.", events); 419 420 if (events > 0) 421 { 422 // Handle just one event at a time, as the first 423 // handler might invalidate select()'s results. 424 handleEvent(readset, writeset); 425 } 426 else 427 if (idleHandlers.length) 428 { 429 import ae.utils.array : shift; 430 auto handler = idleHandlers.shift(); 431 432 // Rotate the idle handler queue before running it, 433 // in case the handler unregisters itself. 434 idleHandlers ~= handler; 435 436 handler(); 437 } 438 439 // Timers may invalidate our select results, so fire them after processing the latter 440 mainTimer.prod(); 441 442 eventCounter++; 443 } 444 } 445 446 debug (ASOCKETS) 447 private void printSets(SocketSet readset, SocketSet writeset) 448 { 449 foreach (GenericSocket conn; sockets) 450 { 451 if (!conn.socket) 452 stderr.writefln("\t\t%s is unset", conn); 453 else 454 { 455 if (readset.isSet(conn.socket)) 456 stderr.writefln("\t\t%s is readable", conn); 457 if (writeset.isSet(conn.socket)) 458 stderr.writefln("\t\t%s is writable", conn); 459 } 460 } 461 } 462 463 private void handleEvent(SocketSet readset, SocketSet writeset) 464 { 465 debug (ASOCKETS) 466 { 467 stderr.writefln("\tSelect results:"); 468 printSets(readset, writeset); 469 } 470 471 foreach (GenericSocket conn; sockets) 472 { 473 if (!conn.socket) 474 continue; 475 476 if (readset.isSet(conn.socket)) 477 { 478 debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn); 479 return conn.onReadable(); 480 } 481 else 482 if (writeset.isSet(conn.socket)) 483 { 484 debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn); 485 return conn.onWritable(); 486 } 487 } 488 489 assert(false, "select() reported events available, but no registered sockets are set"); 490 } 491 } 492 493 /// Schedule a function to run on the next event loop iteration. 494 /// Can be used to queue logic to run once all current execution frames exit. 495 /// Similar to e.g. process.nextTick in Node. 496 void onNextTick(ref SocketManager socketManager, void delegate() dg) pure @safe nothrow 497 { 498 socketManager.nextTickHandlers ~= dg; 499 } 500 501 // Use UFCS to allow removeIdleHandler to have a predicate with context 502 /// Register a function to be called when the event loop is idle, 503 /// and would otherwise sleep. 504 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 505 { 506 foreach (i, idleHandler; socketManager.idleHandlers) 507 assert(handler !is idleHandler); 508 509 socketManager.idleHandlers ~= handler; 510 } 511 512 /// Unregister a function previously registered with `addIdleHandler`. 513 void removeIdleHandler(alias pred=(a, b) => a is b, Args...)(ref SocketManager socketManager, Args args) 514 { 515 foreach (i, idleHandler; socketManager.idleHandlers) 516 if (pred(idleHandler, args)) 517 { 518 import std.algorithm : remove; 519 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 520 return; 521 } 522 assert(false, "No such idle handler"); 523 } 524 525 private mixin template SocketMixin() 526 { 527 /// Interested in read notifications (onReadable)? 528 bool notifyRead; 529 /// Interested in write notifications (onWritable)? 530 bool notifyWrite; 531 } 532 } 533 534 /// The default socket manager. 535 SocketManager socketManager; 536 537 // *************************************************************************** 538 539 /// General methods for an asynchronous socket. 540 abstract class GenericSocket 541 { 542 /// Declares notifyRead and notifyWrite. 543 mixin SocketMixin; 544 545 protected: 546 /// The socket this class wraps. 547 Socket conn; 548 549 // protected: 550 /// Retrieve the socket class this class wraps. 551 @property final Socket socket() 552 { 553 return conn; 554 } 555 556 void onReadable() 557 { 558 } 559 560 void onWritable() 561 { 562 } 563 564 void onError(string /*reason*/) 565 { 566 } 567 568 public: 569 /// allow getting the address of connections that are already disconnected 570 private Address[2] cachedAddress; 571 572 /*private*/ final @property Address _address(bool local)() 573 { 574 if (cachedAddress[local] !is null) 575 return cachedAddress[local]; 576 else 577 if (conn is null) 578 return null; 579 else 580 { 581 Address a; 582 if (conn.addressFamily == AddressFamily.UNSPEC) 583 { 584 // Socket will attempt to construct an UnknownAddress, 585 // which will almost certainly not match the real address length. 586 static if (local) 587 alias getname = c_socks.getsockname; 588 else 589 alias getname = c_socks.getpeername; 590 591 c_socks.socklen_t nameLen = 0; 592 if (getname(conn.handle, null, &nameLen) < 0) 593 throw new SocketOSException("Unable to obtain socket address"); 594 595 auto buf = new ubyte[nameLen]; 596 auto sa = cast(c_socks.sockaddr*)buf.ptr; 597 if (getname(conn.handle, sa, &nameLen) < 0) 598 throw new SocketOSException("Unable to obtain socket address"); 599 a = new UnknownAddressReference(sa, nameLen); 600 } 601 else 602 a = local ? conn.localAddress() : conn.remoteAddress(); 603 return cachedAddress[local] = a; 604 } 605 } 606 607 alias localAddress = _address!true; /// Retrieve this socket's local address. 608 alias remoteAddress = _address!false; /// Retrieve this socket's remote address. 609 610 /*private*/ final @property string _addressStr(bool local)() nothrow 611 { 612 try 613 { 614 auto a = _address!local; 615 if (a is null) 616 return "[null address]"; 617 string host = a.toAddrString(); 618 import std.string : indexOf; 619 if (host.indexOf(':') >= 0) 620 host = "[" ~ host ~ "]"; 621 try 622 { 623 string port = a.toPortString(); 624 return host ~ ":" ~ port; 625 } 626 catch (Exception e) 627 return host; 628 } 629 catch (Exception e) 630 return "[error: " ~ e.msg ~ "]"; 631 } 632 633 alias localAddressStr = _addressStr!true; /// Retrieve this socket's local address, as a string. 634 alias remoteAddressStr = _addressStr!false; /// Retrieve this socket's remote address, as a string. 635 636 /// Don't block the process from exiting, even if the socket is ready to receive data. 637 /// TODO: Not implemented with libev 638 bool daemonRead; 639 640 /// Don't block the process from exiting, even if the socket is ready to send data. 641 /// TODO: Not implemented with libev 642 bool daemonWrite; 643 644 deprecated alias daemon = daemonRead; 645 646 /// Enable TCP keep-alive on the socket with the given settings. 647 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 648 { 649 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 650 if (enabled) 651 { 652 try 653 conn.setKeepAlive(time, interval); 654 catch (SocketException) 655 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 656 } 657 else 658 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 659 } 660 661 /// Returns a string containing the class name, address, and file descriptor. 662 override string toString() const 663 { 664 import std.string : format, split; 665 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 666 } 667 } 668 669 // *************************************************************************** 670 671 /// Classifies the cause of the disconnect. 672 /// Can be used to decide e.g. when it makes sense to reconnect. 673 enum DisconnectType 674 { 675 requested, /// Initiated by the application. 676 graceful, /// The peer gracefully closed the connection. 677 error /// Some abnormal network condition. 678 } 679 680 /// Used to indicate the state of a connection throughout its lifecycle. 681 enum ConnectionState 682 { 683 /// The initial state, or the state after a disconnect was fully processed. 684 disconnected, 685 686 /// Name resolution. Currently done synchronously. 687 resolving, 688 689 /// A connection attempt is in progress. 690 connecting, 691 692 /// A connection is established. 693 connected, 694 695 /// Disconnecting in progress. No data can be sent or received at this point. 696 /// We are waiting for queued data to be actually sent before disconnecting. 697 disconnecting, 698 } 699 700 /// Returns true if this is a connection state for which disconnecting is valid. 701 /// Generally, applications should be aware of the life cycle of their sockets, 702 /// so checking the state of a connection is unnecessary (and a code smell). 703 /// However, unconditionally disconnecting some connected sockets can be useful 704 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle), 705 /// such as in response to a signal. 706 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; } 707 708 /// Common interface for connections and adapters. 709 interface IConnection 710 { 711 /// `send` queues data for sending in one of five queues, indexed 712 /// by a numeric priority. 713 /// `MAX_PRIORITY` is the highest (least urgent) priority index. 714 /// `DEFAULT_PRIORITY` is the default priority 715 enum MAX_PRIORITY = 4; 716 enum DEFAULT_PRIORITY = 2; /// ditto 717 718 /// This is the default value for the `disconnect` `reason` string parameter. 719 static const defaultDisconnectReason = "Software closed the connection"; 720 721 /// Get connection state. 722 @property ConnectionState state(); 723 724 /// Has a connection been established? 725 deprecated final @property bool connected() { return state == ConnectionState.connected; } 726 727 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 728 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 729 730 /// Queue Data for sending. 731 void send(scope Data[] data, int priority = DEFAULT_PRIORITY); 732 733 /// ditto 734 final void send(Data datum, int priority = DEFAULT_PRIORITY) 735 { 736 this.send(datum.toArray, priority); 737 } 738 739 /// Terminate the connection. 740 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 741 742 /// Callback setter for when a connection has been established 743 /// (if applicable). 744 alias ConnectHandler = void delegate(); 745 @property void handleConnect(ConnectHandler value); /// ditto 746 747 /// Callback setter for when new data is read. 748 alias ReadDataHandler = void delegate(Data data); 749 @property void handleReadData(ReadDataHandler value); /// ditto 750 751 /// Callback setter for when a connection was closed. 752 alias DisconnectHandler = void delegate(string reason, DisconnectType type); 753 @property void handleDisconnect(DisconnectHandler value); /// ditto 754 755 /// Callback setter for when all queued data has been sent. 756 alias BufferFlushedHandler = void delegate(); 757 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 758 } 759 760 // *************************************************************************** 761 762 /// Implementation of `IConnection` using a socket. 763 /// Implements receiving data when readable and sending queued data 764 /// when writable. 765 class Connection : GenericSocket, IConnection 766 { 767 private: 768 /// Blocks of data larger than this value are passed as unmanaged memory 769 /// (in Data objects). Blocks smaller than this value will be reallocated 770 /// on the managed heap. The disadvantage of placing large objects on the 771 /// managed heap is false pointers; the disadvantage of using Data for 772 /// small objects is wasted slack space due to the page size alignment 773 /// requirement. 774 enum UNMANAGED_THRESHOLD = 256; 775 776 ConnectionState _state; 777 final @property ConnectionState state(ConnectionState value) { return _state = value; } 778 779 public: 780 /// Get connection state. 781 override @property ConnectionState state() { return _state; } 782 783 protected: 784 abstract sizediff_t doSend(in void[] buffer); 785 abstract sizediff_t doReceive(void[] buffer); 786 787 /// The send buffers. 788 DataVec[MAX_PRIORITY+1] outQueue; 789 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 790 int partiallySent = -1; 791 792 /// Constructor used by a ServerSocket for new connections 793 this(Socket conn) 794 { 795 this(); 796 this.conn = conn; 797 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 798 if (conn) 799 socketManager.register(this); 800 updateFlags(); 801 } 802 803 final void updateFlags() 804 { 805 if (state == ConnectionState.connecting) 806 notifyWrite = true; 807 else 808 notifyWrite = writePending; 809 810 notifyRead = state == ConnectionState.connected && readDataHandler; 811 debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite); 812 } 813 814 /// Called when a socket is readable. 815 override void onReadable() 816 { 817 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 818 static ubyte[0x10000] inBuffer = void; 819 auto received = doReceive(inBuffer); 820 821 if (received == 0) 822 return disconnect("Connection closed", DisconnectType.graceful); 823 824 if (received == Socket.ERROR) 825 { 826 // if (wouldHaveBlocked) 827 // { 828 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 829 // return; 830 // } 831 // else 832 onError("recv() error: " ~ lastSocketError); 833 } 834 else 835 { 836 debug (PRINTDATA) 837 { 838 stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr); 839 stderr.write(hexDump(inBuffer[0 .. received])); 840 stderr.flush(); 841 } 842 843 if (state == ConnectionState.disconnecting) 844 { 845 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this); 846 } 847 else 848 if (!readDataHandler) 849 { 850 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this); 851 } 852 else 853 { 854 // Currently, unlike the D1 version of this module, 855 // we will always reallocate read network data. 856 // This disfavours code which doesn't need to store 857 // read data after processing it, but otherwise 858 // makes things simpler and safer all around. 859 860 Data data; 861 if (received < UNMANAGED_THRESHOLD) 862 { 863 // Copy to the managed heap 864 data = Data(inBuffer[0 .. received].dup); 865 } 866 else 867 { 868 // Copy to unmanaged memory 869 data = Data(inBuffer[0 .. received], true); 870 } 871 readDataHandler(data); 872 } 873 } 874 } 875 876 /// Called when an error occurs on the socket. 877 override void onError(string reason) 878 { 879 if (state == ConnectionState.disconnecting) 880 { 881 debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 882 return close(); 883 } 884 885 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 886 disconnect("Socket error: " ~ reason, DisconnectType.error); 887 } 888 889 this() 890 { 891 } 892 893 public: 894 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 895 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 896 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 897 { 898 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 899 assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state)); 900 901 if (writePending) 902 { 903 if (type==DisconnectType.requested) 904 { 905 assert(conn, "Attempting to disconnect on an uninitialized socket"); 906 // queue disconnect after all data is sent 907 debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 908 state = ConnectionState.disconnecting; 909 //setIdleTimeout(30.seconds); 910 if (disconnectHandler) 911 disconnectHandler(reason, type); 912 updateFlags(); 913 return; 914 } 915 else 916 discardQueues(); 917 } 918 919 debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 920 921 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 922 close(); 923 else 924 { 925 assert(conn is null, "Registered but %s socket".format(state)); 926 if (state == ConnectionState.resolving) 927 state = ConnectionState.disconnected; 928 } 929 930 if (disconnectHandler) 931 disconnectHandler(reason, type); 932 updateFlags(); 933 } 934 935 private final void close() 936 { 937 assert(conn, "Attempting to close an unregistered socket"); 938 socketManager.unregister(this); 939 conn.close(); 940 conn = null; 941 outQueue[] = DataVec.init; 942 state = ConnectionState.disconnected; 943 } 944 945 /// Append data to the send buffer. 946 void send(scope Data[] data, int priority = DEFAULT_PRIORITY) 947 { 948 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 949 outQueue[priority] ~= data; 950 notifyWrite = true; // Fast updateFlags() 951 952 debug (PRINTDATA) 953 { 954 stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr); 955 foreach (datum; data) 956 if (datum.length) 957 stderr.write(hexDump(datum.contents)); 958 else 959 stderr.writeln("(empty Data)"); 960 stderr.flush(); 961 } 962 } 963 964 /// ditto 965 alias send = IConnection.send; 966 967 /// Cancel all queued `Data` packets with the given priority. 968 /// Does not cancel any partially-sent `Data`. 969 final void clearQueue(int priority) 970 { 971 if (priority == partiallySent) 972 { 973 assert(outQueue[priority].length > 0); 974 outQueue[priority].length = 1; 975 } 976 else 977 outQueue[priority] = null; 978 updateFlags(); 979 } 980 981 /// Clears all queues, even partially sent content. 982 private final void discardQueues() 983 { 984 foreach (priority; 0..MAX_PRIORITY+1) 985 outQueue[priority] = null; 986 partiallySent = -1; 987 updateFlags(); 988 } 989 990 /// Returns true if any queues have pending data. 991 @property 992 final bool writePending() 993 { 994 foreach (ref queue; outQueue) 995 if (queue.length) 996 return true; 997 return false; 998 } 999 1000 /// Returns true if there are any queued `Data` which have not yet 1001 /// begun to be sent. 1002 final bool queuePresent(int priority = DEFAULT_PRIORITY) 1003 { 1004 if (priority == partiallySent) 1005 { 1006 assert(outQueue[priority].length > 0); 1007 return outQueue[priority].length > 1; 1008 } 1009 else 1010 return outQueue[priority].length > 0; 1011 } 1012 1013 /// Returns the number of queued `Data` at the given priority. 1014 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 1015 { 1016 return outQueue[priority].length; 1017 } 1018 1019 /// Returns the number of queued bytes at the given priority. 1020 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 1021 { 1022 size_t bytes; 1023 foreach (datum; outQueue[priority]) 1024 bytes += datum.length; 1025 return bytes; 1026 } 1027 1028 // public: 1029 private ConnectHandler connectHandler; 1030 /// Callback for when a connection has been established. 1031 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 1032 1033 private ReadDataHandler readDataHandler; 1034 /// Callback for incoming data. 1035 /// Data will not be received unless this handler is set. 1036 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 1037 1038 private DisconnectHandler disconnectHandler; 1039 /// Callback for when a connection was closed. 1040 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 1041 1042 private BufferFlushedHandler bufferFlushedHandler; 1043 /// Callback setter for when all queued data has been sent. 1044 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 1045 } 1046 1047 /// Implements a stream connection. 1048 /// Queued `Data` is allowed to be fragmented. 1049 class StreamConnection : Connection 1050 { 1051 protected: 1052 this() 1053 { 1054 super(); 1055 } 1056 1057 /// Called when a socket is writable. 1058 override void onWritable() 1059 { 1060 //scope(success) updateFlags(); 1061 onWritableImpl(); 1062 updateFlags(); 1063 } 1064 1065 // Work around scope(success) breaking debugger stack traces 1066 final private void onWritableImpl() 1067 { 1068 debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state); 1069 if (state == ConnectionState.connecting) 1070 { 1071 int32_t error; 1072 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 1073 if (error) 1074 return disconnect(formatSocketError(error), DisconnectType.error); 1075 1076 state = ConnectionState.connected; 1077 1078 //debug writefln("[%s] Connected", remoteAddressStr); 1079 try 1080 setKeepAlive(); 1081 catch (Exception e) 1082 return disconnect(e.msg, DisconnectType.error); 1083 if (connectHandler) 1084 connectHandler(); 1085 return; 1086 } 1087 //debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1088 1089 foreach (sendPartial; [true, false]) 1090 foreach (int priority, ref queue; outQueue) 1091 while (queue.length && (!sendPartial || priority == partiallySent)) 1092 { 1093 assert(partiallySent == -1 || partiallySent == priority); 1094 1095 ptrdiff_t sent = 0; 1096 if (!queue.front.empty) 1097 { 1098 sent = doSend(queue.front.contents); 1099 debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, queue.front.length); 1100 } 1101 else 1102 { 1103 debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this); 1104 } 1105 1106 if (sent == Socket.ERROR) 1107 { 1108 if (wouldHaveBlocked()) 1109 return; 1110 else 1111 return onError("send() error: " ~ lastSocketError); 1112 } 1113 else 1114 if (sent < queue.front.length) 1115 { 1116 if (sent > 0) 1117 { 1118 queue.front = queue.front[sent..queue.front.length]; 1119 partiallySent = priority; 1120 } 1121 return; 1122 } 1123 else 1124 { 1125 assert(sent == queue.front.length); 1126 //debug writefln("[%s] Sent data:", remoteAddressStr); 1127 //debug writefln("%s", hexDump(queue.front.contents[0..sent])); 1128 queue.front.clear(); 1129 queue.popFront(); 1130 partiallySent = -1; 1131 if (queue.length == 0) 1132 queue = null; 1133 } 1134 } 1135 1136 // outQueue is now empty 1137 if (bufferFlushedHandler) 1138 bufferFlushedHandler(); 1139 if (state == ConnectionState.disconnecting) 1140 { 1141 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1142 close(); 1143 } 1144 } 1145 1146 public: 1147 this(Socket conn) 1148 { 1149 super(conn); 1150 } /// 1151 } 1152 1153 // *************************************************************************** 1154 1155 /// A POSIX file stream. 1156 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1157 /// Does not dup the given file descriptor, so "disconnecting" this connection 1158 /// will close it. 1159 version (Posix) 1160 class FileConnection : StreamConnection 1161 { 1162 this(int fileno) 1163 { 1164 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1165 conn.blocking = false; 1166 super(conn); 1167 } /// 1168 1169 protected: 1170 import core.sys.posix.unistd : read, write; 1171 1172 override sizediff_t doSend(in void[] buffer) 1173 { 1174 return write(socket.handle, buffer.ptr, buffer.length); 1175 } 1176 1177 override sizediff_t doReceive(void[] buffer) 1178 { 1179 return read(socket.handle, buffer.ptr, buffer.length); 1180 } 1181 } 1182 1183 /// Separates reading and writing, e.g. for stdin/stdout. 1184 class Duplex : IConnection 1185 { 1186 /// 1187 IConnection reader, writer; 1188 1189 this(IConnection reader, IConnection writer) 1190 { 1191 this.reader = reader; 1192 this.writer = writer; 1193 reader.handleConnect = &onConnect; 1194 writer.handleConnect = &onConnect; 1195 reader.handleDisconnect = &onDisconnect; 1196 writer.handleDisconnect = &onDisconnect; 1197 } /// 1198 1199 @property ConnectionState state() 1200 { 1201 if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting) 1202 return ConnectionState.disconnecting; 1203 else 1204 return reader.state < writer.state ? reader.state : writer.state; 1205 } /// 1206 1207 /// Queue Data for sending. 1208 void send(scope Data[] data, int priority) 1209 { 1210 writer.send(data, priority); 1211 } 1212 1213 alias send = IConnection.send; /// ditto 1214 1215 /// Terminate the connection. 1216 /// Note: this isn't quite fleshed out - applications may want to 1217 /// wait and send some more data even after stdin is closed, but 1218 /// such an interface can't be fitted into an IConnection 1219 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1220 { 1221 if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting) 1222 reader.disconnect(reason, type); 1223 if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting) 1224 writer.disconnect(reason, type); 1225 debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1226 } 1227 1228 protected void onConnect() 1229 { 1230 if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected) 1231 connectHandler(); 1232 } 1233 1234 protected void onDisconnect(string reason, DisconnectType type) 1235 { 1236 debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1237 if (disconnectHandler) 1238 { 1239 disconnectHandler(reason, type); 1240 disconnectHandler = null; // don't call it twice for the other connection 1241 } 1242 // It is our responsibility to disconnect the other connection 1243 // Use DisconnectType.requested to ensure that any written data is flushed 1244 disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested); 1245 } 1246 1247 /// Callback for when a connection has been established. 1248 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1249 private ConnectHandler connectHandler; 1250 1251 /// Callback setter for when new data is read. 1252 @property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; } 1253 1254 /// Callback setter for when a connection was closed. 1255 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1256 private DisconnectHandler disconnectHandler; 1257 1258 /// Callback setter for when all queued data has been written. 1259 @property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; } 1260 } 1261 1262 unittest { if (false) new Duplex(null, null); } 1263 1264 // *************************************************************************** 1265 1266 /// An asynchronous socket-based connection. 1267 class SocketConnection : StreamConnection 1268 { 1269 protected: 1270 AddressInfo[] addressQueue; 1271 1272 this(Socket conn) 1273 { 1274 super(conn); 1275 } 1276 1277 override sizediff_t doSend(in void[] buffer) 1278 { 1279 return conn.send(buffer); 1280 } 1281 1282 override sizediff_t doReceive(void[] buffer) 1283 { 1284 return conn.receive(buffer); 1285 } 1286 1287 final void tryNextAddress() 1288 { 1289 assert(state == ConnectionState.connecting); 1290 auto addressInfo = addressQueue[0]; 1291 addressQueue = addressQueue[1..$]; 1292 1293 try 1294 { 1295 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1296 conn.blocking = false; 1297 1298 socketManager.register(this); 1299 updateFlags(); 1300 debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString()); 1301 conn.connect(addressInfo.address); 1302 } 1303 catch (SocketException e) 1304 return onError("Connect error: " ~ e.msg); 1305 } 1306 1307 /// Called when an error occurs on the socket. 1308 override void onError(string reason) 1309 { 1310 if (state == ConnectionState.connecting && addressQueue.length) 1311 { 1312 socketManager.unregister(this); 1313 conn.close(); 1314 conn = null; 1315 1316 return tryNextAddress(); 1317 } 1318 1319 super.onError(reason); 1320 } 1321 1322 public: 1323 /// Default constructor 1324 this() 1325 { 1326 debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this); 1327 } 1328 1329 /// Start establishing a connection. 1330 final void connect(AddressInfo[] addresses) 1331 { 1332 assert(addresses.length, "No addresses specified"); 1333 1334 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1335 assert(!conn); 1336 1337 addressQueue = addresses; 1338 state = ConnectionState.connecting; 1339 tryNextAddress(); 1340 } 1341 } 1342 1343 /// An asynchronous TCP connection. 1344 class TcpConnection : SocketConnection 1345 { 1346 protected: 1347 this(Socket conn) 1348 { 1349 super(conn); 1350 } 1351 1352 public: 1353 /// Default constructor 1354 this() 1355 { 1356 debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this); 1357 } 1358 1359 /// 1360 alias connect = SocketConnection.connect; // raise overload 1361 1362 /// Start establishing a connection. 1363 final void connect(string host, ushort port) 1364 { 1365 assert(host.length, "Empty host"); 1366 assert(port, "No port specified"); 1367 1368 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1369 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1370 1371 state = ConnectionState.resolving; 1372 1373 AddressInfo[] addressInfos; 1374 try 1375 { 1376 auto addresses = getAddress(host, port); 1377 enforce(addresses.length, "No addresses found"); 1378 debug (ASOCKETS) 1379 { 1380 stderr.writefln("Resolved to %s addresses:", addresses.length); 1381 foreach (address; addresses) 1382 stderr.writefln("- %s", address.toString()); 1383 } 1384 1385 if (addresses.length > 1) 1386 { 1387 import std.random : randomShuffle; 1388 randomShuffle(addresses); 1389 } 1390 1391 foreach (address; addresses) 1392 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1393 } 1394 catch (SocketException e) 1395 return onError("Lookup error: " ~ e.msg); 1396 1397 state = ConnectionState.disconnected; 1398 connect(addressInfos); 1399 } 1400 } 1401 1402 // *************************************************************************** 1403 1404 /// An asynchronous connection server for socket-based connections. 1405 class SocketServer 1406 { 1407 protected: 1408 /// Class that actually performs listening on a certain address family 1409 final class Listener : GenericSocket 1410 { 1411 this(Socket conn) 1412 { 1413 debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this); 1414 this.conn = conn; 1415 socketManager.register(this); 1416 } 1417 1418 /// Called when a socket is readable. 1419 override void onReadable() 1420 { 1421 debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this); 1422 Socket acceptSocket = conn.accept(); 1423 acceptSocket.blocking = false; 1424 if (handleAccept) 1425 { 1426 auto connection = createConnection(acceptSocket); 1427 debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr); 1428 connection.setKeepAlive(); 1429 //assert(connection.connected); 1430 //connection.connected = true; 1431 acceptHandler(connection); 1432 } 1433 else 1434 acceptSocket.close(); 1435 } 1436 1437 /// Called when a socket is writable. 1438 override void onWritable() 1439 { 1440 } 1441 1442 /// Called when an error occurs on the socket. 1443 override void onError(string reason) 1444 { 1445 close(); // call parent 1446 } 1447 1448 void closeListener() 1449 { 1450 assert(conn); 1451 socketManager.unregister(this); 1452 conn.close(); 1453 conn = null; 1454 } 1455 } 1456 1457 SocketConnection createConnection(Socket socket) 1458 { 1459 return new SocketConnection(socket); 1460 } 1461 1462 /// Whether the socket is listening. 1463 bool listening; 1464 /// Listener instances 1465 Listener[] listeners; 1466 1467 final void updateFlags() 1468 { 1469 foreach (listener; listeners) 1470 listener.notifyRead = handleAccept !is null; 1471 } 1472 1473 public: 1474 /// Start listening on this socket. 1475 final void listen(AddressInfo[] addressInfos) 1476 { 1477 foreach (ref addressInfo; addressInfos) 1478 { 1479 try 1480 { 1481 Socket conn = new Socket(addressInfo); 1482 conn.blocking = false; 1483 if (addressInfo.family == AddressFamily.INET6) 1484 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1485 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1486 1487 conn.bind(addressInfo.address); 1488 conn.listen(totalCPUs * 2); 1489 1490 listeners ~= new Listener(conn); 1491 } 1492 catch (SocketException e) 1493 { 1494 debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1495 debug(ASOCKETS) stderr.writeln(e.msg); 1496 } 1497 } 1498 1499 if (listeners.length==0) 1500 throw new Exception("Unable to bind service"); 1501 1502 listening = true; 1503 1504 updateFlags(); 1505 } 1506 1507 this() 1508 { 1509 } /// 1510 1511 /// Creates a Server with the given sockets. 1512 /// The sockets must have already had `bind` and `listen` called on them. 1513 this(Socket[] sockets...) 1514 { 1515 foreach (socket; sockets) 1516 listeners ~= new Listener(socket); 1517 } 1518 1519 /// Returns all listening addresses. 1520 final @property Address[] localAddresses() 1521 { 1522 Address[] result; 1523 foreach (listener; listeners) 1524 result ~= listener.localAddress; 1525 return result; 1526 } 1527 1528 /// Returns `true` if the server is listening for incoming connections. 1529 final @property bool isListening() 1530 { 1531 return listening; 1532 } 1533 1534 /// Stop listening on this socket. 1535 final void close() 1536 { 1537 foreach (listener;listeners) 1538 listener.closeListener(); 1539 listeners = null; 1540 listening = false; 1541 if (handleClose) 1542 handleClose(); 1543 } 1544 1545 /// Create a SocketServer using the handle passed on standard input, 1546 /// for which `listen` had already been called. Used by 1547 /// e.g. FastCGI and systemd sockets with "Listen = yes". 1548 static SocketServer fromStdin() 1549 { 1550 socket_t socket; 1551 version (Windows) 1552 { 1553 import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE; 1554 socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE); 1555 } 1556 else 1557 socket = cast(socket_t)0; 1558 1559 auto s = new Socket(socket, AddressFamily.UNSPEC); 1560 s.blocking = false; 1561 return new SocketServer(s); 1562 } 1563 1564 /// Callback for when the socket was closed. 1565 void delegate() handleClose; 1566 1567 private void delegate(SocketConnection incoming) acceptHandler; 1568 /// Callback for an incoming connection. 1569 /// Connections will not be accepted unless this handler is set. 1570 @property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; } 1571 /// ditto 1572 @property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); } 1573 } 1574 1575 /// An asynchronous TCP connection server. 1576 class TcpServer : SocketServer 1577 { 1578 protected: 1579 override SocketConnection createConnection(Socket socket) 1580 { 1581 return new TcpConnection(socket); 1582 } 1583 1584 public: 1585 this() 1586 { 1587 } /// 1588 1589 this(Socket[] sockets...) 1590 { 1591 super(sockets); 1592 } /// Construct from the given sockets. 1593 1594 /// 1595 alias listen = SocketServer.listen; // raise overload 1596 1597 /// Start listening on this socket. 1598 final ushort listen(ushort port, string addr = null) 1599 { 1600 debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port); 1601 //assert(!listening, "Attempting to listen on a listening socket"); 1602 1603 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1604 1605 debug (ASOCKETS) 1606 { 1607 stderr.writefln("Resolved to %s addresses:", addressInfos.length); 1608 foreach (ref addressInfo; addressInfos) 1609 stderr.writefln("- %s", addressInfo); 1610 } 1611 1612 // listen on random ports only on IPv4 for now 1613 if (port == 0) 1614 { 1615 foreach_reverse (i, ref addressInfo; addressInfos) 1616 if (addressInfo.family != AddressFamily.INET) 1617 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1618 } 1619 1620 listen(addressInfos); 1621 1622 foreach (listener; listeners) 1623 { 1624 auto address = listener.conn.localAddress(); 1625 if (address.addressFamily == AddressFamily.INET) 1626 port = to!ushort(address.toPortString()); 1627 } 1628 1629 return port; 1630 } 1631 1632 deprecated("Use SocketServer.fromStdin") 1633 static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; } 1634 1635 /// Delegate to be called when a connection is accepted. 1636 @property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); } 1637 } 1638 1639 // *************************************************************************** 1640 1641 /// An asynchronous UDP stream. 1642 /// UDP does not have connections, so this class encapsulates a socket 1643 /// with a fixed destination (sendto) address, and optionally bound to 1644 /// a local address. 1645 /// Currently received packets' address is not exposed. 1646 class UdpConnection : Connection 1647 { 1648 protected: 1649 this(Socket conn) 1650 { 1651 super(conn); 1652 } 1653 1654 /// Called when a socket is writable. 1655 override void onWritable() 1656 { 1657 //scope(success) updateFlags(); 1658 onWritableImpl(); 1659 updateFlags(); 1660 } 1661 1662 // Work around scope(success) breaking debugger stack traces 1663 final private void onWritableImpl() 1664 { 1665 foreach (priority, ref queue; outQueue) 1666 while (queue.length) 1667 { 1668 auto sent = conn.sendTo(queue.front.contents, remoteAddress); 1669 1670 if (sent == Socket.ERROR) 1671 { 1672 if (wouldHaveBlocked()) 1673 return; 1674 else 1675 return onError("send() error: " ~ lastSocketError); 1676 } 1677 else 1678 if (sent < queue.front.length) 1679 { 1680 return onError("Sent only %d/%d bytes of the datagram!".format(sent, queue.front.length)); 1681 } 1682 else 1683 { 1684 assert(sent == queue.front.length); 1685 //debug writefln("[%s] Sent data:", remoteAddressStr); 1686 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1687 queue.front.clear(); 1688 queue.popFront(); 1689 if (queue.length == 0) 1690 queue = null; 1691 } 1692 } 1693 1694 // outQueue is now empty 1695 if (bufferFlushedHandler) 1696 bufferFlushedHandler(); 1697 if (state == ConnectionState.disconnecting) 1698 { 1699 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1700 close(); 1701 } 1702 } 1703 1704 override sizediff_t doSend(in void[] buffer) 1705 { 1706 assert(false); // never called (called only from overridden methods) 1707 } 1708 1709 override sizediff_t doReceive(void[] buffer) 1710 { 1711 return conn.receive(buffer); 1712 } 1713 1714 public: 1715 /// Default constructor 1716 this() 1717 { 1718 debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this); 1719 } 1720 1721 /// Initialize with the given `AddressFamily`, without binding to an address. 1722 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP) 1723 { 1724 initializeImpl(family, type, protocol); 1725 if (connectHandler) 1726 connectHandler(); 1727 } 1728 1729 private final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1730 { 1731 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1732 assert(!conn); 1733 1734 conn = new Socket(family, type, protocol); 1735 conn.blocking = false; 1736 socketManager.register(this); 1737 state = ConnectionState.connected; 1738 updateFlags(); 1739 } 1740 1741 /// Bind to a local address in order to receive packets sent there. 1742 final ushort bind(string host, ushort port) 1743 { 1744 assert(host.length, "Empty host"); 1745 1746 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1747 1748 state = ConnectionState.resolving; 1749 1750 AddressInfo addressInfo; 1751 try 1752 { 1753 auto addresses = getAddress(host, port); 1754 enforce(addresses.length, "No addresses found"); 1755 debug (ASOCKETS) 1756 { 1757 stderr.writefln("Resolved to %s addresses:", addresses.length); 1758 foreach (address; addresses) 1759 stderr.writefln("- %s", address.toString()); 1760 } 1761 1762 Address address; 1763 if (addresses.length > 1) 1764 { 1765 import std.random : uniform; 1766 address = addresses[uniform(0, $)]; 1767 } 1768 else 1769 address = addresses[0]; 1770 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1771 } 1772 catch (SocketException e) 1773 { 1774 onError("Lookup error: " ~ e.msg); 1775 return 0; 1776 } 1777 1778 state = ConnectionState.disconnected; 1779 return bind(addressInfo); 1780 } 1781 1782 /// ditto 1783 final ushort bind(AddressInfo addressInfo) 1784 { 1785 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1786 conn.bind(addressInfo.address); 1787 1788 auto address = conn.localAddress(); 1789 auto port = to!ushort(address.toPortString()); 1790 1791 if (connectHandler) 1792 connectHandler(); 1793 1794 return port; 1795 } 1796 1797 // public: 1798 /// Where to send packets to. 1799 Address remoteAddress; 1800 } 1801 1802 /// 1803 unittest 1804 { 1805 auto server = new UdpConnection(); 1806 server.bind("localhost", 0); 1807 1808 auto client = new UdpConnection(); 1809 client.initialize(server.localAddress.addressFamily); 1810 1811 string[] packets = ["Hello", "there"]; 1812 client.remoteAddress = server.localAddress; 1813 client.send({ 1814 DataVec data; 1815 foreach (packet; packets) 1816 data ~= Data(packet); 1817 return data; 1818 }()[]); 1819 1820 server.handleReadData = (Data data) 1821 { 1822 assert(data.contents == packets[0]); 1823 packets = packets[1..$]; 1824 if (!packets.length) 1825 { 1826 server.close(); 1827 client.close(); 1828 } 1829 }; 1830 socketManager.loop(); 1831 assert(!packets.length); 1832 } 1833 1834 // *************************************************************************** 1835 1836 /// Base class for a connection adapter. 1837 /// By itself, does nothing. 1838 class ConnectionAdapter : IConnection 1839 { 1840 /// The next connection in the chain (towards the raw transport). 1841 IConnection next; 1842 1843 this(IConnection next) 1844 { 1845 this.next = next; 1846 next.handleConnect = &onConnect; 1847 next.handleDisconnect = &onDisconnect; 1848 next.handleBufferFlushed = &onBufferFlushed; 1849 } /// 1850 1851 @property ConnectionState state() { return next.state; } /// 1852 1853 /// Queue Data for sending. 1854 void send(scope Data[] data, int priority) 1855 { 1856 next.send(data, priority); 1857 } 1858 1859 alias send = IConnection.send; /// ditto 1860 1861 /// Terminate the connection. 1862 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1863 { 1864 next.disconnect(reason, type); 1865 } 1866 1867 protected void onConnect() 1868 { 1869 if (connectHandler) 1870 connectHandler(); 1871 } 1872 1873 protected void onReadData(Data data) 1874 { 1875 // onReadData should be fired only if readDataHandler is set 1876 assert(readDataHandler, "onReadData caled with null readDataHandler"); 1877 readDataHandler(data); 1878 } 1879 1880 protected void onDisconnect(string reason, DisconnectType type) 1881 { 1882 if (disconnectHandler) 1883 disconnectHandler(reason, type); 1884 } 1885 1886 protected void onBufferFlushed() 1887 { 1888 if (bufferFlushedHandler) 1889 bufferFlushedHandler(); 1890 } 1891 1892 /// Callback for when a connection has been established. 1893 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1894 protected ConnectHandler connectHandler; 1895 1896 /// Callback setter for when new data is read. 1897 @property void handleReadData(ReadDataHandler value) 1898 { 1899 readDataHandler = value; 1900 next.handleReadData = value ? &onReadData : null ; 1901 } 1902 protected ReadDataHandler readDataHandler; 1903 1904 /// Callback setter for when a connection was closed. 1905 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1906 protected DisconnectHandler disconnectHandler; 1907 1908 /// Callback setter for when all queued data has been written. 1909 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 1910 protected BufferFlushedHandler bufferFlushedHandler; 1911 } 1912 1913 // *************************************************************************** 1914 1915 /// Adapter for connections with a line-based protocol. 1916 /// Splits data stream into delimiter-separated lines. 1917 class LineBufferedAdapter : ConnectionAdapter 1918 { 1919 /// The protocol's line delimiter. 1920 string delimiter = "\r\n"; 1921 1922 /// Maximum line length (0 means unlimited). 1923 size_t maxLength = 0; 1924 1925 this(IConnection next) 1926 { 1927 super(next); 1928 } /// 1929 1930 /// Append a line to the send buffer. 1931 void send(string line) 1932 { 1933 //super.send(Data(line ~ delimiter)); 1934 // https://issues.dlang.org/show_bug.cgi?id=13985 1935 ConnectionAdapter ca = this; 1936 ca.send(Data(line ~ delimiter)); 1937 } 1938 1939 protected: 1940 /// The receive buffer. 1941 Data inBuffer; 1942 1943 /// Called when data has been received. 1944 final override void onReadData(Data data) 1945 { 1946 import std.string : indexOf; 1947 auto oldBufferLength = inBuffer.length; 1948 if (oldBufferLength) 1949 inBuffer ~= data; 1950 else 1951 inBuffer = data; 1952 1953 if (delimiter.length == 1) 1954 { 1955 import core.stdc.string : memchr; 1956 1957 char c = delimiter[0]; 1958 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1959 while (p) 1960 { 1961 sizediff_t index = p - inBuffer.ptr; 1962 if (!processLine(index)) 1963 break; 1964 1965 p = memchr(inBuffer.ptr, c, inBuffer.length); 1966 } 1967 } 1968 else 1969 { 1970 sizediff_t index; 1971 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1972 while ((index = indexOf(cast(string)inBuffer.contents, delimiter)) >= 0 1973 && processLine(index)) 1974 {} 1975 } 1976 1977 if (maxLength && inBuffer.length > maxLength) 1978 disconnect("Line too long", DisconnectType.error); 1979 } 1980 1981 final bool processLine(size_t index) 1982 { 1983 if (maxLength && index > maxLength) 1984 { 1985 disconnect("Line too long", DisconnectType.error); 1986 return false; 1987 } 1988 auto line = inBuffer[0..index]; 1989 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1990 super.onReadData(line); 1991 return true; 1992 } 1993 1994 override void onDisconnect(string reason, DisconnectType type) 1995 { 1996 super.onDisconnect(reason, type); 1997 inBuffer.clear(); 1998 } 1999 } 2000 2001 // *************************************************************************** 2002 2003 /// Fires an event handler or disconnects connections 2004 /// after a period of inactivity. 2005 class TimeoutAdapter : ConnectionAdapter 2006 { 2007 this(IConnection next) 2008 { 2009 debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this); 2010 super(next); 2011 } /// 2012 2013 /// Set the `Duration` indicating the period of inactivity after which to take action. 2014 final void setIdleTimeout(Duration duration) 2015 { 2016 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 2017 assert(duration > Duration.zero); 2018 2019 // Configure idleTask 2020 if (idleTask is null) 2021 { 2022 idleTask = new TimerTask(duration); 2023 idleTask.handleTask = &onTask_Idle; 2024 } 2025 else 2026 { 2027 if (idleTask.isWaiting()) 2028 idleTask.cancel(); 2029 idleTask.delay = duration; 2030 } 2031 2032 mainTimer.add(idleTask); 2033 } 2034 2035 /// Manually mark this connection as non-idle, restarting the idle timer. 2036 /// `handleNonIdle` will be called, if set. 2037 void markNonIdle() 2038 { 2039 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 2040 if (handleNonIdle) 2041 handleNonIdle(); 2042 if (idleTask && idleTask.isWaiting()) 2043 idleTask.restart(); 2044 } 2045 2046 /// Stop the idle timer. 2047 void cancelIdleTimeout() 2048 { 2049 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 2050 assert(idleTask !is null); 2051 assert(idleTask.isWaiting()); 2052 idleTask.cancel(); 2053 } 2054 2055 /// Restart the idle timer. 2056 void resumeIdleTimeout() 2057 { 2058 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 2059 assert(idleTask !is null); 2060 assert(!idleTask.isWaiting()); 2061 mainTimer.add(idleTask); 2062 } 2063 2064 /// Callback for when a connection has stopped responding. 2065 /// If unset, the connection will be disconnected. 2066 void delegate() handleIdleTimeout; 2067 2068 /// Callback for when a connection is marked as non-idle 2069 /// (when data is received). 2070 void delegate() handleNonIdle; 2071 2072 protected: 2073 override void onConnect() 2074 { 2075 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 2076 markNonIdle(); 2077 super.onConnect(); 2078 } 2079 2080 override void onReadData(Data data) 2081 { 2082 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 2083 markNonIdle(); 2084 super.onReadData(data); 2085 } 2086 2087 override void onDisconnect(string reason, DisconnectType type) 2088 { 2089 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 2090 if (idleTask && idleTask.isWaiting()) 2091 idleTask.cancel(); 2092 super.onDisconnect(reason, type); 2093 } 2094 2095 private: 2096 TimerTask idleTask; // non-null if an idle timeout has been set 2097 2098 final void onTask_Idle(Timer /*timer*/, TimerTask /*task*/) 2099 { 2100 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this); 2101 if (state == ConnectionState.disconnecting) 2102 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 2103 2104 if (state == ConnectionState.disconnected) 2105 return; 2106 2107 if (handleIdleTimeout) 2108 { 2109 resumeIdleTimeout(); // reschedule (by default) 2110 handleIdleTimeout(); 2111 } 2112 else 2113 disconnect("Time-out", DisconnectType.error); 2114 } 2115 } 2116 2117 // *************************************************************************** 2118 2119 unittest 2120 { 2121 void testTimer() 2122 { 2123 bool fired; 2124 setTimeout({fired = true;}, 10.msecs); 2125 socketManager.loop(); 2126 assert(fired); 2127 } 2128 2129 testTimer(); 2130 }