blob: 967c41fa0efdf8d56e3eabab04bb43953e0057fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module thrift.async.libevent;
import core.atomic;
import core.time : Duration, dur;
import core.exception : onOutOfMemoryError;
import core.memory : GC;
import core.thread : Fiber, Thread;
import core.sync.condition;
import core.sync.mutex;
import core.stdc.stdlib : free, malloc;
import deimos.event2.event;
import std.array : empty, front, popFront;
import std.conv : text, to;
import std.exception : enforce;
import std.socket : Socket, socketPair;
import thrift.base;
import thrift.async.base;
import thrift.internal.socket;
import thrift.internal.traits;
import thrift.util.cancellation;
// To avoid DMD @@BUG6395@@.
import thrift.internal.algorithm;
/**
* A TAsyncManager implementation based on libevent.
*
* The libevent loop for handling non-blocking sockets is run in a background
* thread, which is lazily spawned. The thread is not daemonized to avoid
* crashes on program shutdown, it is only stopped when the manager instance
* is destroyed. So, to ensure a clean program teardown, either make sure this
* instance gets destroyed (e.g. by using scope), or manually call stop() at
* the end.
*/
class TLibeventAsyncManager : TAsyncSocketManager {
this() {
eventBase_ = event_base_new();
// Set up the socket pair for transferring control messages to the event
// loop.
auto pair = socketPair();
controlSendSocket_ = pair[0];
controlReceiveSocket_ = pair[1];
controlReceiveSocket_.blocking = false;
// Register an event for receiving control messages.
controlReceiveEvent_ = event_new(eventBase_, controlReceiveSocket_.handle,
EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&controlMsgReceiveCallback),
cast(void*)this);
event_add(controlReceiveEvent_, null);
queuedCountMutex_ = new Mutex;
zeroQueuedCondition_ = new Condition(queuedCountMutex_);
}
~this() {
// stop() should be safe to call, because either we don't have a worker
// thread running and it is a no-op anyway, or it is guaranteed to be
// still running (blocked in event_base_loop), and thus guaranteed not to
// be garbage collected yet.
stop(dur!"hnsecs"(0));
event_free(controlReceiveEvent_);
event_base_free(eventBase_);
eventBase_ = null;
}
override void execute(TAsyncTransport transport, Work work,
TCancellation cancellation = null
) {
if (cancellation && cancellation.triggered) return;
// Keep track that there is a new work item to be processed.
incrementQueuedCount();
ensureWorkerThreadRunning();
// We should be able to send the control message as a whole – we currently
// assume to be able to receive it at once as well. If this proves to be
// unstable (e.g. send could possibly return early if the receiving buffer
// is full and the blocking call gets interrupted by a signal), it could
// be changed to a more sophisticated scheme.
// Make sure the delegate context doesn't get GCd while the work item is
// on the wire.
GC.addRoot(work.ptr);
// Send work message.
sendControlMsg(ControlMsg(MsgType.WORK, work, transport));
if (cancellation) {
cancellation.triggering.addCallback({
sendControlMsg(ControlMsg(MsgType.CANCEL, work, transport));
});
}
}
override void delay(Duration duration, void delegate() work) {
incrementQueuedCount();
ensureWorkerThreadRunning();
const tv = toTimeval(duration);
// DMD @@BUG@@: Cannot deduce T to void delegate() here.
registerOneshotEvent!(void delegate())(
-1, 0, assumeNothrow(&delayCallback), &tv,
{
work();
decrementQueuedCount();
}
);
}
override bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)) {
bool cleanExit = true;
synchronized (this) {
if (workerThread_) {
synchronized (queuedCountMutex_) {
if (waitFinishTimeout > dur!"hnsecs"(0)) {
if (queuedCount_ > 0) {
zeroQueuedCondition_.wait(waitFinishTimeout);
}
} else if (waitFinishTimeout < dur!"hnsecs"(0)) {
while (queuedCount_ > 0) zeroQueuedCondition_.wait();
} else {
// waitFinishTimeout is zero, immediately exit in all cases.
}
cleanExit = (queuedCount_ == 0);
}
event_base_loopbreak(eventBase_);
sendControlMsg(ControlMsg(MsgType.SHUTDOWN));
workerThread_.join();
workQueues_ = null;
// We have nuked all currently enqueued items, so set the count to
// zero. This is safe to do without locking, since the worker thread
// is down.
queuedCount_ = 0;
atomicStore(*(cast(shared)&workerThread_), cast(shared(Thread))null);
}
}
return cleanExit;
}
override void addOneshotListener(Socket socket, TAsyncEventType eventType,
TSocketEventListener listener
) {
addOneshotListenerImpl(socket, eventType, null, listener);
}
override void addOneshotListener(Socket socket, TAsyncEventType eventType,
Duration timeout, TSocketEventListener listener
) {
if (timeout <= dur!"hnsecs"(0)) {
addOneshotListenerImpl(socket, eventType, null, listener);
} else {
// This is not really documented well, but libevent does not require to
// keep the timeval around after the event was added.
auto tv = toTimeval(timeout);
addOneshotListenerImpl(socket, eventType, &tv, listener);
}
}
private:
alias void delegate() Work;
void addOneshotListenerImpl(Socket socket, TAsyncEventType eventType,
const(timeval)* timeout, TSocketEventListener listener
) {
registerOneshotEvent(socket.handle, libeventEventType(eventType),
assumeNothrow(&socketCallback), timeout, listener);
}
void registerOneshotEvent(T)(evutil_socket_t fd, short type,
event_callback_fn callback, const(timeval)* timeout, T payload
) {
// Create a copy of the payload on the C heap.
auto payloadMem = malloc(payload.sizeof);
if (!payloadMem) onOutOfMemoryError();
(cast(T*)payloadMem)[0 .. 1] = payload;
GC.addRange(payloadMem, payload.sizeof);
auto result = event_base_once(eventBase_, fd, type, callback,
payloadMem, timeout);
// Assuming that we didn't get our arguments wrong above, the only other
// situation in which event_base_once can fail is when it can't allocate
// memory.
if (result != 0) onOutOfMemoryError();
}
enum MsgType : ubyte {
SHUTDOWN,
WORK,
CANCEL
}
struct ControlMsg {
MsgType type;
Work work;
TAsyncTransport transport;
}
/**
* Starts the worker thread if it is not already running.
*/
void ensureWorkerThreadRunning() {
// Technically, only half barriers would be required here, but adding the
// argument seems to trigger a DMD template argument deduction @@BUG@@.
if (!atomicLoad(*(cast(shared)&workerThread_))) {
synchronized (this) {
if (!workerThread_) {
auto thread = new Thread({ event_base_loop(eventBase_, 0); });
thread.start();
atomicStore(*(cast(shared)&workerThread_), cast(shared)thread);
}
}
}
}
/**
* Sends a control message to the worker thread.
*/
void sendControlMsg(const(ControlMsg) msg) {
auto result = controlSendSocket_.send((&msg)[0 .. 1]);
enum size = msg.sizeof;
enforce(result == size, new TException(text(
"Sending control message of type ", msg.type, " failed (", result,
" bytes instead of ", size, " transmitted).")));
}
/**
* Receives messages from the control message socket and acts on them. Called
* from the worker thread.
*/
void receiveControlMsg() {
// Read as many new work items off the socket as possible (at least one
// should be available, as we got notified by libevent).
ControlMsg msg;
ptrdiff_t bytesRead;
while (true) {
bytesRead = controlReceiveSocket_.receive(cast(ubyte[])((&msg)[0 .. 1]));
if (bytesRead < 0) {
auto errno = getSocketErrno();
if (errno != WOULD_BLOCK_ERRNO) {
logError("Reading control message, some work item will possibly " ~
"never be executed: %s", socketErrnoString(errno));
}
}
if (bytesRead != msg.sizeof) break;
// Everything went fine, we received a new control message.
final switch (msg.type) {
case MsgType.SHUTDOWN:
// The message was just intended to wake us up for shutdown.
break;
case MsgType.CANCEL:
// When processing a cancellation, we must not touch the first item,
// since it is already being processed.
auto queue = workQueues_[msg.transport];
if (queue.length > 0) {
workQueues_[msg.transport] = [queue[0]] ~
removeEqual(queue[1 .. $], msg.work);
}
break;
case MsgType.WORK:
// Now that the work item is back in the D world, we don't need the
// extra GC root for the context pointer anymore (see execute()).
GC.removeRoot(msg.work.ptr);
// Add the work item to the queue and execute it.
auto queue = msg.transport in workQueues_;
if (queue is null || (*queue).empty) {
// If the queue is empty, add the new work item to the queue as well,
// but immediately start executing it.
workQueues_[msg.transport] = [msg.work];
executeWork(msg.transport, msg.work);
} else {
(*queue) ~= msg.work;
}
break;
}
}
// If the last read was successful, but didn't read enough bytes, we got
// a problem.
if (bytesRead > 0) {
logError("Unexpected partial control message read (%s byte(s) " ~
"instead of %s), some work item will possibly never be executed.",
bytesRead, msg.sizeof);
}
}
/**
* Executes the given work item and all others enqueued for the same
* transport in a new fiber. Called from the worker thread.
*/
void executeWork(TAsyncTransport transport, Work work) {
(new Fiber({
auto item = work;
while (true) {
try {
// Execute the actual work. It will possibly add listeners to the
// event loop and yield away if it has to wait for blocking
// operations. It is quite possible that another fiber will modify
// the work queue for the current transport.
item();
} catch (Exception e) {
// This should never happen, just to be sure the worker thread
// doesn't stop working in mysterious ways because of an unhandled
// exception.
logError("Exception thrown by work item: %s", e);
}
// Remove the item from the work queue.
// Note: Due to the value semantics of array slices, we have to
// re-lookup this on every iteration. This could be solved, but I'd
// rather replace this directly with a queue type once one becomes
// available in Phobos.
auto queue = workQueues_[transport];
assert(queue.front == item);
queue.popFront();
workQueues_[transport] = queue;
// Now that the work item is done, no longer count it as queued.
decrementQueuedCount();
if (queue.empty) break;
// If the queue is not empty, execute the next waiting item.
item = queue.front;
}
})).call();
}
/**
* Increments the amount of queued items.
*/
void incrementQueuedCount() {
synchronized (queuedCountMutex_) {
++queuedCount_;
}
}
/**
* Decrements the amount of queued items.
*/
void decrementQueuedCount() {
synchronized (queuedCountMutex_) {
assert(queuedCount_ > 0);
--queuedCount_;
if (queuedCount_ == 0) {
zeroQueuedCondition_.notifyAll();
}
}
}
static extern(C) void controlMsgReceiveCallback(evutil_socket_t, short,
void *managerThis
) {
(cast(TLibeventAsyncManager)managerThis).receiveControlMsg();
}
static extern(C) void socketCallback(evutil_socket_t, short flags,
void *arg
) {
auto reason = (flags & EV_TIMEOUT) ? TAsyncEventReason.TIMED_OUT :
TAsyncEventReason.NORMAL;
(*(cast(TSocketEventListener*)arg))(reason);
GC.removeRange(arg);
clear(arg);
free(arg);
}
static extern(C) void delayCallback(evutil_socket_t, short flags,
void *arg
) {
assert(flags & EV_TIMEOUT);
(*(cast(void delegate()*)arg))();
GC.removeRange(arg);
clear(arg);
free(arg);
}
Thread workerThread_;
event_base* eventBase_;
/// The socket used for receiving new work items in the event loop. Paired
/// with controlSendSocket_. Invalid (i.e. TAsyncWorkItem.init) items are
/// ignored and can be used to wake up the worker thread.
Socket controlReceiveSocket_;
event* controlReceiveEvent_;
/// The socket used to send new work items to the event loop. It is
/// expected that work items can always be read at once from it, i.e. that
/// there will never be short reads.
Socket controlSendSocket_;
/// Queued up work delegates for async transports. This also includes
/// currently active ones, they are removed from the queue on completion,
/// which is relied on by the control message receive fiber (the main one)
/// to decide whether to immediately start executing items or not.
// TODO: This should really be of some queue type, not an array slice, but
// std.container doesn't have anything.
Work[][TAsyncTransport] workQueues_;
/// The total number of work items not yet finished (queued and currently
/// executed) and delays not yet executed.
uint queuedCount_;
/// Protects queuedCount_.
Mutex queuedCountMutex_;
/// Triggered when queuedCount_ reaches zero, protected by queuedCountMutex_.
Condition zeroQueuedCondition_;
}
private {
timeval toTimeval(const(Duration) dur) {
timeval tv = {tv_sec: cast(int)dur.total!"seconds"(),
tv_usec: dur.fracSec.usecs};
return tv;
}
/**
* Returns the libevent flags combination to represent a given TAsyncEventType.
*/
short libeventEventType(TAsyncEventType type) {
final switch (type) {
case TAsyncEventType.READ:
return EV_READ | EV_ET;
case TAsyncEventType.WRITE:
return EV_WRITE | EV_ET;
}
}
}