1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <vladimir@thecybershadow.net> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import core.stdc.stdint : int32_t; 24 25 import std.exception; 26 import std.socket; 27 import std.string : format; 28 public import std.socket : Address, AddressInfo, Socket; 29 30 version (Windows) 31 private import c_socks = core.sys.windows.winsock2; 32 else version (Posix) 33 private import c_socks = core.sys.posix.sys.socket; 34 35 debug(ASOCKETS) import std.stdio : stderr; 36 debug(PRINTDATA) import std.stdio : stderr; 37 debug(PRINTDATA) import ae.utils.text : hexDump; 38 private import std.conv : to; 39 40 41 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 42 static import ae.utils.array; 43 44 version(LIBEV) 45 { 46 import deimos.ev; 47 pragma(lib, "ev"); 48 } 49 50 version(Windows) 51 { 52 import core.sys.windows.windows : Sleep; 53 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 54 } 55 else 56 enum USE_SLEEP = false; 57 58 /// Flags that determine socket wake-up events. 59 int eventCounter; 60 61 version(LIBEV) 62 { 63 // Watchers are a GenericSocket field (as declared in SocketMixin). 64 // Use one watcher per read and write event. 65 // Start/stop those as higher-level code declares interest in those events. 66 // Use the "data" ev_io field to store the parent GenericSocket address. 67 // Also use the "data" field as a flag to indicate whether the watcher is active 68 // (data is null when the watcher is stopped). 69 70 struct SocketManager 71 { 72 private: 73 size_t count; 74 75 extern(C) 76 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 77 { 78 eventCounter++; 79 auto socket = cast(GenericSocket)w.data; 80 assert(socket, "libev event fired on stopped watcher"); 81 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 82 83 if (revents & EV_READ) 84 socket.onReadable(); 85 else 86 if (revents & EV_WRITE) 87 socket.onWritable(); 88 else 89 assert(false, "Unknown event fired from libev"); 90 91 // TODO? Need to get proper SocketManager instance to call updateTimer on 92 socketManager.updateTimer(false); 93 } 94 95 ev_timer evTimer; 96 MonoTime lastNextEvent = MonoTime.max; 97 98 extern(C) 99 static void timerCallback(ev_loop_t* l, ev_timer* w, int revents) 100 { 101 eventCounter++; 102 debug (ASOCKETS) writefln("Timer callback called."); 103 mainTimer.prod(); 104 105 socketManager.updateTimer(true); 106 debug (ASOCKETS) writefln("Timer callback exiting."); 107 } 108 109 void updateTimer(bool force) 110 { 111 auto nextEvent = mainTimer.getNextEvent(); 112 if (force || lastNextEvent != nextEvent) 113 { 114 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 115 if (nextEvent == MonoTime.max) // Stopping 116 { 117 if (lastNextEvent != MonoTime.max) 118 ev_timer_stop(ev_default_loop(0), &evTimer); 119 } 120 else 121 { 122 auto remaining = mainTimer.getRemainingTime(); 123 while (remaining <= Duration.zero) 124 { 125 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 126 mainTimer.prod(); 127 remaining = mainTimer.getRemainingTime(); 128 } 129 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 130 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 131 if (lastNextEvent == MonoTime.max) // Starting 132 { 133 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 134 ev_timer_start(ev_default_loop(0), &evTimer); 135 } 136 else // Adjusting 137 { 138 evTimer.repeat = tstamp; 139 ev_timer_again(ev_default_loop(0), &evTimer); 140 } 141 } 142 lastNextEvent = nextEvent; 143 } 144 } 145 146 public: 147 /// Register a socket with the manager. 148 void register(GenericSocket socket) 149 { 150 debug (ASOCKETS) writefln("Registering %s", socket); 151 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 152 auto fd = socket.conn.handle; 153 assert(fd, "Must have fd before socket registration"); 154 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 155 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 156 count++; 157 } 158 159 /// Unregister a socket with the manager. 160 void unregister(GenericSocket socket) 161 { 162 debug (ASOCKETS) writefln("Unregistering %s", socket); 163 socket.notifyRead = false; 164 socket.notifyWrite = false; 165 count--; 166 } 167 168 size_t size() 169 { 170 return count; 171 } 172 173 /// Loop continuously until no sockets are left. 174 void loop() 175 { 176 auto evLoop = ev_default_loop(0); 177 enforce(evLoop, "libev initialization failure"); 178 179 updateTimer(true); 180 debug (ASOCKETS) writeln("ev_run"); 181 ev_run(ev_default_loop(0), 0); 182 } 183 } 184 185 private mixin template SocketMixin() 186 { 187 private ev_io evRead, evWrite; 188 189 private void setWatcherState(ref ev_io ev, bool newValue, int event) 190 { 191 if (!conn) 192 { 193 // Can happen when setting delegates before connecting. 194 return; 195 } 196 197 if (newValue && !ev.data) 198 { 199 // Start 200 ev.data = cast(void*)this; 201 ev_io_start(ev_default_loop(0), &ev); 202 } 203 else 204 if (!newValue && ev.data) 205 { 206 // Stop 207 assert(ev.data is cast(void*)this); 208 ev.data = null; 209 ev_io_stop(ev_default_loop(0), &ev); 210 } 211 } 212 213 /// Interested in read notifications (onReadable)? 214 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 215 /// Interested in write notifications (onWritable)? 216 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 217 218 debug ~this() 219 { 220 // The LIBEV SocketManager holds no references to registered sockets. 221 // TODO: Add a doubly-linked list? 222 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 223 } 224 } 225 } 226 else // Use select 227 { 228 struct SocketManager 229 { 230 private: 231 enum FD_SETSIZE = 1024; 232 233 /// List of all sockets to poll. 234 GenericSocket[] sockets; 235 236 /// Debug AA to check for dangling socket references. 237 debug GenericSocket[socket_t] socketHandles; 238 239 void delegate()[] idleHandlers; 240 241 public: 242 /// Register a socket with the manager. 243 void register(GenericSocket conn) 244 { 245 debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1); 246 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 247 sockets ~= conn; 248 249 debug 250 { 251 auto handle = conn.socket.handle; 252 assert(handle != socket_t.init, "Can't register a closed socket"); 253 assert(handle !in socketHandles, "This socket handle is already registered"); 254 socketHandles[handle] = conn; 255 } 256 } 257 258 /// Unregister a socket with the manager. 259 void unregister(GenericSocket conn) 260 { 261 debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 262 263 debug 264 { 265 auto handle = conn.socket.handle; 266 assert(handle != socket_t.init, "Can't unregister a closed socket"); 267 auto pconn = handle in socketHandles; 268 assert(pconn, "This socket handle is not registered"); 269 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 270 socketHandles.remove(handle); 271 } 272 273 foreach (size_t i, GenericSocket j; sockets) 274 if (j is conn) 275 { 276 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 277 return; 278 } 279 assert(false, "Socket not registered"); 280 } 281 282 size_t size() 283 { 284 return sockets.length; 285 } 286 287 /// Loop continuously until no sockets are left. 288 void loop() 289 { 290 debug (ASOCKETS) stderr.writeln("Starting event loop."); 291 292 SocketSet readset, writeset; 293 size_t sockcount; 294 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 295 readset = new SocketSet(setSize); 296 writeset = new SocketSet(setSize); 297 while (true) 298 { 299 uint minSize = 0; 300 version(Windows) 301 minSize = cast(uint)sockets.length; 302 else 303 { 304 foreach (s; sockets) 305 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 306 minSize = s.socket.handle; 307 } 308 minSize++; 309 310 if (setSize < minSize) 311 { 312 debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 313 setSize = minSize * 2; 314 readset = new SocketSet(setSize); 315 writeset = new SocketSet(setSize); 316 } 317 else 318 { 319 readset.reset(); 320 writeset.reset(); 321 } 322 323 sockcount = 0; 324 bool haveActive; 325 debug (ASOCKETS) stderr.writeln("Populating sets"); 326 foreach (GenericSocket conn; sockets) 327 { 328 if (!conn.socket) 329 continue; 330 sockcount++; 331 332 debug (ASOCKETS) stderr.writef("\t%s:", conn); 333 if (conn.notifyRead) 334 { 335 readset.add(conn.socket); 336 if (!conn.daemonRead) 337 haveActive = true; 338 debug (ASOCKETS) stderr.write(" READ", conn.daemonRead ? "[daemon]" : ""); 339 } 340 if (conn.notifyWrite) 341 { 342 writeset.add(conn.socket); 343 if (!conn.daemonWrite) 344 haveActive = true; 345 debug (ASOCKETS) stderr.write(" WRITE", conn.daemonWrite ? "[daemon]" : ""); 346 } 347 debug (ASOCKETS) stderr.writeln(); 348 } 349 debug (ASOCKETS) 350 { 351 stderr.writefln("Sets populated as follows:"); 352 printSets(readset, writeset); 353 } 354 355 debug (ASOCKETS) stderr.writefln("Waiting (%sactive with %d sockets, %s timer events, %d idle handlers)...", 356 haveActive ? "" : "not ", 357 sockcount, 358 mainTimer.isWaiting() ? "with" : "no", 359 idleHandlers.length, 360 ); 361 if (!haveActive && !mainTimer.isWaiting()) 362 { 363 debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop."); 364 break; 365 } 366 367 debug (ASOCKETS) stderr.flush(); 368 369 int events; 370 if (idleHandlers.length) 371 { 372 if (sockcount==0) 373 events = 0; 374 else 375 events = Socket.select(readset, writeset, null, 0.seconds); 376 } 377 else 378 if (USE_SLEEP && sockcount==0) 379 { 380 version(Windows) 381 { 382 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 383 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 384 if (duration <= 0) 385 duration = 1; // Avoid busywait 386 else 387 if (duration > int.max) 388 duration = int.max; 389 Sleep(cast(int)duration); 390 events = 0; 391 } 392 else 393 assert(0); 394 } 395 else 396 if (mainTimer.isWaiting()) 397 events = Socket.select(readset, writeset, null, mainTimer.getRemainingTime()); 398 else 399 events = Socket.select(readset, writeset, null); 400 401 debug (ASOCKETS) stderr.writefln("%d events fired.", events); 402 403 if (events > 0) 404 { 405 // Handle just one event at a time, as the first 406 // handler might invalidate select()'s results. 407 handleEvent(readset, writeset); 408 } 409 else 410 if (idleHandlers.length) 411 { 412 import ae.utils.array; 413 auto handler = idleHandlers.shift(); 414 415 // Rotate the idle handler queue before running it, 416 // in case the handler unregisters itself. 417 idleHandlers ~= handler; 418 419 handler(); 420 } 421 422 // Timers may invalidate our select results, so fire them after processing the latter 423 mainTimer.prod(); 424 425 eventCounter++; 426 } 427 } 428 429 debug (ASOCKETS) 430 void printSets(SocketSet readset, SocketSet writeset) 431 { 432 foreach (GenericSocket conn; sockets) 433 { 434 if (!conn.socket) 435 stderr.writefln("\t\t%s is unset", conn); 436 else 437 { 438 if (readset.isSet(conn.socket)) 439 stderr.writefln("\t\t%s is readable", conn); 440 if (writeset.isSet(conn.socket)) 441 stderr.writefln("\t\t%s is writable", conn); 442 } 443 } 444 } 445 446 void handleEvent(SocketSet readset, SocketSet writeset) 447 { 448 debug (ASOCKETS) 449 { 450 stderr.writefln("\tSelect results:"); 451 printSets(readset, writeset); 452 } 453 454 foreach (GenericSocket conn; sockets) 455 { 456 if (!conn.socket) 457 continue; 458 459 if (readset.isSet(conn.socket)) 460 { 461 debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn); 462 return conn.onReadable(); 463 } 464 else 465 if (writeset.isSet(conn.socket)) 466 { 467 debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn); 468 return conn.onWritable(); 469 } 470 } 471 472 assert(false, "select() reported events available, but no registered sockets are set"); 473 } 474 } 475 476 // Use UFCS to allow removeIdleHandler to have a predicate with context 477 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 478 { 479 foreach (i, idleHandler; socketManager.idleHandlers) 480 assert(handler !is idleHandler); 481 482 socketManager.idleHandlers ~= handler; 483 } 484 485 static bool isFun(T)(T a, T b) { return a is b; } 486 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 487 { 488 foreach (i, idleHandler; socketManager.idleHandlers) 489 if (pred(idleHandler, args)) 490 { 491 import std.algorithm; 492 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 493 return; 494 } 495 assert(false, "No such idle handler"); 496 } 497 498 private mixin template SocketMixin() 499 { 500 /// Interested in read notifications (onReadable)? 501 bool notifyRead; 502 /// Interested in write notifications (onWritable)? 503 bool notifyWrite; 504 } 505 } 506 507 /// The default socket manager. 508 SocketManager socketManager; 509 510 // *************************************************************************** 511 512 /// General methods for an asynchronous socket. 513 abstract class GenericSocket 514 { 515 /// Declares notifyRead and notifyWrite. 516 mixin SocketMixin; 517 518 protected: 519 /// The socket this class wraps. 520 Socket conn; 521 522 protected: 523 /// Retrieve the socket class this class wraps. 524 @property final Socket socket() 525 { 526 return conn; 527 } 528 529 void onReadable() 530 { 531 } 532 533 void onWritable() 534 { 535 } 536 537 void onError(string reason) 538 { 539 } 540 541 public: 542 /// allow getting the address of connections that are already disconnected 543 private Address[2] cachedAddress; 544 545 /*private*/ final @property Address address(bool local)() 546 { 547 if (cachedAddress[local] !is null) 548 return cachedAddress[local]; 549 else 550 if (conn is null) 551 return null; 552 else 553 { 554 Address a; 555 if (conn.addressFamily == AddressFamily.UNSPEC) 556 { 557 // Socket will attempt to construct an UnknownAddress, 558 // which will almost certainly not match the real address length. 559 static if (local) 560 alias getname = c_socks.getsockname; 561 else 562 alias getname = c_socks.getpeername; 563 564 c_socks.socklen_t nameLen = 0; 565 if (getname(conn.handle, null, &nameLen) < 0) 566 throw new SocketOSException("Unable to obtain socket address"); 567 568 auto buf = new ubyte[nameLen]; 569 auto sa = cast(c_socks.sockaddr*)buf.ptr; 570 if (getname(conn.handle, sa, &nameLen) < 0) 571 throw new SocketOSException("Unable to obtain socket address"); 572 a = new UnknownAddressReference(sa, nameLen); 573 } 574 else 575 a = local ? conn.localAddress() : conn.remoteAddress(); 576 return cachedAddress[local] = a; 577 } 578 } 579 580 alias localAddress = address!true; 581 alias remoteAddress = address!false; 582 583 /*private*/ final @property string addressStr(bool local)() nothrow 584 { 585 try 586 { 587 auto a = address!local; 588 if (a is null) 589 return "[null address]"; 590 string host = a.toAddrString(); 591 import std.string : indexOf; 592 if (host.indexOf(':') >= 0) 593 host = "[" ~ host ~ "]"; 594 try 595 { 596 string port = a.toPortString(); 597 return host ~ ":" ~ port; 598 } 599 catch (Exception e) 600 return host; 601 } 602 catch (Exception e) 603 return "[error: " ~ e.msg ~ "]"; 604 } 605 606 alias localAddressStr = addressStr!true; 607 alias remoteAddressStr = addressStr!false; 608 609 /// Don't block the process from exiting, even if the socket is ready to receive data. 610 /// TODO: Not implemented with libev 611 bool daemonRead; 612 613 /// Don't block the process from exiting, even if the socket is ready to send data. 614 /// TODO: Not implemented with libev 615 bool daemonWrite; 616 617 deprecated alias daemon = daemonRead; 618 619 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 620 { 621 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 622 if (enabled) 623 { 624 try 625 conn.setKeepAlive(time, interval); 626 catch (SocketException) 627 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 628 } 629 else 630 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 631 } 632 633 override string toString() const 634 { 635 import std.string; 636 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 637 } 638 } 639 640 // *************************************************************************** 641 642 enum DisconnectType 643 { 644 requested, // initiated by the application 645 graceful, // peer gracefully closed the connection 646 error // abnormal network condition 647 } 648 649 enum ConnectionState 650 { 651 /// The initial state, or the state after a disconnect was fully processed. 652 disconnected, 653 654 /// Name resolution. Currently done synchronously. 655 resolving, 656 657 /// A connection attempt is in progress. 658 connecting, 659 660 /// A connection is established. 661 connected, 662 663 /// Disconnecting in progress. No data can be sent or received at this point. 664 /// We are waiting for queued data to be actually sent before disconnecting. 665 disconnecting, 666 } 667 668 /// Returns true if this is a connection state for which disconnecting is valid. 669 /// Generally, applications should be aware of the life cycle of their sockets, 670 /// so checking the state of a connection is unnecessary (and a code smell). 671 /// However, unconditionally disconnecting some connected sockets can be useful 672 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle), 673 /// such as in response to a signal. 674 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; } 675 676 /// Common interface for connections and adapters. 677 interface IConnection 678 { 679 enum MAX_PRIORITY = 4; 680 enum DEFAULT_PRIORITY = 2; 681 682 static const defaultDisconnectReason = "Software closed the connection"; 683 684 /// Get connection state. 685 @property ConnectionState state(); 686 687 /// Has a connection been established? 688 deprecated final @property bool connected() { return state == ConnectionState.connected; } 689 690 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 691 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 692 693 /// Queue Data for sending. 694 void send(Data[] data, int priority = DEFAULT_PRIORITY); 695 696 /// ditto 697 final void send(Data datum, int priority = DEFAULT_PRIORITY) 698 { 699 Data[1] data; 700 data[0] = datum; 701 this.send(data, priority); 702 data[] = Data.init; 703 } 704 705 /// Terminate the connection. 706 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 707 708 /// Callback setter for when a connection has been established 709 /// (if applicable). 710 alias void delegate() ConnectHandler; 711 @property void handleConnect(ConnectHandler value); /// ditto 712 713 /// Callback setter for when new data is read. 714 alias void delegate(Data data) ReadDataHandler; 715 @property void handleReadData(ReadDataHandler value); /// ditto 716 717 /// Callback setter for when a connection was closed. 718 alias void delegate(string reason, DisconnectType type) DisconnectHandler; 719 @property void handleDisconnect(DisconnectHandler value); /// ditto 720 721 /// Callback setter for when all queued data has been sent. 722 alias void delegate() BufferFlushedHandler; 723 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 724 } 725 726 // *************************************************************************** 727 728 class Connection : GenericSocket, IConnection 729 { 730 private: 731 /// Blocks of data larger than this value are passed as unmanaged memory 732 /// (in Data objects). Blocks smaller than this value will be reallocated 733 /// on the managed heap. The disadvantage of placing large objects on the 734 /// managed heap is false pointers; the disadvantage of using Data for 735 /// small objects is wasted slack space due to the page size alignment 736 /// requirement. 737 enum UNMANAGED_THRESHOLD = 256; 738 739 ConnectionState _state; 740 final @property ConnectionState state(ConnectionState value) { return _state = value; } 741 742 public: 743 /// Get connection state. 744 override @property ConnectionState state() { return _state; } 745 746 protected: 747 abstract sizediff_t doSend(in void[] buffer); 748 abstract sizediff_t doReceive(void[] buffer); 749 750 /// The send buffers. 751 Data[][MAX_PRIORITY+1] outQueue; 752 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 753 int partiallySent = -1; 754 755 /// Constructor used by a ServerSocket for new connections 756 this(Socket conn) 757 { 758 this(); 759 this.conn = conn; 760 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 761 if (conn) 762 socketManager.register(this); 763 updateFlags(); 764 } 765 766 final void updateFlags() 767 { 768 if (state == ConnectionState.connecting) 769 notifyWrite = true; 770 else 771 notifyWrite = writePending; 772 773 notifyRead = state == ConnectionState.connected && readDataHandler; 774 debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite); 775 } 776 777 /// Called when a socket is readable. 778 override void onReadable() 779 { 780 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 781 static ubyte[0x10000] inBuffer = void; 782 auto received = doReceive(inBuffer); 783 784 if (received == 0) 785 return disconnect("Connection closed", DisconnectType.graceful); 786 787 if (received == Socket.ERROR) 788 { 789 // if (wouldHaveBlocked) 790 // { 791 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 792 // return; 793 // } 794 // else 795 onError("recv() error: " ~ lastSocketError); 796 } 797 else 798 { 799 debug (PRINTDATA) 800 { 801 stderr.writefln("== %s <- %s ==", localAddressStr, remoteAddressStr); 802 stderr.write(hexDump(inBuffer[0 .. received])); 803 stderr.flush(); 804 } 805 806 if (state == ConnectionState.disconnecting) 807 { 808 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this); 809 } 810 else 811 if (!readDataHandler) 812 { 813 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this); 814 } 815 else 816 { 817 // Currently, unlike the D1 version of this module, 818 // we will always reallocate read network data. 819 // This disfavours code which doesn't need to store 820 // read data after processing it, but otherwise 821 // makes things simpler and safer all around. 822 823 if (received < UNMANAGED_THRESHOLD) 824 { 825 // Copy to the managed heap 826 readDataHandler(Data(inBuffer[0 .. received].dup)); 827 } 828 else 829 { 830 // Copy to unmanaged memory 831 readDataHandler(Data(inBuffer[0 .. received], true)); 832 } 833 } 834 } 835 } 836 837 /// Called when an error occurs on the socket. 838 override void onError(string reason) 839 { 840 if (state == ConnectionState.disconnecting) 841 { 842 debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 843 return close(); 844 } 845 846 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 847 disconnect("Socket error: " ~ reason, DisconnectType.error); 848 } 849 850 this() 851 { 852 } 853 854 public: 855 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 856 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 857 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 858 { 859 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 860 assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state)); 861 862 if (writePending) 863 { 864 if (type==DisconnectType.requested) 865 { 866 assert(conn, "Attempting to disconnect on an uninitialized socket"); 867 // queue disconnect after all data is sent 868 debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 869 state = ConnectionState.disconnecting; 870 //setIdleTimeout(30.seconds); 871 if (disconnectHandler) 872 disconnectHandler(reason, type); 873 updateFlags(); 874 return; 875 } 876 else 877 discardQueues(); 878 } 879 880 debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 881 882 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 883 close(); 884 else 885 { 886 assert(conn is null, "Registered but %s socket".format(state)); 887 if (state == ConnectionState.resolving) 888 state = ConnectionState.disconnected; 889 } 890 891 if (disconnectHandler) 892 disconnectHandler(reason, type); 893 updateFlags(); 894 } 895 896 private final void close() 897 { 898 assert(conn, "Attempting to close an unregistered socket"); 899 socketManager.unregister(this); 900 conn.close(); 901 conn = null; 902 outQueue[] = null; 903 state = ConnectionState.disconnected; 904 } 905 906 /// Append data to the send buffer. 907 void send(Data[] data, int priority = DEFAULT_PRIORITY) 908 { 909 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 910 outQueue[priority] ~= data; 911 notifyWrite = true; // Fast updateFlags() 912 913 debug (PRINTDATA) 914 { 915 stderr.writefln("== %s -> %s ==", localAddressStr, remoteAddressStr); 916 foreach (datum; data) 917 if (datum.length) 918 stderr.write(hexDump(datum.contents)); 919 else 920 stderr.writeln("(empty Data)"); 921 stderr.flush(); 922 } 923 } 924 925 /// ditto 926 alias send = IConnection.send; 927 928 final void clearQueue(int priority) 929 { 930 if (priority == partiallySent) 931 { 932 assert(outQueue[priority].length > 0); 933 outQueue[priority] = outQueue[priority][0..1]; 934 } 935 else 936 outQueue[priority] = null; 937 updateFlags(); 938 } 939 940 /// Clears all queues, even partially sent content. 941 private final void discardQueues() 942 { 943 foreach (priority; 0..MAX_PRIORITY+1) 944 outQueue[priority] = null; 945 partiallySent = -1; 946 updateFlags(); 947 } 948 949 @property 950 final bool writePending() 951 { 952 foreach (queue; outQueue) 953 if (queue.length) 954 return true; 955 return false; 956 } 957 958 final bool queuePresent(int priority = DEFAULT_PRIORITY) 959 { 960 if (priority == partiallySent) 961 { 962 assert(outQueue[priority].length > 0); 963 return outQueue[priority].length > 1; 964 } 965 else 966 return outQueue[priority].length > 0; 967 } 968 969 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 970 { 971 return outQueue[priority].length; 972 } 973 974 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 975 { 976 size_t bytes; 977 foreach (datum; outQueue[priority]) 978 bytes += datum.length; 979 return bytes; 980 } 981 982 public: 983 private ConnectHandler connectHandler; 984 /// Callback for when a connection has been established. 985 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 986 987 private ReadDataHandler readDataHandler; 988 /// Callback for incoming data. 989 /// Data will not be received unless this handler is set. 990 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 991 992 private DisconnectHandler disconnectHandler; 993 /// Callback for when a connection was closed. 994 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 995 996 /// Callback setter for when all queued data has been sent. 997 private BufferFlushedHandler bufferFlushedHandler; 998 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 999 } 1000 1001 class StreamConnection : Connection 1002 { 1003 protected: 1004 this() 1005 { 1006 super(); 1007 } 1008 1009 /// Called when a socket is writable. 1010 override void onWritable() 1011 { 1012 //scope(success) updateFlags(); 1013 onWritableImpl(); 1014 updateFlags(); 1015 } 1016 1017 // Work around scope(success) breaking debugger stack traces 1018 final private void onWritableImpl() 1019 { 1020 debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state); 1021 if (state == ConnectionState.connecting) 1022 { 1023 int32_t error; 1024 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 1025 if (error) 1026 return disconnect(formatSocketError(error), DisconnectType.error); 1027 1028 state = ConnectionState.connected; 1029 1030 //debug writefln("[%s] Connected", remoteAddressStr); 1031 try 1032 setKeepAlive(); 1033 catch (Exception e) 1034 return disconnect(e.msg, DisconnectType.error); 1035 if (connectHandler) 1036 connectHandler(); 1037 return; 1038 } 1039 //debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1040 1041 foreach (sendPartial; [true, false]) 1042 foreach (int priority, ref queue; outQueue) 1043 while (queue.length && (!sendPartial || priority == partiallySent)) 1044 { 1045 assert(partiallySent == -1 || partiallySent == priority); 1046 1047 auto pdata = queue.ptr; // pointer to first data 1048 1049 ptrdiff_t sent = 0; 1050 if (pdata.length) 1051 { 1052 sent = doSend(pdata.contents); 1053 debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 1054 } 1055 else 1056 { 1057 debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this); 1058 } 1059 1060 if (sent == Socket.ERROR) 1061 { 1062 if (wouldHaveBlocked()) 1063 return; 1064 else 1065 return onError("send() error: " ~ lastSocketError); 1066 } 1067 else 1068 if (sent < pdata.length) 1069 { 1070 if (sent > 0) 1071 { 1072 *pdata = (*pdata)[sent..pdata.length]; 1073 partiallySent = priority; 1074 } 1075 return; 1076 } 1077 else 1078 { 1079 assert(sent == pdata.length); 1080 //debug writefln("[%s] Sent data:", remoteAddressStr); 1081 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1082 pdata.clear(); 1083 queue = queue[1..$]; 1084 partiallySent = -1; 1085 if (queue.length == 0) 1086 queue = null; 1087 } 1088 } 1089 1090 // outQueue is now empty 1091 if (bufferFlushedHandler) 1092 bufferFlushedHandler(); 1093 if (state == ConnectionState.disconnecting) 1094 { 1095 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1096 close(); 1097 } 1098 } 1099 1100 public: 1101 this(Socket conn) 1102 { 1103 super(conn); 1104 } 1105 } 1106 1107 // *************************************************************************** 1108 1109 /// A POSIX file stream. 1110 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1111 /// Does not dup the given file descriptor, so "disconnecting" this connection 1112 /// will close it. 1113 version (Posix) 1114 class FileConnection : StreamConnection 1115 { 1116 this(int fileno) 1117 { 1118 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1119 conn.blocking = false; 1120 super(conn); 1121 } 1122 1123 protected: 1124 import core.sys.posix.unistd : read, write; 1125 1126 override sizediff_t doSend(in void[] buffer) 1127 { 1128 return write(socket.handle, buffer.ptr, buffer.length); 1129 } 1130 1131 override sizediff_t doReceive(void[] buffer) 1132 { 1133 return read(socket.handle, buffer.ptr, buffer.length); 1134 } 1135 } 1136 1137 /// Separates reading and writing, e.g. for stdin/stdout. 1138 class Duplex : IConnection 1139 { 1140 IConnection reader, writer; 1141 1142 this(IConnection reader, IConnection writer) 1143 { 1144 this.reader = reader; 1145 this.writer = writer; 1146 reader.handleConnect = &onConnect; 1147 writer.handleConnect = &onConnect; 1148 reader.handleDisconnect = &onDisconnect; 1149 writer.handleDisconnect = &onDisconnect; 1150 } 1151 1152 @property ConnectionState state() 1153 { 1154 if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting) 1155 return ConnectionState.disconnecting; 1156 else 1157 return reader.state < writer.state ? reader.state : writer.state; 1158 } 1159 1160 /// Queue Data for sending. 1161 void send(Data[] data, int priority) 1162 { 1163 writer.send(data, priority); 1164 } 1165 1166 alias send = IConnection.send; /// ditto 1167 1168 /// Terminate the connection. 1169 /// Note: this isn't quite fleshed out - applications may want to 1170 /// wait and send some more data even after stdin is closed, but 1171 /// such an interface can't be fitted into an IConnection 1172 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1173 { 1174 if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting) 1175 reader.disconnect(reason, type); 1176 if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting) 1177 writer.disconnect(reason, type); 1178 debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1179 } 1180 1181 protected void onConnect() 1182 { 1183 if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected) 1184 connectHandler(); 1185 } 1186 1187 protected void onDisconnect(string reason, DisconnectType type) 1188 { 1189 debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1190 if (disconnectHandler) 1191 { 1192 disconnectHandler(reason, type); 1193 disconnectHandler = null; // don't call it twice for the other connection 1194 } 1195 // It is our responsibility to disconnect the other connection 1196 // Use DisconnectType.requested to ensure that any written data is flushed 1197 disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested); 1198 } 1199 1200 /// Callback for when a connection has been established. 1201 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1202 private ConnectHandler connectHandler; 1203 1204 /// Callback setter for when new data is read. 1205 @property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; } 1206 1207 /// Callback setter for when a connection was closed. 1208 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1209 private DisconnectHandler disconnectHandler; 1210 1211 /// Callback setter for when all queued data has been written. 1212 @property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; } 1213 } 1214 1215 unittest { if (false) new Duplex(null, null); } 1216 1217 // *************************************************************************** 1218 1219 /// An asynchronous socket-based connection. 1220 class SocketConnection : StreamConnection 1221 { 1222 protected: 1223 AddressInfo[] addressQueue; 1224 1225 this(Socket conn) 1226 { 1227 super(conn); 1228 } 1229 1230 override sizediff_t doSend(in void[] buffer) 1231 { 1232 return conn.send(buffer); 1233 } 1234 1235 override sizediff_t doReceive(void[] buffer) 1236 { 1237 return conn.receive(buffer); 1238 } 1239 1240 final void tryNextAddress() 1241 { 1242 assert(state == ConnectionState.connecting); 1243 auto addressInfo = addressQueue[0]; 1244 addressQueue = addressQueue[1..$]; 1245 1246 try 1247 { 1248 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1249 conn.blocking = false; 1250 1251 socketManager.register(this); 1252 updateFlags(); 1253 debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString()); 1254 conn.connect(addressInfo.address); 1255 } 1256 catch (SocketException e) 1257 return onError("Connect error: " ~ e.msg); 1258 } 1259 1260 /// Called when an error occurs on the socket. 1261 override void onError(string reason) 1262 { 1263 if (state == ConnectionState.connecting && addressQueue.length) 1264 { 1265 socketManager.unregister(this); 1266 conn.close(); 1267 conn = null; 1268 1269 return tryNextAddress(); 1270 } 1271 1272 super.onError(reason); 1273 } 1274 1275 public: 1276 /// Default constructor 1277 this() 1278 { 1279 debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this); 1280 } 1281 1282 /// Start establishing a connection. 1283 final void connect(AddressInfo[] addresses) 1284 { 1285 assert(addresses.length, "No addresses specified"); 1286 1287 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1288 assert(!conn); 1289 1290 addressQueue = addresses; 1291 state = ConnectionState.connecting; 1292 tryNextAddress(); 1293 } 1294 } 1295 1296 /// An asynchronous TCP connection. 1297 class TcpConnection : SocketConnection 1298 { 1299 protected: 1300 this(Socket conn) 1301 { 1302 super(conn); 1303 } 1304 1305 public: 1306 /// Default constructor 1307 this() 1308 { 1309 debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this); 1310 } 1311 1312 alias connect = SocketConnection.connect; // raise overload 1313 1314 /// Start establishing a connection. 1315 final void connect(string host, ushort port) 1316 { 1317 assert(host.length, "Empty host"); 1318 assert(port, "No port specified"); 1319 1320 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1321 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1322 1323 state = ConnectionState.resolving; 1324 1325 AddressInfo[] addressInfos; 1326 try 1327 { 1328 auto addresses = getAddress(host, port); 1329 enforce(addresses.length, "No addresses found"); 1330 debug (ASOCKETS) 1331 { 1332 stderr.writefln("Resolved to %s addresses:", addresses.length); 1333 foreach (address; addresses) 1334 stderr.writefln("- %s", address.toString()); 1335 } 1336 1337 if (addresses.length > 1) 1338 { 1339 import std.random : randomShuffle; 1340 randomShuffle(addresses); 1341 } 1342 1343 foreach (address; addresses) 1344 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1345 } 1346 catch (SocketException e) 1347 return onError("Lookup error: " ~ e.msg); 1348 1349 state = ConnectionState.disconnected; 1350 connect(addressInfos); 1351 } 1352 } 1353 1354 // *************************************************************************** 1355 1356 /// An asynchronous connection server for socket-based connections. 1357 class SocketServer 1358 { 1359 protected: 1360 /// Class that actually performs listening on a certain address family 1361 final class Listener : GenericSocket 1362 { 1363 this(Socket conn) 1364 { 1365 debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this); 1366 this.conn = conn; 1367 socketManager.register(this); 1368 } 1369 1370 /// Called when a socket is readable. 1371 override void onReadable() 1372 { 1373 debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this); 1374 Socket acceptSocket = conn.accept(); 1375 acceptSocket.blocking = false; 1376 if (handleAccept) 1377 { 1378 auto connection = createConnection(acceptSocket); 1379 debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddressStr); 1380 connection.setKeepAlive(); 1381 //assert(connection.connected); 1382 //connection.connected = true; 1383 acceptHandler(connection); 1384 } 1385 else 1386 acceptSocket.close(); 1387 } 1388 1389 /// Called when a socket is writable. 1390 override void onWritable() 1391 { 1392 } 1393 1394 /// Called when an error occurs on the socket. 1395 override void onError(string reason) 1396 { 1397 close(); // call parent 1398 } 1399 1400 void closeListener() 1401 { 1402 assert(conn); 1403 socketManager.unregister(this); 1404 conn.close(); 1405 conn = null; 1406 } 1407 } 1408 1409 SocketConnection createConnection(Socket socket) 1410 { 1411 return new SocketConnection(socket); 1412 } 1413 1414 /// Whether the socket is listening. 1415 bool listening; 1416 /// Listener instances 1417 Listener[] listeners; 1418 1419 final void updateFlags() 1420 { 1421 foreach (listener; listeners) 1422 listener.notifyRead = handleAccept !is null; 1423 } 1424 1425 public: 1426 /// Start listening on this socket. 1427 final void listen(AddressInfo[] addressInfos) 1428 { 1429 foreach (ref addressInfo; addressInfos) 1430 { 1431 try 1432 { 1433 Socket conn = new Socket(addressInfo); 1434 conn.blocking = false; 1435 if (addressInfo.family == AddressFamily.INET6) 1436 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1437 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1438 1439 conn.bind(addressInfo.address); 1440 conn.listen(8); 1441 1442 listeners ~= new Listener(conn); 1443 } 1444 catch (SocketException e) 1445 { 1446 debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1447 debug(ASOCKETS) stderr.writeln(e.msg); 1448 } 1449 } 1450 1451 if (listeners.length==0) 1452 throw new Exception("Unable to bind service"); 1453 1454 listening = true; 1455 1456 updateFlags(); 1457 } 1458 1459 this() 1460 { 1461 } 1462 1463 /// Creates a Server with the given sockets. 1464 /// The sockets must have already had `bind` and `listen` called on them. 1465 this(Socket[] sockets...) 1466 { 1467 foreach (socket; sockets) 1468 listeners ~= new Listener(socket); 1469 } 1470 1471 final @property Address[] localAddresses() 1472 { 1473 Address[] result; 1474 foreach (listener; listeners) 1475 result ~= listener.localAddress; 1476 return result; 1477 } 1478 1479 final @property bool isListening() 1480 { 1481 return listening; 1482 } 1483 1484 /// Stop listening on this socket. 1485 final void close() 1486 { 1487 foreach (listener;listeners) 1488 listener.closeListener(); 1489 listeners = null; 1490 listening = false; 1491 if (handleClose) 1492 handleClose(); 1493 } 1494 1495 /// Create a SocketServer using the handle passed on standard input, 1496 /// for which `listen` had already been called. Used by 1497 /// e.g. FastCGI and systemd sockets with "Listen = yes". 1498 static SocketServer fromStdin() 1499 { 1500 socket_t socket; 1501 version (Windows) 1502 { 1503 import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE; 1504 socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE); 1505 } 1506 else 1507 socket = cast(socket_t)0; 1508 1509 auto s = new Socket(socket, AddressFamily.UNSPEC); 1510 s.blocking = false; 1511 return new SocketServer(s); 1512 } 1513 1514 /// Callback for when the socket was closed. 1515 void delegate() handleClose; 1516 1517 private void delegate(SocketConnection incoming) acceptHandler; 1518 /// Callback for an incoming connection. 1519 /// Connections will not be accepted unless this handler is set. 1520 @property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; } 1521 /// ditto 1522 @property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); } 1523 } 1524 1525 /// An asynchronous TCP connection server. 1526 class TcpServer : SocketServer 1527 { 1528 protected: 1529 override SocketConnection createConnection(Socket socket) 1530 { 1531 return new TcpConnection(socket); 1532 } 1533 1534 public: 1535 this() 1536 { 1537 } 1538 1539 this(Socket[] sockets...) 1540 { 1541 super(sockets); 1542 } 1543 1544 alias listen = SocketServer.listen; // raise overload 1545 1546 /// Start listening on this socket. 1547 final ushort listen(ushort port, string addr = null) 1548 { 1549 debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port); 1550 //assert(!listening, "Attempting to listen on a listening socket"); 1551 1552 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1553 1554 debug (ASOCKETS) 1555 { 1556 stderr.writefln("Resolved to %s addresses:", addressInfos.length); 1557 foreach (ref addressInfo; addressInfos) 1558 stderr.writefln("- %s", addressInfo); 1559 } 1560 1561 // listen on random ports only on IPv4 for now 1562 if (port == 0) 1563 { 1564 foreach_reverse (i, ref addressInfo; addressInfos) 1565 if (addressInfo.family != AddressFamily.INET) 1566 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1567 } 1568 1569 listen(addressInfos); 1570 1571 foreach (listener; listeners) 1572 { 1573 auto address = listener.conn.localAddress(); 1574 if (address.addressFamily == AddressFamily.INET) 1575 port = to!ushort(address.toPortString()); 1576 } 1577 1578 return port; 1579 } 1580 1581 deprecated("Use SocketServer.fromStdin") 1582 static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; } 1583 1584 @property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); } 1585 } 1586 1587 // *************************************************************************** 1588 1589 /// An asynchronous UDP stream. 1590 /// UDP does not have connections, so this class encapsulates a socket 1591 /// with a fixed destination (sendto) address, and optionally bound to 1592 /// a local address. 1593 /// Currently received packets' address is not exposed. 1594 class UdpConnection : Connection 1595 { 1596 protected: 1597 this(Socket conn) 1598 { 1599 super(conn); 1600 } 1601 1602 /// Called when a socket is writable. 1603 override void onWritable() 1604 { 1605 //scope(success) updateFlags(); 1606 onWritableImpl(); 1607 updateFlags(); 1608 } 1609 1610 // Work around scope(success) breaking debugger stack traces 1611 final private void onWritableImpl() 1612 { 1613 foreach (priority, ref queue; outQueue) 1614 while (queue.length) 1615 { 1616 auto pdata = queue.ptr; // pointer to first data 1617 1618 auto sent = conn.sendTo(pdata.contents, remoteAddress); 1619 1620 if (sent == Socket.ERROR) 1621 { 1622 if (wouldHaveBlocked()) 1623 return; 1624 else 1625 return onError("send() error: " ~ lastSocketError); 1626 } 1627 else 1628 if (sent < pdata.length) 1629 { 1630 return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length)); 1631 } 1632 else 1633 { 1634 assert(sent == pdata.length); 1635 //debug writefln("[%s] Sent data:", remoteAddressStr); 1636 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1637 pdata.clear(); 1638 queue = queue[1..$]; 1639 if (queue.length == 0) 1640 queue = null; 1641 } 1642 } 1643 1644 // outQueue is now empty 1645 if (bufferFlushedHandler) 1646 bufferFlushedHandler(); 1647 if (state == ConnectionState.disconnecting) 1648 { 1649 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1650 close(); 1651 } 1652 } 1653 1654 override sizediff_t doSend(in void[] buffer) 1655 { 1656 assert(false); // never called (called only from overridden methods) 1657 } 1658 1659 override sizediff_t doReceive(void[] buffer) 1660 { 1661 return conn.receive(buffer); 1662 } 1663 1664 public: 1665 /// Default constructor 1666 this() 1667 { 1668 debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this); 1669 } 1670 1671 /// Initialize with the given AddressFamily, without binding to an address. 1672 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP) 1673 { 1674 initializeImpl(family, type, protocol); 1675 if (connectHandler) 1676 connectHandler(); 1677 } 1678 1679 final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1680 { 1681 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1682 assert(!conn); 1683 1684 conn = new Socket(family, type, protocol); 1685 conn.blocking = false; 1686 socketManager.register(this); 1687 state = ConnectionState.connected; 1688 updateFlags(); 1689 } 1690 1691 /// Bind to a local address in order to receive packets sent there. 1692 final ushort bind(string host, ushort port) 1693 { 1694 assert(host.length, "Empty host"); 1695 1696 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1697 1698 state = ConnectionState.resolving; 1699 1700 AddressInfo addressInfo; 1701 try 1702 { 1703 auto addresses = getAddress(host, port); 1704 enforce(addresses.length, "No addresses found"); 1705 debug (ASOCKETS) 1706 { 1707 stderr.writefln("Resolved to %s addresses:", addresses.length); 1708 foreach (address; addresses) 1709 stderr.writefln("- %s", address.toString()); 1710 } 1711 1712 Address address; 1713 if (addresses.length > 1) 1714 { 1715 import std.random : uniform; 1716 address = addresses[uniform(0, $)]; 1717 } 1718 else 1719 address = addresses[0]; 1720 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1721 } 1722 catch (SocketException e) 1723 { 1724 onError("Lookup error: " ~ e.msg); 1725 return 0; 1726 } 1727 1728 state = ConnectionState.disconnected; 1729 return bind(addressInfo); 1730 } 1731 1732 /// ditto 1733 final ushort bind(AddressInfo addressInfo) 1734 { 1735 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1736 conn.bind(addressInfo.address); 1737 1738 auto address = conn.localAddress(); 1739 auto port = to!ushort(address.toPortString()); 1740 1741 if (connectHandler) 1742 connectHandler(); 1743 1744 return port; 1745 } 1746 1747 public: 1748 /// Where to send packets to. 1749 Address remoteAddress; 1750 } 1751 1752 /// 1753 unittest 1754 { 1755 auto server = new UdpConnection(); 1756 auto serverPort = server.bind("localhost", 0); 1757 1758 auto client = new UdpConnection(); 1759 client.initialize(server.localAddress.addressFamily); 1760 1761 string[] packets = ["Hello", "there"]; 1762 client.remoteAddress = server.localAddress; 1763 client.send({ 1764 Data[] data; 1765 foreach (packet; packets) 1766 data ~= Data(packet); 1767 return data; 1768 }()); 1769 1770 server.handleReadData = (Data data) 1771 { 1772 assert(data.contents == packets[0]); 1773 packets = packets[1..$]; 1774 if (!packets.length) 1775 { 1776 server.close(); 1777 client.close(); 1778 } 1779 }; 1780 socketManager.loop(); 1781 assert(!packets.length); 1782 } 1783 1784 // *************************************************************************** 1785 1786 /// Base class for a connection adapter. 1787 /// By itself, does nothing. 1788 class ConnectionAdapter : IConnection 1789 { 1790 IConnection next; 1791 1792 this(IConnection next) 1793 { 1794 this.next = next; 1795 next.handleConnect = &onConnect; 1796 next.handleDisconnect = &onDisconnect; 1797 next.handleBufferFlushed = &onBufferFlushed; 1798 } 1799 1800 @property ConnectionState state() { return next.state; } 1801 1802 /// Queue Data for sending. 1803 void send(Data[] data, int priority) 1804 { 1805 next.send(data, priority); 1806 } 1807 1808 alias send = IConnection.send; /// ditto 1809 1810 /// Terminate the connection. 1811 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1812 { 1813 next.disconnect(reason, type); 1814 } 1815 1816 protected void onConnect() 1817 { 1818 if (connectHandler) 1819 connectHandler(); 1820 } 1821 1822 protected void onReadData(Data data) 1823 { 1824 // onReadData should be fired only if readDataHandler is set 1825 readDataHandler(data); 1826 } 1827 1828 protected void onDisconnect(string reason, DisconnectType type) 1829 { 1830 if (disconnectHandler) 1831 disconnectHandler(reason, type); 1832 } 1833 1834 protected void onBufferFlushed() 1835 { 1836 if (bufferFlushedHandler) 1837 bufferFlushedHandler(); 1838 } 1839 1840 /// Callback for when a connection has been established. 1841 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1842 private ConnectHandler connectHandler; 1843 1844 /// Callback setter for when new data is read. 1845 @property void handleReadData(ReadDataHandler value) 1846 { 1847 readDataHandler = value; 1848 next.handleReadData = value ? &onReadData : null ; 1849 } 1850 private ReadDataHandler readDataHandler; 1851 1852 /// Callback setter for when a connection was closed. 1853 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1854 private DisconnectHandler disconnectHandler; 1855 1856 /// Callback setter for when all queued data has been written. 1857 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 1858 private BufferFlushedHandler bufferFlushedHandler; 1859 } 1860 1861 // *************************************************************************** 1862 1863 /// Adapter for connections with a line-based protocol. 1864 /// Splits data stream into delimiter-separated lines. 1865 class LineBufferedAdapter : ConnectionAdapter 1866 { 1867 /// The protocol's line delimiter. 1868 string delimiter = "\r\n"; 1869 1870 this(IConnection next) 1871 { 1872 super(next); 1873 } 1874 1875 /// Append a line to the send buffer. 1876 void send(string line) 1877 { 1878 //super.send(Data(line ~ delimiter)); 1879 // https://issues.dlang.org/show_bug.cgi?id=13985 1880 ConnectionAdapter ca = this; 1881 ca.send(Data(line ~ delimiter)); 1882 } 1883 1884 protected: 1885 /// The receive buffer. 1886 Data inBuffer; 1887 1888 /// Called when data has been received. 1889 final override void onReadData(Data data) 1890 { 1891 import std.string; 1892 auto oldBufferLength = inBuffer.length; 1893 if (oldBufferLength) 1894 inBuffer ~= data; 1895 else 1896 inBuffer = data; 1897 1898 if (delimiter.length == 1) 1899 { 1900 import core.stdc.string; // memchr 1901 1902 char c = delimiter[0]; 1903 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1904 while (p) 1905 { 1906 sizediff_t index = p - inBuffer.ptr; 1907 processLine(index); 1908 1909 p = memchr(inBuffer.ptr, c, inBuffer.length); 1910 } 1911 } 1912 else 1913 { 1914 sizediff_t index; 1915 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1916 while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0) 1917 processLine(index); 1918 } 1919 } 1920 1921 final void processLine(size_t index) 1922 { 1923 auto line = inBuffer[0..index]; 1924 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1925 super.onReadData(line); 1926 } 1927 1928 override void onDisconnect(string reason, DisconnectType type) 1929 { 1930 super.onDisconnect(reason, type); 1931 inBuffer.clear(); 1932 } 1933 } 1934 1935 // *************************************************************************** 1936 1937 /// Fires an event handler or disconnects connections 1938 /// after a period of inactivity. 1939 class TimeoutAdapter : ConnectionAdapter 1940 { 1941 this(IConnection next) 1942 { 1943 debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this); 1944 super(next); 1945 } 1946 1947 void cancelIdleTimeout() 1948 { 1949 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 1950 assert(idleTask !is null); 1951 assert(idleTask.isWaiting()); 1952 idleTask.cancel(); 1953 } 1954 1955 void resumeIdleTimeout() 1956 { 1957 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 1958 assert(idleTask !is null); 1959 assert(!idleTask.isWaiting()); 1960 mainTimer.add(idleTask); 1961 } 1962 1963 final void setIdleTimeout(Duration duration) 1964 { 1965 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 1966 assert(duration > Duration.zero); 1967 1968 // Configure idleTask 1969 if (idleTask is null) 1970 { 1971 idleTask = new TimerTask(duration); 1972 idleTask.handleTask = &onTask_Idle; 1973 } 1974 else 1975 { 1976 if (idleTask.isWaiting()) 1977 idleTask.cancel(); 1978 idleTask.delay = duration; 1979 } 1980 1981 mainTimer.add(idleTask); 1982 } 1983 1984 void markNonIdle() 1985 { 1986 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 1987 if (handleNonIdle) 1988 handleNonIdle(); 1989 if (idleTask && idleTask.isWaiting()) 1990 idleTask.restart(); 1991 } 1992 1993 /// Callback for when a connection has stopped responding. 1994 /// If unset, the connection will be disconnected. 1995 void delegate() handleIdleTimeout; 1996 1997 /// Callback for when a connection is marked as non-idle 1998 /// (when data is received). 1999 void delegate() handleNonIdle; 2000 2001 protected: 2002 override void onConnect() 2003 { 2004 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 2005 markNonIdle(); 2006 super.onConnect(); 2007 } 2008 2009 override void onReadData(Data data) 2010 { 2011 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 2012 markNonIdle(); 2013 super.onReadData(data); 2014 } 2015 2016 override void onDisconnect(string reason, DisconnectType type) 2017 { 2018 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 2019 if (idleTask && idleTask.isWaiting()) 2020 idleTask.cancel(); 2021 super.onDisconnect(reason, type); 2022 } 2023 2024 private: 2025 TimerTask idleTask; // non-null if an idle timeout has been set 2026 2027 final void onTask_Idle(Timer timer, TimerTask task) 2028 { 2029 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this); 2030 if (state == ConnectionState.disconnecting) 2031 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 2032 2033 if (state == ConnectionState.disconnected) 2034 return; 2035 2036 if (handleIdleTimeout) 2037 { 2038 resumeIdleTimeout(); // reschedule (by default) 2039 handleIdleTimeout(); 2040 } 2041 else 2042 disconnect("Time-out", DisconnectType.error); 2043 } 2044 } 2045 2046 // *************************************************************************** 2047 2048 unittest 2049 { 2050 void testTimer() 2051 { 2052 bool fired; 2053 setTimeout({fired = true;}, 10.msecs); 2054 socketManager.loop(); 2055 assert(fired); 2056 } 2057 2058 testTimer(); 2059 }