| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| module thrift.async.socket; |
| |
| import core.thread : Fiber; |
| import core.time : dur, Duration; |
| import std.array : empty; |
| import std.conv : to; |
| import std.exception : enforce; |
| import std.socket; |
| import thrift.base; |
| import thrift.async.base; |
| import thrift.transport.base; |
| import thrift.transport.socket : TSocketBase; |
| import thrift.internal.endian; |
| import thrift.internal.socket; |
| |
| version (Windows) { |
| import std.c.windows.winsock : connect; |
| } else version (Posix) { |
| import core.sys.posix.sys.socket : connect; |
| } else static assert(0, "Don't know connect on this platform."); |
| |
| /** |
| * Non-blocking socket implementation of the TTransport interface. |
| * |
| * Whenever a socket operation would block, TAsyncSocket registers a callback |
| * with the specified TAsyncSocketManager and yields. |
| * |
| * As for thrift.transport.socket, due to the limitations of std.socket, |
| * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are |
| * not). |
| */ |
| class TAsyncSocket : TSocketBase, TAsyncTransport { |
| /** |
| * Constructor that takes an already created, connected (!) socket. |
| * |
| * Params: |
| * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. |
| * socket = Already created, connected socket object. Will be switched to |
| * non-blocking mode if it isn't already. |
| */ |
| this(TAsyncSocketManager asyncManager, Socket socket) { |
| asyncManager_ = asyncManager; |
| socket.blocking = false; |
| super(socket); |
| } |
| |
| /** |
| * Creates a new unconnected socket that will connect to the given host |
| * on the given port. |
| * |
| * Params: |
| * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. |
| * host = Remote host. |
| * port = Remote port. |
| */ |
| this(TAsyncSocketManager asyncManager, string host, ushort port) { |
| asyncManager_ = asyncManager; |
| super(host, port); |
| } |
| |
| override TAsyncManager asyncManager() @property { |
| return asyncManager_; |
| } |
| |
| /** |
| * Asynchronously connects the socket. |
| * |
| * Completes without blocking and defers further operations on the socket |
| * until the connection is established. If connecting fails, this is |
| * currently not indicated in any way other than every call to read/write |
| * failing. |
| */ |
| override void open() { |
| if (isOpen) return; |
| |
| enforce(!host_.empty, new TTransportException( |
| "Cannot open null host.", TTransportException.Type.NOT_OPEN)); |
| enforce(port_ != 0, new TTransportException( |
| "Cannot open with null port.", TTransportException.Type.NOT_OPEN)); |
| |
| |
| // Cannot use std.socket.Socket.connect here because it hides away |
| // EINPROGRESS/WSAWOULDBLOCK. |
| Address addr; |
| try { |
| // Currently, we just go with the first address returned, could be made |
| // more intelligent though – IPv6? |
| addr = getAddress(host_, port_)[0]; |
| } catch (Exception e) { |
| throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`, |
| TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); |
| } |
| |
| socket_ = new TcpSocket(addr.addressFamily); |
| socket_.blocking = false; |
| setSocketOpts(); |
| |
| auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen()); |
| if (errorCode == 0) { |
| // If the connection could be established immediately, just return. I |
| // don't know if this ever happens. |
| return; |
| } |
| |
| auto errno = getSocketErrno(); |
| if (errno != CONNECT_INPROGRESS_ERRNO) { |
| throw new TTransportException(`Could not establish connection to "` ~ |
| host_ ~ `": ` ~ socketErrnoString(errno), |
| TTransportException.Type.NOT_OPEN); |
| } |
| |
| // This is the expected case: connect() signalled that the connection |
| // is being established in the background. Queue up a work item with the |
| // async manager which just defers any other operations on this |
| // TAsyncSocket instance until the socket is ready. |
| asyncManager_.execute(this, |
| { |
| auto fiber = Fiber.getThis(); |
| TAsyncEventReason reason = void; |
| asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE, |
| connectTimeout, |
| scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); }) |
| ); |
| Fiber.yield(); |
| |
| if (reason == TAsyncEventReason.TIMED_OUT) { |
| // Close the connection, so that subsequent work items fail immediately. |
| closeImmediately(); |
| return; |
| } |
| |
| int errorCode = void; |
| socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR, |
| errorCode); |
| |
| if (errorCode) { |
| logInfo("Could not connect TAsyncSocket: %s", |
| socketErrnoString(errorCode)); |
| |
| // Close the connection, so that subsequent work items fail immediately. |
| closeImmediately(); |
| return; |
| } |
| |
| } |
| ); |
| } |
| |
| /** |
| * Closes the socket. |
| * |
| * Will block until all currently active operations are finished before the |
| * socket is closed. |
| */ |
| override void close() { |
| if (!isOpen) return; |
| |
| import core.sync.condition; |
| import core.sync.mutex; |
| |
| auto doneMutex = new Mutex; |
| auto doneCond = new Condition(doneMutex); |
| synchronized (doneMutex) { |
| asyncManager_.execute(this, |
| scopedDelegate( |
| { |
| closeImmediately(); |
| synchronized (doneMutex) doneCond.notifyAll(); |
| } |
| ) |
| ); |
| doneCond.wait(); |
| } |
| } |
| |
| override bool peek() { |
| if (!isOpen) return false; |
| |
| ubyte buf; |
| auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK); |
| if (r == Socket.ERROR) { |
| auto lastErrno = getSocketErrno(); |
| static if (connresetOnPeerShutdown) { |
| if (lastErrno == ECONNRESET) { |
| closeImmediately(); |
| return false; |
| } |
| } |
| throw new TTransportException("Peeking into socket failed: " ~ |
| socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); |
| } |
| return (r > 0); |
| } |
| |
| override size_t read(ubyte[] buf) { |
| enforce(isOpen, new TTransportException( |
| "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN)); |
| |
| typeof(getSocketErrno()) lastErrno; |
| |
| auto r = yieldOnBlock(socket_.receive(cast(void[])buf), |
| TAsyncEventType.READ); |
| |
| // If recv went fine, immediately return. |
| if (r >= 0) return r; |
| |
| // Something went wrong, find out how to handle it. |
| lastErrno = getSocketErrno(); |
| |
| static if (connresetOnPeerShutdown) { |
| // See top comment. |
| if (lastErrno == ECONNRESET) { |
| return 0; |
| } |
| } |
| |
| throw new TTransportException("Receiving from socket failed: " ~ |
| socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); |
| } |
| |
| override void write(in ubyte[] buf) { |
| size_t sent; |
| while (sent < buf.length) { |
| sent += writeSome(buf[sent .. $]); |
| } |
| assert(sent == buf.length); |
| } |
| |
| override size_t writeSome(in ubyte[] buf) { |
| enforce(isOpen, new TTransportException( |
| "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN)); |
| |
| auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE); |
| |
| // Everything went well, just return the number of bytes written. |
| if (r > 0) return r; |
| |
| // Handle error conditions. |
| if (r < 0) { |
| auto lastErrno = getSocketErrno(); |
| |
| auto type = TTransportException.Type.UNKNOWN; |
| if (isSocketCloseErrno(lastErrno)) { |
| type = TTransportException.Type.NOT_OPEN; |
| closeImmediately(); |
| } |
| |
| throw new TTransportException("Sending to socket failed: " ~ |
| socketErrnoString(lastErrno), type); |
| } |
| |
| // send() should never return 0. |
| throw new TTransportException("Sending to socket failed (0 bytes written).", |
| TTransportException.Type.UNKNOWN); |
| } |
| |
| /// The amount of time in which a conncetion must be established before the |
| /// open() call times out. |
| Duration connectTimeout = dur!"seconds"(5); |
| |
| private: |
| void closeImmediately() { |
| socket_.close(); |
| socket_ = null; |
| } |
| |
| T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) { |
| while (true) { |
| auto result = call(); |
| if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result; |
| |
| // We got an EAGAIN result, register a callback to return here once some |
| // event happens and yield. |
| |
| Duration timeout = void; |
| final switch (eventType) { |
| case TAsyncEventType.READ: |
| timeout = recvTimeout_; |
| break; |
| case TAsyncEventType.WRITE: |
| timeout = sendTimeout_; |
| break; |
| } |
| |
| auto fiber = Fiber.getThis(); |
| assert(fiber, "Current fiber null – not running in TAsyncManager?"); |
| TAsyncEventReason eventReason = void; |
| asyncManager_.addOneshotListener(socket_, eventType, timeout, |
| scopedDelegate((TAsyncEventReason reason) { |
| eventReason = reason; |
| fiber.call(); |
| }) |
| ); |
| |
| // Yields execution back to the async manager, will return back here once |
| // the above listener is called. |
| Fiber.yield(); |
| |
| if (eventReason == TAsyncEventReason.TIMED_OUT) { |
| // If we are cancelling the request due to a timed out operation, the |
| // connection is in an undefined state, because the server could decide |
| // to send the requested data later, or we could have already been half- |
| // way into writing a request. Thus, we close the connection to make any |
| // possibly queued up work items fail immediately. Besides, the server |
| // is not very likely to immediately recover after a socket-level |
| // timeout has expired anyway. |
| closeImmediately(); |
| |
| throw new TTransportException("Timed out while waiting for socket " ~ |
| "to get ready to " ~ to!string(eventType) ~ ".", |
| TTransportException.Type.TIMED_OUT); |
| } |
| } |
| } |
| |
| /// The TAsyncSocketManager to use for non-blocking I/O. |
| TAsyncSocketManager asyncManager_; |
| } |
| |
| private { |
| // std.socket doesn't include SO_ERROR for reasons unknown. |
| version (linux) { |
| enum SO_ERROR = 4; |
| } else version (OSX) { |
| enum SO_ERROR = 0x1007; |
| } else version (FreeBSD) { |
| enum SO_ERROR = 0x1007; |
| } else version (Win32) { |
| import std.c.windows.winsock : SO_ERROR; |
| } else static assert(false, "Don't know SO_ERROR on this platform."); |
| |
| // This hack forces a delegate literal to be scoped, even if it is passed to |
| // a function accepting normal delegates as well. DMD likes to allocate the |
| // context on the heap anyway, but it seems to work for LDC. |
| import std.traits : isDelegate; |
| auto scopedDelegate(D)(scope D d) if (isDelegate!D) { |
| return d; |
| } |
| } |