1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <vladimir@thecybershadow.net> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import core.stdc.stdint : int32_t; 24 25 import std.exception; 26 import std.socket; 27 import std.string : format; 28 public import std.socket : Address, AddressInfo, Socket; 29 30 version (Windows) 31 private import c_socks = core.sys.windows.winsock2; 32 else version (Posix) 33 private import c_socks = core.sys.posix.sys.socket; 34 35 debug(ASOCKETS) import std.stdio : stderr; 36 debug(PRINTDATA) static import std.stdio; 37 debug(PRINTDATA) import ae.utils.text : hexDump; 38 private import std.conv : to; 39 40 41 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 42 static import ae.utils.array; 43 44 version(LIBEV) 45 { 46 import deimos.ev; 47 pragma(lib, "ev"); 48 } 49 50 version(Windows) 51 { 52 import core.sys.windows.windows : Sleep; 53 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 54 } 55 else 56 enum USE_SLEEP = false; 57 58 /// Flags that determine socket wake-up events. 59 int eventCounter; 60 61 version(LIBEV) 62 { 63 // Watchers are a GenericSocket field (as declared in SocketMixin). 64 // Use one watcher per read and write event. 65 // Start/stop those as higher-level code declares interest in those events. 66 // Use the "data" ev_io field to store the parent GenericSocket address. 67 // Also use the "data" field as a flag to indicate whether the watcher is active 68 // (data is null when the watcher is stopped). 69 70 struct SocketManager 71 { 72 private: 73 size_t count; 74 75 extern(C) 76 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 77 { 78 eventCounter++; 79 auto socket = cast(GenericSocket)w.data; 80 assert(socket, "libev event fired on stopped watcher"); 81 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 82 83 if (revents & EV_READ) 84 socket.onReadable(); 85 else 86 if (revents & EV_WRITE) 87 socket.onWritable(); 88 else 89 assert(false, "Unknown event fired from libev"); 90 91 // TODO? Need to get proper SocketManager instance to call updateTimer on 92 socketManager.updateTimer(false); 93 } 94 95 ev_timer evTimer; 96 MonoTime lastNextEvent = MonoTime.max; 97 98 extern(C) 99 static void timerCallback(ev_loop_t* l, ev_timer* w, int revents) 100 { 101 eventCounter++; 102 debug (ASOCKETS) writefln("Timer callback called."); 103 mainTimer.prod(); 104 105 socketManager.updateTimer(true); 106 debug (ASOCKETS) writefln("Timer callback exiting."); 107 } 108 109 void updateTimer(bool force) 110 { 111 auto nextEvent = mainTimer.getNextEvent(); 112 if (force || lastNextEvent != nextEvent) 113 { 114 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 115 if (nextEvent == MonoTime.max) // Stopping 116 { 117 if (lastNextEvent != MonoTime.max) 118 ev_timer_stop(ev_default_loop(0), &evTimer); 119 } 120 else 121 { 122 auto remaining = mainTimer.getRemainingTime(); 123 while (remaining <= Duration.zero) 124 { 125 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 126 mainTimer.prod(); 127 remaining = mainTimer.getRemainingTime(); 128 } 129 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 130 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 131 if (lastNextEvent == MonoTime.max) // Starting 132 { 133 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 134 ev_timer_start(ev_default_loop(0), &evTimer); 135 } 136 else // Adjusting 137 { 138 evTimer.repeat = tstamp; 139 ev_timer_again(ev_default_loop(0), &evTimer); 140 } 141 } 142 lastNextEvent = nextEvent; 143 } 144 } 145 146 public: 147 /// Register a socket with the manager. 148 void register(GenericSocket socket) 149 { 150 debug (ASOCKETS) writefln("Registering %s", socket); 151 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 152 auto fd = socket.conn.handle; 153 assert(fd, "Must have fd before socket registration"); 154 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 155 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 156 count++; 157 } 158 159 /// Unregister a socket with the manager. 160 void unregister(GenericSocket socket) 161 { 162 debug (ASOCKETS) writefln("Unregistering %s", socket); 163 socket.notifyRead = false; 164 socket.notifyWrite = false; 165 count--; 166 } 167 168 size_t size() 169 { 170 return count; 171 } 172 173 /// Loop continuously until no sockets are left. 174 void loop() 175 { 176 auto evLoop = ev_default_loop(0); 177 enforce(evLoop, "libev initialization failure"); 178 179 updateTimer(true); 180 debug (ASOCKETS) writeln("ev_run"); 181 ev_run(ev_default_loop(0), 0); 182 } 183 } 184 185 private mixin template SocketMixin() 186 { 187 private ev_io evRead, evWrite; 188 189 private void setWatcherState(ref ev_io ev, bool newValue, int event) 190 { 191 if (!conn) 192 { 193 // Can happen when setting delegates before connecting. 194 return; 195 } 196 197 if (newValue && !ev.data) 198 { 199 // Start 200 ev.data = cast(void*)this; 201 ev_io_start(ev_default_loop(0), &ev); 202 } 203 else 204 if (!newValue && ev.data) 205 { 206 // Stop 207 assert(ev.data is cast(void*)this); 208 ev.data = null; 209 ev_io_stop(ev_default_loop(0), &ev); 210 } 211 } 212 213 /// Interested in read notifications (onReadable)? 214 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 215 /// Interested in write notifications (onWritable)? 216 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 217 218 debug ~this() 219 { 220 // The LIBEV SocketManager holds no references to registered sockets. 221 // TODO: Add a doubly-linked list? 222 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 223 } 224 } 225 } 226 else // Use select 227 { 228 struct SocketManager 229 { 230 private: 231 enum FD_SETSIZE = 1024; 232 233 /// List of all sockets to poll. 234 GenericSocket[] sockets; 235 236 /// Debug AA to check for dangling socket references. 237 debug GenericSocket[socket_t] socketHandles; 238 239 void delegate()[] idleHandlers; 240 241 public: 242 /// Register a socket with the manager. 243 void register(GenericSocket conn) 244 { 245 debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1); 246 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 247 sockets ~= conn; 248 249 debug 250 { 251 auto handle = conn.socket.handle; 252 assert(handle != socket_t.init, "Can't register a closed socket"); 253 assert(handle !in socketHandles, "This socket handle is already registered"); 254 socketHandles[handle] = conn; 255 } 256 } 257 258 /// Unregister a socket with the manager. 259 void unregister(GenericSocket conn) 260 { 261 debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 262 263 debug 264 { 265 auto handle = conn.socket.handle; 266 assert(handle != socket_t.init, "Can't unregister a closed socket"); 267 auto pconn = handle in socketHandles; 268 assert(pconn, "This socket handle is not registered"); 269 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 270 socketHandles.remove(handle); 271 } 272 273 foreach (size_t i, GenericSocket j; sockets) 274 if (j is conn) 275 { 276 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 277 return; 278 } 279 assert(false, "Socket not registered"); 280 } 281 282 size_t size() 283 { 284 return sockets.length; 285 } 286 287 /// Loop continuously until no sockets are left. 288 void loop() 289 { 290 debug (ASOCKETS) stderr.writeln("Starting event loop."); 291 292 SocketSet readset, writeset, errorset; 293 size_t sockcount; 294 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 295 readset = new SocketSet(setSize); 296 writeset = new SocketSet(setSize); 297 errorset = new SocketSet(setSize); 298 while (true) 299 { 300 uint minSize = 0; 301 version(Windows) 302 minSize = cast(uint)sockets.length; 303 else 304 { 305 foreach (s; sockets) 306 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 307 minSize = s.socket.handle; 308 } 309 minSize++; 310 311 if (setSize < minSize) 312 { 313 debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 314 setSize = minSize * 2; 315 readset = new SocketSet(setSize); 316 writeset = new SocketSet(setSize); 317 errorset = new SocketSet(setSize); 318 } 319 else 320 { 321 readset.reset(); 322 writeset.reset(); 323 errorset.reset(); 324 } 325 326 sockcount = 0; 327 bool haveActive; 328 debug (ASOCKETS) stderr.writeln("Populating sets"); 329 foreach (GenericSocket conn; sockets) 330 { 331 if (!conn.socket) 332 continue; 333 sockcount++; 334 if (!conn.daemon) 335 haveActive = true; 336 337 debug (ASOCKETS) stderr.writef("\t%s:", conn); 338 if (conn.notifyRead) 339 { 340 readset.add(conn.socket); 341 debug (ASOCKETS) stderr.write(" READ"); 342 } 343 if (conn.notifyWrite) 344 { 345 writeset.add(conn.socket); 346 debug (ASOCKETS) stderr.write(" WRITE"); 347 } 348 errorset.add(conn.socket); 349 debug (ASOCKETS) stderr.writeln(); 350 } 351 debug (ASOCKETS) 352 { 353 stderr.writefln("Sets populated as follows:"); 354 printSets(readset, writeset, errorset); 355 } 356 357 debug (ASOCKETS) stderr.writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...", 358 sockcount, 359 mainTimer.isWaiting() ? "with" : "no", 360 idleHandlers.length, 361 ); 362 if (!haveActive && !mainTimer.isWaiting()) 363 { 364 debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop."); 365 break; 366 } 367 368 debug (ASOCKETS) stderr.flush(); 369 370 int events; 371 if (idleHandlers.length) 372 { 373 if (sockcount==0) 374 events = 0; 375 else 376 events = Socket.select(readset, writeset, errorset, 0.seconds); 377 } 378 else 379 if (USE_SLEEP && sockcount==0) 380 { 381 version(Windows) 382 { 383 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 384 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 385 if (duration <= 0) 386 duration = 1; // Avoid busywait 387 else 388 if (duration > int.max) 389 duration = int.max; 390 Sleep(cast(int)duration); 391 events = 0; 392 } 393 else 394 assert(0); 395 } 396 else 397 if (mainTimer.isWaiting()) 398 events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime()); 399 else 400 events = Socket.select(readset, writeset, errorset); 401 402 debug (ASOCKETS) stderr.writefln("%d events fired.", events); 403 404 if (events > 0) 405 { 406 // Handle just one event at a time, as the first 407 // handler might invalidate select()'s results. 408 handleEvent(readset, writeset, errorset); 409 } 410 else 411 if (idleHandlers.length) 412 { 413 import ae.utils.array; 414 auto handler = idleHandlers.shift(); 415 416 // Rotate the idle handler queue before running it, 417 // in case the handler unregisters itself. 418 idleHandlers ~= handler; 419 420 handler(); 421 } 422 423 // Timers may invalidate our select results, so fire them after processing the latter 424 mainTimer.prod(); 425 426 eventCounter++; 427 } 428 } 429 430 debug (ASOCKETS) 431 void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset) 432 { 433 foreach (GenericSocket conn; sockets) 434 { 435 if (!conn.socket) 436 stderr.writefln("\t\t%s is unset", conn); 437 else 438 { 439 if (readset.isSet(conn.socket)) 440 stderr.writefln("\t\t%s is readable", conn); 441 if (writeset.isSet(conn.socket)) 442 stderr.writefln("\t\t%s is writable", conn); 443 if (errorset.isSet(conn.socket)) 444 stderr.writefln("\t\t%s is errored", conn); 445 } 446 } 447 } 448 449 void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset) 450 { 451 debug (ASOCKETS) 452 { 453 stderr.writefln("\tSelect results:"); 454 printSets(readset, writeset, errorset); 455 } 456 457 foreach (GenericSocket conn; sockets) 458 { 459 if (!conn.socket) 460 continue; 461 462 if (readset.isSet(conn.socket)) 463 { 464 debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn); 465 return conn.onReadable(); 466 } 467 else 468 if (writeset.isSet(conn.socket)) 469 { 470 debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn); 471 return conn.onWritable(); 472 } 473 else 474 if (errorset.isSet(conn.socket)) 475 { 476 debug (ASOCKETS) stderr.writefln("\t%s - calling onError", conn); 477 return conn.onError("select() error: " ~ conn.socket.getErrorText()); 478 } 479 } 480 481 assert(false, "select() reported events available, but no registered sockets are set"); 482 } 483 } 484 485 // Use UFCS to allow removeIdleHandler to have a predicate with context 486 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 487 { 488 foreach (i, idleHandler; socketManager.idleHandlers) 489 assert(handler !is idleHandler); 490 491 socketManager.idleHandlers ~= handler; 492 } 493 494 static bool isFun(T)(T a, T b) { return a is b; } 495 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 496 { 497 foreach (i, idleHandler; socketManager.idleHandlers) 498 if (pred(idleHandler, args)) 499 { 500 import std.algorithm; 501 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 502 return; 503 } 504 assert(false, "No such idle handler"); 505 } 506 507 private mixin template SocketMixin() 508 { 509 /// Interested in read notifications (onReadable)? 510 bool notifyRead; 511 /// Interested in write notifications (onWritable)? 512 bool notifyWrite; 513 } 514 } 515 516 /// The default socket manager. 517 SocketManager socketManager; 518 519 // *************************************************************************** 520 521 /// General methods for an asynchronous socket. 522 abstract class GenericSocket 523 { 524 /// Declares notifyRead and notifyWrite. 525 mixin SocketMixin; 526 527 protected: 528 /// The socket this class wraps. 529 Socket conn; 530 531 protected: 532 /// Retrieve the socket class this class wraps. 533 @property final Socket socket() 534 { 535 return conn; 536 } 537 538 void onReadable() 539 { 540 } 541 542 void onWritable() 543 { 544 } 545 546 void onError(string reason) 547 { 548 } 549 550 public: 551 /// allow getting the address of connections that are already disconnected 552 private Address[2] cachedAddress; 553 554 /*private*/ final @property Address address(bool local)() 555 { 556 if (cachedAddress[local] !is null) 557 return cachedAddress[local]; 558 else 559 if (conn is null) 560 return null; 561 else 562 { 563 Address a; 564 if (conn.addressFamily == AddressFamily.UNSPEC) 565 { 566 // Socket will attempt to construct an UnknownAddress, 567 // which will almost certainly not match the real address length. 568 static if (local) 569 alias getname = c_socks.getsockname; 570 else 571 alias getname = c_socks.getpeername; 572 573 c_socks.socklen_t nameLen = 0; 574 if (getname(conn.handle, null, &nameLen) < 0) 575 throw new SocketOSException("Unable to obtain socket address"); 576 577 auto buf = new ubyte[nameLen]; 578 auto sa = cast(c_socks.sockaddr*)buf.ptr; 579 if (getname(conn.handle, sa, &nameLen) < 0) 580 throw new SocketOSException("Unable to obtain socket address"); 581 a = new UnknownAddressReference(sa, nameLen); 582 } 583 else 584 a = local ? conn.localAddress() : conn.remoteAddress(); 585 return cachedAddress[local] = a; 586 } 587 } 588 589 alias localAddress = address!true; 590 alias remoteAddress = address!false; 591 592 /*private*/ final @property string addressStr(bool local)() nothrow 593 { 594 try 595 { 596 auto a = address!local; 597 if (a is null) 598 return "[null address]"; 599 string host = a.toAddrString(); 600 import std.string : indexOf; 601 if (host.indexOf(':') >= 0) 602 host = "[" ~ host ~ "]"; 603 try 604 { 605 string port = a.toPortString(); 606 return host ~ ":" ~ port; 607 } 608 catch (Exception e) 609 return host; 610 } 611 catch (Exception e) 612 return "[error: " ~ e.msg ~ "]"; 613 } 614 615 alias localAddressStr = addressStr!true; 616 alias remoteAddressStr = addressStr!false; 617 618 /// Don't block the process from exiting. 619 /// TODO: Not implemented with libev 620 bool daemon; 621 622 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 623 { 624 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 625 if (enabled) 626 { 627 try 628 conn.setKeepAlive(time, interval); 629 catch (SocketException) 630 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 631 } 632 else 633 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 634 } 635 636 override string toString() const 637 { 638 import std.string; 639 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 640 } 641 } 642 643 // *************************************************************************** 644 645 enum DisconnectType 646 { 647 requested, // initiated by the application 648 graceful, // peer gracefully closed the connection 649 error // abnormal network condition 650 } 651 652 enum ConnectionState 653 { 654 /// The initial state, or the state after a disconnect was fully processed. 655 disconnected, 656 657 /// Name resolution. Currently done synchronously. 658 resolving, 659 660 /// A connection attempt is in progress. 661 connecting, 662 663 /// A connection is established. 664 connected, 665 666 /// Disconnecting in progress. No data can be sent or received at this point. 667 /// We are waiting for queued data to be actually sent before disconnecting. 668 disconnecting, 669 } 670 671 /// Returns true if this is a connection state for which disconnecting is valid. 672 /// Generally, applications should be aware of the life cycle of their sockets, 673 /// so checking the state of a connection is unnecessary (and a code smell). 674 /// However, unconditionally disconnecting some connected sockets can be useful 675 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle), 676 /// such as in response to a signal. 677 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; } 678 679 /// Common interface for connections and adapters. 680 interface IConnection 681 { 682 enum MAX_PRIORITY = 4; 683 enum DEFAULT_PRIORITY = 2; 684 685 static const defaultDisconnectReason = "Software closed the connection"; 686 687 /// Get connection state. 688 @property ConnectionState state(); 689 690 /// Has a connection been established? 691 deprecated final @property bool connected() { return state == ConnectionState.connected; } 692 693 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 694 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 695 696 /// Queue Data for sending. 697 void send(Data[] data, int priority = DEFAULT_PRIORITY); 698 699 /// ditto 700 final void send(Data datum, int priority = DEFAULT_PRIORITY) 701 { 702 Data[1] data; 703 data[0] = datum; 704 this.send(data, priority); 705 data[] = Data.init; 706 } 707 708 /// Terminate the connection. 709 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 710 711 /// Callback setter for when a connection has been established 712 /// (if applicable). 713 alias void delegate() ConnectHandler; 714 @property void handleConnect(ConnectHandler value); /// ditto 715 716 /// Callback setter for when new data is read. 717 alias void delegate(Data data) ReadDataHandler; 718 @property void handleReadData(ReadDataHandler value); /// ditto 719 720 /// Callback setter for when a connection was closed. 721 alias void delegate(string reason, DisconnectType type) DisconnectHandler; 722 @property void handleDisconnect(DisconnectHandler value); /// ditto 723 724 /// Callback setter for when all queued data has been sent. 725 alias void delegate() BufferFlushedHandler; 726 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 727 } 728 729 // *************************************************************************** 730 731 class Connection : GenericSocket, IConnection 732 { 733 private: 734 /// Blocks of data larger than this value are passed as unmanaged memory 735 /// (in Data objects). Blocks smaller than this value will be reallocated 736 /// on the managed heap. The disadvantage of placing large objects on the 737 /// managed heap is false pointers; the disadvantage of using Data for 738 /// small objects is wasted slack space due to the page size alignment 739 /// requirement. 740 enum UNMANAGED_THRESHOLD = 256; 741 742 ConnectionState _state; 743 final @property ConnectionState state(ConnectionState value) { return _state = value; } 744 745 public: 746 /// Get connection state. 747 override @property ConnectionState state() { return _state; } 748 749 protected: 750 abstract sizediff_t doSend(in void[] buffer); 751 abstract sizediff_t doReceive(void[] buffer); 752 753 /// The send buffers. 754 Data[][MAX_PRIORITY+1] outQueue; 755 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 756 int partiallySent = -1; 757 758 /// Constructor used by a ServerSocket for new connections 759 this(Socket conn) 760 { 761 this(); 762 this.conn = conn; 763 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 764 if (conn) 765 socketManager.register(this); 766 updateFlags(); 767 } 768 769 final void updateFlags() 770 { 771 if (state == ConnectionState.connecting) 772 notifyWrite = true; 773 else 774 notifyWrite = writePending; 775 776 notifyRead = state == ConnectionState.connected && readDataHandler; 777 debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite); 778 } 779 780 /// Called when a socket is readable. 781 override void onReadable() 782 { 783 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 784 static ubyte[0x10000] inBuffer = void; 785 auto received = doReceive(inBuffer); 786 787 if (received == 0) 788 return disconnect("Connection closed", DisconnectType.graceful); 789 790 if (received == Socket.ERROR) 791 { 792 // if (wouldHaveBlocked) 793 // { 794 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 795 // return; 796 // } 797 // else 798 onError("recv() error: " ~ lastSocketError); 799 } 800 else 801 { 802 debug (PRINTDATA) 803 { 804 std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress); 805 std.stdio.write(hexDump(inBuffer[0 .. received])); 806 std.stdio.stdout.flush(); 807 } 808 809 if (state == ConnectionState.disconnecting) 810 { 811 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this); 812 } 813 else 814 if (!readDataHandler) 815 { 816 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this); 817 } 818 else 819 { 820 // Currently, unlike the D1 version of this module, 821 // we will always reallocate read network data. 822 // This disfavours code which doesn't need to store 823 // read data after processing it, but otherwise 824 // makes things simpler and safer all around. 825 826 if (received < UNMANAGED_THRESHOLD) 827 { 828 // Copy to the managed heap 829 readDataHandler(Data(inBuffer[0 .. received].dup)); 830 } 831 else 832 { 833 // Copy to unmanaged memory 834 readDataHandler(Data(inBuffer[0 .. received], true)); 835 } 836 } 837 } 838 } 839 840 /// Called when an error occurs on the socket. 841 override void onError(string reason) 842 { 843 if (state == ConnectionState.disconnecting) 844 { 845 debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 846 return close(); 847 } 848 849 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 850 disconnect("Socket error: " ~ reason, DisconnectType.error); 851 } 852 853 this() 854 { 855 } 856 857 public: 858 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 859 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 860 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 861 { 862 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 863 assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state)); 864 865 if (writePending) 866 { 867 if (type==DisconnectType.requested) 868 { 869 assert(conn, "Attempting to disconnect on an uninitialized socket"); 870 // queue disconnect after all data is sent 871 debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 872 state = ConnectionState.disconnecting; 873 //setIdleTimeout(30.seconds); 874 if (disconnectHandler) 875 disconnectHandler(reason, type); 876 updateFlags(); 877 return; 878 } 879 else 880 discardQueues(); 881 } 882 883 debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 884 885 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 886 close(); 887 else 888 { 889 assert(conn is null, "Registered but %s socket".format(state)); 890 if (state == ConnectionState.resolving) 891 state = ConnectionState.disconnected; 892 } 893 894 if (disconnectHandler) 895 disconnectHandler(reason, type); 896 updateFlags(); 897 } 898 899 private final void close() 900 { 901 assert(conn, "Attempting to close an unregistered socket"); 902 socketManager.unregister(this); 903 conn.close(); 904 conn = null; 905 outQueue[] = null; 906 state = ConnectionState.disconnected; 907 } 908 909 /// Append data to the send buffer. 910 void send(Data[] data, int priority = DEFAULT_PRIORITY) 911 { 912 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 913 outQueue[priority] ~= data; 914 notifyWrite = true; // Fast updateFlags() 915 916 debug (PRINTDATA) 917 { 918 std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress); 919 foreach (datum; data) 920 if (datum.length) 921 std.stdio.write(hexDump(datum.contents)); 922 else 923 std.stdio.writeln("(empty Data)"); 924 std.stdio.stdout.flush(); 925 } 926 } 927 928 /// ditto 929 alias send = IConnection.send; 930 931 final void clearQueue(int priority) 932 { 933 if (priority == partiallySent) 934 { 935 assert(outQueue[priority].length > 0); 936 outQueue[priority] = outQueue[priority][0..1]; 937 } 938 else 939 outQueue[priority] = null; 940 updateFlags(); 941 } 942 943 /// Clears all queues, even partially sent content. 944 private final void discardQueues() 945 { 946 foreach (priority; 0..MAX_PRIORITY+1) 947 outQueue[priority] = null; 948 partiallySent = -1; 949 updateFlags(); 950 } 951 952 @property 953 final bool writePending() 954 { 955 foreach (queue; outQueue) 956 if (queue.length) 957 return true; 958 return false; 959 } 960 961 final bool queuePresent(int priority = DEFAULT_PRIORITY) 962 { 963 if (priority == partiallySent) 964 { 965 assert(outQueue[priority].length > 0); 966 return outQueue[priority].length > 1; 967 } 968 else 969 return outQueue[priority].length > 0; 970 } 971 972 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 973 { 974 return outQueue[priority].length; 975 } 976 977 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 978 { 979 size_t bytes; 980 foreach (datum; outQueue[priority]) 981 bytes += datum.length; 982 return bytes; 983 } 984 985 public: 986 private ConnectHandler connectHandler; 987 /// Callback for when a connection has been established. 988 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 989 990 private ReadDataHandler readDataHandler; 991 /// Callback for incoming data. 992 /// Data will not be received unless this handler is set. 993 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 994 995 private DisconnectHandler disconnectHandler; 996 /// Callback for when a connection was closed. 997 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 998 999 /// Callback setter for when all queued data has been sent. 1000 private BufferFlushedHandler bufferFlushedHandler; 1001 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 1002 } 1003 1004 class StreamConnection : Connection 1005 { 1006 protected: 1007 this() 1008 { 1009 super(); 1010 } 1011 1012 /// Called when a socket is writable. 1013 override void onWritable() 1014 { 1015 //scope(success) updateFlags(); 1016 onWritableImpl(); 1017 updateFlags(); 1018 } 1019 1020 // Work around scope(success) breaking debugger stack traces 1021 final private void onWritableImpl() 1022 { 1023 debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state); 1024 if (state == ConnectionState.connecting) 1025 { 1026 int32_t error; 1027 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 1028 if (error) 1029 return disconnect(formatSocketError(error), DisconnectType.error); 1030 1031 state = ConnectionState.connected; 1032 1033 //debug writefln("[%s] Connected", remoteAddress); 1034 try 1035 setKeepAlive(); 1036 catch (Exception e) 1037 return disconnect(e.msg, DisconnectType.error); 1038 if (connectHandler) 1039 connectHandler(); 1040 return; 1041 } 1042 //debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1043 1044 foreach (sendPartial; [true, false]) 1045 foreach (int priority, ref queue; outQueue) 1046 while (queue.length && (!sendPartial || priority == partiallySent)) 1047 { 1048 assert(partiallySent == -1 || partiallySent == priority); 1049 1050 auto pdata = queue.ptr; // pointer to first data 1051 1052 ptrdiff_t sent = 0; 1053 if (pdata.length) 1054 { 1055 sent = doSend(pdata.contents); 1056 debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 1057 } 1058 else 1059 { 1060 debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this); 1061 } 1062 1063 if (sent == Socket.ERROR) 1064 { 1065 if (wouldHaveBlocked()) 1066 return; 1067 else 1068 return onError("send() error: " ~ lastSocketError); 1069 } 1070 else 1071 if (sent < pdata.length) 1072 { 1073 if (sent > 0) 1074 { 1075 *pdata = (*pdata)[sent..pdata.length]; 1076 partiallySent = priority; 1077 } 1078 return; 1079 } 1080 else 1081 { 1082 assert(sent == pdata.length); 1083 //debug writefln("[%s] Sent data:", remoteAddress); 1084 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1085 pdata.clear(); 1086 queue = queue[1..$]; 1087 partiallySent = -1; 1088 if (queue.length == 0) 1089 queue = null; 1090 } 1091 } 1092 1093 // outQueue is now empty 1094 if (bufferFlushedHandler) 1095 bufferFlushedHandler(); 1096 if (state == ConnectionState.disconnecting) 1097 { 1098 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1099 close(); 1100 } 1101 } 1102 1103 public: 1104 this(Socket conn) 1105 { 1106 super(conn); 1107 } 1108 } 1109 1110 // *************************************************************************** 1111 1112 /// A POSIX file stream. 1113 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1114 /// Does not dup the given file descriptor, so "disconnecting" this connection 1115 /// will close it. 1116 version (Posix) 1117 class FileConnection : StreamConnection 1118 { 1119 this(int fileno) 1120 { 1121 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1122 conn.blocking = false; 1123 super(conn); 1124 } 1125 1126 protected: 1127 import core.sys.posix.unistd : read, write; 1128 1129 override sizediff_t doSend(in void[] buffer) 1130 { 1131 return write(socket.handle, buffer.ptr, buffer.length); 1132 } 1133 1134 override sizediff_t doReceive(void[] buffer) 1135 { 1136 return read(socket.handle, buffer.ptr, buffer.length); 1137 } 1138 } 1139 1140 /// Separates reading and writing, e.g. for stdin/stdout. 1141 class Duplex : IConnection 1142 { 1143 IConnection reader, writer; 1144 1145 this(IConnection reader, IConnection writer) 1146 { 1147 this.reader = reader; 1148 this.writer = writer; 1149 reader.handleConnect = &onConnect; 1150 writer.handleConnect = &onConnect; 1151 reader.handleDisconnect = &onDisconnect; 1152 writer.handleDisconnect = &onDisconnect; 1153 } 1154 1155 @property ConnectionState state() 1156 { 1157 if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting) 1158 return ConnectionState.disconnecting; 1159 else 1160 return reader.state < writer.state ? reader.state : writer.state; 1161 } 1162 1163 /// Queue Data for sending. 1164 void send(Data[] data, int priority) 1165 { 1166 writer.send(data, priority); 1167 } 1168 1169 alias send = IConnection.send; /// ditto 1170 1171 /// Terminate the connection. 1172 /// Note: this isn't quite fleshed out - applications may want to 1173 /// wait and send some more data even after stdin is closed, but 1174 /// such an interface can't be fitted into an IConnection 1175 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1176 { 1177 if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting) 1178 reader.disconnect(reason, type); 1179 if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting) 1180 writer.disconnect(reason, type); 1181 debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1182 } 1183 1184 protected void onConnect() 1185 { 1186 if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected) 1187 connectHandler(); 1188 } 1189 1190 protected void onDisconnect(string reason, DisconnectType type) 1191 { 1192 debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1193 if (disconnectHandler) 1194 { 1195 disconnectHandler(reason, type); 1196 disconnectHandler = null; // don't call it twice for the other connection 1197 } 1198 // It is our responsibility to disconnect the other connection 1199 // Use DisconnectType.requested to ensure that any written data is flushed 1200 disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested); 1201 } 1202 1203 /// Callback for when a connection has been established. 1204 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1205 private ConnectHandler connectHandler; 1206 1207 /// Callback setter for when new data is read. 1208 @property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; } 1209 1210 /// Callback setter for when a connection was closed. 1211 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1212 private DisconnectHandler disconnectHandler; 1213 1214 /// Callback setter for when all queued data has been written. 1215 @property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; } 1216 } 1217 1218 unittest { if (false) new Duplex(null, null); } 1219 1220 // *************************************************************************** 1221 1222 /// An asynchronous socket-based connection. 1223 class SocketConnection : StreamConnection 1224 { 1225 protected: 1226 AddressInfo[] addressQueue; 1227 1228 this(Socket conn) 1229 { 1230 super(conn); 1231 } 1232 1233 override sizediff_t doSend(in void[] buffer) 1234 { 1235 return conn.send(buffer); 1236 } 1237 1238 override sizediff_t doReceive(void[] buffer) 1239 { 1240 return conn.receive(buffer); 1241 } 1242 1243 final void tryNextAddress() 1244 { 1245 assert(state == ConnectionState.connecting); 1246 auto addressInfo = addressQueue[0]; 1247 addressQueue = addressQueue[1..$]; 1248 1249 try 1250 { 1251 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1252 conn.blocking = false; 1253 1254 socketManager.register(this); 1255 updateFlags(); 1256 debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString()); 1257 conn.connect(addressInfo.address); 1258 } 1259 catch (SocketException e) 1260 return onError("Connect error: " ~ e.msg); 1261 } 1262 1263 /// Called when an error occurs on the socket. 1264 override void onError(string reason) 1265 { 1266 if (state == ConnectionState.connecting && addressQueue.length) 1267 { 1268 socketManager.unregister(this); 1269 conn.close(); 1270 conn = null; 1271 1272 return tryNextAddress(); 1273 } 1274 1275 super.onError(reason); 1276 } 1277 1278 public: 1279 /// Default constructor 1280 this() 1281 { 1282 debug (ASOCKETS) stderr.writefln("New SocketConnection @ %s", cast(void*)this); 1283 } 1284 1285 /// Start establishing a connection. 1286 final void connect(AddressInfo[] addresses) 1287 { 1288 assert(addresses.length, "No addresses specified"); 1289 1290 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1291 assert(!conn); 1292 1293 addressQueue = addresses; 1294 state = ConnectionState.connecting; 1295 tryNextAddress(); 1296 } 1297 } 1298 1299 /// An asynchronous TCP connection. 1300 class TcpConnection : SocketConnection 1301 { 1302 protected: 1303 this(Socket conn) 1304 { 1305 super(conn); 1306 } 1307 1308 public: 1309 /// Default constructor 1310 this() 1311 { 1312 debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this); 1313 } 1314 1315 alias connect = SocketConnection.connect; // raise overload 1316 1317 /// Start establishing a connection. 1318 final void connect(string host, ushort port) 1319 { 1320 assert(host.length, "Empty host"); 1321 assert(port, "No port specified"); 1322 1323 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1324 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1325 1326 state = ConnectionState.resolving; 1327 1328 AddressInfo[] addressInfos; 1329 try 1330 { 1331 auto addresses = getAddress(host, port); 1332 enforce(addresses.length, "No addresses found"); 1333 debug (ASOCKETS) 1334 { 1335 stderr.writefln("Resolved to %s addresses:", addresses.length); 1336 foreach (address; addresses) 1337 stderr.writefln("- %s", address.toString()); 1338 } 1339 1340 if (addresses.length > 1) 1341 { 1342 import std.random : randomShuffle; 1343 randomShuffle(addresses); 1344 } 1345 1346 foreach (address; addresses) 1347 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1348 } 1349 catch (SocketException e) 1350 return onError("Lookup error: " ~ e.msg); 1351 1352 state = ConnectionState.disconnected; 1353 connect(addressInfos); 1354 } 1355 } 1356 1357 // *************************************************************************** 1358 1359 /// An asynchronous connection server for socket-based connections. 1360 class SocketServer 1361 { 1362 protected: 1363 /// Class that actually performs listening on a certain address family 1364 final class Listener : GenericSocket 1365 { 1366 this(Socket conn) 1367 { 1368 debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this); 1369 this.conn = conn; 1370 socketManager.register(this); 1371 } 1372 1373 /// Called when a socket is readable. 1374 override void onReadable() 1375 { 1376 debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this); 1377 Socket acceptSocket = conn.accept(); 1378 acceptSocket.blocking = false; 1379 if (handleAccept) 1380 { 1381 auto connection = createConnection(acceptSocket); 1382 debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress); 1383 connection.setKeepAlive(); 1384 //assert(connection.connected); 1385 //connection.connected = true; 1386 acceptHandler(connection); 1387 } 1388 else 1389 acceptSocket.close(); 1390 } 1391 1392 /// Called when a socket is writable. 1393 override void onWritable() 1394 { 1395 } 1396 1397 /// Called when an error occurs on the socket. 1398 override void onError(string reason) 1399 { 1400 close(); // call parent 1401 } 1402 1403 void closeListener() 1404 { 1405 assert(conn); 1406 socketManager.unregister(this); 1407 conn.close(); 1408 conn = null; 1409 } 1410 } 1411 1412 SocketConnection createConnection(Socket socket) 1413 { 1414 return new SocketConnection(socket); 1415 } 1416 1417 /// Whether the socket is listening. 1418 bool listening; 1419 /// Listener instances 1420 Listener[] listeners; 1421 1422 final void updateFlags() 1423 { 1424 foreach (listener; listeners) 1425 listener.notifyRead = handleAccept !is null; 1426 } 1427 1428 public: 1429 /// Start listening on this socket. 1430 final void listen(AddressInfo[] addressInfos) 1431 { 1432 foreach (ref addressInfo; addressInfos) 1433 { 1434 try 1435 { 1436 Socket conn = new Socket(addressInfo); 1437 conn.blocking = false; 1438 if (addressInfo.family == AddressFamily.INET6) 1439 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1440 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1441 1442 conn.bind(addressInfo.address); 1443 conn.listen(8); 1444 1445 listeners ~= new Listener(conn); 1446 } 1447 catch (SocketException e) 1448 { 1449 debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1450 debug(ASOCKETS) stderr.writeln(e.msg); 1451 } 1452 } 1453 1454 if (listeners.length==0) 1455 throw new Exception("Unable to bind service"); 1456 1457 listening = true; 1458 1459 updateFlags(); 1460 } 1461 1462 this() 1463 { 1464 } 1465 1466 /// Creates a Server with the given sockets. 1467 /// The sockets must have already had `bind` and `listen` called on them. 1468 this(Socket[] sockets...) 1469 { 1470 foreach (socket; sockets) 1471 listeners ~= new Listener(socket); 1472 } 1473 1474 final @property Address[] localAddresses() 1475 { 1476 Address[] result; 1477 foreach (listener; listeners) 1478 result ~= listener.localAddress; 1479 return result; 1480 } 1481 1482 final @property bool isListening() 1483 { 1484 return listening; 1485 } 1486 1487 /// Stop listening on this socket. 1488 final void close() 1489 { 1490 foreach (listener;listeners) 1491 listener.closeListener(); 1492 listeners = null; 1493 listening = false; 1494 if (handleClose) 1495 handleClose(); 1496 } 1497 1498 /// Create a SocketServer using the handle passed on standard input, 1499 /// for which `listen` had already been called. Used by 1500 /// e.g. FastCGI and systemd sockets with "Listen = yes". 1501 static SocketServer fromStdin() 1502 { 1503 socket_t socket; 1504 version (Windows) 1505 { 1506 import core.sys.windows.winbase : GetStdHandle, STD_INPUT_HANDLE; 1507 socket = cast(socket_t)GetStdHandle(STD_INPUT_HANDLE); 1508 } 1509 else 1510 socket = cast(socket_t)0; 1511 1512 auto s = new Socket(socket, AddressFamily.UNSPEC); 1513 s.blocking = false; 1514 return new SocketServer(s); 1515 } 1516 1517 /// Callback for when the socket was closed. 1518 void delegate() handleClose; 1519 1520 private void delegate(SocketConnection incoming) acceptHandler; 1521 /// Callback for an incoming connection. 1522 /// Connections will not be accepted unless this handler is set. 1523 @property final void delegate(SocketConnection incoming) handleAccept() { return acceptHandler; } 1524 /// ditto 1525 @property final void handleAccept(void delegate(SocketConnection incoming) value) { acceptHandler = value; updateFlags(); } 1526 } 1527 1528 /// An asynchronous TCP connection server. 1529 class TcpServer : SocketServer 1530 { 1531 protected: 1532 override SocketConnection createConnection(Socket socket) 1533 { 1534 return new TcpConnection(socket); 1535 } 1536 1537 public: 1538 this() 1539 { 1540 } 1541 1542 this(Socket[] sockets...) 1543 { 1544 super(sockets); 1545 } 1546 1547 alias listen = SocketServer.listen; // raise overload 1548 1549 /// Start listening on this socket. 1550 final ushort listen(ushort port, string addr = null) 1551 { 1552 debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port); 1553 //assert(!listening, "Attempting to listen on a listening socket"); 1554 1555 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1556 1557 debug (ASOCKETS) 1558 { 1559 stderr.writefln("Resolved to %s addresses:", addressInfos.length); 1560 foreach (ref addressInfo; addressInfos) 1561 stderr.writefln("- %s", addressInfo); 1562 } 1563 1564 // listen on random ports only on IPv4 for now 1565 if (port == 0) 1566 { 1567 foreach_reverse (i, ref addressInfo; addressInfos) 1568 if (addressInfo.family != AddressFamily.INET) 1569 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1570 } 1571 1572 listen(addressInfos); 1573 1574 foreach (listener; listeners) 1575 { 1576 auto address = listener.conn.localAddress(); 1577 if (address.addressFamily == AddressFamily.INET) 1578 port = to!ushort(address.toPortString()); 1579 } 1580 1581 return port; 1582 } 1583 1584 deprecated("Use SocketServer.fromStdin") 1585 static TcpServer fromStdin() { return cast(TcpServer) cast(void*) SocketServer.fromStdin; } 1586 1587 @property final void handleAccept(void delegate(TcpConnection incoming) value) { super.handleAccept((SocketConnection c) => value(cast(TcpConnection)c)); } 1588 } 1589 1590 // *************************************************************************** 1591 1592 /// An asynchronous UDP stream. 1593 /// UDP does not have connections, so this class encapsulates a socket 1594 /// with a fixed destination (sendto) address, and optionally bound to 1595 /// a local address. 1596 /// Currently received packets' address is not exposed. 1597 class UdpConnection : Connection 1598 { 1599 protected: 1600 this(Socket conn) 1601 { 1602 super(conn); 1603 } 1604 1605 /// Called when a socket is writable. 1606 override void onWritable() 1607 { 1608 //scope(success) updateFlags(); 1609 onWritableImpl(); 1610 updateFlags(); 1611 } 1612 1613 // Work around scope(success) breaking debugger stack traces 1614 final private void onWritableImpl() 1615 { 1616 foreach (priority, ref queue; outQueue) 1617 while (queue.length) 1618 { 1619 auto pdata = queue.ptr; // pointer to first data 1620 1621 auto sent = conn.sendTo(pdata.contents, remoteAddress); 1622 1623 if (sent == Socket.ERROR) 1624 { 1625 if (wouldHaveBlocked()) 1626 return; 1627 else 1628 return onError("send() error: " ~ lastSocketError); 1629 } 1630 else 1631 if (sent < pdata.length) 1632 { 1633 return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length)); 1634 } 1635 else 1636 { 1637 assert(sent == pdata.length); 1638 //debug writefln("[%s] Sent data:", remoteAddress); 1639 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1640 pdata.clear(); 1641 queue = queue[1..$]; 1642 if (queue.length == 0) 1643 queue = null; 1644 } 1645 } 1646 1647 // outQueue is now empty 1648 if (bufferFlushedHandler) 1649 bufferFlushedHandler(); 1650 if (state == ConnectionState.disconnecting) 1651 { 1652 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1653 close(); 1654 } 1655 } 1656 1657 override sizediff_t doSend(in void[] buffer) 1658 { 1659 assert(false); // never called (called only from overridden methods) 1660 } 1661 1662 override sizediff_t doReceive(void[] buffer) 1663 { 1664 return conn.receive(buffer); 1665 } 1666 1667 public: 1668 /// Default constructor 1669 this() 1670 { 1671 debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this); 1672 } 1673 1674 /// Initialize with the given AddressFamily, without binding to an address. 1675 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP) 1676 { 1677 initializeImpl(family, type, protocol); 1678 if (connectHandler) 1679 connectHandler(); 1680 } 1681 1682 final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1683 { 1684 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1685 assert(!conn); 1686 1687 conn = new Socket(family, type, protocol); 1688 conn.blocking = false; 1689 socketManager.register(this); 1690 state = ConnectionState.connected; 1691 updateFlags(); 1692 } 1693 1694 /// Bind to a local address in order to receive packets sent there. 1695 final ushort bind(string host, ushort port) 1696 { 1697 assert(host.length, "Empty host"); 1698 1699 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1700 1701 state = ConnectionState.resolving; 1702 1703 AddressInfo addressInfo; 1704 try 1705 { 1706 auto addresses = getAddress(host, port); 1707 enforce(addresses.length, "No addresses found"); 1708 debug (ASOCKETS) 1709 { 1710 stderr.writefln("Resolved to %s addresses:", addresses.length); 1711 foreach (address; addresses) 1712 stderr.writefln("- %s", address.toString()); 1713 } 1714 1715 Address address; 1716 if (addresses.length > 1) 1717 { 1718 import std.random : uniform; 1719 address = addresses[uniform(0, $)]; 1720 } 1721 else 1722 address = addresses[0]; 1723 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1724 } 1725 catch (SocketException e) 1726 { 1727 onError("Lookup error: " ~ e.msg); 1728 return 0; 1729 } 1730 1731 state = ConnectionState.disconnected; 1732 return bind(addressInfo); 1733 } 1734 1735 /// ditto 1736 final ushort bind(AddressInfo addressInfo) 1737 { 1738 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1739 conn.bind(addressInfo.address); 1740 1741 auto address = conn.localAddress(); 1742 auto port = to!ushort(address.toPortString()); 1743 1744 if (connectHandler) 1745 connectHandler(); 1746 1747 return port; 1748 } 1749 1750 public: 1751 /// Where to send packets to. 1752 Address remoteAddress; 1753 } 1754 1755 /// 1756 unittest 1757 { 1758 auto server = new UdpConnection(); 1759 auto serverPort = server.bind("localhost", 0); 1760 1761 auto client = new UdpConnection(); 1762 client.initialize(server.localAddress.addressFamily); 1763 1764 string[] packets = ["Hello", "there"]; 1765 client.remoteAddress = server.localAddress; 1766 client.send({ 1767 Data[] data; 1768 foreach (packet; packets) 1769 data ~= Data(packet); 1770 return data; 1771 }()); 1772 1773 server.handleReadData = (Data data) 1774 { 1775 assert(data.contents == packets[0]); 1776 packets = packets[1..$]; 1777 if (!packets.length) 1778 { 1779 server.close(); 1780 client.close(); 1781 } 1782 }; 1783 socketManager.loop(); 1784 assert(!packets.length); 1785 } 1786 1787 // *************************************************************************** 1788 1789 /// Base class for a connection adapter. 1790 /// By itself, does nothing. 1791 class ConnectionAdapter : IConnection 1792 { 1793 IConnection next; 1794 1795 this(IConnection next) 1796 { 1797 this.next = next; 1798 next.handleConnect = &onConnect; 1799 next.handleDisconnect = &onDisconnect; 1800 next.handleBufferFlushed = &onBufferFlushed; 1801 } 1802 1803 @property ConnectionState state() { return next.state; } 1804 1805 /// Queue Data for sending. 1806 void send(Data[] data, int priority) 1807 { 1808 next.send(data, priority); 1809 } 1810 1811 alias send = IConnection.send; /// ditto 1812 1813 /// Terminate the connection. 1814 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1815 { 1816 next.disconnect(reason, type); 1817 } 1818 1819 protected void onConnect() 1820 { 1821 if (connectHandler) 1822 connectHandler(); 1823 } 1824 1825 protected void onReadData(Data data) 1826 { 1827 // onReadData should be fired only if readDataHandler is set 1828 readDataHandler(data); 1829 } 1830 1831 protected void onDisconnect(string reason, DisconnectType type) 1832 { 1833 if (disconnectHandler) 1834 disconnectHandler(reason, type); 1835 } 1836 1837 protected void onBufferFlushed() 1838 { 1839 if (bufferFlushedHandler) 1840 bufferFlushedHandler(); 1841 } 1842 1843 /// Callback for when a connection has been established. 1844 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1845 private ConnectHandler connectHandler; 1846 1847 /// Callback setter for when new data is read. 1848 @property void handleReadData(ReadDataHandler value) 1849 { 1850 readDataHandler = value; 1851 next.handleReadData = value ? &onReadData : null ; 1852 } 1853 private ReadDataHandler readDataHandler; 1854 1855 /// Callback setter for when a connection was closed. 1856 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1857 private DisconnectHandler disconnectHandler; 1858 1859 /// Callback setter for when all queued data has been written. 1860 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 1861 private BufferFlushedHandler bufferFlushedHandler; 1862 } 1863 1864 // *************************************************************************** 1865 1866 /// Adapter for connections with a line-based protocol. 1867 /// Splits data stream into delimiter-separated lines. 1868 class LineBufferedAdapter : ConnectionAdapter 1869 { 1870 /// The protocol's line delimiter. 1871 string delimiter = "\r\n"; 1872 1873 this(IConnection next) 1874 { 1875 super(next); 1876 } 1877 1878 /// Append a line to the send buffer. 1879 void send(string line) 1880 { 1881 //super.send(Data(line ~ delimiter)); 1882 // https://issues.dlang.org/show_bug.cgi?id=13985 1883 ConnectionAdapter ca = this; 1884 ca.send(Data(line ~ delimiter)); 1885 } 1886 1887 protected: 1888 /// The receive buffer. 1889 Data inBuffer; 1890 1891 /// Called when data has been received. 1892 final override void onReadData(Data data) 1893 { 1894 import std.string; 1895 auto oldBufferLength = inBuffer.length; 1896 if (oldBufferLength) 1897 inBuffer ~= data; 1898 else 1899 inBuffer = data; 1900 1901 if (delimiter.length == 1) 1902 { 1903 import core.stdc.string; // memchr 1904 1905 char c = delimiter[0]; 1906 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1907 while (p) 1908 { 1909 sizediff_t index = p - inBuffer.ptr; 1910 processLine(index); 1911 1912 p = memchr(inBuffer.ptr, c, inBuffer.length); 1913 } 1914 } 1915 else 1916 { 1917 sizediff_t index; 1918 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1919 while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0) 1920 processLine(index); 1921 } 1922 } 1923 1924 final void processLine(size_t index) 1925 { 1926 auto line = inBuffer[0..index]; 1927 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1928 super.onReadData(line); 1929 } 1930 1931 override void onDisconnect(string reason, DisconnectType type) 1932 { 1933 super.onDisconnect(reason, type); 1934 inBuffer.clear(); 1935 } 1936 } 1937 1938 // *************************************************************************** 1939 1940 /// Fires an event handler or disconnects connections 1941 /// after a period of inactivity. 1942 class TimeoutAdapter : ConnectionAdapter 1943 { 1944 this(IConnection next) 1945 { 1946 debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this); 1947 super(next); 1948 } 1949 1950 void cancelIdleTimeout() 1951 { 1952 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 1953 assert(idleTask !is null); 1954 assert(idleTask.isWaiting()); 1955 idleTask.cancel(); 1956 } 1957 1958 void resumeIdleTimeout() 1959 { 1960 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 1961 assert(idleTask !is null); 1962 assert(!idleTask.isWaiting()); 1963 mainTimer.add(idleTask); 1964 } 1965 1966 final void setIdleTimeout(Duration duration) 1967 { 1968 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 1969 assert(duration > Duration.zero); 1970 1971 // Configure idleTask 1972 if (idleTask is null) 1973 { 1974 idleTask = new TimerTask(duration); 1975 idleTask.handleTask = &onTask_Idle; 1976 } 1977 else 1978 { 1979 if (idleTask.isWaiting()) 1980 idleTask.cancel(); 1981 idleTask.delay = duration; 1982 } 1983 1984 mainTimer.add(idleTask); 1985 } 1986 1987 void markNonIdle() 1988 { 1989 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 1990 if (handleNonIdle) 1991 handleNonIdle(); 1992 if (idleTask && idleTask.isWaiting()) 1993 idleTask.restart(); 1994 } 1995 1996 /// Callback for when a connection has stopped responding. 1997 /// If unset, the connection will be disconnected. 1998 void delegate() handleIdleTimeout; 1999 2000 /// Callback for when a connection is marked as non-idle 2001 /// (when data is received). 2002 void delegate() handleNonIdle; 2003 2004 protected: 2005 override void onConnect() 2006 { 2007 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 2008 markNonIdle(); 2009 super.onConnect(); 2010 } 2011 2012 override void onReadData(Data data) 2013 { 2014 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 2015 markNonIdle(); 2016 super.onReadData(data); 2017 } 2018 2019 override void onDisconnect(string reason, DisconnectType type) 2020 { 2021 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 2022 if (idleTask && idleTask.isWaiting()) 2023 idleTask.cancel(); 2024 super.onDisconnect(reason, type); 2025 } 2026 2027 private: 2028 TimerTask idleTask; // non-null if an idle timeout has been set 2029 2030 final void onTask_Idle(Timer timer, TimerTask task) 2031 { 2032 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this); 2033 if (state == ConnectionState.disconnecting) 2034 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 2035 2036 if (state == ConnectionState.disconnected) 2037 return; 2038 2039 if (handleIdleTimeout) 2040 { 2041 resumeIdleTimeout(); // reschedule (by default) 2042 handleIdleTimeout(); 2043 } 2044 else 2045 disconnect("Time-out", DisconnectType.error); 2046 } 2047 } 2048 2049 // *************************************************************************** 2050 2051 unittest 2052 { 2053 void testTimer() 2054 { 2055 bool fired; 2056 setTimeout({fired = true;}, 10.msecs); 2057 socketManager.loop(); 2058 assert(fired); 2059 } 2060 2061 testTimer(); 2062 }