1 /** 2 * Asynchronous socket abstraction. 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Stéphan Kochen <stephan@kochen.nl> 12 * Vladimir Panteleev <vladimir@thecybershadow.net> 13 * Vincent Povirk <madewokherd@gmail.com> 14 * Simon Arlott 15 */ 16 17 module ae.net.asockets; 18 19 import ae.sys.timing; 20 import ae.utils.math; 21 public import ae.sys.data; 22 23 import core.stdc.stdint : int32_t; 24 25 import std.exception; 26 import std.socket; 27 import std.string : format; 28 public import std.socket : Address, AddressInfo, Socket; 29 30 debug(ASOCKETS) import std.stdio : stderr; 31 debug(PRINTDATA) static import std.stdio; 32 debug(PRINTDATA) import ae.utils.text : hexDump; 33 private import std.conv : to; 34 35 36 // http://d.puremagic.com/issues/show_bug.cgi?id=7016 37 static import ae.utils.array; 38 39 version(LIBEV) 40 { 41 import deimos.ev; 42 pragma(lib, "ev"); 43 } 44 45 version(Windows) 46 { 47 import core.sys.windows.windows : Sleep; 48 enum USE_SLEEP = true; // avoid convoluted mix of static and runtime conditions 49 } 50 else 51 enum USE_SLEEP = false; 52 53 /// Flags that determine socket wake-up events. 54 int eventCounter; 55 56 version(LIBEV) 57 { 58 // Watchers are a GenericSocket field (as declared in SocketMixin). 59 // Use one watcher per read and write event. 60 // Start/stop those as higher-level code declares interest in those events. 61 // Use the "data" ev_io field to store the parent GenericSocket address. 62 // Also use the "data" field as a flag to indicate whether the watcher is active 63 // (data is null when the watcher is stopped). 64 65 struct SocketManager 66 { 67 private: 68 size_t count; 69 70 extern(C) 71 static void ioCallback(ev_loop_t* l, ev_io* w, int revents) 72 { 73 eventCounter++; 74 auto socket = cast(GenericSocket)w.data; 75 assert(socket, "libev event fired on stopped watcher"); 76 debug (ASOCKETS) writefln("ioCallback(%s, 0x%X)", socket, revents); 77 78 if (revents & EV_READ) 79 socket.onReadable(); 80 else 81 if (revents & EV_WRITE) 82 socket.onWritable(); 83 else 84 assert(false, "Unknown event fired from libev"); 85 86 // TODO? Need to get proper SocketManager instance to call updateTimer on 87 socketManager.updateTimer(false); 88 } 89 90 ev_timer evTimer; 91 MonoTime lastNextEvent = MonoTime.max; 92 93 extern(C) 94 static void timerCallback(ev_loop_t* l, ev_timer* w, int revents) 95 { 96 eventCounter++; 97 debug (ASOCKETS) writefln("Timer callback called."); 98 mainTimer.prod(); 99 100 socketManager.updateTimer(true); 101 debug (ASOCKETS) writefln("Timer callback exiting."); 102 } 103 104 void updateTimer(bool force) 105 { 106 auto nextEvent = mainTimer.getNextEvent(); 107 if (force || lastNextEvent != nextEvent) 108 { 109 debug (ASOCKETS) writefln("Rescheduling timer. Was at %s, now at %s", lastNextEvent, nextEvent); 110 if (nextEvent == MonoTime.max) // Stopping 111 { 112 if (lastNextEvent != MonoTime.max) 113 ev_timer_stop(ev_default_loop(0), &evTimer); 114 } 115 else 116 { 117 auto remaining = mainTimer.getRemainingTime(); 118 while (remaining <= Duration.zero) 119 { 120 debug (ASOCKETS) writefln("remaining=%s, prodding timer.", remaining); 121 mainTimer.prod(); 122 remaining = mainTimer.getRemainingTime(); 123 } 124 ev_tstamp tstamp = remaining.total!"hnsecs" * 1.0 / convert!("seconds", "hnsecs")(1); 125 debug (ASOCKETS) writefln("remaining=%s, ev_tstamp=%s", remaining, tstamp); 126 if (lastNextEvent == MonoTime.max) // Starting 127 { 128 ev_timer_init(&evTimer, &timerCallback, 0., tstamp); 129 ev_timer_start(ev_default_loop(0), &evTimer); 130 } 131 else // Adjusting 132 { 133 evTimer.repeat = tstamp; 134 ev_timer_again(ev_default_loop(0), &evTimer); 135 } 136 } 137 lastNextEvent = nextEvent; 138 } 139 } 140 141 public: 142 /// Register a socket with the manager. 143 void register(GenericSocket socket) 144 { 145 debug (ASOCKETS) writefln("Registering %s", socket); 146 debug assert(socket.evRead.data is null && socket.evWrite.data is null, "Re-registering a started socket"); 147 auto fd = socket.conn.handle; 148 assert(fd, "Must have fd before socket registration"); 149 ev_io_init(&socket.evRead , &ioCallback, fd, EV_READ ); 150 ev_io_init(&socket.evWrite, &ioCallback, fd, EV_WRITE); 151 count++; 152 } 153 154 /// Unregister a socket with the manager. 155 void unregister(GenericSocket socket) 156 { 157 debug (ASOCKETS) writefln("Unregistering %s", socket); 158 socket.notifyRead = false; 159 socket.notifyWrite = false; 160 count--; 161 } 162 163 size_t size() 164 { 165 return count; 166 } 167 168 /// Loop continuously until no sockets are left. 169 void loop() 170 { 171 auto evLoop = ev_default_loop(0); 172 enforce(evLoop, "libev initialization failure"); 173 174 updateTimer(true); 175 debug (ASOCKETS) writeln("ev_run"); 176 ev_run(ev_default_loop(0), 0); 177 } 178 } 179 180 private mixin template SocketMixin() 181 { 182 private ev_io evRead, evWrite; 183 184 private void setWatcherState(ref ev_io ev, bool newValue, int event) 185 { 186 if (!conn) 187 { 188 // Can happen when setting delegates before connecting. 189 return; 190 } 191 192 if (newValue && !ev.data) 193 { 194 // Start 195 ev.data = cast(void*)this; 196 ev_io_start(ev_default_loop(0), &ev); 197 } 198 else 199 if (!newValue && ev.data) 200 { 201 // Stop 202 assert(ev.data is cast(void*)this); 203 ev.data = null; 204 ev_io_stop(ev_default_loop(0), &ev); 205 } 206 } 207 208 /// Interested in read notifications (onReadable)? 209 @property final void notifyRead (bool value) { setWatcherState(evRead , value, EV_READ ); } 210 /// Interested in write notifications (onWritable)? 211 @property final void notifyWrite(bool value) { setWatcherState(evWrite, value, EV_WRITE); } 212 213 debug ~this() 214 { 215 // The LIBEV SocketManager holds no references to registered sockets. 216 // TODO: Add a doubly-linked list? 217 assert(evRead.data is null && evWrite.data is null, "Destroying a registered socket"); 218 } 219 } 220 } 221 else // Use select 222 { 223 struct SocketManager 224 { 225 private: 226 enum FD_SETSIZE = 1024; 227 228 /// List of all sockets to poll. 229 GenericSocket[] sockets; 230 231 /// Debug AA to check for dangling socket references. 232 debug GenericSocket[socket_t] socketHandles; 233 234 void delegate()[] idleHandlers; 235 236 public: 237 /// Register a socket with the manager. 238 void register(GenericSocket conn) 239 { 240 debug (ASOCKETS) stderr.writefln("Registering %s (%d total)", conn, sockets.length + 1); 241 assert(!conn.socket.blocking, "Trying to register a blocking socket"); 242 sockets ~= conn; 243 244 debug 245 { 246 auto handle = conn.socket.handle; 247 assert(handle != socket_t.init, "Can't register a closed socket"); 248 assert(handle !in socketHandles, "This socket handle is already registered"); 249 socketHandles[handle] = conn; 250 } 251 } 252 253 /// Unregister a socket with the manager. 254 void unregister(GenericSocket conn) 255 { 256 debug (ASOCKETS) stderr.writefln("Unregistering %s (%d total)", conn, sockets.length - 1); 257 258 debug 259 { 260 auto handle = conn.socket.handle; 261 assert(handle != socket_t.init, "Can't unregister a closed socket"); 262 auto pconn = handle in socketHandles; 263 assert(pconn, "This socket handle is not registered"); 264 assert(*pconn is conn, "This socket handle is registered but belongs to another GenericSocket"); 265 socketHandles.remove(handle); 266 } 267 268 foreach (size_t i, GenericSocket j; sockets) 269 if (j is conn) 270 { 271 sockets = sockets[0 .. i] ~ sockets[i + 1 .. sockets.length]; 272 return; 273 } 274 assert(false, "Socket not registered"); 275 } 276 277 size_t size() 278 { 279 return sockets.length; 280 } 281 282 /// Loop continuously until no sockets are left. 283 void loop() 284 { 285 debug (ASOCKETS) stderr.writeln("Starting event loop."); 286 287 SocketSet readset, writeset, errorset; 288 size_t sockcount; 289 uint setSize = FD_SETSIZE; // Can't trust SocketSet.max due to Issue 14012 290 readset = new SocketSet(setSize); 291 writeset = new SocketSet(setSize); 292 errorset = new SocketSet(setSize); 293 while (true) 294 { 295 uint minSize = 0; 296 version(Windows) 297 minSize = cast(uint)sockets.length; 298 else 299 { 300 foreach (s; sockets) 301 if (s.socket && s.socket.handle != socket_t.init && s.socket.handle > minSize) 302 minSize = s.socket.handle; 303 } 304 minSize++; 305 306 if (setSize < minSize) 307 { 308 debug (ASOCKETS) stderr.writefln("Resizing SocketSets: %d => %d", setSize, minSize*2); 309 setSize = minSize * 2; 310 readset = new SocketSet(setSize); 311 writeset = new SocketSet(setSize); 312 errorset = new SocketSet(setSize); 313 } 314 else 315 { 316 readset.reset(); 317 writeset.reset(); 318 errorset.reset(); 319 } 320 321 sockcount = 0; 322 bool haveActive; 323 debug (ASOCKETS) stderr.writeln("Populating sets"); 324 foreach (GenericSocket conn; sockets) 325 { 326 if (!conn.socket) 327 continue; 328 sockcount++; 329 if (!conn.daemon) 330 haveActive = true; 331 332 debug (ASOCKETS) stderr.writef("\t%s:", conn); 333 if (conn.notifyRead) 334 { 335 readset.add(conn.socket); 336 debug (ASOCKETS) stderr.write(" READ"); 337 } 338 if (conn.notifyWrite) 339 { 340 writeset.add(conn.socket); 341 debug (ASOCKETS) stderr.write(" WRITE"); 342 } 343 errorset.add(conn.socket); 344 debug (ASOCKETS) stderr.writeln(); 345 } 346 debug (ASOCKETS) 347 { 348 stderr.writefln("Sets populated as follows:"); 349 printSets(readset, writeset, errorset); 350 } 351 352 debug (ASOCKETS) stderr.writefln("Waiting (%d sockets, %s timer events, %d idle handlers)...", 353 sockcount, 354 mainTimer.isWaiting() ? "with" : "no", 355 idleHandlers.length, 356 ); 357 if (!haveActive && !mainTimer.isWaiting()) 358 { 359 debug (ASOCKETS) stderr.writeln("No more sockets or timer events, exiting loop."); 360 break; 361 } 362 363 debug (ASOCKETS) stderr.flush(); 364 365 int events; 366 if (idleHandlers.length) 367 { 368 if (sockcount==0) 369 events = 0; 370 else 371 events = Socket.select(readset, writeset, errorset, 0.seconds); 372 } 373 else 374 if (USE_SLEEP && sockcount==0) 375 { 376 version(Windows) 377 { 378 auto duration = mainTimer.getRemainingTime().total!"msecs"(); 379 debug (ASOCKETS) writeln("Wait duration: ", duration, " msecs"); 380 if (duration <= 0) 381 duration = 1; // Avoid busywait 382 else 383 if (duration > int.max) 384 duration = int.max; 385 Sleep(cast(int)duration); 386 events = 0; 387 } 388 else 389 assert(0); 390 } 391 else 392 if (mainTimer.isWaiting()) 393 events = Socket.select(readset, writeset, errorset, mainTimer.getRemainingTime()); 394 else 395 events = Socket.select(readset, writeset, errorset); 396 397 debug (ASOCKETS) stderr.writefln("%d events fired.", events); 398 399 if (events > 0) 400 { 401 // Handle just one event at a time, as the first 402 // handler might invalidate select()'s results. 403 handleEvent(readset, writeset, errorset); 404 } 405 else 406 if (idleHandlers.length) 407 { 408 import ae.utils.array; 409 auto handler = idleHandlers.shift(); 410 411 // Rotate the idle handler queue before running it, 412 // in case the handler unregisters itself. 413 idleHandlers ~= handler; 414 415 handler(); 416 } 417 418 // Timers may invalidate our select results, so fire them after processing the latter 419 mainTimer.prod(); 420 421 eventCounter++; 422 } 423 } 424 425 debug (ASOCKETS) 426 void printSets(SocketSet readset, SocketSet writeset, SocketSet errorset) 427 { 428 foreach (GenericSocket conn; sockets) 429 { 430 if (!conn.socket) 431 stderr.writefln("\t\t%s is unset", conn); 432 else 433 { 434 if (readset.isSet(conn.socket)) 435 stderr.writefln("\t\t%s is readable", conn); 436 if (writeset.isSet(conn.socket)) 437 stderr.writefln("\t\t%s is writable", conn); 438 if (errorset.isSet(conn.socket)) 439 stderr.writefln("\t\t%s is errored", conn); 440 } 441 } 442 } 443 444 void handleEvent(SocketSet readset, SocketSet writeset, SocketSet errorset) 445 { 446 debug (ASOCKETS) 447 { 448 stderr.writefln("\tSelect results:"); 449 printSets(readset, writeset, errorset); 450 } 451 452 foreach (GenericSocket conn; sockets) 453 { 454 if (!conn.socket) 455 continue; 456 457 if (readset.isSet(conn.socket)) 458 { 459 debug (ASOCKETS) stderr.writefln("\t%s - calling onReadable", conn); 460 return conn.onReadable(); 461 } 462 else 463 if (writeset.isSet(conn.socket)) 464 { 465 debug (ASOCKETS) stderr.writefln("\t%s - calling onWritable", conn); 466 return conn.onWritable(); 467 } 468 else 469 if (errorset.isSet(conn.socket)) 470 { 471 debug (ASOCKETS) stderr.writefln("\t%s - calling onError", conn); 472 return conn.onError("select() error: " ~ conn.socket.getErrorText()); 473 } 474 } 475 476 assert(false, "select() reported events available, but no registered sockets are set"); 477 } 478 } 479 480 // Use UFCS to allow removeIdleHandler to have a predicate with context 481 void addIdleHandler(ref SocketManager socketManager, void delegate() handler) 482 { 483 foreach (i, idleHandler; socketManager.idleHandlers) 484 assert(handler !is idleHandler); 485 486 socketManager.idleHandlers ~= handler; 487 } 488 489 static bool isFun(T)(T a, T b) { return a is b; } 490 void removeIdleHandler(alias pred=isFun, Args...)(ref SocketManager socketManager, Args args) 491 { 492 foreach (i, idleHandler; socketManager.idleHandlers) 493 if (pred(idleHandler, args)) 494 { 495 import std.algorithm; 496 socketManager.idleHandlers = socketManager.idleHandlers.remove(i); 497 return; 498 } 499 assert(false, "No such idle handler"); 500 } 501 502 private mixin template SocketMixin() 503 { 504 /// Interested in read notifications (onReadable)? 505 bool notifyRead; 506 /// Interested in write notifications (onWritable)? 507 bool notifyWrite; 508 } 509 } 510 511 /// The default socket manager. 512 SocketManager socketManager; 513 514 // *************************************************************************** 515 516 /// General methods for an asynchronous socket. 517 abstract class GenericSocket 518 { 519 /// Declares notifyRead and notifyWrite. 520 mixin SocketMixin; 521 522 protected: 523 /// The socket this class wraps. 524 Socket conn; 525 526 protected: 527 /// Retrieve the socket class this class wraps. 528 @property final Socket socket() 529 { 530 return conn; 531 } 532 533 void onReadable() 534 { 535 } 536 537 void onWritable() 538 { 539 } 540 541 void onError(string reason) 542 { 543 } 544 545 public: 546 /// allow getting the address of connections that are already disconnected 547 private Address cachedLocalAddress, cachedRemoteAddress; 548 549 /// Don't block the process from exiting. 550 /// TODO: Not implemented with libev 551 bool daemon; 552 553 final @property Address localAddress() 554 { 555 if (cachedLocalAddress !is null) 556 return cachedLocalAddress; 557 else 558 if (conn is null) 559 return null; 560 else 561 return cachedLocalAddress = conn.localAddress(); 562 } 563 564 final @property string localAddressStr() nothrow 565 { 566 try 567 { 568 auto a = localAddress; 569 return a is null ? "[null address]" : a.toString(); 570 } 571 catch (Exception e) 572 return "[error: " ~ e.msg ~ "]"; 573 } 574 575 final @property Address remoteAddress() 576 { 577 if (cachedRemoteAddress !is null) 578 return cachedRemoteAddress; 579 else 580 if (conn is null) 581 return null; 582 else 583 return cachedRemoteAddress = conn.remoteAddress(); 584 } 585 586 final @property string remoteAddressStr() nothrow 587 { 588 try 589 { 590 auto a = remoteAddress; 591 return a is null ? "[null address]" : a.toString(); 592 } 593 catch (Exception e) 594 return "[error: " ~ e.msg ~ "]"; 595 } 596 597 final void setKeepAlive(bool enabled=true, int time=10, int interval=5) 598 { 599 assert(conn, "Attempting to set keep-alive on an uninitialized socket"); 600 if (enabled) 601 { 602 try 603 conn.setKeepAlive(time, interval); 604 catch (SocketException) 605 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, true); 606 } 607 else 608 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, false); 609 } 610 611 override string toString() const 612 { 613 import std.string; 614 return "%s {this=%s, fd=%s}".format(this.classinfo.name.split(".")[$-1], cast(void*)this, conn ? conn.handle : -1); 615 } 616 } 617 618 // *************************************************************************** 619 620 enum DisconnectType 621 { 622 requested, // initiated by the application 623 graceful, // peer gracefully closed the connection 624 error // abnormal network condition 625 } 626 627 enum ConnectionState 628 { 629 /// The initial state, or the state after a disconnect was fully processed. 630 disconnected, 631 632 /// Name resolution. Currently done synchronously. 633 resolving, 634 635 /// A connection attempt is in progress. 636 connecting, 637 638 /// A connection is established. 639 connected, 640 641 /// Disconnecting in progress. No data can be sent or received at this point. 642 /// We are waiting for queued data to be actually sent before disconnecting. 643 disconnecting, 644 } 645 646 /// Returns true if this is a connection state for which disconnecting is valid. 647 /// Generally, applications should be aware of the life cycle of their sockets, 648 /// so checking the state of a connection is unnecessary (and a code smell). 649 /// However, unconditionally disconnecting some connected sockets can be useful 650 /// when it needs to occur "out-of-bound" (not tied to the application normal life cycle), 651 /// such as in response to a signal. 652 bool disconnectable(ConnectionState state) { return state >= ConnectionState.resolving && state <= ConnectionState.connected; } 653 654 /// Common interface for connections and adapters. 655 interface IConnection 656 { 657 enum MAX_PRIORITY = 4; 658 enum DEFAULT_PRIORITY = 2; 659 660 static const defaultDisconnectReason = "Software closed the connection"; 661 662 /// Get connection state. 663 @property ConnectionState state(); 664 665 /// Has a connection been established? 666 deprecated final @property bool connected() { return state == ConnectionState.connected; } 667 668 /// Are we in the process of disconnecting? (Waiting for data to be flushed) 669 deprecated final @property bool disconnecting() { return state == ConnectionState.disconnecting; } 670 671 /// Queue Data for sending. 672 void send(Data[] data, int priority = DEFAULT_PRIORITY); 673 674 /// ditto 675 final void send(Data datum, int priority = DEFAULT_PRIORITY) 676 { 677 Data[1] data; 678 data[0] = datum; 679 this.send(data, priority); 680 data[] = Data.init; 681 } 682 683 /// Terminate the connection. 684 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested); 685 686 /// Callback setter for when a connection has been established 687 /// (if applicable). 688 alias void delegate() ConnectHandler; 689 @property void handleConnect(ConnectHandler value); /// ditto 690 691 /// Callback setter for when new data is read. 692 alias void delegate(Data data) ReadDataHandler; 693 @property void handleReadData(ReadDataHandler value); /// ditto 694 695 /// Callback setter for when a connection was closed. 696 alias void delegate(string reason, DisconnectType type) DisconnectHandler; 697 @property void handleDisconnect(DisconnectHandler value); /// ditto 698 699 /// Callback setter for when all queued data has been sent. 700 alias void delegate() BufferFlushedHandler; 701 @property void handleBufferFlushed(BufferFlushedHandler value); /// ditto 702 } 703 704 // *************************************************************************** 705 706 class Connection : GenericSocket, IConnection 707 { 708 private: 709 /// Blocks of data larger than this value are passed as unmanaged memory 710 /// (in Data objects). Blocks smaller than this value will be reallocated 711 /// on the managed heap. The disadvantage of placing large objects on the 712 /// managed heap is false pointers; the disadvantage of using Data for 713 /// small objects is wasted slack space due to the page size alignment 714 /// requirement. 715 enum UNMANAGED_THRESHOLD = 256; 716 717 ConnectionState _state; 718 final @property ConnectionState state(ConnectionState value) { return _state = value; } 719 720 public: 721 /// Get connection state. 722 override @property ConnectionState state() { return _state; } 723 724 protected: 725 abstract sizediff_t doSend(in void[] buffer); 726 abstract sizediff_t doReceive(void[] buffer); 727 728 /// The send buffers. 729 Data[][MAX_PRIORITY+1] outQueue; 730 /// Whether the first item from this queue (if any) has been partially sent (and thus can't be canceled). 731 int partiallySent = -1; 732 733 /// Constructor used by a ServerSocket for new connections 734 this(Socket conn) 735 { 736 this(); 737 this.conn = conn; 738 state = conn is null ? ConnectionState.disconnected : ConnectionState.connected; 739 if (conn) 740 socketManager.register(this); 741 updateFlags(); 742 } 743 744 final void updateFlags() 745 { 746 if (state == ConnectionState.connecting) 747 notifyWrite = true; 748 else 749 notifyWrite = writePending; 750 751 notifyRead = state == ConnectionState.connected && readDataHandler; 752 debug(ASOCKETS) stderr.writefln("[%s] updateFlags: %s %s", conn ? conn.handle : -1, notifyRead, notifyWrite); 753 } 754 755 /// Called when a socket is readable. 756 override void onReadable() 757 { 758 // TODO: use FIONREAD when Phobos gets ioctl support (issue 6649) 759 static ubyte[0x10000] inBuffer = void; 760 auto received = doReceive(inBuffer); 761 762 if (received == 0) 763 return disconnect("Connection closed", DisconnectType.graceful); 764 765 if (received == Socket.ERROR) 766 { 767 // if (wouldHaveBlocked) 768 // { 769 // debug (ASOCKETS) writefln("\t\t%s: wouldHaveBlocked or recv()", this); 770 // return; 771 // } 772 // else 773 onError("recv() error: " ~ lastSocketError); 774 } 775 else 776 { 777 debug (PRINTDATA) 778 { 779 std.stdio.writefln("== %s <- %s ==", localAddress, remoteAddress); 780 std.stdio.write(hexDump(inBuffer[0 .. received])); 781 std.stdio.stdout.flush(); 782 } 783 784 if (state == ConnectionState.disconnecting) 785 { 786 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because we are disconnecting", this); 787 } 788 else 789 if (!readDataHandler) 790 { 791 debug (ASOCKETS) stderr.writefln("\t\t%s: Discarding received data because there is no data handler", this); 792 } 793 else 794 { 795 // Currently, unlike the D1 version of this module, 796 // we will always reallocate read network data. 797 // This disfavours code which doesn't need to store 798 // read data after processing it, but otherwise 799 // makes things simpler and safer all around. 800 801 if (received < UNMANAGED_THRESHOLD) 802 { 803 // Copy to the managed heap 804 readDataHandler(Data(inBuffer[0 .. received].dup)); 805 } 806 else 807 { 808 // Copy to unmanaged memory 809 readDataHandler(Data(inBuffer[0 .. received], true)); 810 } 811 } 812 } 813 } 814 815 /// Called when an error occurs on the socket. 816 override void onError(string reason) 817 { 818 if (state == ConnectionState.disconnecting) 819 { 820 debug (ASOCKETS) stderr.writefln("Socket error while disconnecting @ %s: %s".format(cast(void*)this, reason)); 821 return close(); 822 } 823 824 assert(state == ConnectionState.resolving || state == ConnectionState.connecting || state == ConnectionState.connected); 825 disconnect("Socket error: " ~ reason, DisconnectType.error); 826 } 827 828 this() 829 { 830 } 831 832 public: 833 /// Close a connection. If there is queued data waiting to be sent, wait until it is sent before disconnecting. 834 /// The disconnect handler will be called immediately, even when not all data has been flushed yet. 835 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 836 { 837 //scope(success) updateFlags(); // Work around scope(success) breaking debugger stack traces 838 assert(state.disconnectable, "Attempting to disconnect on a %s socket".format(state)); 839 840 if (writePending) 841 { 842 if (type==DisconnectType.requested) 843 { 844 assert(conn, "Attempting to disconnect on an uninitialized socket"); 845 // queue disconnect after all data is sent 846 debug (ASOCKETS) stderr.writefln("[%s] Queueing disconnect: %s", remoteAddressStr, reason); 847 state = ConnectionState.disconnecting; 848 //setIdleTimeout(30.seconds); 849 if (disconnectHandler) 850 disconnectHandler(reason, type); 851 updateFlags(); 852 return; 853 } 854 else 855 discardQueues(); 856 } 857 858 debug (ASOCKETS) stderr.writefln("Disconnecting @ %s: %s", cast(void*)this, reason); 859 860 if ((state == ConnectionState.connecting && conn) || state == ConnectionState.connected) 861 close(); 862 else 863 { 864 assert(conn is null, "Registered but %s socket".format(state)); 865 if (state == ConnectionState.resolving) 866 state = ConnectionState.disconnected; 867 } 868 869 if (disconnectHandler) 870 disconnectHandler(reason, type); 871 updateFlags(); 872 } 873 874 private final void close() 875 { 876 assert(conn, "Attempting to close an unregistered socket"); 877 socketManager.unregister(this); 878 conn.close(); 879 conn = null; 880 outQueue[] = null; 881 state = ConnectionState.disconnected; 882 } 883 884 /// Append data to the send buffer. 885 void send(Data[] data, int priority = DEFAULT_PRIORITY) 886 { 887 assert(state == ConnectionState.connected, "Attempting to send on a %s socket".format(state)); 888 outQueue[priority] ~= data; 889 notifyWrite = true; // Fast updateFlags() 890 891 debug (PRINTDATA) 892 { 893 std.stdio.writefln("== %s -> %s ==", localAddress, remoteAddress); 894 foreach (datum; data) 895 if (datum.length) 896 std.stdio.write(hexDump(datum.contents)); 897 else 898 std.stdio.writeln("(empty Data)"); 899 std.stdio.stdout.flush(); 900 } 901 } 902 903 /// ditto 904 alias send = IConnection.send; 905 906 final void clearQueue(int priority) 907 { 908 if (priority == partiallySent) 909 { 910 assert(outQueue[priority].length > 0); 911 outQueue[priority] = outQueue[priority][0..1]; 912 } 913 else 914 outQueue[priority] = null; 915 updateFlags(); 916 } 917 918 /// Clears all queues, even partially sent content. 919 private final void discardQueues() 920 { 921 foreach (priority; 0..MAX_PRIORITY+1) 922 outQueue[priority] = null; 923 partiallySent = -1; 924 updateFlags(); 925 } 926 927 @property 928 final bool writePending() 929 { 930 foreach (queue; outQueue) 931 if (queue.length) 932 return true; 933 return false; 934 } 935 936 final bool queuePresent(int priority = DEFAULT_PRIORITY) 937 { 938 if (priority == partiallySent) 939 { 940 assert(outQueue[priority].length > 0); 941 return outQueue[priority].length > 1; 942 } 943 else 944 return outQueue[priority].length > 0; 945 } 946 947 final size_t packetsQueued(int priority = DEFAULT_PRIORITY) 948 { 949 return outQueue[priority].length; 950 } 951 952 final size_t bytesQueued(int priority = DEFAULT_PRIORITY) 953 { 954 size_t bytes; 955 foreach (datum; outQueue[priority]) 956 bytes += datum.length; 957 return bytes; 958 } 959 960 public: 961 private ConnectHandler connectHandler; 962 /// Callback for when a connection has been established. 963 @property final void handleConnect(ConnectHandler value) { connectHandler = value; updateFlags(); } 964 965 private ReadDataHandler readDataHandler; 966 /// Callback for incoming data. 967 /// Data will not be received unless this handler is set. 968 @property final void handleReadData(ReadDataHandler value) { readDataHandler = value; updateFlags(); } 969 970 private DisconnectHandler disconnectHandler; 971 /// Callback for when a connection was closed. 972 @property final void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; updateFlags(); } 973 974 /// Callback setter for when all queued data has been sent. 975 private BufferFlushedHandler bufferFlushedHandler; 976 @property final void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; updateFlags(); } 977 } 978 979 class StreamConnection : Connection 980 { 981 protected: 982 this() 983 { 984 super(); 985 } 986 987 /// Called when a socket is writable. 988 override void onWritable() 989 { 990 //scope(success) updateFlags(); 991 onWritableImpl(); 992 updateFlags(); 993 } 994 995 // Work around scope(success) breaking debugger stack traces 996 final private void onWritableImpl() 997 { 998 debug(ASOCKETS) stderr.writefln("[%s] onWritableImpl (we are %s)", conn ? conn.handle : -1, state); 999 if (state == ConnectionState.connecting) 1000 { 1001 int32_t error; 1002 conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error); 1003 if (error) 1004 return disconnect(formatSocketError(error), DisconnectType.error); 1005 1006 state = ConnectionState.connected; 1007 1008 //debug writefln("[%s] Connected", remoteAddress); 1009 try 1010 setKeepAlive(); 1011 catch (Exception e) 1012 return disconnect(e.msg, DisconnectType.error); 1013 if (connectHandler) 1014 connectHandler(); 1015 return; 1016 } 1017 //debug writefln(remoteAddress(), ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length); 1018 1019 foreach (sendPartial; [true, false]) 1020 foreach (int priority, ref queue; outQueue) 1021 while (queue.length && (!sendPartial || priority == partiallySent)) 1022 { 1023 assert(partiallySent == -1 || partiallySent == priority); 1024 1025 auto pdata = queue.ptr; // pointer to first data 1026 1027 ptrdiff_t sent = 0; 1028 if (pdata.length) 1029 { 1030 sent = doSend(pdata.contents); 1031 debug (ASOCKETS) stderr.writefln("\t\t%s: sent %d/%d bytes", this, sent, pdata.length); 1032 } 1033 else 1034 { 1035 debug (ASOCKETS) stderr.writefln("\t\t%s: empty Data object", this); 1036 } 1037 1038 if (sent == Socket.ERROR) 1039 { 1040 if (wouldHaveBlocked()) 1041 return; 1042 else 1043 return onError("send() error: " ~ lastSocketError); 1044 } 1045 else 1046 if (sent < pdata.length) 1047 { 1048 if (sent > 0) 1049 { 1050 *pdata = (*pdata)[sent..pdata.length]; 1051 partiallySent = priority; 1052 } 1053 return; 1054 } 1055 else 1056 { 1057 assert(sent == pdata.length); 1058 //debug writefln("[%s] Sent data:", remoteAddress); 1059 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1060 pdata.clear(); 1061 queue = queue[1..$]; 1062 partiallySent = -1; 1063 if (queue.length == 0) 1064 queue = null; 1065 } 1066 } 1067 1068 // outQueue is now empty 1069 if (bufferFlushedHandler) 1070 bufferFlushedHandler(); 1071 if (state == ConnectionState.disconnecting) 1072 { 1073 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1074 close(); 1075 } 1076 } 1077 1078 public: 1079 this(Socket conn) 1080 { 1081 super(conn); 1082 } 1083 } 1084 1085 // *************************************************************************** 1086 1087 /// A POSIX file stream. 1088 /// Allows adding a file (e.g. stdin/stdout) to the socket manager. 1089 /// Does not dup the given file descriptor, so "disconnecting" this connection 1090 /// will close it. 1091 version (Posix) 1092 class FileConnection : StreamConnection 1093 { 1094 this(int fileno) 1095 { 1096 auto conn = new Socket(cast(socket_t)fileno, AddressFamily.UNSPEC); 1097 conn.blocking = false; 1098 super(conn); 1099 } 1100 1101 protected: 1102 import core.sys.posix.unistd : read, write; 1103 1104 override sizediff_t doSend(in void[] buffer) 1105 { 1106 return write(socket.handle, buffer.ptr, buffer.length); 1107 } 1108 1109 override sizediff_t doReceive(void[] buffer) 1110 { 1111 return read(socket.handle, buffer.ptr, buffer.length); 1112 } 1113 } 1114 1115 /// Separates reading and writing, e.g. for stdin/stdout. 1116 class Duplex : IConnection 1117 { 1118 IConnection reader, writer; 1119 1120 this(IConnection reader, IConnection writer) 1121 { 1122 this.reader = reader; 1123 this.writer = writer; 1124 reader.handleConnect = &onConnect; 1125 writer.handleConnect = &onConnect; 1126 reader.handleDisconnect = &onDisconnect; 1127 writer.handleDisconnect = &onDisconnect; 1128 } 1129 1130 @property ConnectionState state() 1131 { 1132 if (reader.state == ConnectionState.disconnecting || writer.state == ConnectionState.disconnecting) 1133 return ConnectionState.disconnecting; 1134 else 1135 return reader.state < writer.state ? reader.state : writer.state; 1136 } 1137 1138 /// Queue Data for sending. 1139 void send(Data[] data, int priority) 1140 { 1141 writer.send(data, priority); 1142 } 1143 1144 alias send = IConnection.send; /// ditto 1145 1146 /// Terminate the connection. 1147 /// Note: this isn't quite fleshed out - applications may want to 1148 /// wait and send some more data even after stdin is closed, but 1149 /// such an interface can't be fitted into an IConnection 1150 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1151 { 1152 if (reader.state > ConnectionState.disconnected && reader.state < ConnectionState.disconnecting) 1153 reader.disconnect(reason, type); 1154 if (writer.state > ConnectionState.disconnected && writer.state < ConnectionState.disconnecting) 1155 writer.disconnect(reason, type); 1156 debug(ASOCKETS) stderr.writefln("Duplex.disconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1157 } 1158 1159 protected void onConnect() 1160 { 1161 if (connectHandler && reader.state == ConnectionState.connected && writer.state == ConnectionState.connected) 1162 connectHandler(); 1163 } 1164 1165 protected void onDisconnect(string reason, DisconnectType type) 1166 { 1167 debug(ASOCKETS) stderr.writefln("Duplex.onDisconnect(%(%s%), %s), states are %s / %s", [reason], type, reader.state, writer.state); 1168 if (disconnectHandler) 1169 { 1170 disconnectHandler(reason, type); 1171 disconnectHandler = null; // don't call it twice for the other connection 1172 } 1173 // It is our responsibility to disconnect the other connection 1174 // Use DisconnectType.requested to ensure that any written data is flushed 1175 disconnect("Other side of Duplex connection closed (" ~ reason ~ ")", DisconnectType.requested); 1176 } 1177 1178 /// Callback for when a connection has been established. 1179 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1180 private ConnectHandler connectHandler; 1181 1182 /// Callback setter for when new data is read. 1183 @property void handleReadData(ReadDataHandler value) { reader.handleReadData = value; } 1184 1185 /// Callback setter for when a connection was closed. 1186 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1187 private DisconnectHandler disconnectHandler; 1188 1189 /// Callback setter for when all queued data has been written. 1190 @property void handleBufferFlushed(BufferFlushedHandler value) { writer.handleBufferFlushed = value; } 1191 } 1192 1193 unittest { if (false) new Duplex(null, null); } 1194 1195 // *************************************************************************** 1196 1197 /// An asynchronous TCP connection. 1198 class TcpConnection : StreamConnection 1199 { 1200 protected: 1201 /// Queue of addresses to try connecting to. 1202 AddressInfo[] addressQueue; 1203 1204 this(Socket conn) 1205 { 1206 super(conn); 1207 } 1208 1209 override sizediff_t doSend(in void[] buffer) 1210 { 1211 return conn.send(buffer); 1212 } 1213 1214 override sizediff_t doReceive(void[] buffer) 1215 { 1216 return conn.receive(buffer); 1217 } 1218 1219 final void tryNextAddress() 1220 { 1221 assert(state == ConnectionState.connecting); 1222 auto addressInfo = addressQueue[0]; 1223 addressQueue = addressQueue[1..$]; 1224 1225 try 1226 { 1227 conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol); 1228 conn.blocking = false; 1229 1230 socketManager.register(this); 1231 updateFlags(); 1232 debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString()); 1233 conn.connect(addressInfo.address); 1234 } 1235 catch (SocketException e) 1236 return onError("Connect error: " ~ e.msg); 1237 } 1238 1239 /// Called when an error occurs on the socket. 1240 override void onError(string reason) 1241 { 1242 if (state == ConnectionState.connecting && addressQueue.length) 1243 { 1244 socketManager.unregister(this); 1245 conn.close(); 1246 conn = null; 1247 1248 return tryNextAddress(); 1249 } 1250 1251 super.onError(reason); 1252 } 1253 1254 public: 1255 /// Default constructor 1256 this() 1257 { 1258 debug (ASOCKETS) stderr.writefln("New TcpConnection @ %s", cast(void*)this); 1259 } 1260 1261 /// Start establishing a connection. 1262 final void connect(string host, ushort port) 1263 { 1264 assert(host.length, "Empty host"); 1265 assert(port, "No port specified"); 1266 1267 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1268 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1269 1270 state = ConnectionState.resolving; 1271 1272 AddressInfo[] addressInfos; 1273 try 1274 { 1275 auto addresses = getAddress(host, port); 1276 enforce(addresses.length, "No addresses found"); 1277 debug (ASOCKETS) 1278 { 1279 stderr.writefln("Resolved to %s addresses:", addresses.length); 1280 foreach (address; addresses) 1281 stderr.writefln("- %s", address.toString()); 1282 } 1283 1284 if (addresses.length > 1) 1285 { 1286 import std.random : randomShuffle; 1287 randomShuffle(addresses); 1288 } 1289 1290 foreach (address; addresses) 1291 addressInfos ~= AddressInfo(address.addressFamily, SocketType.STREAM, ProtocolType.TCP, address, host); 1292 } 1293 catch (SocketException e) 1294 return onError("Lookup error: " ~ e.msg); 1295 1296 state = ConnectionState.disconnected; 1297 connect(addressInfos); 1298 } 1299 1300 /// ditto 1301 final void connect(AddressInfo[] addresses) 1302 { 1303 assert(addresses.length, "No addresses specified"); 1304 1305 assert(state == ConnectionState.disconnected, "Attempting to connect on a %s socket".format(state)); 1306 assert(!conn); 1307 1308 addressQueue = addresses; 1309 state = ConnectionState.connecting; 1310 tryNextAddress(); 1311 } 1312 } 1313 1314 // *************************************************************************** 1315 1316 /// An asynchronous TCP connection server. 1317 class TcpServer 1318 { 1319 private: 1320 /// Class that actually performs listening on a certain address family 1321 final class Listener : GenericSocket 1322 { 1323 this(Socket conn) 1324 { 1325 debug (ASOCKETS) stderr.writefln("New Listener @ %s", cast(void*)this); 1326 this.conn = conn; 1327 socketManager.register(this); 1328 } 1329 1330 /// Called when a socket is readable. 1331 override void onReadable() 1332 { 1333 debug (ASOCKETS) stderr.writefln("Accepting connection from listener @ %s", cast(void*)this); 1334 Socket acceptSocket = conn.accept(); 1335 acceptSocket.blocking = false; 1336 if (handleAccept) 1337 { 1338 TcpConnection connection = new TcpConnection(acceptSocket); 1339 debug (ASOCKETS) stderr.writefln("\tAccepted connection %s from %s", connection, connection.remoteAddress); 1340 connection.setKeepAlive(); 1341 //assert(connection.connected); 1342 //connection.connected = true; 1343 acceptHandler(connection); 1344 } 1345 else 1346 acceptSocket.close(); 1347 } 1348 1349 /// Called when a socket is writable. 1350 override void onWritable() 1351 { 1352 } 1353 1354 /// Called when an error occurs on the socket. 1355 override void onError(string reason) 1356 { 1357 close(); // call parent 1358 } 1359 1360 void closeListener() 1361 { 1362 assert(conn); 1363 socketManager.unregister(this); 1364 conn.close(); 1365 conn = null; 1366 } 1367 } 1368 1369 /// Whether the socket is listening. 1370 bool listening; 1371 /// Listener instances 1372 Listener[] listeners; 1373 1374 final void updateFlags() 1375 { 1376 foreach (listener; listeners) 1377 listener.notifyRead = handleAccept !is null; 1378 } 1379 1380 public: 1381 /// Start listening on this socket. 1382 final ushort listen(ushort port, string addr = null) 1383 { 1384 debug(ASOCKETS) stderr.writefln("Attempting to listen on %s:%d", addr, port); 1385 //assert(!listening, "Attempting to listen on a listening socket"); 1386 1387 auto addressInfos = getAddressInfo(addr, to!string(port), AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); 1388 1389 debug (ASOCKETS) 1390 { 1391 stderr.writefln("Resolved to %s addresses:", addressInfos.length); 1392 foreach (ref addressInfo; addressInfos) 1393 stderr.writefln("- %s", addressInfo); 1394 } 1395 1396 // listen on random ports only on IPv4 for now 1397 if (port == 0) 1398 { 1399 foreach_reverse (i, ref addressInfo; addressInfos) 1400 if (addressInfo.family != AddressFamily.INET) 1401 addressInfos = addressInfos[0..i] ~ addressInfos[i+1..$]; 1402 } 1403 1404 listen(addressInfos); 1405 1406 foreach (listener; listeners) 1407 { 1408 auto address = listener.conn.localAddress(); 1409 if (address.addressFamily == AddressFamily.INET) 1410 port = to!ushort(address.toPortString()); 1411 } 1412 1413 return port; 1414 } 1415 1416 /// ditto 1417 final void listen(AddressInfo[] addressInfos) 1418 { 1419 foreach (ref addressInfo; addressInfos) 1420 { 1421 try 1422 { 1423 Socket conn = new Socket(addressInfo); 1424 conn.blocking = false; 1425 if (addressInfo.family == AddressFamily.INET6) 1426 conn.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); 1427 conn.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 1428 1429 conn.bind(addressInfo.address); 1430 conn.listen(8); 1431 1432 listeners ~= new Listener(conn); 1433 } 1434 catch (SocketException e) 1435 { 1436 debug(ASOCKETS) stderr.writefln("Unable to listen node \"%s\" service \"%s\"", addressInfo.address.toAddrString(), addressInfo.address.toPortString()); 1437 debug(ASOCKETS) stderr.writeln(e.msg); 1438 } 1439 } 1440 1441 if (listeners.length==0) 1442 throw new Exception("Unable to bind service"); 1443 1444 listening = true; 1445 1446 updateFlags(); 1447 } 1448 1449 final @property Address[] localAddresses() 1450 { 1451 Address[] result; 1452 foreach (listener; listeners) 1453 result ~= listener.localAddress; 1454 return result; 1455 } 1456 1457 final @property bool isListening() 1458 { 1459 return listening; 1460 } 1461 1462 /// Stop listening on this socket. 1463 final void close() 1464 { 1465 foreach (listener;listeners) 1466 listener.closeListener(); 1467 listeners = null; 1468 listening = false; 1469 if (handleClose) 1470 handleClose(); 1471 } 1472 1473 public: 1474 /// Callback for when the socket was closed. 1475 void delegate() handleClose; 1476 1477 private void delegate(TcpConnection incoming) acceptHandler; 1478 /// Callback for an incoming connection. 1479 /// Connections will not be accepted unless this handler is set. 1480 @property final void delegate(TcpConnection incoming) handleAccept() { return acceptHandler; } 1481 /// ditto 1482 @property final void handleAccept(void delegate(TcpConnection incoming) value) { acceptHandler = value; updateFlags(); } 1483 } 1484 1485 // *************************************************************************** 1486 1487 /// An asynchronous UDP stream. 1488 /// UDP does not have connections, so this class encapsulates a socket 1489 /// with a fixed destination (sendto) address, and optionally bound to 1490 /// a local address. 1491 /// Currently received packets' address is not exposed. 1492 class UdpConnection : Connection 1493 { 1494 protected: 1495 this(Socket conn) 1496 { 1497 super(conn); 1498 } 1499 1500 /// Called when a socket is writable. 1501 override void onWritable() 1502 { 1503 //scope(success) updateFlags(); 1504 onWritableImpl(); 1505 updateFlags(); 1506 } 1507 1508 // Work around scope(success) breaking debugger stack traces 1509 final private void onWritableImpl() 1510 { 1511 foreach (priority, ref queue; outQueue) 1512 while (queue.length) 1513 { 1514 auto pdata = queue.ptr; // pointer to first data 1515 1516 auto sent = conn.sendTo(pdata.contents, remoteAddress); 1517 1518 if (sent == Socket.ERROR) 1519 { 1520 if (wouldHaveBlocked()) 1521 return; 1522 else 1523 return onError("send() error: " ~ lastSocketError); 1524 } 1525 else 1526 if (sent < pdata.length) 1527 { 1528 return onError("Sent only %d/%d bytes of the datagram!".format(sent, pdata.length)); 1529 } 1530 else 1531 { 1532 assert(sent == pdata.length); 1533 //debug writefln("[%s] Sent data:", remoteAddress); 1534 //debug writefln("%s", hexDump(pdata.contents[0..sent])); 1535 pdata.clear(); 1536 queue = queue[1..$]; 1537 if (queue.length == 0) 1538 queue = null; 1539 } 1540 } 1541 1542 // outQueue is now empty 1543 if (bufferFlushedHandler) 1544 bufferFlushedHandler(); 1545 if (state == ConnectionState.disconnecting) 1546 { 1547 debug (ASOCKETS) stderr.writefln("Closing @ %s (Delayed disconnect - buffer flushed)", cast(void*)this); 1548 close(); 1549 } 1550 } 1551 1552 override sizediff_t doSend(in void[] buffer) 1553 { 1554 assert(false); // never called (called only from overridden methods) 1555 } 1556 1557 override sizediff_t doReceive(void[] buffer) 1558 { 1559 return conn.receive(buffer); 1560 } 1561 1562 public: 1563 /// Default constructor 1564 this() 1565 { 1566 debug (ASOCKETS) stderr.writefln("New UdpConnection @ %s", cast(void*)this); 1567 } 1568 1569 /// Initialize with the given AddressFamily, without binding to an address. 1570 final void initialize(AddressFamily family, SocketType type = SocketType.DGRAM, ProtocolType protocol = ProtocolType.UDP) 1571 { 1572 initializeImpl(family, type, protocol); 1573 if (connectHandler) 1574 connectHandler(); 1575 } 1576 1577 final void initializeImpl(AddressFamily family, SocketType type, ProtocolType protocol) 1578 { 1579 assert(state == ConnectionState.disconnected, "Attempting to initialize a %s socket".format(state)); 1580 assert(!conn); 1581 1582 conn = new Socket(family, type, protocol); 1583 conn.blocking = false; 1584 socketManager.register(this); 1585 state = ConnectionState.connected; 1586 updateFlags(); 1587 } 1588 1589 /// Bind to a local address in order to receive packets sent there. 1590 final ushort bind(string host, ushort port) 1591 { 1592 assert(host.length, "Empty host"); 1593 1594 debug (ASOCKETS) stderr.writefln("Connecting to %s:%s", host, port); 1595 1596 state = ConnectionState.resolving; 1597 1598 AddressInfo addressInfo; 1599 try 1600 { 1601 auto addresses = getAddress(host, port); 1602 enforce(addresses.length, "No addresses found"); 1603 debug (ASOCKETS) 1604 { 1605 stderr.writefln("Resolved to %s addresses:", addresses.length); 1606 foreach (address; addresses) 1607 stderr.writefln("- %s", address.toString()); 1608 } 1609 1610 Address address; 1611 if (addresses.length > 1) 1612 { 1613 import std.random : uniform; 1614 address = addresses[uniform(0, $)]; 1615 } 1616 else 1617 address = addresses[0]; 1618 addressInfo = AddressInfo(address.addressFamily, SocketType.DGRAM, ProtocolType.UDP, address, host); 1619 } 1620 catch (SocketException e) 1621 { 1622 onError("Lookup error: " ~ e.msg); 1623 return 0; 1624 } 1625 1626 state = ConnectionState.disconnected; 1627 return bind(addressInfo); 1628 } 1629 1630 /// ditto 1631 final ushort bind(AddressInfo addressInfo) 1632 { 1633 initialize(addressInfo.family, addressInfo.type, addressInfo.protocol); 1634 conn.bind(addressInfo.address); 1635 1636 auto address = conn.localAddress(); 1637 auto port = to!ushort(address.toPortString()); 1638 1639 if (connectHandler) 1640 connectHandler(); 1641 1642 return port; 1643 } 1644 1645 public: 1646 /// Where to send packets to. 1647 Address remoteAddress; 1648 } 1649 1650 /// 1651 unittest 1652 { 1653 auto server = new UdpConnection(); 1654 auto serverPort = server.bind("localhost", 0); 1655 1656 auto client = new UdpConnection(); 1657 client.initialize(server.localAddress.addressFamily); 1658 1659 string[] packets = ["Hello", "there"]; 1660 client.remoteAddress = server.localAddress; 1661 client.send({ 1662 Data[] data; 1663 foreach (packet; packets) 1664 data ~= Data(packet); 1665 return data; 1666 }()); 1667 1668 server.handleReadData = (Data data) 1669 { 1670 assert(data.contents == packets[0]); 1671 packets = packets[1..$]; 1672 if (!packets.length) 1673 { 1674 server.close(); 1675 client.close(); 1676 } 1677 }; 1678 socketManager.loop(); 1679 assert(!packets.length); 1680 } 1681 1682 // *************************************************************************** 1683 1684 /// Base class for a connection adapter. 1685 /// By itself, does nothing. 1686 class ConnectionAdapter : IConnection 1687 { 1688 IConnection next; 1689 1690 this(IConnection next) 1691 { 1692 this.next = next; 1693 next.handleConnect = &onConnect; 1694 next.handleDisconnect = &onDisconnect; 1695 next.handleBufferFlushed = &onBufferFlushed; 1696 } 1697 1698 @property ConnectionState state() { return next.state; } 1699 1700 /// Queue Data for sending. 1701 void send(Data[] data, int priority) 1702 { 1703 next.send(data, priority); 1704 } 1705 1706 alias send = IConnection.send; /// ditto 1707 1708 /// Terminate the connection. 1709 void disconnect(string reason = defaultDisconnectReason, DisconnectType type = DisconnectType.requested) 1710 { 1711 next.disconnect(reason, type); 1712 } 1713 1714 protected void onConnect() 1715 { 1716 if (connectHandler) 1717 connectHandler(); 1718 } 1719 1720 protected void onReadData(Data data) 1721 { 1722 // onReadData should be fired only if readDataHandler is set 1723 readDataHandler(data); 1724 } 1725 1726 protected void onDisconnect(string reason, DisconnectType type) 1727 { 1728 if (disconnectHandler) 1729 disconnectHandler(reason, type); 1730 } 1731 1732 protected void onBufferFlushed() 1733 { 1734 if (bufferFlushedHandler) 1735 bufferFlushedHandler(); 1736 } 1737 1738 /// Callback for when a connection has been established. 1739 @property void handleConnect(ConnectHandler value) { connectHandler = value; } 1740 private ConnectHandler connectHandler; 1741 1742 /// Callback setter for when new data is read. 1743 @property void handleReadData(ReadDataHandler value) 1744 { 1745 readDataHandler = value; 1746 next.handleReadData = value ? &onReadData : null ; 1747 } 1748 private ReadDataHandler readDataHandler; 1749 1750 /// Callback setter for when a connection was closed. 1751 @property void handleDisconnect(DisconnectHandler value) { disconnectHandler = value; } 1752 private DisconnectHandler disconnectHandler; 1753 1754 /// Callback setter for when all queued data has been written. 1755 @property void handleBufferFlushed(BufferFlushedHandler value) { bufferFlushedHandler = value; } 1756 private BufferFlushedHandler bufferFlushedHandler; 1757 } 1758 1759 // *************************************************************************** 1760 1761 /// Adapter for connections with a line-based protocol. 1762 /// Splits data stream into delimiter-separated lines. 1763 class LineBufferedAdapter : ConnectionAdapter 1764 { 1765 /// The protocol's line delimiter. 1766 string delimiter = "\r\n"; 1767 1768 this(IConnection next) 1769 { 1770 super(next); 1771 } 1772 1773 /// Append a line to the send buffer. 1774 void send(string line) 1775 { 1776 //super.send(Data(line ~ delimiter)); 1777 // https://issues.dlang.org/show_bug.cgi?id=13985 1778 ConnectionAdapter ca = this; 1779 ca.send(Data(line ~ delimiter)); 1780 } 1781 1782 protected: 1783 /// The receive buffer. 1784 Data inBuffer; 1785 1786 /// Called when data has been received. 1787 final override void onReadData(Data data) 1788 { 1789 import std.string; 1790 auto oldBufferLength = inBuffer.length; 1791 if (oldBufferLength) 1792 inBuffer ~= data; 1793 else 1794 inBuffer = data; 1795 1796 if (delimiter.length == 1) 1797 { 1798 import core.stdc.string; // memchr 1799 1800 char c = delimiter[0]; 1801 auto p = memchr(inBuffer.ptr + oldBufferLength, c, data.length); 1802 while (p) 1803 { 1804 sizediff_t index = p - inBuffer.ptr; 1805 processLine(index); 1806 1807 p = memchr(inBuffer.ptr, c, inBuffer.length); 1808 } 1809 } 1810 else 1811 { 1812 sizediff_t index; 1813 // TODO: we can start the search at oldBufferLength-delimiter.length+1 1814 while ((index=indexOf(cast(string)inBuffer.contents, delimiter)) >= 0) 1815 processLine(index); 1816 } 1817 } 1818 1819 final void processLine(size_t index) 1820 { 1821 auto line = inBuffer[0..index]; 1822 inBuffer = inBuffer[index+delimiter.length..inBuffer.length]; 1823 super.onReadData(line); 1824 } 1825 1826 override void onDisconnect(string reason, DisconnectType type) 1827 { 1828 super.onDisconnect(reason, type); 1829 inBuffer.clear(); 1830 } 1831 } 1832 1833 // *************************************************************************** 1834 1835 /// Fires an event handler or disconnects connections 1836 /// after a period of inactivity. 1837 class TimeoutAdapter : ConnectionAdapter 1838 { 1839 this(IConnection next) 1840 { 1841 debug (ASOCKETS) stderr.writefln("New TimeoutAdapter @ %s", cast(void*)this); 1842 super(next); 1843 } 1844 1845 void cancelIdleTimeout() 1846 { 1847 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.cancelIdleTimeout @ %s", cast(void*)this); 1848 assert(idleTask !is null); 1849 assert(idleTask.isWaiting()); 1850 idleTask.cancel(); 1851 } 1852 1853 void resumeIdleTimeout() 1854 { 1855 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.resumeIdleTimeout @ %s", cast(void*)this); 1856 assert(idleTask !is null); 1857 assert(!idleTask.isWaiting()); 1858 mainTimer.add(idleTask); 1859 } 1860 1861 final void setIdleTimeout(Duration duration) 1862 { 1863 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.setIdleTimeout @ %s", cast(void*)this); 1864 assert(duration > Duration.zero); 1865 1866 // Configure idleTask 1867 if (idleTask is null) 1868 { 1869 idleTask = new TimerTask(duration); 1870 idleTask.handleTask = &onTask_Idle; 1871 } 1872 else 1873 { 1874 if (idleTask.isWaiting()) 1875 idleTask.cancel(); 1876 idleTask.delay = duration; 1877 } 1878 1879 mainTimer.add(idleTask); 1880 } 1881 1882 void markNonIdle() 1883 { 1884 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.markNonIdle @ %s", cast(void*)this); 1885 if (handleNonIdle) 1886 handleNonIdle(); 1887 if (idleTask && idleTask.isWaiting()) 1888 idleTask.restart(); 1889 } 1890 1891 /// Callback for when a connection has stopped responding. 1892 /// If unset, the connection will be disconnected. 1893 void delegate() handleIdleTimeout; 1894 1895 /// Callback for when a connection is marked as non-idle 1896 /// (when data is received). 1897 void delegate() handleNonIdle; 1898 1899 protected: 1900 override void onConnect() 1901 { 1902 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onConnect @ %s", cast(void*)this); 1903 markNonIdle(); 1904 super.onConnect(); 1905 } 1906 1907 override void onReadData(Data data) 1908 { 1909 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onReadData @ %s", cast(void*)this); 1910 markNonIdle(); 1911 super.onReadData(data); 1912 } 1913 1914 override void onDisconnect(string reason, DisconnectType type) 1915 { 1916 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onDisconnect @ %s", cast(void*)this); 1917 if (idleTask && idleTask.isWaiting()) 1918 idleTask.cancel(); 1919 super.onDisconnect(reason, type); 1920 } 1921 1922 private: 1923 TimerTask idleTask; // non-null if an idle timeout has been set 1924 1925 final void onTask_Idle(Timer timer, TimerTask task) 1926 { 1927 debug (ASOCKETS) stderr.writefln("TimeoutAdapter.onTask_Idle @ %s", cast(void*)this); 1928 if (state == ConnectionState.disconnecting) 1929 return disconnect("Delayed disconnect - time-out", DisconnectType.error); 1930 1931 if (state == ConnectionState.disconnected) 1932 return; 1933 1934 if (handleIdleTimeout) 1935 { 1936 resumeIdleTimeout(); // reschedule (by default) 1937 handleIdleTimeout(); 1938 } 1939 else 1940 disconnect("Time-out", DisconnectType.error); 1941 } 1942 } 1943 1944 // *************************************************************************** 1945 1946 unittest 1947 { 1948 void testTimer() 1949 { 1950 bool fired; 1951 setTimeout({fired = true;}, 10.msecs); 1952 socketManager.loop(); 1953 assert(fired); 1954 } 1955 1956 testTimer(); 1957 }