Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 1 | /* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | */ |
| 19 | module thrift.async.libevent; |
| 20 | |
| 21 | import core.atomic; |
| 22 | import core.time : Duration, dur; |
| 23 | import core.exception : onOutOfMemoryError; |
| 24 | import core.memory : GC; |
| 25 | import core.thread : Fiber, Thread; |
| 26 | import core.sync.condition; |
| 27 | import core.sync.mutex; |
| 28 | import core.stdc.stdlib : free, malloc; |
| 29 | import deimos.event2.event; |
| 30 | import std.array : empty, front, popFront; |
| 31 | import std.conv : text, to; |
| 32 | import std.exception : enforce; |
| 33 | import std.socket : Socket, socketPair; |
| 34 | import thrift.base; |
| 35 | import thrift.async.base; |
| 36 | import thrift.internal.socket; |
| 37 | import thrift.internal.traits; |
| 38 | import thrift.util.cancellation; |
| 39 | |
| 40 | // To avoid DMD @@BUG6395@@. |
| 41 | import thrift.internal.algorithm; |
| 42 | |
| 43 | /** |
| 44 | * A TAsyncManager implementation based on libevent. |
| 45 | * |
| 46 | * The libevent loop for handling non-blocking sockets is run in a background |
| 47 | * thread, which is lazily spawned. The thread is not daemonized to avoid |
| 48 | * crashes on program shutdown, it is only stopped when the manager instance |
| 49 | * is destroyed. So, to ensure a clean program teardown, either make sure this |
| 50 | * instance gets destroyed (e.g. by using scope), or manually call stop() at |
| 51 | * the end. |
| 52 | */ |
| 53 | class TLibeventAsyncManager : TAsyncSocketManager { |
| 54 | this() { |
| 55 | eventBase_ = event_base_new(); |
| 56 | |
| 57 | // Set up the socket pair for transferring control messages to the event |
| 58 | // loop. |
| 59 | auto pair = socketPair(); |
| 60 | controlSendSocket_ = pair[0]; |
| 61 | controlReceiveSocket_ = pair[1]; |
| 62 | controlReceiveSocket_.blocking = false; |
| 63 | |
| 64 | // Register an event for receiving control messages. |
James Lacey | 8181056 | 2020-03-27 19:14:33 +0100 | [diff] [blame] | 65 | controlReceiveEvent_ = event_new(eventBase_, cast(evutil_socket_t)controlReceiveSocket_.handle, |
Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 66 | EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&controlMsgReceiveCallback), |
| 67 | cast(void*)this); |
| 68 | event_add(controlReceiveEvent_, null); |
| 69 | |
| 70 | queuedCountMutex_ = new Mutex; |
| 71 | zeroQueuedCondition_ = new Condition(queuedCountMutex_); |
| 72 | } |
| 73 | |
| 74 | ~this() { |
| 75 | // stop() should be safe to call, because either we don't have a worker |
| 76 | // thread running and it is a no-op anyway, or it is guaranteed to be |
| 77 | // still running (blocked in event_base_loop), and thus guaranteed not to |
| 78 | // be garbage collected yet. |
| 79 | stop(dur!"hnsecs"(0)); |
| 80 | |
| 81 | event_free(controlReceiveEvent_); |
| 82 | event_base_free(eventBase_); |
| 83 | eventBase_ = null; |
| 84 | } |
| 85 | |
| 86 | override void execute(TAsyncTransport transport, Work work, |
| 87 | TCancellation cancellation = null |
| 88 | ) { |
| 89 | if (cancellation && cancellation.triggered) return; |
| 90 | |
| 91 | // Keep track that there is a new work item to be processed. |
| 92 | incrementQueuedCount(); |
| 93 | |
| 94 | ensureWorkerThreadRunning(); |
| 95 | |
| 96 | // We should be able to send the control message as a whole – we currently |
| 97 | // assume to be able to receive it at once as well. If this proves to be |
| 98 | // unstable (e.g. send could possibly return early if the receiving buffer |
| 99 | // is full and the blocking call gets interrupted by a signal), it could |
| 100 | // be changed to a more sophisticated scheme. |
| 101 | |
| 102 | // Make sure the delegate context doesn't get GCd while the work item is |
| 103 | // on the wire. |
| 104 | GC.addRoot(work.ptr); |
| 105 | |
| 106 | // Send work message. |
| 107 | sendControlMsg(ControlMsg(MsgType.WORK, work, transport)); |
| 108 | |
| 109 | if (cancellation) { |
| 110 | cancellation.triggering.addCallback({ |
| 111 | sendControlMsg(ControlMsg(MsgType.CANCEL, work, transport)); |
| 112 | }); |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | override void delay(Duration duration, void delegate() work) { |
| 117 | incrementQueuedCount(); |
| 118 | |
| 119 | ensureWorkerThreadRunning(); |
| 120 | |
| 121 | const tv = toTimeval(duration); |
| 122 | |
| 123 | // DMD @@BUG@@: Cannot deduce T to void delegate() here. |
| 124 | registerOneshotEvent!(void delegate())( |
| 125 | -1, 0, assumeNothrow(&delayCallback), &tv, |
| 126 | { |
| 127 | work(); |
| 128 | decrementQueuedCount(); |
| 129 | } |
| 130 | ); |
| 131 | } |
| 132 | |
| 133 | override bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)) { |
| 134 | bool cleanExit = true; |
| 135 | |
| 136 | synchronized (this) { |
| 137 | if (workerThread_) { |
| 138 | synchronized (queuedCountMutex_) { |
| 139 | if (waitFinishTimeout > dur!"hnsecs"(0)) { |
| 140 | if (queuedCount_ > 0) { |
| 141 | zeroQueuedCondition_.wait(waitFinishTimeout); |
| 142 | } |
| 143 | } else if (waitFinishTimeout < dur!"hnsecs"(0)) { |
| 144 | while (queuedCount_ > 0) zeroQueuedCondition_.wait(); |
| 145 | } else { |
| 146 | // waitFinishTimeout is zero, immediately exit in all cases. |
| 147 | } |
| 148 | cleanExit = (queuedCount_ == 0); |
| 149 | } |
| 150 | |
| 151 | event_base_loopbreak(eventBase_); |
| 152 | sendControlMsg(ControlMsg(MsgType.SHUTDOWN)); |
| 153 | workerThread_.join(); |
| 154 | workQueues_ = null; |
| 155 | // We have nuked all currently enqueued items, so set the count to |
| 156 | // zero. This is safe to do without locking, since the worker thread |
| 157 | // is down. |
| 158 | queuedCount_ = 0; |
| 159 | atomicStore(*(cast(shared)&workerThread_), cast(shared(Thread))null); |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | return cleanExit; |
| 164 | } |
| 165 | |
| 166 | override void addOneshotListener(Socket socket, TAsyncEventType eventType, |
| 167 | TSocketEventListener listener |
| 168 | ) { |
| 169 | addOneshotListenerImpl(socket, eventType, null, listener); |
| 170 | } |
| 171 | |
| 172 | override void addOneshotListener(Socket socket, TAsyncEventType eventType, |
| 173 | Duration timeout, TSocketEventListener listener |
| 174 | ) { |
| 175 | if (timeout <= dur!"hnsecs"(0)) { |
| 176 | addOneshotListenerImpl(socket, eventType, null, listener); |
| 177 | } else { |
| 178 | // This is not really documented well, but libevent does not require to |
| 179 | // keep the timeval around after the event was added. |
| 180 | auto tv = toTimeval(timeout); |
| 181 | addOneshotListenerImpl(socket, eventType, &tv, listener); |
| 182 | } |
| 183 | } |
| 184 | |
| 185 | private: |
| 186 | alias void delegate() Work; |
| 187 | |
| 188 | void addOneshotListenerImpl(Socket socket, TAsyncEventType eventType, |
| 189 | const(timeval)* timeout, TSocketEventListener listener |
| 190 | ) { |
James Lacey | 8181056 | 2020-03-27 19:14:33 +0100 | [diff] [blame] | 191 | registerOneshotEvent(cast(evutil_socket_t)socket.handle, libeventEventType(eventType), |
Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 192 | assumeNothrow(&socketCallback), timeout, listener); |
| 193 | } |
| 194 | |
| 195 | void registerOneshotEvent(T)(evutil_socket_t fd, short type, |
| 196 | event_callback_fn callback, const(timeval)* timeout, T payload |
| 197 | ) { |
| 198 | // Create a copy of the payload on the C heap. |
| 199 | auto payloadMem = malloc(payload.sizeof); |
| 200 | if (!payloadMem) onOutOfMemoryError(); |
| 201 | (cast(T*)payloadMem)[0 .. 1] = payload; |
| 202 | GC.addRange(payloadMem, payload.sizeof); |
| 203 | |
| 204 | auto result = event_base_once(eventBase_, fd, type, callback, |
| 205 | payloadMem, timeout); |
| 206 | |
| 207 | // Assuming that we didn't get our arguments wrong above, the only other |
| 208 | // situation in which event_base_once can fail is when it can't allocate |
| 209 | // memory. |
| 210 | if (result != 0) onOutOfMemoryError(); |
| 211 | } |
| 212 | |
| 213 | enum MsgType : ubyte { |
| 214 | SHUTDOWN, |
| 215 | WORK, |
| 216 | CANCEL |
| 217 | } |
| 218 | |
| 219 | struct ControlMsg { |
| 220 | MsgType type; |
| 221 | Work work; |
| 222 | TAsyncTransport transport; |
| 223 | } |
| 224 | |
| 225 | /** |
| 226 | * Starts the worker thread if it is not already running. |
| 227 | */ |
| 228 | void ensureWorkerThreadRunning() { |
| 229 | // Technically, only half barriers would be required here, but adding the |
| 230 | // argument seems to trigger a DMD template argument deduction @@BUG@@. |
| 231 | if (!atomicLoad(*(cast(shared)&workerThread_))) { |
| 232 | synchronized (this) { |
| 233 | if (!workerThread_) { |
| 234 | auto thread = new Thread({ event_base_loop(eventBase_, 0); }); |
| 235 | thread.start(); |
| 236 | atomicStore(*(cast(shared)&workerThread_), cast(shared)thread); |
| 237 | } |
| 238 | } |
| 239 | } |
| 240 | } |
| 241 | |
| 242 | /** |
| 243 | * Sends a control message to the worker thread. |
| 244 | */ |
| 245 | void sendControlMsg(const(ControlMsg) msg) { |
| 246 | auto result = controlSendSocket_.send((&msg)[0 .. 1]); |
| 247 | enum size = msg.sizeof; |
| 248 | enforce(result == size, new TException(text( |
| 249 | "Sending control message of type ", msg.type, " failed (", result, |
| 250 | " bytes instead of ", size, " transmitted)."))); |
| 251 | } |
| 252 | |
| 253 | /** |
| 254 | * Receives messages from the control message socket and acts on them. Called |
| 255 | * from the worker thread. |
| 256 | */ |
| 257 | void receiveControlMsg() { |
| 258 | // Read as many new work items off the socket as possible (at least one |
| 259 | // should be available, as we got notified by libevent). |
| 260 | ControlMsg msg; |
| 261 | ptrdiff_t bytesRead; |
| 262 | while (true) { |
| 263 | bytesRead = controlReceiveSocket_.receive(cast(ubyte[])((&msg)[0 .. 1])); |
| 264 | |
| 265 | if (bytesRead < 0) { |
| 266 | auto errno = getSocketErrno(); |
| 267 | if (errno != WOULD_BLOCK_ERRNO) { |
| 268 | logError("Reading control message, some work item will possibly " ~ |
| 269 | "never be executed: %s", socketErrnoString(errno)); |
| 270 | } |
| 271 | } |
| 272 | if (bytesRead != msg.sizeof) break; |
| 273 | |
| 274 | // Everything went fine, we received a new control message. |
| 275 | final switch (msg.type) { |
| 276 | case MsgType.SHUTDOWN: |
| 277 | // The message was just intended to wake us up for shutdown. |
| 278 | break; |
| 279 | |
| 280 | case MsgType.CANCEL: |
| 281 | // When processing a cancellation, we must not touch the first item, |
| 282 | // since it is already being processed. |
| 283 | auto queue = workQueues_[msg.transport]; |
| 284 | if (queue.length > 0) { |
| 285 | workQueues_[msg.transport] = [queue[0]] ~ |
| 286 | removeEqual(queue[1 .. $], msg.work); |
| 287 | } |
| 288 | break; |
| 289 | |
| 290 | case MsgType.WORK: |
| 291 | // Now that the work item is back in the D world, we don't need the |
| 292 | // extra GC root for the context pointer anymore (see execute()). |
| 293 | GC.removeRoot(msg.work.ptr); |
| 294 | |
| 295 | // Add the work item to the queue and execute it. |
| 296 | auto queue = msg.transport in workQueues_; |
| 297 | if (queue is null || (*queue).empty) { |
| 298 | // If the queue is empty, add the new work item to the queue as well, |
| 299 | // but immediately start executing it. |
| 300 | workQueues_[msg.transport] = [msg.work]; |
| 301 | executeWork(msg.transport, msg.work); |
| 302 | } else { |
| 303 | (*queue) ~= msg.work; |
| 304 | } |
| 305 | break; |
| 306 | } |
| 307 | } |
| 308 | |
| 309 | // If the last read was successful, but didn't read enough bytes, we got |
| 310 | // a problem. |
| 311 | if (bytesRead > 0) { |
| 312 | logError("Unexpected partial control message read (%s byte(s) " ~ |
| 313 | "instead of %s), some work item will possibly never be executed.", |
| 314 | bytesRead, msg.sizeof); |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | /** |
| 319 | * Executes the given work item and all others enqueued for the same |
| 320 | * transport in a new fiber. Called from the worker thread. |
| 321 | */ |
| 322 | void executeWork(TAsyncTransport transport, Work work) { |
| 323 | (new Fiber({ |
| 324 | auto item = work; |
| 325 | while (true) { |
| 326 | try { |
| 327 | // Execute the actual work. It will possibly add listeners to the |
| 328 | // event loop and yield away if it has to wait for blocking |
| 329 | // operations. It is quite possible that another fiber will modify |
| 330 | // the work queue for the current transport. |
| 331 | item(); |
| 332 | } catch (Exception e) { |
| 333 | // This should never happen, just to be sure the worker thread |
| 334 | // doesn't stop working in mysterious ways because of an unhandled |
| 335 | // exception. |
| 336 | logError("Exception thrown by work item: %s", e); |
| 337 | } |
| 338 | |
| 339 | // Remove the item from the work queue. |
| 340 | // Note: Due to the value semantics of array slices, we have to |
| 341 | // re-lookup this on every iteration. This could be solved, but I'd |
| 342 | // rather replace this directly with a queue type once one becomes |
| 343 | // available in Phobos. |
| 344 | auto queue = workQueues_[transport]; |
| 345 | assert(queue.front == item); |
| 346 | queue.popFront(); |
| 347 | workQueues_[transport] = queue; |
| 348 | |
| 349 | // Now that the work item is done, no longer count it as queued. |
| 350 | decrementQueuedCount(); |
| 351 | |
| 352 | if (queue.empty) break; |
| 353 | |
| 354 | // If the queue is not empty, execute the next waiting item. |
| 355 | item = queue.front; |
| 356 | } |
| 357 | })).call(); |
| 358 | } |
| 359 | |
| 360 | /** |
| 361 | * Increments the amount of queued items. |
| 362 | */ |
| 363 | void incrementQueuedCount() { |
| 364 | synchronized (queuedCountMutex_) { |
| 365 | ++queuedCount_; |
| 366 | } |
| 367 | } |
| 368 | |
| 369 | /** |
| 370 | * Decrements the amount of queued items. |
| 371 | */ |
| 372 | void decrementQueuedCount() { |
| 373 | synchronized (queuedCountMutex_) { |
| 374 | assert(queuedCount_ > 0); |
| 375 | --queuedCount_; |
| 376 | if (queuedCount_ == 0) { |
| 377 | zeroQueuedCondition_.notifyAll(); |
| 378 | } |
| 379 | } |
| 380 | } |
| 381 | |
| 382 | static extern(C) void controlMsgReceiveCallback(evutil_socket_t, short, |
| 383 | void *managerThis |
| 384 | ) { |
| 385 | (cast(TLibeventAsyncManager)managerThis).receiveControlMsg(); |
| 386 | } |
| 387 | |
| 388 | static extern(C) void socketCallback(evutil_socket_t, short flags, |
| 389 | void *arg |
| 390 | ) { |
| 391 | auto reason = (flags & EV_TIMEOUT) ? TAsyncEventReason.TIMED_OUT : |
| 392 | TAsyncEventReason.NORMAL; |
| 393 | (*(cast(TSocketEventListener*)arg))(reason); |
| 394 | GC.removeRange(arg); |
Jens Geyer | 855cf7f | 2015-10-08 21:12:57 +0200 | [diff] [blame] | 395 | destroy(arg); |
Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 396 | free(arg); |
| 397 | } |
| 398 | |
| 399 | static extern(C) void delayCallback(evutil_socket_t, short flags, |
| 400 | void *arg |
| 401 | ) { |
| 402 | assert(flags & EV_TIMEOUT); |
| 403 | (*(cast(void delegate()*)arg))(); |
| 404 | GC.removeRange(arg); |
Jens Geyer | 855cf7f | 2015-10-08 21:12:57 +0200 | [diff] [blame] | 405 | destroy(arg); |
Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 406 | free(arg); |
| 407 | } |
| 408 | |
| 409 | Thread workerThread_; |
| 410 | |
| 411 | event_base* eventBase_; |
| 412 | |
| 413 | /// The socket used for receiving new work items in the event loop. Paired |
| 414 | /// with controlSendSocket_. Invalid (i.e. TAsyncWorkItem.init) items are |
| 415 | /// ignored and can be used to wake up the worker thread. |
| 416 | Socket controlReceiveSocket_; |
| 417 | event* controlReceiveEvent_; |
| 418 | |
| 419 | /// The socket used to send new work items to the event loop. It is |
| 420 | /// expected that work items can always be read at once from it, i.e. that |
| 421 | /// there will never be short reads. |
| 422 | Socket controlSendSocket_; |
| 423 | |
| 424 | /// Queued up work delegates for async transports. This also includes |
| 425 | /// currently active ones, they are removed from the queue on completion, |
| 426 | /// which is relied on by the control message receive fiber (the main one) |
| 427 | /// to decide whether to immediately start executing items or not. |
| 428 | // TODO: This should really be of some queue type, not an array slice, but |
| 429 | // std.container doesn't have anything. |
| 430 | Work[][TAsyncTransport] workQueues_; |
| 431 | |
| 432 | /// The total number of work items not yet finished (queued and currently |
Konrad Grochowski | 3b5dacb | 2014-11-24 10:55:31 +0100 | [diff] [blame] | 433 | /// executed) and delays not yet executed. |
Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 434 | uint queuedCount_; |
| 435 | |
| 436 | /// Protects queuedCount_. |
| 437 | Mutex queuedCountMutex_; |
| 438 | |
| 439 | /// Triggered when queuedCount_ reaches zero, protected by queuedCountMutex_. |
| 440 | Condition zeroQueuedCondition_; |
| 441 | } |
| 442 | |
| 443 | private { |
| 444 | timeval toTimeval(const(Duration) dur) { |
Nobuaki Sukegawa | 74f583c | 2016-03-09 20:09:10 +0900 | [diff] [blame] | 445 | timeval tv; |
| 446 | dur.split!("seconds", "usecs")(tv.tv_sec, tv.tv_usec); |
Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 447 | return tv; |
| 448 | } |
| 449 | |
| 450 | /** |
| 451 | * Returns the libevent flags combination to represent a given TAsyncEventType. |
| 452 | */ |
| 453 | short libeventEventType(TAsyncEventType type) { |
| 454 | final switch (type) { |
| 455 | case TAsyncEventType.READ: |
| 456 | return EV_READ | EV_ET; |
| 457 | case TAsyncEventType.WRITE: |
| 458 | return EV_WRITE | EV_ET; |
| 459 | } |
| 460 | } |
| 461 | } |