1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <vladimir@thecybershadow.net> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import core.stdc.stdint : int32_t; 24 25 import std.exception; 26 import std.socket; 27 import std.string : format; 28 public import std.socket : Address, AddressInfo, Socket; 29 30 debug(ASOCKETS) import std.stdio : stderr; 31 debug(PRINTDATA) static import std.stdio; 32 debug(PRINTDATA) import ae.utils.text : hexDump; 33 private import std.conv : to; 34 35 36 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 37 static import ae.utils.array; 38 39 version(LIBEV) 40 { 41 import deimos.ev; 42 pragma(lib, "ev"); 43 } 44 45 version(Windows) 46 { 47 import core.sys.windows.windows : Sleep; 48 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 49 } 50 else 51 enum USE_SLEEP = false; 52 53 /// Flags that determine socket wake-up events. 54 int eventCounter; 55 56 version(LIBEV) 57 { 58 // Watchers are a GenericSocket field (as declared in SocketMixin). 59 // Use one watcher per read and write event. 60 // Start/stop those as higher-level code declares interest in those events. 61 // Use the "data" ev_io field to store the parent GenericSocket address. 62 // Also use the "data" field as a flag to indicate whether the watcher is active 63 // (data is null when the watcher is stopped). 64 65 struct SocketManager 66 { 67 private: 68 size_t count; 69 70 extern(C) 71 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 72 { 73 eventCounter++; 74 auto socket = cast(GenericSocket)w.data; 75 assert(socket, "libev event fired on stopped watcher"); 76 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 77 78 if (revents & EV_READ) 79 socket.onReadable(); 80 else 81 if (revents & EV_WRITE) 82 socket.onWritable(); 83 else 84 assert(false, "Unknown event fired from libev"); 85 86 // TODO? Need to get proper SocketManager instance to call updateTimer on 87 socketManager.updateTimer(false); 88 } 89 90 ev_timer evTimer; 91 MonoTime lastNextEvent = MonoTime.max; 92 93 extern(C) 94 static void timerCallback(ev_loop_t* l, ev_timer* w, int revents) 95 { 96 eventCounter++; 97 debug (ASOCKETS) writefln("Timer callback called."); 98 mainTimer.prod(); 99 100 socketManager.updateTimer(true); 101 debug (ASOCKETS) writefln("Timer callback exiting."); 102 } 103 104 void updateTimer(bool force) 105 { 106 auto nextEvent = mainTimer.getNextEvent(); 107 if (force || lastNextEvent != nextEvent) 108 { 109 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 110 if (nextEvent == MonoTime.max) // Stopping 111 { 112 if (lastNextEvent != MonoTime.max) 113 ev_timer_stop(ev_default_loop(0), &evTimer); 114 } 115 else 116 { 117 auto remaining = mainTimer.getRemainingTime(); 118 while (remaining <= Duration.zero) 119 { 120 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 121 mainTimer.prod(); 122 remaining = mainTimer.getRemainingTime(); 123 } 124 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 125 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 126 if (lastNextEvent == MonoTime.max) // Starting 127 { 128 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 129 ev_timer_start(ev_default_loop(0), &evTimer); 130 } 131 else // Adjusting 132 { 133 evTimer.repeat = tstamp; 134 ev_timer_again(ev_default_loop(0), &evTimer); 135 } 136 } 137 lastNextEvent = nextEvent; 138 } 139 } 140 141 public: 142 /// Register a socket with the manager. 143 void register(GenericSocket socket) 144 { 145 debug (ASOCKETS) writefln("Registering %s", socket); 146 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 147 auto fd = socket.conn.handle; 148 assert(fd, "Must have fd before socket registration"); 149 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 150 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 151 count++; 152 } 153 154 /// Unregister a socket with the manager. 155 void unregister(GenericSocket socket) 156 { 157 debug (ASOCKETS) writefln("Unregistering %s", socket); 158 socket.notifyRead = false; 159 socket.notifyWrite = false; 160 count--; 161 } 162 163 size_t size() 164 { 165 return count; 166 } 167 168 /// Loop continuously until no sockets are left. 169 void loop() 170 { 171 auto evLoop = ev_default_loop(0); 172 enforce(evLoop, "libev initialization failure"); 173 174 updateTimer(true); 175 debug (ASOCKETS) writeln("ev_run"); 176 ev_run(ev_default_loop(0), 0); 177 } 178 } 179 180 private mixin template SocketMixin() 181 { 182 private ev_io evRead, evWrite; 183 184 private void setWatcherState(ref ev_io ev, bool newValue, int event) 185 { 186 if (!conn) 187 { 188 // Can happen when setting delegates before connecting. 189 return; 190 } 191 192 if (newValue && !ev.data) 193 { 194 // Start 195 ev.data = cast(void*)this; 196 ev_io_start(ev_default_loop(0), &ev); 197 } 198 else 199 if (!newValue && ev.data) 200 { 201 // Stop 202 assert(ev.data is cast(void*)this); 203 ev.data = null; 204 ev_io_stop(ev_default_loop(0), &ev); 205 } 206 } 207 208 /// Interested in read notifications (onReadable)? 209 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 210 /// Interested in write notifications (onWritable)? 211 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 212 213 debug ~this() 214 { 215 // The LIBEV SocketManager holds no references to registered sockets. 216 // TODO: Add a doubly-linked list? 217 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 218 } 219 } 220 } 221 else // Use select 222 { 223 struct SocketManager 224 { 225 private: 226 enum FD_SETSIZE = 1024; 227 228 /// List of all sockets to poll. 229 GenericSocket[] sockets; 230 231 /// Debug AA to check for dangling socket references. 232 debug GenericSocket[socket_t] socketHandles; 233 234 void delegate()[] idleHandlers; 235 236 public: 237 /// Register a socket with the manager. 238 void register(GenericSocket conn) 239 { 240 debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1); 241 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 242 sockets ~= conn; 243 244 debug 245 { 246 auto handle = conn.socket.handle; 247 assert(handle != socket_t.init, "Can't register a closed socket"); 248 assert(handle !in socketHandles, "This socket handle is already registered"); 249 socketHandles[handle] = conn; 250 } 251 } 252 253 /// Unregister a socket with the manager. 254 void unregister(GenericSocket conn) 255 { 256 debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 257 258 debug 259 { 260 auto handle = conn.socket.handle; 261 assert(handle != socket_t.init, "Can't unregister a closed socket"); 262 auto pconn = handle in socketHandles; 263 assert(pconn, "This socket handle is not registered"); 264 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 265 socketHandles.remove(handle); 266 } 267 268 foreach (size_t i, GenericSocket j; sockets) 269 if (j is conn) 270 { 271 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 272 return; 273 } 274 assert(false, "Socket not registered"); 275 } 276 277 size_t size() 278 { 279 return sockets.length; 280 } 281 282 /// Loop continuously until no sockets are left. 283 void loop() 284 { 285 debug (ASOCKETS) stderr.writeln("Starting event loop."); 286 287 SocketSet readset, writeset, errorset; 288 size_t sockcount; 289 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 290 readset = new SocketSet(setSize); 291 writeset = new SocketSet(setSize); 292 errorset = new SocketSet(setSize); 293 while (true) 294 { 295 uint minSize = 0; 296 version(Windows) 297 minSize = cast(uint)sockets.length; 298 else 299 { 300 foreach (s; sockets) 301 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 302 minSize = s.socket.handle; 303 } 304 minSize++; 305 306 if (setSize < minSize) 307 { 308 debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 309 setSize = minSize * 2; 310 readset = new SocketSet(setSize); 311 writeset = new SocketSet(setSize); 312 errorset = new SocketSet(setSize); 313 } 314 else 315 { 316 readset.reset(); 317 writeset.reset(); 318 errorset.reset(); 319 } 320 321 sockcount = 0; 322 bool haveActive; 323 debug (ASOCKETS) stderr.writeln("Populating sets"); 324 foreach (GenericSocket conn; sockets) 325 { 326 if (!conn.socket) 327 continue; 328 sockcount++; 329 if (!conn.daemon) 330 haveActive = true; 331 332 debug (ASOCKETS) stderr.writef("\t%s:", conn); 333 if (conn.notifyRead) 334 { 335 readset.add(conn.socket); 336 debug (ASOCKETS) stderr.write(" READ"); 337 } 338 if (conn.notifyWrite) 339 { 340 writeset.add(conn.socket); 341 debug (ASOCKETS) stderr.write(" WRITE"); 342 } 343 errorset.add(conn.socket); 344 debug (ASOCKETS) stderr.writeln(); 345 } 346 debug (ASOCKETS) 347 { 348 stderr.writefln("Sets populated as follows:"); 349 printSets(readset, writeset, errorset); 350 } 351 352 debug (ASOCKETS) stderr.writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...", 353 sockcount, 354 mainTimer.isWaiting() ? "with" : "no", 355 idleHandlers.length, 356 ); 357 if (!haveActive && !mainTimer.isWaiting()) 358 { 359 debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop."); 360 break; 361 } 362 363 debug (ASOCKETS) stderr.flush(); 364 365 int events; 366 if (idleHandlers.length) 367 { 368 if (sockcount==0) 369 events = 0; 370 else 371 events = Socket.select(readset, writeset, errorset, 0.seconds); 372 } 373 else 374 if (USE_SLEEP && sockcount==0) 375 { 376 version(Windows) 377 { 378 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 379 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 380 if (duration <= 0) 381 duration = 1; // Avoid busywait 382 else 383 if (duration > int.max) 384 duration = int.max; 385 Sleep(cast(int)duration); 386 events = 0; 387 } 388 else 389 assert(0); 390 } 391 else 392 if (mainTimer.isWaiting()) 393 events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime()); 394 else 395 events = Socket.select(readset, writeset, errorset); 396 397 debug (ASOCKETS) stderr.writefln("%d events fired.", events); 398 399 if (events > 0) 400 { 401 // Handle just one event at a time, as the first 402 // handler might invalidate select()'s results. 403 handleEvent(readset, writeset, errorset); 404 } 405 else 406 if (idleHandlers.length) 407 { 408 import ae.utils.array; 409 auto handler = idleHandlers.shift(); 410 411 // Rotate the idle handler queue before running it, 412 // in case the handler unregisters itself. 413 idleHandlers ~= handler; 414 415 handler(); 416 } 417 418 // Timers may invalidate our select results, so fire them after processing the latter 419 mainTimer.prod(); 420 421 eventCounter++; 422 } 423 } 424 425 debug (ASOCKETS) 426 void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset) 427 { 428 foreach (GenericSocket conn; sockets) 429 { 430 if (!conn.socket) 431 stderr.writefln("\t\t%s is unset", conn); 432 else 433 { 434 if (readset.isSet(conn.socket)) 435 stderr.writefln("\t\t%s is readable", conn); 436 if (writeset.isSet(conn.socket)) 437 stderr.writefln("\t\t%s is writable", conn); 438 if (errorset.isSet(conn.socket)) 439 stderr.writefln("\t\t%s is errored", conn); 440 } 441 } 442 } 443 444 void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset) 445 { 446 debug (ASOCKETS) 447 { 448 stderr.writefln("\tSelect results:"); 449 printSets(readset, writeset, errorset); 450 } 451 452 foreach (GenericSocket conn; sockets) 453 { 454 if (!conn.socket) 455 continue; 456 457 if (readset.isSet(conn.socket)) 458 { 459 debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn); 460 return conn.onReadable(); 461 } 462 else 463 if (writeset.isSet(conn.socket)) 464 { 465 debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn); 466 return conn.onWritable(); 467 } 468 else 469 if (errorset.isSet(conn.socket)) 470 { 471 debug (ASOCKETS) stderr.writefln("\t%s - calling onError", conn); 472 return conn.onError("select() error: " ~ conn.socket.getErrorText()); 473 } 474 } 475 476 assert(false, "select() reported events available, but no registered sockets are set"); 477 } 478 } 479 480 // Use UFCS to allow removeIdleHandler to have a predicate with context 481 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 482 { 483 foreach (i, idleHandler; socketManager.idleHandlers) 484 assert(handler !is idleHandler); 485 486 socketManager.idleHandlers ~= handler; 487 } 488 489 static bool isFun(T)(T a, T b) { return a is b; } 490 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 491 { 492 foreach (i, idleHandler; socketManager.idleHandlers) 493 if (pred(idleHandler, args)) 494 { 495 import std.algorithm; 496 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 497 return; 498 } 499 assert(false, "No such idle handler"); 500 } 501 502 private mixin template SocketMixin() 503 { 504 /// Interested in read notifications (onReadable)? 505 bool notifyRead; 506 /// Interested in write notifications (onWritable)? 507 bool notifyWrite; 508 } 509 } 510 511 /// The default socket manager. 512 SocketManager socketManager; 513 514 // *************************************************************************** 515 516 /// General methods for an asynchronous socket. 517 abstract class GenericSocket 518 { 519 /// Declares notifyRead and notifyWrite. 520 mixin SocketMixin; 521 522 protected: 523 /// The socket this class wraps. 524 Socket conn; 525 526 protected: 527 /// Retrieve the socket class this class wraps. 528 @property final Socket socket() 529 { 530 return conn; 531 } 532 533 void onReadable() 534 { 535 } 536 537 void onWritable() 538 { 539 } 540 541 void onError(string reason) 542 { 543 } 544 545 public: 546 /// allow getting the address of connections that are already disconnected 547 private Address cachedLocalAddress, cachedRemoteAddress; 548 549 /// Don't block the process from exiting. 550 /// TODO: Not implemented with libev 551 bool daemon; 552 553 final @property Address localAddress() 554 { 555 if (cachedLocalAddress !is null) 556 return cachedLocalAddress; 557 else 558 if (conn is null) 559 return null; 560 else 561 return cachedLocalAddress = conn.localAddress(); 562 } 563 564 final @property string localAddressStr() nothrow 565 { 566 try 567 { 568 auto a = localAddress; 569 return a is null ? "[null address]" : a.toString(); 570 } 571 catch (Exception e) 572 return "[error: " ~ e.msg ~ "]"; 573 } 574 575 final @property Address remoteAddress() 576 { 577 if (cachedRemoteAddress !is null) 578 return cachedRemoteAddress; 579 else 580 if (conn is null) 581 return null; 582 else 583 return cachedRemoteAddress = conn.remoteAddress(); 584 } 585 586 final @property string remoteAddressStr() nothrow 587 { 588 try 589 { 590 auto a = remoteAddress; 591 return a is null ? "[null address]" : a.toString(); 592 } 593 catch (Exception e) 594 return "[error: " ~ e.msg ~ "]"; 595 } 596 597 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 598 { 599 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 600 if (enabled) 601 { 602 try 603 conn.setKeepAlive(time, interval); 604 catch (SocketException) 605 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 606 } 607 else 608 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 609 } 610 611 override string toString() const 612 { 613 import std.string; 614 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 615 } 616 } 617 618 // *************************************************************************** 619 620 enum DisconnectType 621 { 622 requested, // initiated by the application 623 graceful, // peer gracefully closed the connection 624 error // abnormal network condition 625 } 626 627 enum ConnectionState 628 { 629 /// The initial state, or the state after a disconnect was fully processed. 630 disconnected, 631 632 /// Name resolution. Currently done synchronously. 633 resolving, 634 635 /// A connection attempt is in progress. 636 connecting, 637 638 /// A connection is established. 639 connected, 640 641 /// Disconnecting in progress. No data can be sent or received at this point. 642 /// We are waiting for queued data to be actually sent before disconnecting. 643 disconnecting, 644 } 645 646 /// Common interface for connections and adapters. 647 interface IConnection 648 { 649 enum MAX_PRIORITY = 4; 650 enum DEFAULT_PRIORITY = 2; 651 652 static const defaultDisconnectReason = "Software closed the connection"; 653 654 /// Get connection state. 655 @property ConnectionState state(); 656 657 /// Has a connection been established? 658 deprecated final @property bool connected() { return state == ConnectionState.connected; } 659 660 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 661 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 662 663 /// Queue Data for sending. 664 void send(Data[] data, int priority = DEFAULT_PRIORITY); 665 666 /// ditto 667 final void send(Data datum, int priority = DEFAULT_PRIORITY) 668 { 669 Data[1] data; 670 data[0] = datum; 671 this.send(data, priority); 672 data[] = Data.init; 673 } 674 675 /// Terminate the connection. 676 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 677 678 /// Callback setter for when a connection has been established 679 /// (if applicable). 680 alias void delegate() ConnectHandler; 681 @property void handleConnect(ConnectHandler value); /// ditto 682 683 /// Callback setter for when new data is read. 684 alias void delegate(Data data) ReadDataHandler; 685 @property void handleReadData(ReadDataHandler value); /// ditto 686 687 /// Callback setter for when a connection was closed. 688 alias void delegate(string reason, DisconnectType type) DisconnectHandler; 689 @property void handleDisconnect(DisconnectHandler value); /// ditto 690 691 /// Callback setter for when all queued data has been sent. 692 alias void delegate() BufferFlushedHandler; 693 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 694 } 695 696 // *************************************************************************** 697 698 class Connection : GenericSocket, IConnection 699 { 700 private: 701 /// Blocks of data larger than this value are passed as unmanaged memory 702 /// (in Data objects). Blocks smaller than this value will be reallocated 703 /// on the managed heap. The disadvantage of placing large objects on the 704 /// managed heap is false pointers; the disadvantage of using Data for 705 /// small objects is wasted slack space due to the page size alignment 706 /// requirement. 707 enum UNMANAGED_THRESHOLD = 256; 708 709 ConnectionState _state; 710 final @property ConnectionState state(ConnectionState value) { return _state = value; } 711 712 public: 713 /// Get connection state. 714 override @property ConnectionState state() { return _state; } 715 716 protected: 717 abstract sizediff_t doSend(in void[] buffer); 718 abstract sizediff_t doReceive(void[] buffer); 719 720 /// The send buffers. 721 Data[][MAX_PRIORITY+1] outQueue; 722 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 723 int partiallySent = -1; 724 725 /// Constructor used by a ServerSocket for new connections 726 this(Socket conn) 727 { 728 this(); 729 this.conn = conn; 730 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 731 if (conn) 732 socketManager.register(this); 733 updateFlags(); 734 } 735 736 final void updateFlags() 737 { 738 if (state == ConnectionState.connecting) 739 notifyWrite = true; 740 else 741 notifyWrite = writePending; 742 743 notifyRead = state == ConnectionState.connected && readDataHandler; 744 debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite); 745 } 746 747 /// Called when a socket is readable. 748 override void onReadable() 749 { 750 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 751 static ubyte[0x10000] inBuffer = void; 752 auto received = doReceive(inBuffer); 753 754 if (received == 0) 755 return disconnect("Connection closed", DisconnectType.graceful); 756 757 if (received == Socket.ERROR) 758 { 759 // if (wouldHaveBlocked) 760 // { 761 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 762 // return; 763 // } 764 // else 765 onError("recv() error: " ~ lastSocketError); 766 } 767 else 768 { 769 debug (PRINTDATA) 770 { 771 std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress); 772 std.stdio.write(hexDump(inBuffer[0 .. received])); 773 std.stdio.stdout.flush(); 774 } 775 776 if (state == ConnectionState.disconnecting) 777 { 778 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this); 779 } 780 else 781 if (!readDataHandler) 782 { 783 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this); 784 } 785 else 786 { 787 // Currently, unlike the D1 version of this module, 788 // we will always reallocate read network data. 789 // This disfavours code which doesn't need to store 790 // read data after processing it, but otherwise 791 // makes things simpler and safer all around. 792 793 if (received < UNMANAGED_THRESHOLD) 794 { 795 // Copy to the managed heap 796 readDataHandler(Data(inBuffer[0 .. received].dup)); 797 } 798 else 799 { 800 // Copy to unmanaged memory 801 readDataHandler(Data(inBuffer[0 .. received], true)); 802 } 803 } 804 } 805 } 806 807 /// Called when an error occurs on the socket. 808 override void onError(string reason) 809 { 810 if (state == ConnectionState.disconnecting) 811 { 812 debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 813 return close(); 814 } 815 816 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 817 disconnect("Socket error: " ~ reason, DisconnectType.error); 818 } 819 820 this() 821 { 822 } 823 824 public: 825 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 826 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 827 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 828 { 829 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 830 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected, "Attempting to disconnect on a %s socket".format(state)); 831 832 if (writePending) 833 { 834 if (type==DisconnectType.requested) 835 { 836 assert(conn, "Attempting to disconnect on an uninitialized socket"); 837 // queue disconnect after all data is sent 838 debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 839 state = ConnectionState.disconnecting; 840 //setIdleTimeout(30.seconds); 841 if (disconnectHandler) 842 disconnectHandler(reason, type); 843 updateFlags(); 844 return; 845 } 846 else 847 discardQueues(); 848 } 849 850 debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 851 852 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 853 close(); 854 else 855 { 856 assert(conn is null, "Registered but %s socket".format(state)); 857 if (state == ConnectionState.resolving) 858 state = ConnectionState.disconnected; 859 } 860 861 if (disconnectHandler) 862 disconnectHandler(reason, type); 863 updateFlags(); 864 } 865 866 private final void close() 867 { 868 assert(conn, "Attempting to close an unregistered socket"); 869 socketManager.unregister(this); 870 conn.close(); 871 conn = null; 872 outQueue[] = null; 873 state = ConnectionState.disconnected; 874 } 875 876 /// Append data to the send buffer. 877 void send(Data[] data, int priority = DEFAULT_PRIORITY) 878 { 879 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 880 outQueue[priority] ~= data; 881 notifyWrite = true; // Fast updateFlags() 882 883 debug (PRINTDATA) 884 { 885 std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress); 886 foreach (datum; data) 887 if (datum.length) 888 std.stdio.write(hexDump(datum.contents)); 889 else 890 std.stdio.writeln("(empty Data)"); 891 std.stdio.stdout.flush(); 892 } 893 } 894 895 /// ditto 896 alias send = IConnection.send; 897 898 final void clearQueue(int priority) 899 { 900 if (priority == partiallySent) 901 { 902 assert(outQueue[priority].length > 0); 903 outQueue[priority] = outQueue[priority][0..1]; 904 } 905 else 906 outQueue[priority] = null; 907 updateFlags(); 908 } 909 910 /// Clears all queues, even partially sent content. 911 private final void discardQueues() 912 { 913 foreach (priority; 0..MAX_PRIORITY+1) 914 outQueue[priority] = null; 915 partiallySent = -1; 916 updateFlags(); 917 } 918 919 @property 920 final bool writePending() 921 { 922 foreach (queue; outQueue) 923 if (queue.length) 924 return true; 925 return false; 926 } 927 928 final bool queuePresent(int priority = DEFAULT_PRIORITY) 929 { 930 if (priority == partiallySent) 931 { 932 assert(outQueue[priority].length > 0); 933 return outQueue[priority].length > 1; 934 } 935 else 936 return outQueue[priority].length > 0; 937 } 938 939 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 940 { 941 return outQueue[priority].length; 942 } 943 944 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 945 { 946 size_t bytes; 947 foreach (datum; outQueue[priority]) 948 bytes += datum.length; 949 return bytes; 950 } 951 952 public: 953 private ConnectHandler connectHandler; 954 /// Callback for when a connection has been established. 955 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 956 957 private ReadDataHandler readDataHandler; 958 /// Callback for incoming data. 959 /// Data will not be received unless this handler is set. 960 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 961 962 private DisconnectHandler disconnectHandler; 963 /// Callback for when a connection was closed. 964 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 965 966 /// Callback setter for when all queued data has been sent. 967 private BufferFlushedHandler bufferFlushedHandler; 968 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 969 } 970 971 class StreamConnection : Connection 972 { 973 protected: 974 this() 975 { 976 super(); 977 } 978 979 /// Called when a socket is writable. 980 override void onWritable() 981 { 982 //scope(success) updateFlags(); 983 onWritableImpl(); 984 updateFlags(); 985 } 986 987 // Work around scope(success) breaking debugger stack traces 988 final private void onWritableImpl() 989 { 990 debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state); 991 if (state == ConnectionState.connecting) 992 { 993 int32_t error; 994 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 995 if (error) 996 return disconnect(formatSocketError(error), DisconnectType.error); 997 998 state = ConnectionState.connected; 999 1000 //debug writefln("[%s] Connected", remoteAddress); 1001 try 1002 setKeepAlive(); 1003 catch (Exception e) 1004 return disconnect(e.msg, DisconnectType.error); 1005 if (connectHandler) 1006 connectHandler(); 1007 return; 1008 } 1009 //debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1010 1011 foreach (sendPartial; [true, false]) 1012 foreach (int priority, ref queue; outQueue) 1013 while (queue.length && (!sendPartial || priority == partiallySent)) 1014 { 1015 assert(partiallySent == -1 || partiallySent == priority); 1016 1017 auto pdata = queue.ptr; // pointer to first data 1018 1019 ptrdiff_t sent = 0; 1020 if (pdata.length) 1021 { 1022 sent = doSend(pdata.contents); 1023 debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 1024 } 1025 else 1026 { 1027 debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this); 1028 } 1029 1030 if (sent == Socket.ERROR) 1031 { 1032 if (wouldHaveBlocked()) 1033 return; 1034 else 1035 return onError("send() error: " ~ lastSocketError); 1036 } 1037 else 1038 if (sent < pdata.length) 1039 { 1040 if (sent > 0) 1041 { 1042 *pdata = (*pdata)[sent..pdata.length]; 1043 partiallySent = priority; 1044 } 1045 return; 1046 } 1047 else 1048 { 1049 assert(sent == pdata.length); 1050 //debug writefln("[%s] Sent data:", remoteAddress); 1051 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1052 pdata.clear(); 1053 queue = queue[1..$]; 1054 partiallySent = -1; 1055 if (queue.length == 0) 1056 queue = null; 1057 } 1058 } 1059 1060 // outQueue is now empty 1061 if (bufferFlushedHandler) 1062 bufferFlushedHandler(); 1063 if (state == ConnectionState.disconnecting) 1064 { 1065 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1066 close(); 1067 } 1068 } 1069 1070 public: 1071 this(Socket conn) 1072 { 1073 super(conn); 1074 } 1075 } 1076 1077 // *************************************************************************** 1078 1079 /// A POSIX file stream. 1080 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1081 /// Does not dup the given file descriptor, so "disconnecting" this connection 1082 /// will close it. 1083 version (Posix) 1084 class FileConnection : StreamConnection 1085 { 1086 this(int fileno) 1087 { 1088 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1089 conn.blocking = false; 1090 super(conn); 1091 } 1092 1093 protected: 1094 import core.sys.posix.unistd : read, write; 1095 1096 override sizediff_t doSend(in void[] buffer) 1097 { 1098 return write(socket.handle, buffer.ptr, buffer.length); 1099 } 1100 1101 override sizediff_t doReceive(void[] buffer) 1102 { 1103 return read(socket.handle, buffer.ptr, buffer.length); 1104 } 1105 } 1106 1107 /// Separates reading and writing, e.g. for stdin/stdout. 1108 class Duplex : IConnection 1109 { 1110 IConnection reader, writer; 1111 1112 this(IConnection reader, IConnection writer) 1113 { 1114 this.reader = reader; 1115 this.writer = writer; 1116 reader.handleConnect = &onConnect; 1117 writer.handleConnect = &onConnect; 1118 reader.handleDisconnect = &onDisconnect; 1119 writer.handleDisconnect = &onDisconnect; 1120 } 1121 1122 @property ConnectionState state() 1123 { 1124 if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting) 1125 return ConnectionState.disconnecting; 1126 else 1127 return reader.state < writer.state ? reader.state : writer.state; 1128 } 1129 1130 /// Queue Data for sending. 1131 void send(Data[] data, int priority) 1132 { 1133 writer.send(data, priority); 1134 } 1135 1136 alias send = IConnection.send; /// ditto 1137 1138 /// Terminate the connection. 1139 /// Note: this isn't quite fleshed out - applications may want to 1140 /// wait and send some more data even after stdin is closed, but 1141 /// such an interface can't be fitted into an IConnection 1142 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1143 { 1144 if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting) 1145 reader.disconnect(reason, type); 1146 if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting) 1147 writer.disconnect(reason, type); 1148 debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1149 } 1150 1151 protected void onConnect() 1152 { 1153 if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected) 1154 connectHandler(); 1155 } 1156 1157 protected void onDisconnect(string reason, DisconnectType type) 1158 { 1159 debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1160 if (disconnectHandler) 1161 { 1162 disconnectHandler(reason, type); 1163 disconnectHandler = null; // don't call it twice for the other connection 1164 } 1165 // It is our responsibility to disconnect the other connection 1166 // Use DisconnectType.requested to ensure that any written data is flushed 1167 disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested); 1168 } 1169 1170 /// Callback for when a connection has been established. 1171 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1172 private ConnectHandler connectHandler; 1173 1174 /// Callback setter for when new data is read. 1175 @property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; } 1176 1177 /// Callback setter for when a connection was closed. 1178 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1179 private DisconnectHandler disconnectHandler; 1180 1181 /// Callback setter for when all queued data has been written. 1182 @property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; } 1183 } 1184 1185 unittest { if (false) new Duplex(null, null); } 1186 1187 // *************************************************************************** 1188 1189 /// An asynchronous TCP connection. 1190 class TcpConnection : StreamConnection 1191 { 1192 protected: 1193 /// Queue of addresses to try connecting to. 1194 AddressInfo[] addressQueue; 1195 1196 this(Socket conn) 1197 { 1198 super(conn); 1199 } 1200 1201 override sizediff_t doSend(in void[] buffer) 1202 { 1203 return conn.send(buffer); 1204 } 1205 1206 override sizediff_t doReceive(void[] buffer) 1207 { 1208 return conn.receive(buffer); 1209 } 1210 1211 final void tryNextAddress() 1212 { 1213 assert(state == ConnectionState.connecting); 1214 auto addressInfo = addressQueue[0]; 1215 addressQueue = addressQueue[1..$]; 1216 1217 try 1218 { 1219 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1220 conn.blocking = false; 1221 1222 socketManager.register(this); 1223 updateFlags(); 1224 debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString()); 1225 conn.connect(addressInfo.address); 1226 } 1227 catch (SocketException e) 1228 return onError("Connect error: " ~ e.msg); 1229 } 1230 1231 /// Called when an error occurs on the socket. 1232 override void onError(string reason) 1233 { 1234 if (state == ConnectionState.connecting && addressQueue.length) 1235 { 1236 socketManager.unregister(this); 1237 conn.close(); 1238 conn = null; 1239 1240 return tryNextAddress(); 1241 } 1242 1243 super.onError(reason); 1244 } 1245 1246 public: 1247 /// Default constructor 1248 this() 1249 { 1250 debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this); 1251 } 1252 1253 /// Start establishing a connection. 1254 final void connect(string host, ushort port) 1255 { 1256 assert(host.length, "Empty host"); 1257 assert(port, "No port specified"); 1258 1259 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1260 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1261 1262 state = ConnectionState.resolving; 1263 1264 AddressInfo[] addressInfos; 1265 try 1266 { 1267 auto addresses = getAddress(host, port); 1268 enforce(addresses.length, "No addresses found"); 1269 debug (ASOCKETS) 1270 { 1271 stderr.writefln("Resolved to %s addresses:", addresses.length); 1272 foreach (address; addresses) 1273 stderr.writefln("- %s", address.toString()); 1274 } 1275 1276 if (addresses.length > 1) 1277 { 1278 import std.random : randomShuffle; 1279 randomShuffle(addresses); 1280 } 1281 1282 foreach (address; addresses) 1283 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1284 } 1285 catch (SocketException e) 1286 return onError("Lookup error: " ~ e.msg); 1287 1288 state = ConnectionState.disconnected; 1289 connect(addressInfos); 1290 } 1291 1292 /// ditto 1293 final void connect(AddressInfo[] addresses) 1294 { 1295 assert(addresses.length, "No addresses specified"); 1296 1297 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1298 assert(!conn); 1299 1300 addressQueue = addresses; 1301 state = ConnectionState.connecting; 1302 tryNextAddress(); 1303 } 1304 } 1305 1306 // *************************************************************************** 1307 1308 /// An asynchronous TCP connection server. 1309 final class TcpServer 1310 { 1311 private: 1312 /// Class that actually performs listening on a certain address family 1313 final class Listener : GenericSocket 1314 { 1315 this(Socket conn) 1316 { 1317 debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this); 1318 this.conn = conn; 1319 socketManager.register(this); 1320 } 1321 1322 /// Called when a socket is readable. 1323 override void onReadable() 1324 { 1325 debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this); 1326 Socket acceptSocket = conn.accept(); 1327 acceptSocket.blocking = false; 1328 if (handleAccept) 1329 { 1330 TcpConnection connection = new TcpConnection(acceptSocket); 1331 debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress); 1332 connection.setKeepAlive(); 1333 //assert(connection.connected); 1334 //connection.connected = true; 1335 acceptHandler(connection); 1336 } 1337 else 1338 acceptSocket.close(); 1339 } 1340 1341 /// Called when a socket is writable. 1342 override void onWritable() 1343 { 1344 } 1345 1346 /// Called when an error occurs on the socket. 1347 override void onError(string reason) 1348 { 1349 close(); // call parent 1350 } 1351 1352 void closeListener() 1353 { 1354 assert(conn); 1355 socketManager.unregister(this); 1356 conn.close(); 1357 conn = null; 1358 } 1359 } 1360 1361 /// Whether the socket is listening. 1362 bool listening; 1363 /// Listener instances 1364 Listener[] listeners; 1365 1366 final void updateFlags() 1367 { 1368 foreach (listener; listeners) 1369 listener.notifyRead = handleAccept !is null; 1370 } 1371 1372 public: 1373 /// Start listening on this socket. 1374 ushort listen(ushort port, string addr = null) 1375 { 1376 debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port); 1377 //assert(!listening, "Attempting to listen on a listening socket"); 1378 1379 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1380 1381 debug (ASOCKETS) 1382 { 1383 stderr.writefln("Resolved to %s addresses:", addressInfos.length); 1384 foreach (ref addressInfo; addressInfos) 1385 stderr.writefln("- %s", addressInfo); 1386 } 1387 1388 // listen on random ports only on IPv4 for now 1389 if (port == 0) 1390 { 1391 foreach_reverse (i, ref addressInfo; addressInfos) 1392 if (addressInfo.family != AddressFamily.INET) 1393 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1394 } 1395 1396 listen(addressInfos); 1397 1398 foreach (listener; listeners) 1399 { 1400 auto address = listener.conn.localAddress(); 1401 if (address.addressFamily == AddressFamily.INET) 1402 port = to!ushort(address.toPortString()); 1403 } 1404 1405 return port; 1406 } 1407 1408 /// ditto 1409 void listen(AddressInfo[] addressInfos) 1410 { 1411 foreach (ref addressInfo; addressInfos) 1412 { 1413 try 1414 { 1415 Socket conn = new Socket(addressInfo); 1416 conn.blocking = false; 1417 if (addressInfo.family == AddressFamily.INET6) 1418 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1419 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1420 1421 conn.bind(addressInfo.address); 1422 conn.listen(8); 1423 1424 listeners ~= new Listener(conn); 1425 } 1426 catch (SocketException e) 1427 { 1428 debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1429 debug(ASOCKETS) stderr.writeln(e.msg); 1430 } 1431 } 1432 1433 if (listeners.length==0) 1434 throw new Exception("Unable to bind service"); 1435 1436 listening = true; 1437 1438 updateFlags(); 1439 } 1440 1441 @property Address[] localAddresses() 1442 { 1443 Address[] result; 1444 foreach (listener; listeners) 1445 result ~= listener.localAddress; 1446 return result; 1447 } 1448 1449 @property bool isListening() 1450 { 1451 return listening; 1452 } 1453 1454 /// Stop listening on this socket. 1455 void close() 1456 { 1457 foreach (listener;listeners) 1458 listener.closeListener(); 1459 listeners = null; 1460 listening = false; 1461 if (handleClose) 1462 handleClose(); 1463 } 1464 1465 public: 1466 /// Callback for when the socket was closed. 1467 void delegate() handleClose; 1468 1469 private void delegate(TcpConnection incoming) acceptHandler; 1470 /// Callback for an incoming connection. 1471 /// Connections will not be accepted unless this handler is set. 1472 @property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; } 1473 /// ditto 1474 @property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); } 1475 } 1476 1477 // *************************************************************************** 1478 1479 /// An asynchronous UDP stream. 1480 /// UDP does not have connections, so this class encapsulates a socket 1481 /// with a fixed destination (sendto) address, and optionally bound to 1482 /// a local address. 1483 /// Currently received packets' address is not exposed. 1484 class UdpConnection : Connection 1485 { 1486 protected: 1487 this(Socket conn) 1488 { 1489 super(conn); 1490 } 1491 1492 /// Called when a socket is writable. 1493 override void onWritable() 1494 { 1495 //scope(success) updateFlags(); 1496 onWritableImpl(); 1497 updateFlags(); 1498 } 1499 1500 // Work around scope(success) breaking debugger stack traces 1501 final private void onWritableImpl() 1502 { 1503 foreach (priority, ref queue; outQueue) 1504 while (queue.length) 1505 { 1506 auto pdata = queue.ptr; // pointer to first data 1507 1508 auto sent = conn.sendTo(pdata.contents, remoteAddress); 1509 1510 if (sent == Socket.ERROR) 1511 { 1512 if (wouldHaveBlocked()) 1513 return; 1514 else 1515 return onError("send() error: " ~ lastSocketError); 1516 } 1517 else 1518 if (sent < pdata.length) 1519 { 1520 return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length)); 1521 } 1522 else 1523 { 1524 assert(sent == pdata.length); 1525 //debug writefln("[%s] Sent data:", remoteAddress); 1526 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1527 pdata.clear(); 1528 queue = queue[1..$]; 1529 if (queue.length == 0) 1530 queue = null; 1531 } 1532 } 1533 1534 // outQueue is now empty 1535 if (bufferFlushedHandler) 1536 bufferFlushedHandler(); 1537 if (state == ConnectionState.disconnecting) 1538 { 1539 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1540 close(); 1541 } 1542 } 1543 1544 override sizediff_t doSend(in void[] buffer) 1545 { 1546 assert(false); // never called (called only from overridden methods) 1547 } 1548 1549 override sizediff_t doReceive(void[] buffer) 1550 { 1551 return conn.receive(buffer); 1552 } 1553 1554 public: 1555 /// Default constructor 1556 this() 1557 { 1558 debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this); 1559 } 1560 1561 /// Initialize with the given AddressFamily, without binding to an address. 1562 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP) 1563 { 1564 initializeImpl(family, type, protocol); 1565 if (connectHandler) 1566 connectHandler(); 1567 } 1568 1569 final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1570 { 1571 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1572 assert(!conn); 1573 1574 conn = new Socket(family, type, protocol); 1575 conn.blocking = false; 1576 socketManager.register(this); 1577 state = ConnectionState.connected; 1578 updateFlags(); 1579 } 1580 1581 /// Bind to a local address in order to receive packets sent there. 1582 final ushort bind(string host, ushort port) 1583 { 1584 assert(host.length, "Empty host"); 1585 1586 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1587 1588 state = ConnectionState.resolving; 1589 1590 AddressInfo addressInfo; 1591 try 1592 { 1593 auto addresses = getAddress(host, port); 1594 enforce(addresses.length, "No addresses found"); 1595 debug (ASOCKETS) 1596 { 1597 stderr.writefln("Resolved to %s addresses:", addresses.length); 1598 foreach (address; addresses) 1599 stderr.writefln("- %s", address.toString()); 1600 } 1601 1602 Address address; 1603 if (addresses.length > 1) 1604 { 1605 import std.random : uniform; 1606 address = addresses[uniform(0, $)]; 1607 } 1608 else 1609 address = addresses[0]; 1610 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1611 } 1612 catch (SocketException e) 1613 { 1614 onError("Lookup error: " ~ e.msg); 1615 return 0; 1616 } 1617 1618 state = ConnectionState.disconnected; 1619 return bind(addressInfo); 1620 } 1621 1622 /// ditto 1623 final ushort bind(AddressInfo addressInfo) 1624 { 1625 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1626 conn.bind(addressInfo.address); 1627 1628 auto address = conn.localAddress(); 1629 auto port = to!ushort(address.toPortString()); 1630 1631 if (connectHandler) 1632 connectHandler(); 1633 1634 return port; 1635 } 1636 1637 public: 1638 /// Where to send packets to. 1639 Address remoteAddress; 1640 } 1641 1642 /// 1643 unittest 1644 { 1645 auto server = new UdpConnection(); 1646 auto serverPort = server.bind("localhost", 0); 1647 1648 auto client = new UdpConnection(); 1649 client.initialize(server.localAddress.addressFamily); 1650 1651 string[] packets = ["Hello", "there"]; 1652 client.remoteAddress = server.localAddress; 1653 client.send({ 1654 Data[] data; 1655 foreach (packet; packets) 1656 data ~= Data(packet); 1657 return data; 1658 }()); 1659 1660 server.handleReadData = (Data data) 1661 { 1662 assert(data.contents == packets[0]); 1663 packets = packets[1..$]; 1664 if (!packets.length) 1665 { 1666 server.close(); 1667 client.close(); 1668 } 1669 }; 1670 socketManager.loop(); 1671 assert(!packets.length); 1672 } 1673 1674 // *************************************************************************** 1675 1676 /// Base class for a connection adapter. 1677 /// By itself, does nothing. 1678 class ConnectionAdapter : IConnection 1679 { 1680 IConnection next; 1681 1682 this(IConnection next) 1683 { 1684 this.next = next; 1685 next.handleConnect = &onConnect; 1686 next.handleDisconnect = &onDisconnect; 1687 next.handleBufferFlushed = &onBufferFlushed; 1688 } 1689 1690 @property ConnectionState state() { return next.state; } 1691 1692 /// Queue Data for sending. 1693 void send(Data[] data, int priority) 1694 { 1695 next.send(data, priority); 1696 } 1697 1698 alias send = IConnection.send; /// ditto 1699 1700 /// Terminate the connection. 1701 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1702 { 1703 next.disconnect(reason, type); 1704 } 1705 1706 protected void onConnect() 1707 { 1708 if (connectHandler) 1709 connectHandler(); 1710 } 1711 1712 protected void onReadData(Data data) 1713 { 1714 // onReadData should be fired only if readDataHandler is set 1715 readDataHandler(data); 1716 } 1717 1718 protected void onDisconnect(string reason, DisconnectType type) 1719 { 1720 if (disconnectHandler) 1721 disconnectHandler(reason, type); 1722 } 1723 1724 protected void onBufferFlushed() 1725 { 1726 if (bufferFlushedHandler) 1727 bufferFlushedHandler(); 1728 } 1729 1730 /// Callback for when a connection has been established. 1731 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1732 private ConnectHandler connectHandler; 1733 1734 /// Callback setter for when new data is read. 1735 @property void handleReadData(ReadDataHandler value) 1736 { 1737 readDataHandler = value; 1738 next.handleReadData = value ? &onReadData : null ; 1739 } 1740 private ReadDataHandler readDataHandler; 1741 1742 /// Callback setter for when a connection was closed. 1743 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1744 private DisconnectHandler disconnectHandler; 1745 1746 /// Callback setter for when all queued data has been written. 1747 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 1748 private BufferFlushedHandler bufferFlushedHandler; 1749 } 1750 1751 // *************************************************************************** 1752 1753 /// Adapter for connections with a line-based protocol. 1754 /// Splits data stream into delimiter-separated lines. 1755 class LineBufferedAdapter : ConnectionAdapter 1756 { 1757 /// The protocol's line delimiter. 1758 string delimiter = "\r\n"; 1759 1760 this(IConnection next) 1761 { 1762 super(next); 1763 } 1764 1765 /// Append a line to the send buffer. 1766 void send(string line) 1767 { 1768 //super.send(Data(line ~ delimiter)); 1769 // https://issues.dlang.org/show_bug.cgi?id=13985 1770 ConnectionAdapter ca = this; 1771 ca.send(Data(line ~ delimiter)); 1772 } 1773 1774 protected: 1775 /// The receive buffer. 1776 Data inBuffer; 1777 1778 /// Called when data has been received. 1779 final override void onReadData(Data data) 1780 { 1781 import std.string; 1782 auto oldBufferLength = inBuffer.length; 1783 if (oldBufferLength) 1784 inBuffer ~= data; 1785 else 1786 inBuffer = data; 1787 1788 if (delimiter.length == 1) 1789 { 1790 import core.stdc.string; // memchr 1791 1792 char c = delimiter[0]; 1793 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1794 while (p) 1795 { 1796 sizediff_t index = p - inBuffer.ptr; 1797 processLine(index); 1798 1799 p = memchr(inBuffer.ptr, c, inBuffer.length); 1800 } 1801 } 1802 else 1803 { 1804 sizediff_t index; 1805 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1806 while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0) 1807 processLine(index); 1808 } 1809 } 1810 1811 final void processLine(size_t index) 1812 { 1813 auto line = inBuffer[0..index]; 1814 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1815 super.onReadData(line); 1816 } 1817 1818 override void onDisconnect(string reason, DisconnectType type) 1819 { 1820 super.onDisconnect(reason, type); 1821 inBuffer.clear(); 1822 } 1823 } 1824 1825 // *************************************************************************** 1826 1827 /// Fires an event handler or disconnects connections 1828 /// after a period of inactivity. 1829 class TimeoutAdapter : ConnectionAdapter 1830 { 1831 this(IConnection next) 1832 { 1833 debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this); 1834 super(next); 1835 } 1836 1837 void cancelIdleTimeout() 1838 { 1839 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 1840 assert(idleTask !is null); 1841 assert(idleTask.isWaiting()); 1842 idleTask.cancel(); 1843 } 1844 1845 void resumeIdleTimeout() 1846 { 1847 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 1848 assert(idleTask !is null); 1849 assert(!idleTask.isWaiting()); 1850 mainTimer.add(idleTask); 1851 } 1852 1853 final void setIdleTimeout(Duration duration) 1854 { 1855 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 1856 assert(duration > Duration.zero); 1857 1858 // Configure idleTask 1859 if (idleTask is null) 1860 { 1861 idleTask = new TimerTask(duration); 1862 idleTask.handleTask = &onTask_Idle; 1863 } 1864 else 1865 { 1866 if (idleTask.isWaiting()) 1867 idleTask.cancel(); 1868 idleTask.delay = duration; 1869 } 1870 1871 mainTimer.add(idleTask); 1872 } 1873 1874 void markNonIdle() 1875 { 1876 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 1877 if (handleNonIdle) 1878 handleNonIdle(); 1879 if (idleTask && idleTask.isWaiting()) 1880 idleTask.restart(); 1881 } 1882 1883 /// Callback for when a connection has stopped responding. 1884 /// If unset, the connection will be disconnected. 1885 void delegate() handleIdleTimeout; 1886 1887 /// Callback for when a connection is marked as non-idle 1888 /// (when data is received). 1889 void delegate() handleNonIdle; 1890 1891 protected: 1892 override void onConnect() 1893 { 1894 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 1895 markNonIdle(); 1896 super.onConnect(); 1897 } 1898 1899 override void onReadData(Data data) 1900 { 1901 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 1902 markNonIdle(); 1903 super.onReadData(data); 1904 } 1905 1906 override void onDisconnect(string reason, DisconnectType type) 1907 { 1908 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 1909 if (idleTask && idleTask.isWaiting()) 1910 idleTask.cancel(); 1911 super.onDisconnect(reason, type); 1912 } 1913 1914 private: 1915 TimerTask idleTask; // non-null if an idle timeout has been set 1916 1917 final void onTask_Idle(Timer timer, TimerTask task) 1918 { 1919 if (state == ConnectionState.disconnecting) 1920 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 1921 1922 if (state == ConnectionState.disconnected) 1923 return; 1924 1925 if (handleIdleTimeout) 1926 { 1927 resumeIdleTimeout(); // reschedule (by default) 1928 handleIdleTimeout(); 1929 } 1930 else 1931 disconnect("Time-out", DisconnectType.error); 1932 } 1933 } 1934 1935 // *************************************************************************** 1936 1937 unittest 1938 { 1939 void testTimer() 1940 { 1941 bool fired; 1942 setTimeout({fired = true;}, 10.msecs); 1943 socketManager.loop(); 1944 assert(fired); 1945 } 1946 1947 testTimer(); 1948 }