blob: 6de13d9167a53426be0a08dbe800cdf4d011ea44 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module thrift.async.socket;
import core.thread : Fiber;
import core.time : dur, Duration;
import std.array : empty;
import std.conv : to;
import std.exception : enforce;
import std.socket;
import thrift.base;
import thrift.async.base;
import thrift.transport.base;
import thrift.transport.socket : TSocketBase;
import thrift.internal.endian;
import thrift.internal.socket;
version (Windows) {
import std.c.windows.winsock : connect;
} else version (Posix) {
import core.sys.posix.sys.socket : connect;
} else static assert(0, "Don't know connect on this platform.");
/**
* Non-blocking socket implementation of the TTransport interface.
*
* Whenever a socket operation would block, TAsyncSocket registers a callback
* with the specified TAsyncSocketManager and yields.
*
* As for thrift.transport.socket, due to the limitations of std.socket,
* currently only TCP/IP sockets are supported (i.e. Unix domain sockets are
* not).
*/
class TAsyncSocket : TSocketBase, TAsyncTransport {
/**
* Constructor that takes an already created, connected (!) socket.
*
* Params:
* asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
* socket = Already created, connected socket object. Will be switched to
* non-blocking mode if it isn't already.
*/
this(TAsyncSocketManager asyncManager, Socket socket) {
asyncManager_ = asyncManager;
socket.blocking = false;
super(socket);
}
/**
* Creates a new unconnected socket that will connect to the given host
* on the given port.
*
* Params:
* asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
* host = Remote host.
* port = Remote port.
*/
this(TAsyncSocketManager asyncManager, string host, ushort port) {
asyncManager_ = asyncManager;
super(host, port);
}
override TAsyncManager asyncManager() @property {
return asyncManager_;
}
/**
* Asynchronously connects the socket.
*
* Completes without blocking and defers further operations on the socket
* until the connection is established. If connecting fails, this is
* currently not indicated in any way other than every call to read/write
* failing.
*/
override void open() {
if (isOpen) return;
enforce(!host_.empty, new TTransportException(
"Cannot open null host.", TTransportException.Type.NOT_OPEN));
enforce(port_ != 0, new TTransportException(
"Cannot open with null port.", TTransportException.Type.NOT_OPEN));
// Cannot use std.socket.Socket.connect here because it hides away
// EINPROGRESS/WSAWOULDBLOCK.
Address addr;
try {
// Currently, we just go with the first address returned, could be made
// more intelligent though – IPv6?
addr = getAddress(host_, port_)[0];
} catch (Exception e) {
throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`,
TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
}
socket_ = new TcpSocket(addr.addressFamily);
socket_.blocking = false;
setSocketOpts();
auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen());
if (errorCode == 0) {
// If the connection could be established immediately, just return. I
// don't know if this ever happens.
return;
}
auto errno = getSocketErrno();
if (errno != CONNECT_INPROGRESS_ERRNO) {
throw new TTransportException(`Could not establish connection to "` ~
host_ ~ `": ` ~ socketErrnoString(errno),
TTransportException.Type.NOT_OPEN);
}
// This is the expected case: connect() signalled that the connection
// is being established in the background. Queue up a work item with the
// async manager which just defers any other operations on this
// TAsyncSocket instance until the socket is ready.
asyncManager_.execute(this,
{
auto fiber = Fiber.getThis();
TAsyncEventReason reason = void;
asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE,
connectTimeout,
scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); })
);
Fiber.yield();
if (reason == TAsyncEventReason.TIMED_OUT) {
// Close the connection, so that subsequent work items fail immediately.
closeImmediately();
return;
}
int errorCode = void;
socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR,
errorCode);
if (errorCode) {
logInfo("Could not connect TAsyncSocket: %s",
socketErrnoString(errorCode));
// Close the connection, so that subsequent work items fail immediately.
closeImmediately();
return;
}
}
);
}
/**
* Closes the socket.
*
* Will block until all currently active operations are finished before the
* socket is closed.
*/
override void close() {
if (!isOpen) return;
import core.sync.condition;
import core.sync.mutex;
auto doneMutex = new Mutex;
auto doneCond = new Condition(doneMutex);
synchronized (doneMutex) {
asyncManager_.execute(this,
scopedDelegate(
{
closeImmediately();
synchronized (doneMutex) doneCond.notifyAll();
}
)
);
doneCond.wait();
}
}
override bool peek() {
if (!isOpen) return false;
ubyte buf;
auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK);
if (r == Socket.ERROR) {
auto lastErrno = getSocketErrno();
static if (connresetOnPeerShutdown) {
if (lastErrno == ECONNRESET) {
closeImmediately();
return false;
}
}
throw new TTransportException("Peeking into socket failed: " ~
socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
}
return (r > 0);
}
override size_t read(ubyte[] buf) {
enforce(isOpen, new TTransportException(
"Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
typeof(getSocketErrno()) lastErrno;
auto r = yieldOnBlock(socket_.receive(cast(void[])buf),
TAsyncEventType.READ);
// If recv went fine, immediately return.
if (r >= 0) return r;
// Something went wrong, find out how to handle it.
lastErrno = getSocketErrno();
static if (connresetOnPeerShutdown) {
// See top comment.
if (lastErrno == ECONNRESET) {
return 0;
}
}
throw new TTransportException("Receiving from socket failed: " ~
socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
}
override void write(in ubyte[] buf) {
size_t sent;
while (sent < buf.length) {
sent += writeSome(buf[sent .. $]);
}
assert(sent == buf.length);
}
override size_t writeSome(in ubyte[] buf) {
enforce(isOpen, new TTransportException(
"Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN));
auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE);
// Everything went well, just return the number of bytes written.
if (r > 0) return r;
// Handle error conditions.
if (r < 0) {
auto lastErrno = getSocketErrno();
auto type = TTransportException.Type.UNKNOWN;
if (isSocketCloseErrno(lastErrno)) {
type = TTransportException.Type.NOT_OPEN;
closeImmediately();
}
throw new TTransportException("Sending to socket failed: " ~
socketErrnoString(lastErrno), type);
}
// send() should never return 0.
throw new TTransportException("Sending to socket failed (0 bytes written).",
TTransportException.Type.UNKNOWN);
}
/// The amount of time in which a conncetion must be established before the
/// open() call times out.
Duration connectTimeout = dur!"seconds"(5);
private:
void closeImmediately() {
socket_.close();
socket_ = null;
}
T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) {
while (true) {
auto result = call();
if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result;
// We got an EAGAIN result, register a callback to return here once some
// event happens and yield.
Duration timeout = void;
final switch (eventType) {
case TAsyncEventType.READ:
timeout = recvTimeout_;
break;
case TAsyncEventType.WRITE:
timeout = sendTimeout_;
break;
}
auto fiber = Fiber.getThis();
assert(fiber, "Current fiber null – not running in TAsyncManager?");
TAsyncEventReason eventReason = void;
asyncManager_.addOneshotListener(socket_, eventType, timeout,
scopedDelegate((TAsyncEventReason reason) {
eventReason = reason;
fiber.call();
})
);
// Yields execution back to the async manager, will return back here once
// the above listener is called.
Fiber.yield();
if (eventReason == TAsyncEventReason.TIMED_OUT) {
// If we are cancelling the request due to a timed out operation, the
// connection is in an undefined state, because the server could decide
// to send the requested data later, or we could have already been half-
// way into writing a request. Thus, we close the connection to make any
// possibly queued up work items fail immediately. Besides, the server
// is not very likely to immediately recover after a socket-level
// timeout has expired anyway.
closeImmediately();
throw new TTransportException("Timed out while waiting for socket " ~
"to get ready to " ~ to!string(eventType) ~ ".",
TTransportException.Type.TIMED_OUT);
}
}
}
/// The TAsyncSocketManager to use for non-blocking I/O.
TAsyncSocketManager asyncManager_;
}
private {
// std.socket doesn't include SO_ERROR for reasons unknown.
version (linux) {
enum SO_ERROR = 4;
} else version (OSX) {
enum SO_ERROR = 0x1007;
} else version (FreeBSD) {
enum SO_ERROR = 0x1007;
} else version (Win32) {
import std.c.windows.winsock : SO_ERROR;
} else static assert(false, "Don't know SO_ERROR on this platform.");
// This hack forces a delegate literal to be scoped, even if it is passed to
// a function accepting normal delegates as well. DMD likes to allocate the
// context on the heap anyway, but it seems to work for LDC.
import std.traits : isDelegate;
auto scopedDelegate(D)(scope D d) if (isDelegate!D) {
return d;
}
}