1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <ae@cy.md> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import core.stdc.stdint : int32_t; 24 25 import std.exception; 26 import std.socket; 27 import std..string : format; 28 public import std.socket : Address, AddressInfo, Socket; 29 30 version (Windows) 31 private import c_socks = core.sys.windows.winsock2; 32 else version (Posix) 33 private import c_socks = core.sys.posix.sys.socket; 34 35 debug(ASOCKETS) import std.stdio : stderr; 36 debug(PRINTDATA) import std.stdio : stderr; 37 debug(PRINTDATA) import ae.utils.text : hexDump; 38 private import std.conv : to; 39 40 41 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 42 static import ae.utils.array; 43 44 version(LIBEV) 45 { 46 import deimos.ev; 47 pragma(lib, "ev"); 48 } 49 50 version(Windows) 51 { 52 import core.sys.windows.windows : Sleep; 53 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 54 } 55 else 56 enum USE_SLEEP = false; 57 58 /// Flags that determine socket wake-up events. 59 int eventCounter; 60 61 version(LIBEV) 62 { 63 // Watchers are a GenericSocket field (as declared in SocketMixin). 64 // Use one watcher per read and write event. 65 // Start/stop those as higher-level code declares interest in those events. 66 // Use the "data" ev_io field to store the parent GenericSocket address. 67 // Also use the "data" field as a flag to indicate whether the watcher is active 68 // (data is null when the watcher is stopped). 69 70 /// `libev`-based event loop implementation. 71 struct SocketManager 72 { 73 private: 74 size_t count; 75 76 extern(C) 77 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 78 { 79 eventCounter++; 80 auto socket = cast(GenericSocket)w.data; 81 assert(socket, "libev event fired on stopped watcher"); 82 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 83 84 if (revents & EV_READ) 85 socket.onReadable(); 86 else 87 if (revents & EV_WRITE) 88 socket.onWritable(); 89 else 90 assert(false, "Unknown event fired from libev"); 91 92 // TODO? Need to get proper SocketManager instance to call updateTimer on 93 socketManager.updateTimer(false); 94 } 95 96 ev_timer evTimer; 97 MonoTime lastNextEvent = MonoTime.max; 98 99 extern(C) 100 static void timerCallback(ev_loop_t* l, ev_timer* w, int /*revents*/) 101 { 102 eventCounter++; 103 debug (ASOCKETS) writefln("Timer callback called."); 104 mainTimer.prod(); 105 106 socketManager.updateTimer(true); 107 debug (ASOCKETS) writefln("Timer callback exiting."); 108 } 109 110 void updateTimer(bool force) 111 { 112 auto nextEvent = mainTimer.getNextEvent(); 113 if (force || lastNextEvent != nextEvent) 114 { 115 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 116 if (nextEvent == MonoTime.max) // Stopping 117 { 118 if (lastNextEvent != MonoTime.max) 119 ev_timer_stop(ev_default_loop(0), &evTimer); 120 } 121 else 122 { 123 auto remaining = mainTimer.getRemainingTime(); 124 while (remaining <= Duration.zero) 125 { 126 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 127 mainTimer.prod(); 128 remaining = mainTimer.getRemainingTime(); 129 } 130 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 131 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 132 if (lastNextEvent == MonoTime.max) // Starting 133 { 134 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 135 ev_timer_start(ev_default_loop(0), &evTimer); 136 } 137 else // Adjusting 138 { 139 evTimer.repeat = tstamp; 140 ev_timer_again(ev_default_loop(0), &evTimer); 141 } 142 } 143 lastNextEvent = nextEvent; 144 } 145 } 146 147 public: 148 /// Register a socket with the manager. 149 void register(GenericSocket socket) 150 { 151 debug (ASOCKETS) writefln("Registering %s", socket); 152 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 153 auto fd = socket.conn.handle; 154 assert(fd, "Must have fd before socket registration"); 155 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 156 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 157 count++; 158 } 159 160 /// Unregister a socket with the manager. 161 void unregister(GenericSocket socket) 162 { 163 debug (ASOCKETS) writefln("Unregistering %s", socket); 164 socket.notifyRead = false; 165 socket.notifyWrite = false; 166 count--; 167 } 168 169 /// Returns the number of registered sockets. 170 size_t size() 171 { 172 return count; 173 } 174 175 /// Loop continuously until no sockets are left. 176 void loop() 177 { 178 auto evLoop = ev_default_loop(0); 179 enforce(evLoop, "libev initialization failure"); 180 181 updateTimer(true); 182 debug (ASOCKETS) writeln("ev_run"); 183 ev_run(ev_default_loop(0), 0); 184 } 185 } 186 187 private mixin template SocketMixin() 188 { 189 private ev_io evRead, evWrite; 190 191 private void setWatcherState(ref ev_io ev, bool newValue, int /*event*/) 192 { 193 if (!conn) 194 { 195 // Can happen when setting delegates before connecting. 196 return; 197 } 198 199 if (newValue && !ev.data) 200 { 201 // Start 202 ev.data = cast(void*)this; 203 ev_io_start(ev_default_loop(0), &ev); 204 } 205 else 206 if (!newValue && ev.data) 207 { 208 // Stop 209 assert(ev.data is cast(void*)this); 210 ev.data = null; 211 ev_io_stop(ev_default_loop(0), &ev); 212 } 213 } 214 215 /// Interested in read notifications (onReadable)? 216 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 217 /// Interested in write notifications (onWritable)? 218 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 219 220 debug ~this() 221 { 222 // The LIBEV SocketManager holds no references to registered sockets. 223 // TODO: Add a doubly-linked list? 224 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 225 } 226 } 227 } 228 else // Use select 229 { 230 /// `select`-based event loop implementation. 231 struct SocketManager 232 { 233 private: 234 enum FD_SETSIZE = 1024; 235 236 /// List of all sockets to poll. 237 GenericSocket[] sockets; 238 239 /// Debug AA to check for dangling socket references. 240 debug GenericSocket[socket_t] socketHandles; 241 242 void delegate()[] nextTickHandlers, idleHandlers; 243 244 public: 245 /// Register a socket with the manager. 246 void register(GenericSocket conn) 247 { 248 debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1); 249 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 250 sockets ~= conn; 251 252 debug 253 { 254 auto handle = conn.socket.handle; 255 assert(handle != socket_t.init, "Can't register a closed socket"); 256 assert(handle !in socketHandles, "This socket handle is already registered"); 257 socketHandles[handle] = conn; 258 } 259 } 260 261 /// Unregister a socket with the manager. 262 void unregister(GenericSocket conn) 263 { 264 debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 265 266 debug 267 { 268 auto handle = conn.socket.handle; 269 assert(handle != socket_t.init, "Can't unregister a closed socket"); 270 auto pconn = handle in socketHandles; 271 assert(pconn, "This socket handle is not registered"); 272 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 273 socketHandles.remove(handle); 274 } 275 276 foreach (size_t i, GenericSocket j; sockets) 277 if (j is conn) 278 { 279 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 280 return; 281 } 282 assert(false, "Socket not registered"); 283 } 284 285 /// Returns the number of registered sockets. 286 size_t size() 287 { 288 return sockets.length; 289 } 290 291 /// Loop continuously until no sockets are left. 292 void loop() 293 { 294 debug (ASOCKETS) stderr.writeln("Starting event loop."); 295 296 SocketSet readset, writeset; 297 size_t sockcount; 298 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 299 readset = new SocketSet(setSize); 300 writeset = new SocketSet(setSize); 301 while (true) 302 { 303 if (nextTickHandlers.length) 304 { 305 auto thisTickHandlers = nextTickHandlers; 306 nextTickHandlers = null; 307 308 foreach (handler; thisTickHandlers) 309 handler(); 310 311 continue; 312 } 313 314 uint minSize = 0; 315 version(Windows) 316 minSize = cast(uint)sockets.length; 317 else 318 { 319 foreach (s; sockets) 320 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 321 minSize = s.socket.handle; 322 } 323 minSize++; 324 325 if (setSize < minSize) 326 { 327 debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 328 setSize = minSize * 2; 329 readset = new SocketSet(setSize); 330 writeset = new SocketSet(setSize); 331 } 332 else 333 { 334 readset.reset(); 335 writeset.reset(); 336 } 337 338 sockcount = 0; 339 bool haveActive; 340 debug (ASOCKETS) stderr.writeln("Populating sets"); 341 foreach (GenericSocket conn; sockets) 342 { 343 if (!conn.socket) 344 continue; 345 sockcount++; 346 347 debug (ASOCKETS) stderr.writef("\t%s:", conn); 348 if (conn.notifyRead) 349 { 350 readset.add(conn.socket); 351 if (!conn.daemonRead) 352 haveActive = true; 353 debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : ""); 354 } 355 if (conn.notifyWrite) 356 { 357 writeset.add(conn.socket); 358 if (!conn.daemonWrite) 359 haveActive = true; 360 debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : ""); 361 } 362 debug (ASOCKETS) stderr.writeln(); 363 } 364 debug (ASOCKETS) 365 { 366 stderr.writefln("Sets populated as follows:"); 367 printSets(readset, writeset); 368 } 369 370 debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...", 371 haveActive ? "" : "not ", 372 sockcount, 373 mainTimer.isWaiting() ? "with" : "no", 374 idleHandlers.length, 375 ); 376 if (!haveActive && !mainTimer.isWaiting() && !nextTickHandlers.length) 377 { 378 debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop."); 379 break; 380 } 381 382 debug (ASOCKETS) stderr.flush(); 383 384 int events; 385 if (idleHandlers.length) 386 { 387 if (sockcount==0) 388 events = 0; 389 else 390 events = Socket.select(readset, writeset, null, 0.seconds); 391 } 392 else 393 if (USE_SLEEP && sockcount==0) 394 { 395 version(Windows) 396 { 397 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 398 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 399 if (duration <= 0) 400 duration = 1; // Avoid busywait 401 else 402 if (duration > int.max) 403 duration = int.max; 404 Sleep(cast(int)duration); 405 events = 0; 406 } 407 else 408 assert(0); 409 } 410 else 411 if (mainTimer.isWaiting()) 412 events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime()); 413 else 414 events = Socket.select(readset, writeset, null); 415 416 debug (ASOCKETS) stderr.writefln("%d events fired.", events); 417 418 if (events > 0) 419 { 420 // Handle just one event at a time, as the first 421 // handler might invalidate select()'s results. 422 handleEvent(readset, writeset); 423 } 424 else 425 if (idleHandlers.length) 426 { 427 import ae.utils.array : shift; 428 auto handler = idleHandlers.shift(); 429 430 // Rotate the idle handler queue before running it, 431 // in case the handler unregisters itself. 432 idleHandlers ~= handler; 433 434 handler(); 435 } 436 437 // Timers may invalidate our select results, so fire them after processing the latter 438 mainTimer.prod(); 439 440 eventCounter++; 441 } 442 } 443 444 debug (ASOCKETS) 445 void printSets(SocketSet readset, SocketSet writeset) 446 { 447 foreach (GenericSocket conn; sockets) 448 { 449 if (!conn.socket) 450 stderr.writefln("\t\t%s is unset", conn); 451 else 452 { 453 if (readset.isSet(conn.socket)) 454 stderr.writefln("\t\t%s is readable", conn); 455 if (writeset.isSet(conn.socket)) 456 stderr.writefln("\t\t%s is writable", conn); 457 } 458 } 459 } 460 461 void handleEvent(SocketSet readset, SocketSet writeset) 462 { 463 debug (ASOCKETS) 464 { 465 stderr.writefln("\tSelect results:"); 466 printSets(readset, writeset); 467 } 468 469 foreach (GenericSocket conn; sockets) 470 { 471 if (!conn.socket) 472 continue; 473 474 if (readset.isSet(conn.socket)) 475 { 476 debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn); 477 return conn.onReadable(); 478 } 479 else 480 if (writeset.isSet(conn.socket)) 481 { 482 debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn); 483 return conn.onWritable(); 484 } 485 } 486 487 assert(false, "select() reported events available, but no registered sockets are set"); 488 } 489 } 490 491 /// Schedule a function to run on the next event loop iteration. 492 /// Can be used to queue logic to run once all current execution frames exit. 493 /// Similar to e.g. process.nextTick in Node. 494 void onNextTick(ref SocketManager socketManager, void delegate() dg) pure @safe nothrow 495 { 496 socketManager.nextTickHandlers ~= dg; 497 } 498 499 // Use UFCS to allow removeIdleHandler to have a predicate with context 500 /// Register a function to be called when the event loop is idle, 501 /// and would otherwise sleep. 502 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 503 { 504 foreach (i, idleHandler; socketManager.idleHandlers) 505 assert(handler !is idleHandler); 506 507 socketManager.idleHandlers ~= handler; 508 } 509 510 /// Unregister a function previously registered with `addIdleHandler`. 511 static bool isFun(T)(T a, T b) { return a is b; } 512 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 513 { 514 foreach (i, idleHandler; socketManager.idleHandlers) 515 if (pred(idleHandler, args)) 516 { 517 import std.algorithm : remove; 518 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 519 return; 520 } 521 assert(false, "No such idle handler"); 522 } 523 524 private mixin template SocketMixin() 525 { 526 /// Interested in read notifications (onReadable)? 527 bool notifyRead; 528 /// Interested in write notifications (onWritable)? 529 bool notifyWrite; 530 } 531 } 532 533 /// The default socket manager. 534 SocketManager socketManager; 535 536 // *************************************************************************** 537 538 /// General methods for an asynchronous socket. 539 abstract class GenericSocket 540 { 541 /// Declares notifyRead and notifyWrite. 542 mixin SocketMixin; 543 544 protected: 545 /// The socket this class wraps. 546 Socket conn; 547 548 // protected: 549 /// Retrieve the socket class this class wraps. 550 @property final Socket socket() 551 { 552 return conn; 553 } 554 555 void onReadable() 556 { 557 } 558 559 void onWritable() 560 { 561 } 562 563 void onError(string /*reason*/) 564 { 565 } 566 567 public: 568 /// allow getting the address of connections that are already disconnected 569 private Address[2] cachedAddress; 570 571 /*private*/ final @property Address address(bool local)() 572 { 573 if (cachedAddress[local] !is null) 574 return cachedAddress[local]; 575 else 576 if (conn is null) 577 return null; 578 else 579 { 580 Address a; 581 if (conn.addressFamily == AddressFamily.UNSPEC) 582 { 583 // Socket will attempt to construct an UnknownAddress, 584 // which will almost certainly not match the real address length. 585 static if (local) 586 alias getname = c_socks.getsockname; 587 else 588 alias getname = c_socks.getpeername; 589 590 c_socks.socklen_t nameLen = 0; 591 if (getname(conn.handle, null, &nameLen) < 0) 592 throw new SocketOSException("Unable to obtain socket address"); 593 594 auto buf = new ubyte[nameLen]; 595 auto sa = cast(c_socks.sockaddr*)buf.ptr; 596 if (getname(conn.handle, sa, &nameLen) < 0) 597 throw new SocketOSException("Unable to obtain socket address"); 598 a = new UnknownAddressReference(sa, nameLen); 599 } 600 else 601 a = local ? conn.localAddress() : conn.remoteAddress(); 602 return cachedAddress[local] = a; 603 } 604 } 605 606 alias localAddress = address!true; 607 alias remoteAddress = address!false; 608 609 /*private*/ final @property string addressStr(bool local)() nothrow 610 { 611 try 612 { 613 auto a = address!local; 614 if (a is null) 615 return "[null address]"; 616 string host = a.toAddrString(); 617 import std..string : indexOf; 618 if (host.indexOf(':') >= 0) 619 host = "[" ~ host ~ "]"; 620 try 621 { 622 string port = a.toPortString(); 623 return host ~ ":" ~ port; 624 } 625 catch (Exception e) 626 return host; 627 } 628 catch (Exception e) 629 return "[error: " ~ e.msg ~ "]"; 630 } 631 632 alias localAddressStr = addressStr!true; 633 alias remoteAddressStr = addressStr!false; 634 635 /// Don't block the process from exiting, even if the socket is ready to receive data. 636 /// TODO: Not implemented with libev 637 bool daemonRead; 638 639 /// Don't block the process from exiting, even if the socket is ready to send data. 640 /// TODO: Not implemented with libev 641 bool daemonWrite; 642 643 deprecated alias daemon = daemonRead; 644 645 /// Enable TCP keep-alive on the socket with the given settings. 646 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 647 { 648 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 649 if (enabled) 650 { 651 try 652 conn.setKeepAlive(time, interval); 653 catch (SocketException) 654 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 655 } 656 else 657 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 658 } 659 660 /// Returns a string containing the class name, address, and file descriptor. 661 override string toString() const 662 { 663 import std..string : format, split; 664 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 665 } 666 } 667 668 // *************************************************************************** 669 670 /// Classifies the cause of the disconnect. 671 /// Can be used to decide e.g. when it makes sense to reconnect. 672 enum DisconnectType 673 { 674 requested, /// Initiated by the application. 675 graceful, /// The peer gracefully closed the connection. 676 error /// Some abnormal network condition. 677 } 678 679 /// Used to indicate the state of a connection throughout its lifecycle. 680 enum ConnectionState 681 { 682 /// The initial state, or the state after a disconnect was fully processed. 683 disconnected, 684 685 /// Name resolution. Currently done synchronously. 686 resolving, 687 688 /// A connection attempt is in progress. 689 connecting, 690 691 /// A connection is established. 692 connected, 693 694 /// Disconnecting in progress. No data can be sent or received at this point. 695 /// We are waiting for queued data to be actually sent before disconnecting. 696 disconnecting, 697 } 698 699 /// Returns true if this is a connection state for which disconnecting is valid. 700 /// Generally, applications should be aware of the life cycle of their sockets, 701 /// so checking the state of a connection is unnecessary (and a code smell). 702 /// However, unconditionally disconnecting some connected sockets can be useful 703 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle), 704 /// such as in response to a signal. 705 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; } 706 707 /// Common interface for connections and adapters. 708 interface IConnection 709 { 710 /// `send` queues data for sending in one of five queues, indexed 711 /// by a numeric priority. 712 /// `MAX_PRIORITY` is the highest (least urgent) priority index. 713 /// `DEFAULT_PRIORITY` is the default priority 714 enum MAX_PRIORITY = 4; 715 enum DEFAULT_PRIORITY = 2; /// ditto 716 717 /// This is the default value for the `disconnect` `reason` string parameter. 718 static const defaultDisconnectReason = "Software closed the connection"; 719 720 /// Get connection state. 721 @property ConnectionState state(); 722 723 /// Has a connection been established? 724 deprecated final @property bool connected() { return state == ConnectionState.connected; } 725 726 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 727 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 728 729 /// Queue Data for sending. 730 void send(Data[] data, int priority = DEFAULT_PRIORITY); 731 732 /// ditto 733 final void send(Data datum, int priority = DEFAULT_PRIORITY) 734 { 735 Data[1] data; 736 data[0] = datum; 737 this.send(data, priority); 738 data[] = Data.init; 739 } 740 741 /// Terminate the connection. 742 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 743 744 /// Callback setter for when a connection has been established 745 /// (if applicable). 746 alias ConnectHandler = void delegate(); 747 @property void handleConnect(ConnectHandler value); /// ditto 748 749 /// Callback setter for when new data is read. 750 alias ReadDataHandler = void delegate(Data data); 751 @property void handleReadData(ReadDataHandler value); /// ditto 752 753 /// Callback setter for when a connection was closed. 754 alias DisconnectHandler = void delegate(string reason, DisconnectType type); 755 @property void handleDisconnect(DisconnectHandler value); /// ditto 756 757 /// Callback setter for when all queued data has been sent. 758 alias BufferFlushedHandler = void delegate(); 759 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 760 } 761 762 // *************************************************************************** 763 764 /// Implementation of `IConnection` using a socket. 765 /// Implements receiving data when readable and sending queued data 766 /// when writable. 767 class Connection : GenericSocket, IConnection 768 { 769 private: 770 /// Blocks of data larger than this value are passed as unmanaged memory 771 /// (in Data objects). Blocks smaller than this value will be reallocated 772 /// on the managed heap. The disadvantage of placing large objects on the 773 /// managed heap is false pointers; the disadvantage of using Data for 774 /// small objects is wasted slack space due to the page size alignment 775 /// requirement. 776 enum UNMANAGED_THRESHOLD = 256; 777 778 ConnectionState _state; 779 final @property ConnectionState state(ConnectionState value) { return _state = value; } 780 781 public: 782 /// Get connection state. 783 override @property ConnectionState state() { return _state; } 784 785 protected: 786 abstract sizediff_t doSend(in void[] buffer); 787 abstract sizediff_t doReceive(void[] buffer); 788 789 /// The send buffers. 790 Data[][MAX_PRIORITY+1] outQueue; 791 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 792 int partiallySent = -1; 793 794 /// Constructor used by a ServerSocket for new connections 795 this(Socket conn) 796 { 797 this(); 798 this.conn = conn; 799 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 800 if (conn) 801 socketManager.register(this); 802 updateFlags(); 803 } 804 805 final void updateFlags() 806 { 807 if (state == ConnectionState.connecting) 808 notifyWrite = true; 809 else 810 notifyWrite = writePending; 811 812 notifyRead = state == ConnectionState.connected && readDataHandler; 813 debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite); 814 } 815 816 /// Called when a socket is readable. 817 override void onReadable() 818 { 819 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 820 static ubyte[0x10000] inBuffer = void; 821 auto received = doReceive(inBuffer); 822 823 if (received == 0) 824 return disconnect("Connection closed", DisconnectType.graceful); 825 826 if (received == Socket.ERROR) 827 { 828 // if (wouldHaveBlocked) 829 // { 830 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 831 // return; 832 // } 833 // else 834 onError("recv() error: " ~ lastSocketError); 835 } 836 else 837 { 838 debug (PRINTDATA) 839 { 840 stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr); 841 stderr.write(hexDump(inBuffer[0 .. received])); 842 stderr.flush(); 843 } 844 845 if (state == ConnectionState.disconnecting) 846 { 847 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this); 848 } 849 else 850 if (!readDataHandler) 851 { 852 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this); 853 } 854 else 855 { 856 // Currently, unlike the D1 version of this module, 857 // we will always reallocate read network data. 858 // This disfavours code which doesn't need to store 859 // read data after processing it, but otherwise 860 // makes things simpler and safer all around. 861 862 if (received < UNMANAGED_THRESHOLD) 863 { 864 // Copy to the managed heap 865 readDataHandler(Data(inBuffer[0 .. received].dup)); 866 } 867 else 868 { 869 // Copy to unmanaged memory 870 readDataHandler(Data(inBuffer[0 .. received], true)); 871 } 872 } 873 } 874 } 875 876 /// Called when an error occurs on the socket. 877 override void onError(string reason) 878 { 879 if (state == ConnectionState.disconnecting) 880 { 881 debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 882 return close(); 883 } 884 885 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 886 disconnect("Socket error: " ~ reason, DisconnectType.error); 887 } 888 889 this() 890 { 891 } 892 893 public: 894 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 895 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 896 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 897 { 898 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 899 assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state)); 900 901 if (writePending) 902 { 903 if (type==DisconnectType.requested) 904 { 905 assert(conn, "Attempting to disconnect on an uninitialized socket"); 906 // queue disconnect after all data is sent 907 debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 908 state = ConnectionState.disconnecting; 909 //setIdleTimeout(30.seconds); 910 if (disconnectHandler) 911 disconnectHandler(reason, type); 912 updateFlags(); 913 return; 914 } 915 else 916 discardQueues(); 917 } 918 919 debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 920 921 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 922 close(); 923 else 924 { 925 assert(conn is null, "Registered but %s socket".format(state)); 926 if (state == ConnectionState.resolving) 927 state = ConnectionState.disconnected; 928 } 929 930 if (disconnectHandler) 931 disconnectHandler(reason, type); 932 updateFlags(); 933 } 934 935 private final void close() 936 { 937 assert(conn, "Attempting to close an unregistered socket"); 938 socketManager.unregister(this); 939 conn.close(); 940 conn = null; 941 outQueue[] = null; 942 state = ConnectionState.disconnected; 943 } 944 945 /// Append data to the send buffer. 946 void send(Data[] data, int priority = DEFAULT_PRIORITY) 947 { 948 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 949 outQueue[priority] ~= data; 950 notifyWrite = true; // Fast updateFlags() 951 952 debug (PRINTDATA) 953 { 954 stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr); 955 foreach (datum; data) 956 if (datum.length) 957 stderr.write(hexDump(datum.contents)); 958 else 959 stderr.writeln("(empty Data)"); 960 stderr.flush(); 961 } 962 } 963 964 /// ditto 965 alias send = IConnection.send; 966 967 /// Cancel all queued `Data` packets with the given priority. 968 /// Does not cancel any partially-sent `Data`. 969 final void clearQueue(int priority) 970 { 971 if (priority == partiallySent) 972 { 973 assert(outQueue[priority].length > 0); 974 outQueue[priority] = outQueue[priority][0..1]; 975 } 976 else 977 outQueue[priority] = null; 978 updateFlags(); 979 } 980 981 /// Clears all queues, even partially sent content. 982 private final void discardQueues() 983 { 984 foreach (priority; 0..MAX_PRIORITY+1) 985 outQueue[priority] = null; 986 partiallySent = -1; 987 updateFlags(); 988 } 989 990 /// Returns true if any queues have pending data. 991 @property 992 final bool writePending() 993 { 994 foreach (queue; outQueue) 995 if (queue.length) 996 return true; 997 return false; 998 } 999 1000 /// Returns true if there are any queued `Data` which have not yet 1001 /// begun to be sent. 1002 final bool queuePresent(int priority = DEFAULT_PRIORITY) 1003 { 1004 if (priority == partiallySent) 1005 { 1006 assert(outQueue[priority].length > 0); 1007 return outQueue[priority].length > 1; 1008 } 1009 else 1010 return outQueue[priority].length > 0; 1011 } 1012 1013 /// Returns the number of queued `Data` at the given priority. 1014 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 1015 { 1016 return outQueue[priority].length; 1017 } 1018 1019 /// Returns the number of queued bytes at the given priority. 1020 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 1021 { 1022 size_t bytes; 1023 foreach (datum; outQueue[priority]) 1024 bytes += datum.length; 1025 return bytes; 1026 } 1027 1028 // public: 1029 private ConnectHandler connectHandler; 1030 /// Callback for when a connection has been established. 1031 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 1032 1033 private ReadDataHandler readDataHandler; 1034 /// Callback for incoming data. 1035 /// Data will not be received unless this handler is set. 1036 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 1037 1038 private DisconnectHandler disconnectHandler; 1039 /// Callback for when a connection was closed. 1040 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 1041 1042 private BufferFlushedHandler bufferFlushedHandler; 1043 /// Callback setter for when all queued data has been sent. 1044 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 1045 } 1046 1047 /// Implements a stream connection. 1048 /// Queued `Data` is allowed to be fragmented. 1049 class StreamConnection : Connection 1050 { 1051 protected: 1052 this() 1053 { 1054 super(); 1055 } 1056 1057 /// Called when a socket is writable. 1058 override void onWritable() 1059 { 1060 //scope(success) updateFlags(); 1061 onWritableImpl(); 1062 updateFlags(); 1063 } 1064 1065 // Work around scope(success) breaking debugger stack traces 1066 final private void onWritableImpl() 1067 { 1068 debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state); 1069 if (state == ConnectionState.connecting) 1070 { 1071 int32_t error; 1072 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 1073 if (error) 1074 return disconnect(formatSocketError(error), DisconnectType.error); 1075 1076 state = ConnectionState.connected; 1077 1078 //debug writefln("[%s] Connected", remoteAddressStr); 1079 try 1080 setKeepAlive(); 1081 catch (Exception e) 1082 return disconnect(e.msg, DisconnectType.error); 1083 if (connectHandler) 1084 connectHandler(); 1085 return; 1086 } 1087 //debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1088 1089 foreach (sendPartial; [true, false]) 1090 foreach (int priority, ref queue; outQueue) 1091 while (queue.length && (!sendPartial || priority == partiallySent)) 1092 { 1093 assert(partiallySent == -1 || partiallySent == priority); 1094 1095 auto pdata = queue.ptr; // pointer to first data 1096 1097 ptrdiff_t sent = 0; 1098 if (pdata.length) 1099 { 1100 sent = doSend(pdata.contents); 1101 debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 1102 } 1103 else 1104 { 1105 debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this); 1106 } 1107 1108 if (sent == Socket.ERROR) 1109 { 1110 if (wouldHaveBlocked()) 1111 return; 1112 else 1113 return onError("send() error: " ~ lastSocketError); 1114 } 1115 else 1116 if (sent < pdata.length) 1117 { 1118 if (sent > 0) 1119 { 1120 *pdata = (*pdata)[sent..pdata.length]; 1121 partiallySent = priority; 1122 } 1123 return; 1124 } 1125 else 1126 { 1127 assert(sent == pdata.length); 1128 //debug writefln("[%s] Sent data:", remoteAddressStr); 1129 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1130 pdata.clear(); 1131 queue = queue[1..$]; 1132 partiallySent = -1; 1133 if (queue.length == 0) 1134 queue = null; 1135 } 1136 } 1137 1138 // outQueue is now empty 1139 if (bufferFlushedHandler) 1140 bufferFlushedHandler(); 1141 if (state == ConnectionState.disconnecting) 1142 { 1143 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1144 close(); 1145 } 1146 } 1147 1148 public: 1149 this(Socket conn) 1150 { 1151 super(conn); 1152 } /// 1153 } 1154 1155 // *************************************************************************** 1156 1157 /// A POSIX file stream. 1158 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1159 /// Does not dup the given file descriptor, so "disconnecting" this connection 1160 /// will close it. 1161 version (Posix) 1162 class FileConnection : StreamConnection 1163 { 1164 this(int fileno) 1165 { 1166 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1167 conn.blocking = false; 1168 super(conn); 1169 } /// 1170 1171 protected: 1172 import core.sys.posix.unistd : read, write; 1173 1174 override sizediff_t doSend(in void[] buffer) 1175 { 1176 return write(socket.handle, buffer.ptr, buffer.length); 1177 } 1178 1179 override sizediff_t doReceive(void[] buffer) 1180 { 1181 return read(socket.handle, buffer.ptr, buffer.length); 1182 } 1183 } 1184 1185 /// Separates reading and writing, e.g. for stdin/stdout. 1186 class Duplex : IConnection 1187 { 1188 /// 1189 IConnection reader, writer; 1190 1191 this(IConnection reader, IConnection writer) 1192 { 1193 this.reader = reader; 1194 this.writer = writer; 1195 reader.handleConnect = &onConnect; 1196 writer.handleConnect = &onConnect; 1197 reader.handleDisconnect = &onDisconnect; 1198 writer.handleDisconnect = &onDisconnect; 1199 } /// 1200 1201 @property ConnectionState state() 1202 { 1203 if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting) 1204 return ConnectionState.disconnecting; 1205 else 1206 return reader.state < writer.state ? reader.state : writer.state; 1207 } /// 1208 1209 /// Queue Data for sending. 1210 void send(Data[] data, int priority) 1211 { 1212 writer.send(data, priority); 1213 } 1214 1215 alias send = IConnection.send; /// ditto 1216 1217 /// Terminate the connection. 1218 /// Note: this isn't quite fleshed out - applications may want to 1219 /// wait and send some more data even after stdin is closed, but 1220 /// such an interface can't be fitted into an IConnection 1221 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1222 { 1223 if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting) 1224 reader.disconnect(reason, type); 1225 if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting) 1226 writer.disconnect(reason, type); 1227 debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1228 } 1229 1230 protected void onConnect() 1231 { 1232 if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected) 1233 connectHandler(); 1234 } 1235 1236 protected void onDisconnect(string reason, DisconnectType type) 1237 { 1238 debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1239 if (disconnectHandler) 1240 { 1241 disconnectHandler(reason, type); 1242 disconnectHandler = null; // don't call it twice for the other connection 1243 } 1244 // It is our responsibility to disconnect the other connection 1245 // Use DisconnectType.requested to ensure that any written data is flushed 1246 disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested); 1247 } 1248 1249 /// Callback for when a connection has been established. 1250 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1251 private ConnectHandler connectHandler; 1252 1253 /// Callback setter for when new data is read. 1254 @property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; } 1255 1256 /// Callback setter for when a connection was closed. 1257 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1258 private DisconnectHandler disconnectHandler; 1259 1260 /// Callback setter for when all queued data has been written. 1261 @property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; } 1262 } 1263 1264 unittest { if (false) new Duplex(null, null); } 1265 1266 // *************************************************************************** 1267 1268 /// An asynchronous socket-based connection. 1269 class SocketConnection : StreamConnection 1270 { 1271 protected: 1272 AddressInfo[] addressQueue; 1273 1274 this(Socket conn) 1275 { 1276 super(conn); 1277 } 1278 1279 override sizediff_t doSend(in void[] buffer) 1280 { 1281 return conn.send(buffer); 1282 } 1283 1284 override sizediff_t doReceive(void[] buffer) 1285 { 1286 return conn.receive(buffer); 1287 } 1288 1289 final void tryNextAddress() 1290 { 1291 assert(state == ConnectionState.connecting); 1292 auto addressInfo = addressQueue[0]; 1293 addressQueue = addressQueue[1..$]; 1294 1295 try 1296 { 1297 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1298 conn.blocking = false; 1299 1300 socketManager.register(this); 1301 updateFlags(); 1302 debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString()); 1303 conn.connect(addressInfo.address); 1304 } 1305 catch (SocketException e) 1306 return onError("Connect error: " ~ e.msg); 1307 } 1308 1309 /// Called when an error occurs on the socket. 1310 override void onError(string reason) 1311 { 1312 if (state == ConnectionState.connecting && addressQueue.length) 1313 { 1314 socketManager.unregister(this); 1315 conn.close(); 1316 conn = null; 1317 1318 return tryNextAddress(); 1319 } 1320 1321 super.onError(reason); 1322 } 1323 1324 public: 1325 /// Default constructor 1326 this() 1327 { 1328 debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this); 1329 } 1330 1331 /// Start establishing a connection. 1332 final void connect(AddressInfo[] addresses) 1333 { 1334 assert(addresses.length, "No addresses specified"); 1335 1336 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1337 assert(!conn); 1338 1339 addressQueue = addresses; 1340 state = ConnectionState.connecting; 1341 tryNextAddress(); 1342 } 1343 } 1344 1345 /// An asynchronous TCP connection. 1346 class TcpConnection : SocketConnection 1347 { 1348 protected: 1349 this(Socket conn) 1350 { 1351 super(conn); 1352 } 1353 1354 public: 1355 /// Default constructor 1356 this() 1357 { 1358 debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this); 1359 } 1360 1361 /// 1362 alias connect = SocketConnection.connect; // raise overload 1363 1364 /// Start establishing a connection. 1365 final void connect(string host, ushort port) 1366 { 1367 assert(host.length, "Empty host"); 1368 assert(port, "No port specified"); 1369 1370 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1371 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1372 1373 state = ConnectionState.resolving; 1374 1375 AddressInfo[] addressInfos; 1376 try 1377 { 1378 auto addresses = getAddress(host, port); 1379 enforce(addresses.length, "No addresses found"); 1380 debug (ASOCKETS) 1381 { 1382 stderr.writefln("Resolved to %s addresses:", addresses.length); 1383 foreach (address; addresses) 1384 stderr.writefln("- %s", address.toString()); 1385 } 1386 1387 if (addresses.length > 1) 1388 { 1389 import std.random : randomShuffle; 1390 randomShuffle(addresses); 1391 } 1392 1393 foreach (address; addresses) 1394 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1395 } 1396 catch (SocketException e) 1397 return onError("Lookup error: " ~ e.msg); 1398 1399 state = ConnectionState.disconnected; 1400 connect(addressInfos); 1401 } 1402 } 1403 1404 // *************************************************************************** 1405 1406 /// An asynchronous connection server for socket-based connections. 1407 class SocketServer 1408 { 1409 protected: 1410 /// Class that actually performs listening on a certain address family 1411 final class Listener : GenericSocket 1412 { 1413 this(Socket conn) 1414 { 1415 debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this); 1416 this.conn = conn; 1417 socketManager.register(this); 1418 } 1419 1420 /// Called when a socket is readable. 1421 override void onReadable() 1422 { 1423 debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this); 1424 Socket acceptSocket = conn.accept(); 1425 acceptSocket.blocking = false; 1426 if (handleAccept) 1427 { 1428 auto connection = createConnection(acceptSocket); 1429 debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr); 1430 connection.setKeepAlive(); 1431 //assert(connection.connected); 1432 //connection.connected = true; 1433 acceptHandler(connection); 1434 } 1435 else 1436 acceptSocket.close(); 1437 } 1438 1439 /// Called when a socket is writable. 1440 override void onWritable() 1441 { 1442 } 1443 1444 /// Called when an error occurs on the socket. 1445 override void onError(string reason) 1446 { 1447 close(); // call parent 1448 } 1449 1450 void closeListener() 1451 { 1452 assert(conn); 1453 socketManager.unregister(this); 1454 conn.close(); 1455 conn = null; 1456 } 1457 } 1458 1459 SocketConnection createConnection(Socket socket) 1460 { 1461 return new SocketConnection(socket); 1462 } 1463 1464 /// Whether the socket is listening. 1465 bool listening; 1466 /// Listener instances 1467 Listener[] listeners; 1468 1469 final void updateFlags() 1470 { 1471 foreach (listener; listeners) 1472 listener.notifyRead = handleAccept !is null; 1473 } 1474 1475 public: 1476 /// Start listening on this socket. 1477 final void listen(AddressInfo[] addressInfos) 1478 { 1479 foreach (ref addressInfo; addressInfos) 1480 { 1481 try 1482 { 1483 Socket conn = new Socket(addressInfo); 1484 conn.blocking = false; 1485 if (addressInfo.family == AddressFamily.INET6) 1486 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1487 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1488 1489 conn.bind(addressInfo.address); 1490 conn.listen(8); 1491 1492 listeners ~= new Listener(conn); 1493 } 1494 catch (SocketException e) 1495 { 1496 debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1497 debug(ASOCKETS) stderr.writeln(e.msg); 1498 } 1499 } 1500 1501 if (listeners.length==0) 1502 throw new Exception("Unable to bind service"); 1503 1504 listening = true; 1505 1506 updateFlags(); 1507 } 1508 1509 this() 1510 { 1511 } /// 1512 1513 /// Creates a Server with the given sockets. 1514 /// The sockets must have already had `bind` and `listen` called on them. 1515 this(Socket[] sockets...) 1516 { 1517 foreach (socket; sockets) 1518 listeners ~= new Listener(socket); 1519 } 1520 1521 /// Returns all listening addresses. 1522 final @property Address[] localAddresses() 1523 { 1524 Address[] result; 1525 foreach (listener; listeners) 1526 result ~= listener.localAddress; 1527 return result; 1528 } 1529 1530 /// Returns `true` if the server is listening for incoming connections. 1531 final @property bool isListening() 1532 { 1533 return listening; 1534 } 1535 1536 /// Stop listening on this socket. 1537 final void close() 1538 { 1539 foreach (listener;listeners) 1540 listener.closeListener(); 1541 listeners = null; 1542 listening = false; 1543 if (handleClose) 1544 handleClose(); 1545 } 1546 1547 /// Create a SocketServer using the handle passed on standard input, 1548 /// for which `listen` had already been called. Used by 1549 /// e.g. FastCGI and systemd sockets with "Listen = yes". 1550 static SocketServer fromStdin() 1551 { 1552 socket_t socket; 1553 version (Windows) 1554 { 1555 import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE; 1556 socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE); 1557 } 1558 else 1559 socket = cast(socket_t)0; 1560 1561 auto s = new Socket(socket, AddressFamily.UNSPEC); 1562 s.blocking = false; 1563 return new SocketServer(s); 1564 } 1565 1566 /// Callback for when the socket was closed. 1567 void delegate() handleClose; 1568 1569 private void delegate(SocketConnection incoming) acceptHandler; 1570 /// Callback for an incoming connection. 1571 /// Connections will not be accepted unless this handler is set. 1572 @property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; } 1573 /// ditto 1574 @property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); } 1575 } 1576 1577 /// An asynchronous TCP connection server. 1578 class TcpServer : SocketServer 1579 { 1580 protected: 1581 override SocketConnection createConnection(Socket socket) 1582 { 1583 return new TcpConnection(socket); 1584 } 1585 1586 public: 1587 this() 1588 { 1589 } /// 1590 1591 this(Socket[] sockets...) 1592 { 1593 super(sockets); 1594 } /// Construct from the given sockets. 1595 1596 /// 1597 alias listen = SocketServer.listen; // raise overload 1598 1599 /// Start listening on this socket. 1600 final ushort listen(ushort port, string addr = null) 1601 { 1602 debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port); 1603 //assert(!listening, "Attempting to listen on a listening socket"); 1604 1605 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1606 1607 debug (ASOCKETS) 1608 { 1609 stderr.writefln("Resolved to %s addresses:", addressInfos.length); 1610 foreach (ref addressInfo; addressInfos) 1611 stderr.writefln("- %s", addressInfo); 1612 } 1613 1614 // listen on random ports only on IPv4 for now 1615 if (port == 0) 1616 { 1617 foreach_reverse (i, ref addressInfo; addressInfos) 1618 if (addressInfo.family != AddressFamily.INET) 1619 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1620 } 1621 1622 listen(addressInfos); 1623 1624 foreach (listener; listeners) 1625 { 1626 auto address = listener.conn.localAddress(); 1627 if (address.addressFamily == AddressFamily.INET) 1628 port = to!ushort(address.toPortString()); 1629 } 1630 1631 return port; 1632 } 1633 1634 deprecated("Use SocketServer.fromStdin") 1635 static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; } 1636 1637 /// Delegate to be called when a connection is accepted. 1638 @property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); } 1639 } 1640 1641 // *************************************************************************** 1642 1643 /// An asynchronous UDP stream. 1644 /// UDP does not have connections, so this class encapsulates a socket 1645 /// with a fixed destination (sendto) address, and optionally bound to 1646 /// a local address. 1647 /// Currently received packets' address is not exposed. 1648 class UdpConnection : Connection 1649 { 1650 protected: 1651 this(Socket conn) 1652 { 1653 super(conn); 1654 } 1655 1656 /// Called when a socket is writable. 1657 override void onWritable() 1658 { 1659 //scope(success) updateFlags(); 1660 onWritableImpl(); 1661 updateFlags(); 1662 } 1663 1664 // Work around scope(success) breaking debugger stack traces 1665 final private void onWritableImpl() 1666 { 1667 foreach (priority, ref queue; outQueue) 1668 while (queue.length) 1669 { 1670 auto pdata = queue.ptr; // pointer to first data 1671 1672 auto sent = conn.sendTo(pdata.contents, remoteAddress); 1673 1674 if (sent == Socket.ERROR) 1675 { 1676 if (wouldHaveBlocked()) 1677 return; 1678 else 1679 return onError("send() error: " ~ lastSocketError); 1680 } 1681 else 1682 if (sent < pdata.length) 1683 { 1684 return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length)); 1685 } 1686 else 1687 { 1688 assert(sent == pdata.length); 1689 //debug writefln("[%s] Sent data:", remoteAddressStr); 1690 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1691 pdata.clear(); 1692 queue = queue[1..$]; 1693 if (queue.length == 0) 1694 queue = null; 1695 } 1696 } 1697 1698 // outQueue is now empty 1699 if (bufferFlushedHandler) 1700 bufferFlushedHandler(); 1701 if (state == ConnectionState.disconnecting) 1702 { 1703 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1704 close(); 1705 } 1706 } 1707 1708 override sizediff_t doSend(in void[] buffer) 1709 { 1710 assert(false); // never called (called only from overridden methods) 1711 } 1712 1713 override sizediff_t doReceive(void[] buffer) 1714 { 1715 return conn.receive(buffer); 1716 } 1717 1718 public: 1719 /// Default constructor 1720 this() 1721 { 1722 debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this); 1723 } 1724 1725 /// Initialize with the given `AddressFamily`, without binding to an address. 1726 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP) 1727 { 1728 initializeImpl(family, type, protocol); 1729 if (connectHandler) 1730 connectHandler(); 1731 } 1732 1733 final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1734 { 1735 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1736 assert(!conn); 1737 1738 conn = new Socket(family, type, protocol); 1739 conn.blocking = false; 1740 socketManager.register(this); 1741 state = ConnectionState.connected; 1742 updateFlags(); 1743 } 1744 1745 /// Bind to a local address in order to receive packets sent there. 1746 final ushort bind(string host, ushort port) 1747 { 1748 assert(host.length, "Empty host"); 1749 1750 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1751 1752 state = ConnectionState.resolving; 1753 1754 AddressInfo addressInfo; 1755 try 1756 { 1757 auto addresses = getAddress(host, port); 1758 enforce(addresses.length, "No addresses found"); 1759 debug (ASOCKETS) 1760 { 1761 stderr.writefln("Resolved to %s addresses:", addresses.length); 1762 foreach (address; addresses) 1763 stderr.writefln("- %s", address.toString()); 1764 } 1765 1766 Address address; 1767 if (addresses.length > 1) 1768 { 1769 import std.random : uniform; 1770 address = addresses[uniform(0, $)]; 1771 } 1772 else 1773 address = addresses[0]; 1774 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1775 } 1776 catch (SocketException e) 1777 { 1778 onError("Lookup error: " ~ e.msg); 1779 return 0; 1780 } 1781 1782 state = ConnectionState.disconnected; 1783 return bind(addressInfo); 1784 } 1785 1786 /// ditto 1787 final ushort bind(AddressInfo addressInfo) 1788 { 1789 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1790 conn.bind(addressInfo.address); 1791 1792 auto address = conn.localAddress(); 1793 auto port = to!ushort(address.toPortString()); 1794 1795 if (connectHandler) 1796 connectHandler(); 1797 1798 return port; 1799 } 1800 1801 // public: 1802 /// Where to send packets to. 1803 Address remoteAddress; 1804 } 1805 1806 /// 1807 unittest 1808 { 1809 auto server = new UdpConnection(); 1810 server.bind("localhost", 0); 1811 1812 auto client = new UdpConnection(); 1813 client.initialize(server.localAddress.addressFamily); 1814 1815 string[] packets = ["Hello", "there"]; 1816 client.remoteAddress = server.localAddress; 1817 client.send({ 1818 Data[] data; 1819 foreach (packet; packets) 1820 data ~= Data(packet); 1821 return data; 1822 }()); 1823 1824 server.handleReadData = (Data data) 1825 { 1826 assert(data.contents == packets[0]); 1827 packets = packets[1..$]; 1828 if (!packets.length) 1829 { 1830 server.close(); 1831 client.close(); 1832 } 1833 }; 1834 socketManager.loop(); 1835 assert(!packets.length); 1836 } 1837 1838 // *************************************************************************** 1839 1840 /// Base class for a connection adapter. 1841 /// By itself, does nothing. 1842 class ConnectionAdapter : IConnection 1843 { 1844 /// The next connection in the chain (towards the raw transport). 1845 IConnection next; 1846 1847 this(IConnection next) 1848 { 1849 this.next = next; 1850 next.handleConnect = &onConnect; 1851 next.handleDisconnect = &onDisconnect; 1852 next.handleBufferFlushed = &onBufferFlushed; 1853 } /// 1854 1855 @property ConnectionState state() { return next.state; } /// 1856 1857 /// Queue Data for sending. 1858 void send(Data[] data, int priority) 1859 { 1860 next.send(data, priority); 1861 } 1862 1863 alias send = IConnection.send; /// ditto 1864 1865 /// Terminate the connection. 1866 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1867 { 1868 next.disconnect(reason, type); 1869 } 1870 1871 protected void onConnect() 1872 { 1873 if (connectHandler) 1874 connectHandler(); 1875 } 1876 1877 protected void onReadData(Data data) 1878 { 1879 // onReadData should be fired only if readDataHandler is set 1880 readDataHandler(data); 1881 } 1882 1883 protected void onDisconnect(string reason, DisconnectType type) 1884 { 1885 if (disconnectHandler) 1886 disconnectHandler(reason, type); 1887 } 1888 1889 protected void onBufferFlushed() 1890 { 1891 if (bufferFlushedHandler) 1892 bufferFlushedHandler(); 1893 } 1894 1895 /// Callback for when a connection has been established. 1896 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1897 private ConnectHandler connectHandler; 1898 1899 /// Callback setter for when new data is read. 1900 @property void handleReadData(ReadDataHandler value) 1901 { 1902 readDataHandler = value; 1903 next.handleReadData = value ? &onReadData : null ; 1904 } 1905 private ReadDataHandler readDataHandler; 1906 1907 /// Callback setter for when a connection was closed. 1908 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1909 private DisconnectHandler disconnectHandler; 1910 1911 /// Callback setter for when all queued data has been written. 1912 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 1913 private BufferFlushedHandler bufferFlushedHandler; 1914 } 1915 1916 // *************************************************************************** 1917 1918 /// Adapter for connections with a line-based protocol. 1919 /// Splits data stream into delimiter-separated lines. 1920 class LineBufferedAdapter : ConnectionAdapter 1921 { 1922 /// The protocol's line delimiter. 1923 string delimiter = "\r\n"; 1924 1925 /// Maximum line length (0 means unlimited). 1926 size_t maxLength = 0; 1927 1928 this(IConnection next) 1929 { 1930 super(next); 1931 } /// 1932 1933 /// Append a line to the send buffer. 1934 void send(string line) 1935 { 1936 //super.send(Data(line ~ delimiter)); 1937 // https://issues.dlang.org/show_bug.cgi?id=13985 1938 ConnectionAdapter ca = this; 1939 ca.send(Data(line ~ delimiter)); 1940 } 1941 1942 protected: 1943 /// The receive buffer. 1944 Data inBuffer; 1945 1946 /// Called when data has been received. 1947 final override void onReadData(Data data) 1948 { 1949 import std..string : indexOf; 1950 auto oldBufferLength = inBuffer.length; 1951 if (oldBufferLength) 1952 inBuffer ~= data; 1953 else 1954 inBuffer = data; 1955 1956 if (delimiter.length == 1) 1957 { 1958 import core.stdc..string : memchr; 1959 1960 char c = delimiter[0]; 1961 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1962 while (p) 1963 { 1964 sizediff_t index = p - inBuffer.ptr; 1965 if (!processLine(index)) 1966 break; 1967 1968 p = memchr(inBuffer.ptr, c, inBuffer.length); 1969 } 1970 } 1971 else 1972 { 1973 sizediff_t index; 1974 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1975 while ((index = indexOf(cast(string)inBuffer.contents, delimiter)) >= 0 1976 && processLine(index)) 1977 {} 1978 } 1979 1980 if (maxLength && inBuffer.length > maxLength) 1981 disconnect("Line too long", DisconnectType.error); 1982 } 1983 1984 final bool processLine(size_t index) 1985 { 1986 if (maxLength && index > maxLength) 1987 { 1988 disconnect("Line too long", DisconnectType.error); 1989 return false; 1990 } 1991 auto line = inBuffer[0..index]; 1992 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1993 super.onReadData(line); 1994 return true; 1995 } 1996 1997 override void onDisconnect(string reason, DisconnectType type) 1998 { 1999 super.onDisconnect(reason, type); 2000 inBuffer.clear(); 2001 } 2002 } 2003 2004 // *************************************************************************** 2005 2006 /// Fires an event handler or disconnects connections 2007 /// after a period of inactivity. 2008 class TimeoutAdapter : ConnectionAdapter 2009 { 2010 this(IConnection next) 2011 { 2012 debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this); 2013 super(next); 2014 } /// 2015 2016 /// Set the `Duration` indicating the period of inactivity after which to take action. 2017 final void setIdleTimeout(Duration duration) 2018 { 2019 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 2020 assert(duration > Duration.zero); 2021 2022 // Configure idleTask 2023 if (idleTask is null) 2024 { 2025 idleTask = new TimerTask(duration); 2026 idleTask.handleTask = &onTask_Idle; 2027 } 2028 else 2029 { 2030 if (idleTask.isWaiting()) 2031 idleTask.cancel(); 2032 idleTask.delay = duration; 2033 } 2034 2035 mainTimer.add(idleTask); 2036 } 2037 2038 /// Manually mark this connection as non-idle, restarting the idle timer. 2039 /// `handleNonIdle` will be called, if set. 2040 void markNonIdle() 2041 { 2042 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 2043 if (handleNonIdle) 2044 handleNonIdle(); 2045 if (idleTask && idleTask.isWaiting()) 2046 idleTask.restart(); 2047 } 2048 2049 /// Stop the idle timer. 2050 void cancelIdleTimeout() 2051 { 2052 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 2053 assert(idleTask !is null); 2054 assert(idleTask.isWaiting()); 2055 idleTask.cancel(); 2056 } 2057 2058 /// Restart the idle timer. 2059 void resumeIdleTimeout() 2060 { 2061 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 2062 assert(idleTask !is null); 2063 assert(!idleTask.isWaiting()); 2064 mainTimer.add(idleTask); 2065 } 2066 2067 /// Callback for when a connection has stopped responding. 2068 /// If unset, the connection will be disconnected. 2069 void delegate() handleIdleTimeout; 2070 2071 /// Callback for when a connection is marked as non-idle 2072 /// (when data is received). 2073 void delegate() handleNonIdle; 2074 2075 protected: 2076 override void onConnect() 2077 { 2078 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 2079 markNonIdle(); 2080 super.onConnect(); 2081 } 2082 2083 override void onReadData(Data data) 2084 { 2085 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 2086 markNonIdle(); 2087 super.onReadData(data); 2088 } 2089 2090 override void onDisconnect(string reason, DisconnectType type) 2091 { 2092 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 2093 if (idleTask && idleTask.isWaiting()) 2094 idleTask.cancel(); 2095 super.onDisconnect(reason, type); 2096 } 2097 2098 private: 2099 TimerTask idleTask; // non-null if an idle timeout has been set 2100 2101 final void onTask_Idle(Timer /*timer*/, TimerTask /*task*/) 2102 { 2103 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this); 2104 if (state == ConnectionState.disconnecting) 2105 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 2106 2107 if (state == ConnectionState.disconnected) 2108 return; 2109 2110 if (handleIdleTimeout) 2111 { 2112 resumeIdleTimeout(); // reschedule (by default) 2113 handleIdleTimeout(); 2114 } 2115 else 2116 disconnect("Time-out", DisconnectType.error); 2117 } 2118 } 2119 2120 // *************************************************************************** 2121 2122 unittest 2123 { 2124 void testTimer() 2125 { 2126 bool fired; 2127 setTimeout({fired = true;}, 10.msecs); 2128 socketManager.loop(); 2129 assert(fired); 2130 } 2131 2132 testTimer(); 2133 }