Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 1 | /* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | */ |
| 19 | |
| 20 | /** |
| 21 | * A non-blocking server implementation that operates a set of I/O threads (by |
| 22 | * default only one) and either does processing »in-line« or off-loads it to a |
| 23 | * task pool. |
| 24 | * |
| 25 | * It *requires* TFramedTransport to be used on the client side, as it expects |
| 26 | * a 4 byte length indicator and writes out responses using the same framing. |
| 27 | * |
| 28 | * Because I/O is done asynchronous/event based, unfortunately |
| 29 | * TServerTransport can't be used. |
| 30 | * |
| 31 | * This implementation is based on the C++ one, with the exception of request |
| 32 | * timeouts and the drain task queue overload handling strategy not being |
| 33 | * implemented yet. |
| 34 | */ |
| 35 | // This really should use a D non-blocking I/O library, once one becomes |
| 36 | // available. |
| 37 | module thrift.server.nonblocking; |
| 38 | |
| 39 | import core.atomic : atomicLoad, atomicStore, atomicOp; |
| 40 | import core.exception : onOutOfMemoryError; |
| 41 | import core.memory : GC; |
| 42 | import core.sync.mutex; |
| 43 | import core.stdc.stdlib : free, realloc; |
| 44 | import core.time : Duration, dur; |
| 45 | import core.thread : Thread, ThreadGroup; |
| 46 | import deimos.event2.event; |
| 47 | import std.array : empty; |
| 48 | import std.conv : emplace, to; |
| 49 | import std.exception : enforce; |
| 50 | import std.parallelism : TaskPool, task; |
| 51 | import std.socket : Socket, socketPair, SocketAcceptException, |
| 52 | SocketException, TcpSocket; |
| 53 | import std.variant : Variant; |
| 54 | import thrift.base; |
| 55 | import thrift.internal.endian; |
| 56 | import thrift.internal.socket; |
| 57 | import thrift.internal.traits; |
| 58 | import thrift.protocol.base; |
| 59 | import thrift.protocol.binary; |
| 60 | import thrift.protocol.processor; |
| 61 | import thrift.server.base; |
| 62 | import thrift.server.transport.socket; |
| 63 | import thrift.transport.base; |
| 64 | import thrift.transport.memory; |
| 65 | import thrift.transport.range; |
| 66 | import thrift.transport.socket; |
| 67 | import thrift.util.cancellation; |
| 68 | |
| 69 | /** |
| 70 | * Possible actions taken on new incoming connections when the server is |
| 71 | * overloaded. |
| 72 | */ |
| 73 | enum TOverloadAction { |
| 74 | /// Do not take any special actions while the server is overloaded, just |
| 75 | /// continue accepting connections. |
| 76 | NONE, |
| 77 | |
| 78 | /// Immediately drop new connections after they have been accepted if the |
| 79 | /// server is overloaded. |
| 80 | CLOSE_ON_ACCEPT |
| 81 | } |
| 82 | |
| 83 | /// |
| 84 | class TNonblockingServer : TServer { |
| 85 | /// |
| 86 | this(TProcessor processor, ushort port, TTransportFactory transportFactory, |
| 87 | TProtocolFactory protocolFactory, TaskPool taskPool = null |
| 88 | ) { |
| 89 | this(new TSingletonProcessorFactory(processor), port, transportFactory, |
| 90 | transportFactory, protocolFactory, protocolFactory, taskPool); |
| 91 | } |
| 92 | |
| 93 | /// |
| 94 | this(TProcessorFactory processorFactory, ushort port, |
| 95 | TTransportFactory transportFactory, TProtocolFactory protocolFactory, |
| 96 | TaskPool taskPool = null |
| 97 | ) { |
| 98 | this(processorFactory, port, transportFactory, transportFactory, |
| 99 | protocolFactory, protocolFactory, taskPool); |
| 100 | } |
| 101 | |
| 102 | /// |
| 103 | this( |
| 104 | TProcessor processor, |
| 105 | ushort port, |
| 106 | TTransportFactory inputTransportFactory, |
| 107 | TTransportFactory outputTransportFactory, |
| 108 | TProtocolFactory inputProtocolFactory, |
| 109 | TProtocolFactory outputProtocolFactory, |
| 110 | TaskPool taskPool = null |
| 111 | ) { |
| 112 | this(new TSingletonProcessorFactory(processor), port, |
| 113 | inputTransportFactory, outputTransportFactory, |
| 114 | inputProtocolFactory, outputProtocolFactory, taskPool); |
| 115 | } |
| 116 | |
| 117 | /// |
| 118 | this( |
| 119 | TProcessorFactory processorFactory, |
| 120 | ushort port, |
| 121 | TTransportFactory inputTransportFactory, |
| 122 | TTransportFactory outputTransportFactory, |
| 123 | TProtocolFactory inputProtocolFactory, |
| 124 | TProtocolFactory outputProtocolFactory, |
| 125 | TaskPool taskPool = null |
| 126 | ) { |
| 127 | super(processorFactory, null, inputTransportFactory, |
| 128 | outputTransportFactory, inputProtocolFactory, outputProtocolFactory); |
| 129 | port_ = port; |
| 130 | |
| 131 | this.taskPool = taskPool; |
| 132 | |
| 133 | connectionMutex_ = new Mutex; |
| 134 | |
| 135 | connectionStackLimit = DEFAULT_CONNECTION_STACK_LIMIT; |
| 136 | maxActiveProcessors = DEFAULT_MAX_ACTIVE_PROCESSORS; |
| 137 | maxConnections = DEFAULT_MAX_CONNECTIONS; |
| 138 | overloadHysteresis = DEFAULT_OVERLOAD_HYSTERESIS; |
| 139 | overloadAction = DEFAULT_OVERLOAD_ACTION; |
| 140 | writeBufferDefaultSize = DEFAULT_WRITE_BUFFER_DEFAULT_SIZE; |
| 141 | idleReadBufferLimit = DEFAULT_IDLE_READ_BUFFER_LIMIT; |
| 142 | idleWriteBufferLimit = DEFAULT_IDLE_WRITE_BUFFER_LIMIT; |
| 143 | resizeBufferEveryN = DEFAULT_RESIZE_BUFFER_EVERY_N; |
| 144 | maxFrameSize = DEFAULT_MAX_FRAME_SIZE; |
| 145 | numIOThreads_ = DEFAULT_NUM_IO_THREADS; |
| 146 | } |
| 147 | |
| 148 | override void serve(TCancellation cancellation = null) { |
| 149 | if (cancellation && cancellation.triggered) return; |
| 150 | |
| 151 | // Initialize the listening socket. |
| 152 | // TODO: SO_KEEPALIVE, TCP_LOW_MIN_RTO, etc. |
| 153 | listenSocket_ = makeSocketAndListen(port_, TServerSocket.ACCEPT_BACKLOG, |
| 154 | BIND_RETRY_LIMIT, BIND_RETRY_DELAY, 0, 0, ipv6Only_); |
| 155 | listenSocket_.blocking = false; |
| 156 | |
| 157 | logInfo("Using %s I/O thread(s).", numIOThreads_); |
| 158 | if (taskPool_) { |
| 159 | logInfo("Using task pool with size: %s.", numIOThreads_, taskPool_.size); |
| 160 | } |
| 161 | |
| 162 | assert(numIOThreads_ > 0); |
| 163 | assert(ioLoops_.empty); |
| 164 | foreach (id; 0 .. numIOThreads_) { |
| 165 | // The IO loop on the first IO thread (this thread, i.e. the one serve() |
| 166 | // is called from) also accepts new connections. |
| 167 | auto listenSocket = (id == 0 ? listenSocket_ : null); |
| 168 | ioLoops_ ~= new IOLoop(this, listenSocket); |
| 169 | } |
| 170 | |
| 171 | if (cancellation) { |
| 172 | cancellation.triggering.addCallback({ |
| 173 | foreach (i, loop; ioLoops_) loop.stop(); |
| 174 | |
| 175 | // Stop accepting new connections right away. |
| 176 | listenSocket_.close(); |
| 177 | listenSocket_ = null; |
| 178 | }); |
| 179 | } |
| 180 | |
| 181 | // Start the IO helper threads for all but the first loop, which we will run |
| 182 | // ourselves. Note that the threads run forever, only terminating if stop() |
| 183 | // is called. |
| 184 | auto threads = new ThreadGroup(); |
| 185 | foreach (loop; ioLoops_[1 .. $]) { |
| 186 | auto t = new Thread(&loop.run); |
| 187 | threads.add(t); |
| 188 | t.start(); |
| 189 | } |
| 190 | |
| 191 | if (eventHandler) eventHandler.preServe(); |
| 192 | |
| 193 | // Run the primary (listener) IO thread loop in our main thread; this will |
| 194 | // block until the server is shutting down. |
| 195 | ioLoops_[0].run(); |
| 196 | |
| 197 | // Ensure all threads are finished before leaving serve(). |
| 198 | threads.joinAll(); |
| 199 | |
| 200 | ioLoops_ = null; |
| 201 | } |
| 202 | |
| 203 | /** |
| 204 | * Returns the number of currently active connections, i.e. open sockets. |
| 205 | */ |
| 206 | size_t numConnections() const @property { |
| 207 | return numConnections_; |
| 208 | } |
| 209 | |
| 210 | /** |
| 211 | * Returns the number of connection objects allocated, but not in use. |
| 212 | */ |
| 213 | size_t numIdleConnections() const @property { |
| 214 | return connectionStack_.length; |
| 215 | } |
| 216 | |
| 217 | /** |
| 218 | * Return count of number of connections which are currently processing. |
| 219 | * |
| 220 | * This is defined as a connection where all data has been received, and the |
| 221 | * processor was invoked but has not yet completed. |
| 222 | */ |
| 223 | size_t numActiveProcessors() const @property { |
| 224 | return numActiveProcessors_; |
| 225 | } |
| 226 | |
| 227 | /// Number of bind() retries. |
| 228 | enum BIND_RETRY_LIMIT = 0; |
| 229 | |
| 230 | /// Duration between bind() retries. |
| 231 | enum BIND_RETRY_DELAY = dur!"hnsecs"(0); |
| 232 | |
| 233 | /// Whether to listen on IPv6 only, if IPv6 support is detected |
| 234 | // (default: false). |
| 235 | void ipv6Only(bool value) @property { |
| 236 | ipv6Only_ = value; |
| 237 | } |
| 238 | |
| 239 | /** |
| 240 | * The task pool to use for processing requests. If null, no additional |
| 241 | * threads are used and request are processed »inline«. |
| 242 | * |
| 243 | * Can safely be set even when the server is already running. |
| 244 | */ |
| 245 | TaskPool taskPool() @property { |
| 246 | return taskPool_; |
| 247 | } |
| 248 | |
| 249 | /// ditto |
| 250 | void taskPool(TaskPool pool) @property { |
| 251 | taskPool_ = pool; |
| 252 | } |
| 253 | |
| 254 | /** |
| 255 | * Hysteresis for overload state. |
| 256 | * |
| 257 | * This is the fraction of the overload value that needs to be reached |
| 258 | * before the overload state is cleared. It must be between 0 and 1, |
| 259 | * practical choices probably lie between 0.5 and 0.9. |
| 260 | */ |
| 261 | double overloadHysteresis() const @property { |
| 262 | return overloadHysteresis_; |
| 263 | } |
| 264 | |
| 265 | /// Ditto |
| 266 | void overloadHysteresis(double value) @property { |
| 267 | enforce(0 < value && value <= 1, |
| 268 | "Invalid value for overload hysteresis: " ~ to!string(value)); |
| 269 | overloadHysteresis_ = value; |
| 270 | } |
| 271 | |
| 272 | /// Ditto |
| 273 | enum DEFAULT_OVERLOAD_HYSTERESIS = 0.8; |
| 274 | |
| 275 | /** |
| 276 | * The action which will be taken on overload. |
| 277 | */ |
| 278 | TOverloadAction overloadAction; |
| 279 | |
| 280 | /// Ditto |
| 281 | enum DEFAULT_OVERLOAD_ACTION = TOverloadAction.NONE; |
| 282 | |
| 283 | /** |
| 284 | * The write buffer is initialized (and when idleWriteBufferLimit_ is checked |
| 285 | * and found to be exceeded, reinitialized) to this size. |
| 286 | */ |
| 287 | size_t writeBufferDefaultSize; |
| 288 | |
| 289 | /// Ditto |
| 290 | enum size_t DEFAULT_WRITE_BUFFER_DEFAULT_SIZE = 1024; |
| 291 | |
| 292 | /** |
| 293 | * Max read buffer size for an idle Connection. When we place an idle |
| 294 | * Connection into connectionStack_ or on every resizeBufferEveryN_ calls, |
| 295 | * we will free the buffer (such that it will be reinitialized by the next |
| 296 | * received frame) if it has exceeded this limit. 0 disables this check. |
| 297 | */ |
| 298 | size_t idleReadBufferLimit; |
| 299 | |
| 300 | /// Ditto |
| 301 | enum size_t DEFAULT_IDLE_READ_BUFFER_LIMIT = 1024; |
| 302 | |
| 303 | /** |
| 304 | * Max write buffer size for an idle connection. When we place an idle |
| 305 | * Connection into connectionStack_ or on every resizeBufferEveryN_ calls, |
| 306 | * we ensure that its write buffer is <= to this size; otherwise we |
| 307 | * replace it with a new one of writeBufferDefaultSize_ bytes to ensure that |
| 308 | * idle connections don't hog memory. 0 disables this check. |
| 309 | */ |
| 310 | size_t idleWriteBufferLimit; |
| 311 | |
| 312 | /// Ditto |
| 313 | enum size_t DEFAULT_IDLE_WRITE_BUFFER_LIMIT = 1024; |
| 314 | |
| 315 | /** |
| 316 | * Every N calls we check the buffer size limits on a connected Connection. |
| 317 | * 0 disables (i.e. the checks are only done when a connection closes). |
| 318 | */ |
| 319 | uint resizeBufferEveryN; |
| 320 | |
| 321 | /// Ditto |
| 322 | enum uint DEFAULT_RESIZE_BUFFER_EVERY_N = 512; |
| 323 | |
| 324 | /// Limit for how many Connection objects to cache. |
| 325 | size_t connectionStackLimit; |
| 326 | |
| 327 | /// Ditto |
| 328 | enum size_t DEFAULT_CONNECTION_STACK_LIMIT = 1024; |
| 329 | |
| 330 | /// Limit for number of open connections before server goes into overload |
| 331 | /// state. |
| 332 | size_t maxConnections; |
| 333 | |
| 334 | /// Ditto |
| 335 | enum size_t DEFAULT_MAX_CONNECTIONS = int.max; |
| 336 | |
| 337 | /// Limit for number of connections processing or waiting to process |
| 338 | size_t maxActiveProcessors; |
| 339 | |
| 340 | /// Ditto |
| 341 | enum size_t DEFAULT_MAX_ACTIVE_PROCESSORS = int.max; |
| 342 | |
| 343 | /// Maximum frame size, in bytes. |
| 344 | /// |
| 345 | /// If a client tries to send a message larger than this limit, its |
| 346 | /// connection will be closed. This helps to avoid allocating huge buffers |
| 347 | /// on bogous input. |
| 348 | uint maxFrameSize; |
| 349 | |
| 350 | /// Ditto |
| 351 | enum uint DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024; |
| 352 | |
| 353 | |
| 354 | size_t numIOThreads() @property { |
| 355 | return numIOThreads_; |
| 356 | } |
| 357 | |
| 358 | void numIOThreads(size_t value) @property { |
| 359 | enforce(value >= 1, new TException("Must use at least one I/O thread.")); |
| 360 | numIOThreads_ = value; |
| 361 | } |
| 362 | |
| 363 | enum DEFAULT_NUM_IO_THREADS = 1; |
| 364 | |
| 365 | private: |
| 366 | /** |
| 367 | * C callback wrapper around acceptConnections(). Expects the custom argument |
| 368 | * to be the this pointer of the associated server instance. |
| 369 | */ |
| 370 | extern(C) static void acceptConnectionsCallback(int fd, short which, |
| 371 | void* serverThis |
| 372 | ) { |
| 373 | (cast(TNonblockingServer)serverThis).acceptConnections(fd, which); |
| 374 | } |
| 375 | |
| 376 | /** |
| 377 | * Called by libevent (IO loop 0/serve() thread only) when something |
| 378 | * happened on the listening socket. |
| 379 | */ |
| 380 | void acceptConnections(int fd, short eventFlags) { |
| 381 | if (atomicLoad(ioLoops_[0].shuttingDown_)) return; |
| 382 | |
| 383 | assert(!!listenSocket_, |
| 384 | "Server should be shutting down if listen socket is null."); |
| 385 | assert(fd == listenSocket_.handle); |
| 386 | assert(eventFlags & EV_READ); |
| 387 | |
| 388 | // Accept as many new clients as possible, even though libevent signaled |
| 389 | // only one. This helps the number of calls into libevent space. |
| 390 | while (true) { |
| 391 | // It is lame to use exceptions for regular control flow (failing is |
| 392 | // excepted due to non-blocking mode of operation), but that's the |
| 393 | // interface std.socket offers… |
| 394 | Socket clientSocket; |
| 395 | try { |
| 396 | clientSocket = listenSocket_.accept(); |
| 397 | } catch (SocketAcceptException e) { |
| 398 | if (e.errorCode != WOULD_BLOCK_ERRNO) { |
| 399 | logError("Error accepting connection: %s", e); |
| 400 | } |
| 401 | break; |
| 402 | } |
| 403 | |
| 404 | // If the server is overloaded, this is the point to take the specified |
| 405 | // action. |
| 406 | if (overloadAction != TOverloadAction.NONE && checkOverloaded()) { |
| 407 | nConnectionsDropped_++; |
| 408 | nTotalConnectionsDropped_++; |
| 409 | if (overloadAction == TOverloadAction.CLOSE_ON_ACCEPT) { |
| 410 | clientSocket.close(); |
| 411 | return; |
| 412 | } |
| 413 | } |
| 414 | |
| 415 | try { |
| 416 | clientSocket.blocking = false; |
| 417 | } catch (SocketException e) { |
| 418 | logError("Couldn't set client socket to non-blocking mode: %s", e); |
| 419 | clientSocket.close(); |
| 420 | return; |
| 421 | } |
| 422 | |
| 423 | // Create a new Connection for this client socket. |
| 424 | Connection conn = void; |
| 425 | IOLoop loop = void; |
| 426 | bool thisThread = void; |
| 427 | synchronized (connectionMutex_) { |
| 428 | // Assign an I/O loop to the connection (round-robin). |
| 429 | assert(nextIOLoop_ >= 0); |
| 430 | assert(nextIOLoop_ < ioLoops_.length); |
| 431 | auto selectedThreadIdx = nextIOLoop_; |
| 432 | nextIOLoop_ = (nextIOLoop_ + 1) % ioLoops_.length; |
| 433 | |
| 434 | loop = ioLoops_[selectedThreadIdx]; |
| 435 | thisThread = (selectedThreadIdx == 0); |
| 436 | |
| 437 | // Check the connection stack to see if we can re-use an existing one. |
| 438 | if (connectionStack_.empty) { |
| 439 | ++numConnections_; |
| 440 | conn = new Connection(clientSocket, loop); |
| 441 | |
| 442 | // Make sure the connection does not get collected while it is active, |
| 443 | // i.e. hooked up with libevent. |
| 444 | GC.addRoot(cast(void*)conn); |
| 445 | } else { |
| 446 | conn = connectionStack_[$ - 1]; |
| 447 | connectionStack_ = connectionStack_[0 .. $ - 1]; |
| 448 | connectionStack_.assumeSafeAppend(); |
| 449 | conn.init(clientSocket, loop); |
| 450 | } |
| 451 | } |
| 452 | |
| 453 | loop.addConnection(); |
| 454 | |
| 455 | // Either notify the ioThread that is assigned this connection to |
| 456 | // start processing, or if it is us, we'll just ask this |
| 457 | // connection to do its initial state change here. |
| 458 | // |
| 459 | // (We need to avoid writing to our own notification pipe, to |
| 460 | // avoid possible deadlocks if the pipe is full.) |
| 461 | if (thisThread) { |
| 462 | conn.transition(); |
| 463 | } else { |
| 464 | loop.notifyCompleted(conn); |
| 465 | } |
| 466 | } |
| 467 | } |
| 468 | |
| 469 | /// Increment the count of connections currently processing. |
| 470 | void incrementActiveProcessors() { |
| 471 | atomicOp!"+="(numActiveProcessors_, 1); |
| 472 | } |
| 473 | |
| 474 | /// Decrement the count of connections currently processing. |
| 475 | void decrementActiveProcessors() { |
| 476 | assert(numActiveProcessors_ > 0); |
| 477 | atomicOp!"-="(numActiveProcessors_, 1); |
| 478 | } |
| 479 | |
| 480 | /** |
| 481 | * Determines if the server is currently overloaded. |
| 482 | * |
| 483 | * If the number of open connections or »processing« connections is over the |
| 484 | * respective limit, the server will enter overload handling mode and a |
| 485 | * warning will be logged. If below values are below the hysteresis curve, |
| 486 | * this will cause the server to exit it again. |
| 487 | * |
| 488 | * Returns: Whether the server is currently overloaded. |
| 489 | */ |
| 490 | bool checkOverloaded() { |
| 491 | auto activeConnections = numConnections_ - connectionStack_.length; |
| 492 | if (numActiveProcessors_ > maxActiveProcessors || |
| 493 | activeConnections > maxConnections) { |
| 494 | if (!overloaded_) { |
| 495 | logInfo("Entering overloaded state."); |
| 496 | overloaded_ = true; |
| 497 | } |
| 498 | } else { |
| 499 | if (overloaded_ && |
| 500 | (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors) && |
| 501 | (activeConnections <= overloadHysteresis_ * maxConnections)) |
| 502 | { |
| 503 | logInfo("Exiting overloaded state, %s connection(s) dropped (% total).", |
| 504 | nConnectionsDropped_, nTotalConnectionsDropped_); |
| 505 | nConnectionsDropped_ = 0; |
| 506 | overloaded_ = false; |
| 507 | } |
| 508 | } |
| 509 | |
| 510 | return overloaded_; |
| 511 | } |
| 512 | |
| 513 | /** |
| 514 | * Marks a connection as inactive and either puts it back into the |
| 515 | * connection pool or leaves it for garbage collection. |
| 516 | */ |
| 517 | void disposeConnection(Connection connection) { |
| 518 | synchronized (connectionMutex_) { |
| 519 | if (!connectionStackLimit || |
| 520 | (connectionStack_.length < connectionStackLimit)) |
| 521 | { |
| 522 | connection.checkIdleBufferLimit(idleReadBufferLimit, |
| 523 | idleWriteBufferLimit); |
| 524 | connectionStack_ ~= connection; |
| 525 | } else { |
| 526 | assert(numConnections_ > 0); |
| 527 | --numConnections_; |
| 528 | |
| 529 | // Leave the connection object for collection now. |
| 530 | GC.removeRoot(cast(void*)connection); |
| 531 | } |
| 532 | } |
| 533 | } |
| 534 | |
| 535 | /// Socket used to listen for connections and accepting them. |
| 536 | Socket listenSocket_; |
| 537 | |
| 538 | /// Port to listen on. |
| 539 | ushort port_; |
| 540 | |
| 541 | /// Whether to listen on IPv6 only. |
| 542 | bool ipv6Only_; |
| 543 | |
| 544 | /// The total number of connections existing, both active and idle. |
| 545 | size_t numConnections_; |
| 546 | |
| 547 | /// The number of connections which are currently waiting for the processor |
| 548 | /// to return. |
| 549 | shared size_t numActiveProcessors_; |
| 550 | |
| 551 | /// Hysteresis for leaving overload state. |
| 552 | double overloadHysteresis_; |
| 553 | |
| 554 | /// Whether the server is currently overloaded. |
| 555 | bool overloaded_; |
| 556 | |
| 557 | /// Number of connections dropped since the server entered the current |
| 558 | /// overloaded state. |
| 559 | uint nConnectionsDropped_; |
| 560 | |
| 561 | /// Number of connections dropped due to overload since the server started. |
| 562 | ulong nTotalConnectionsDropped_; |
| 563 | |
| 564 | /// The task pool used for processing requests. |
| 565 | TaskPool taskPool_; |
| 566 | |
| 567 | /// Number of IO threads this server will use (>= 1). |
| 568 | size_t numIOThreads_; |
| 569 | |
| 570 | /// The IOLoops among which socket handling work is distributed. |
| 571 | IOLoop[] ioLoops_; |
| 572 | |
| 573 | /// The index of the loop in ioLoops_ which will handle the next accepted |
| 574 | /// connection. |
| 575 | size_t nextIOLoop_; |
| 576 | |
| 577 | /// All the connection objects which have been created but are not currently |
| 578 | /// in use. When a connection is closed, it it placed here to enable object |
| 579 | /// (resp. buffer) reuse. |
| 580 | Connection[] connectionStack_; |
| 581 | |
| 582 | /// This mutex protects the connection stack. |
| 583 | Mutex connectionMutex_; |
| 584 | } |
| 585 | |
| 586 | private { |
| 587 | /* |
| 588 | * Encapsulates a libevent event loop. |
| 589 | * |
| 590 | * The design is a bit of a mess, since the first loop is actually run on the |
| 591 | * server thread itself and is special because it is the only instance for |
| 592 | * which listenSocket_ is not null. |
| 593 | */ |
| 594 | final class IOLoop { |
| 595 | /** |
| 596 | * Creates a new instance and set up the event base. |
| 597 | * |
| 598 | * If listenSocket is not null, the thread will also accept new |
| 599 | * connections itself. |
| 600 | */ |
| 601 | this(TNonblockingServer server, Socket listenSocket) { |
| 602 | server_ = server; |
| 603 | listenSocket_ = listenSocket; |
| 604 | initMutex_ = new Mutex; |
| 605 | } |
| 606 | |
| 607 | /** |
| 608 | * Runs the event loop; only returns after a call to stop(). |
| 609 | */ |
| 610 | void run() { |
| 611 | assert(!atomicLoad(initialized_), "IOLoop already running?!"); |
| 612 | |
| 613 | synchronized (initMutex_) { |
| 614 | if (atomicLoad(shuttingDown_)) return; |
| 615 | atomicStore(initialized_, true); |
| 616 | |
| 617 | assert(!eventBase_); |
| 618 | eventBase_ = event_base_new(); |
| 619 | |
| 620 | if (listenSocket_) { |
| 621 | // Log the libevent version and backend. |
| 622 | logInfo("libevent version %s, using method %s.", |
| 623 | to!string(event_get_version()), to!string(event_base_get_method(eventBase_))); |
| 624 | |
| 625 | // Register the event for the listening socket. |
| 626 | listenEvent_ = event_new(eventBase_, listenSocket_.handle, |
| 627 | EV_READ | EV_PERSIST | EV_ET, |
| 628 | assumeNothrow(&TNonblockingServer.acceptConnectionsCallback), |
| 629 | cast(void*)server_); |
| 630 | if (event_add(listenEvent_, null) == -1) { |
| 631 | throw new TException("event_add for the listening socket event failed."); |
| 632 | } |
| 633 | } |
| 634 | |
| 635 | auto pair = socketPair(); |
| 636 | foreach (s; pair) s.blocking = false; |
| 637 | completionSendSocket_ = pair[0]; |
| 638 | completionReceiveSocket_ = pair[1]; |
| 639 | |
| 640 | // Register an event for the task completion notification socket. |
| 641 | completionEvent_ = event_new(eventBase_, completionReceiveSocket_.handle, |
| 642 | EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&completedCallback), |
| 643 | cast(void*)this); |
| 644 | |
| 645 | if (event_add(completionEvent_, null) == -1) { |
| 646 | throw new TException("event_add for the notification socket failed."); |
| 647 | } |
| 648 | } |
| 649 | |
| 650 | // Run libevent engine, returns only after stop(). |
| 651 | event_base_dispatch(eventBase_); |
| 652 | |
| 653 | if (listenEvent_) { |
| 654 | event_free(listenEvent_); |
| 655 | listenEvent_ = null; |
| 656 | } |
| 657 | |
| 658 | event_free(completionEvent_); |
| 659 | completionEvent_ = null; |
| 660 | |
| 661 | completionSendSocket_.close(); |
| 662 | completionSendSocket_ = null; |
| 663 | |
| 664 | completionReceiveSocket_.close(); |
| 665 | completionReceiveSocket_ = null; |
| 666 | |
| 667 | event_base_free(eventBase_); |
| 668 | eventBase_ = null; |
| 669 | |
| 670 | atomicStore(shuttingDown_, false); |
| 671 | |
| 672 | initialized_ = false; |
| 673 | } |
| 674 | |
| 675 | /** |
| 676 | * Adds a new connection handled by this loop. |
| 677 | */ |
| 678 | void addConnection() { |
| 679 | ++numActiveConnections_; |
| 680 | } |
| 681 | |
| 682 | /** |
| 683 | * Disposes a connection object (typically after it has been closed). |
| 684 | */ |
| 685 | void disposeConnection(Connection conn) { |
| 686 | server_.disposeConnection(conn); |
| 687 | assert(numActiveConnections_ > 0); |
| 688 | --numActiveConnections_; |
| 689 | if (numActiveConnections_ == 0) { |
| 690 | if (atomicLoad(shuttingDown_)) { |
| 691 | event_base_loopbreak(eventBase_); |
| 692 | } |
| 693 | } |
| 694 | } |
| 695 | |
| 696 | /** |
| 697 | * Notifies the event loop that the current step (initialization, |
| 698 | * processing of a request) on a certain connection has been completed. |
| 699 | * |
| 700 | * This function is thread-safe, but should never be called from the |
| 701 | * thread running the loop itself. |
| 702 | */ |
| 703 | void notifyCompleted(Connection conn) { |
| 704 | assert(!!completionSendSocket_); |
| 705 | auto bytesSent = completionSendSocket_.send(cast(ubyte[])((&conn)[0 .. 1])); |
| 706 | |
| 707 | if (bytesSent != Connection.sizeof) { |
| 708 | logError("Sending completion notification failed, connection will " ~ |
| 709 | "not be properly terminated."); |
| 710 | } |
| 711 | } |
| 712 | |
| 713 | /** |
| 714 | * Exits the event loop after all currently active connections have been |
| 715 | * closed. |
| 716 | * |
| 717 | * This function is thread-safe. |
| 718 | */ |
| 719 | void stop() { |
| 720 | // There is a bug in either libevent or its documentation, having no |
| 721 | // events registered doesn't actually terminate the loop, because |
| 722 | // event_base_new() registers some internal one by calling |
| 723 | // evthread_make_base_notifiable(). |
| 724 | // Due to this, we can't simply remove all events and expect the event |
| 725 | // loop to terminate. Instead, we ping the event loop using a null |
| 726 | // completion message. This way, we make sure to wake up the libevent |
| 727 | // thread if it not currently processing any connections. It will break |
| 728 | // out of the loop in disposeConnection() after the last active |
| 729 | // connection has been closed. |
| 730 | synchronized (initMutex_) { |
| 731 | atomicStore(shuttingDown_, true); |
| 732 | if (atomicLoad(initialized_)) notifyCompleted(null); |
| 733 | } |
| 734 | } |
| 735 | |
| 736 | private: |
| 737 | /** |
| 738 | * C callback to call completed() from libevent. |
| 739 | * |
| 740 | * Expects the custom argument to be the this pointer of the associated |
| 741 | * IOLoop instance. |
| 742 | */ |
| 743 | extern(C) static void completedCallback(int fd, short what, void* loopThis) { |
| 744 | assert(what & EV_READ); |
| 745 | auto loop = cast(IOLoop)loopThis; |
| 746 | assert(fd == loop.completionReceiveSocket_.handle); |
| 747 | loop.completed(); |
| 748 | } |
| 749 | |
| 750 | /** |
| 751 | * Reads from the completion receive socket and appropriately transitions |
| 752 | * the connections and shuts down the loop if requested. |
| 753 | */ |
| 754 | void completed() { |
| 755 | Connection connection; |
| 756 | ptrdiff_t bytesRead; |
| 757 | while (true) { |
| 758 | bytesRead = completionReceiveSocket_.receive( |
| 759 | cast(ubyte[])((&connection)[0 .. 1])); |
| 760 | if (bytesRead < 0) { |
| 761 | auto errno = getSocketErrno(); |
| 762 | |
| 763 | if (errno != WOULD_BLOCK_ERRNO) { |
| 764 | logError("Reading from completion socket failed, some connection " ~ |
| 765 | "will never be properly terminated: %s", socketErrnoString(errno)); |
| 766 | } |
| 767 | } |
| 768 | |
| 769 | if (bytesRead != Connection.sizeof) break; |
| 770 | |
| 771 | if (!connection) { |
| 772 | assert(atomicLoad(shuttingDown_)); |
| 773 | if (numActiveConnections_ == 0) { |
| 774 | event_base_loopbreak(eventBase_); |
| 775 | } |
| 776 | continue; |
| 777 | } |
| 778 | |
| 779 | connection.transition(); |
| 780 | } |
| 781 | |
| 782 | if (bytesRead > 0) { |
| 783 | logError("Unexpected partial read from completion socket " ~ |
| 784 | "(%s bytes instead of %s).", bytesRead, Connection.sizeof); |
| 785 | } |
| 786 | } |
| 787 | |
| 788 | /// associated server |
| 789 | TNonblockingServer server_; |
| 790 | |
| 791 | /// The managed listening socket, if any. |
| 792 | Socket listenSocket_; |
| 793 | |
| 794 | /// The libevent event base for the loop. |
| 795 | event_base* eventBase_; |
| 796 | |
| 797 | /// Triggered on listen socket events. |
| 798 | event* listenEvent_; |
| 799 | |
| 800 | /// Triggered on completion receive socket events. |
| 801 | event* completionEvent_; |
| 802 | |
| 803 | /// Socket used to send completion notification messages. Paired with |
| 804 | /// completionReceiveSocket_. |
| 805 | Socket completionSendSocket_; |
| 806 | |
| 807 | /// Socket used to send completion notification messages. Paired with |
| 808 | /// completionSendSocket_. |
| 809 | Socket completionReceiveSocket_; |
| 810 | |
| 811 | /// Whether the server is currently shutting down (i.e. the cancellation has |
| 812 | /// been triggered, but not all client connections have been closed yet). |
| 813 | shared bool shuttingDown_; |
| 814 | |
| 815 | /// The number of currently active client connections. |
| 816 | size_t numActiveConnections_; |
| 817 | |
| 818 | /// Guards loop startup so that the loop can be reliably shut down even if |
| 819 | /// another thread has just started to execute run(). Locked during |
| 820 | /// initialization in run(). When unlocked, the completion mechanism is |
| 821 | /// expected to be fully set up. |
| 822 | Mutex initMutex_; |
| 823 | shared bool initialized_; /// Ditto |
| 824 | } |
| 825 | |
| 826 | /* |
| 827 | * I/O states a socket can be in. |
| 828 | */ |
| 829 | enum SocketState { |
| 830 | RECV_FRAME_SIZE, /// The frame size is received. |
| 831 | RECV, /// The payload is received. |
| 832 | SEND /// The response is written back out. |
| 833 | } |
| 834 | |
| 835 | /* |
| 836 | * States a connection can be in. |
| 837 | */ |
| 838 | enum ConnectionState { |
| 839 | INIT, /// The connection will be initialized. |
| 840 | READ_FRAME_SIZE, /// The four frame size bytes are being read. |
| 841 | READ_REQUEST, /// The request payload itself is being read. |
| 842 | WAIT_PROCESSOR, /// The connection waits for the processor to finish. |
| 843 | SEND_RESULT /// The result is written back out. |
| 844 | } |
| 845 | |
| 846 | /* |
| 847 | * A connection that is handled via libevent. |
| 848 | * |
| 849 | * Data received is buffered until the request is complete (returning back to |
| 850 | * libevent if not), at which point the processor is invoked. |
| 851 | */ |
| 852 | final class Connection { |
| 853 | /** |
| 854 | * Constructs a new instance. |
| 855 | * |
| 856 | * To reuse a connection object later on, the init() function can be used |
| 857 | * to the same effect on the internal state. |
| 858 | */ |
| 859 | this(Socket socket, IOLoop loop) { |
| 860 | // The input and output transport objects are reused between clients |
| 861 | // connections, so initialize them here rather than in init(). |
| 862 | inputTransport_ = new TInputRangeTransport!(ubyte[])([]); |
| 863 | outputTransport_ = new TMemoryBuffer(loop.server_.writeBufferDefaultSize); |
| 864 | |
| 865 | init(socket, loop); |
| 866 | } |
| 867 | |
| 868 | /** |
| 869 | * Initializes the connection. |
| 870 | * |
| 871 | * Params: |
| 872 | * socket = The socket to work on. |
| 873 | * eventFlags = Any flags to pass to libevent. |
| 874 | * s = The server this connection is part of. |
| 875 | */ |
| 876 | void init(Socket socket, IOLoop loop) { |
| 877 | // TODO: This allocation could be avoided. |
| 878 | socket_ = new TSocket(socket); |
| 879 | |
| 880 | loop_ = loop; |
| 881 | server_ = loop_.server_; |
| 882 | connState_ = ConnectionState.INIT; |
| 883 | eventFlags_ = 0; |
| 884 | |
| 885 | readBufferPos_ = 0; |
| 886 | readWant_ = 0; |
| 887 | |
| 888 | writeBuffer_ = null; |
| 889 | writeBufferPos_ = 0; |
| 890 | largestWriteBufferSize_ = 0; |
| 891 | |
| 892 | socketState_ = SocketState.RECV_FRAME_SIZE; |
| 893 | callsSinceResize_ = 0; |
| 894 | |
| 895 | factoryInputTransport_ = |
| 896 | server_.inputTransportFactory_.getTransport(inputTransport_); |
| 897 | factoryOutputTransport_ = |
| 898 | server_.outputTransportFactory_.getTransport(outputTransport_); |
| 899 | |
| 900 | inputProtocol_ = |
| 901 | server_.inputProtocolFactory_.getProtocol(factoryInputTransport_); |
| 902 | outputProtocol_ = |
| 903 | server_.outputProtocolFactory_.getProtocol(factoryOutputTransport_); |
| 904 | |
| 905 | if (server_.eventHandler) { |
| 906 | connectionContext_ = |
| 907 | server_.eventHandler.createContext(inputProtocol_, outputProtocol_); |
| 908 | } |
| 909 | |
| 910 | auto info = TConnectionInfo(inputProtocol_, outputProtocol_, socket_); |
| 911 | processor_ = server_.processorFactory_.getProcessor(info); |
| 912 | } |
| 913 | |
| 914 | ~this() { |
| 915 | free(readBuffer_); |
| 916 | if (event_) { |
| 917 | event_free(event_); |
| 918 | event_ = null; |
| 919 | } |
| 920 | } |
| 921 | |
| 922 | /** |
| 923 | * Check buffers against the size limits and shrink them if exceeded. |
| 924 | * |
| 925 | * Params: |
| 926 | * readLimit = Read buffer size limit (in bytes, 0 to ignore). |
| 927 | * writeLimit = Write buffer size limit (in bytes, 0 to ignore). |
| 928 | */ |
| 929 | void checkIdleBufferLimit(size_t readLimit, size_t writeLimit) { |
| 930 | if (readLimit > 0 && readBufferSize_ > readLimit) { |
| 931 | free(readBuffer_); |
| 932 | readBuffer_ = null; |
| 933 | readBufferSize_ = 0; |
| 934 | } |
| 935 | |
| 936 | if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) { |
| 937 | // just start over |
| 938 | outputTransport_.reset(server_.writeBufferDefaultSize); |
| 939 | largestWriteBufferSize_ = 0; |
| 940 | } |
| 941 | } |
| 942 | |
| 943 | /** |
| 944 | * Transitions the connection to the next state. |
| 945 | * |
| 946 | * This is called e.g. when the request has been read completely or all |
| 947 | * the data has been written back. |
| 948 | */ |
| 949 | void transition() { |
| 950 | assert(!!loop_); |
| 951 | assert(!!server_); |
| 952 | |
| 953 | // Switch upon the state that we are currently in and move to a new state |
| 954 | final switch (connState_) { |
| 955 | case ConnectionState.READ_REQUEST: |
| 956 | // We are done reading the request, package the read buffer into transport |
| 957 | // and get back some data from the dispatch function |
| 958 | inputTransport_.reset(readBuffer_[0 .. readBufferPos_]); |
| 959 | outputTransport_.reset(); |
| 960 | |
| 961 | // Prepend four bytes of blank space to the buffer so we can |
| 962 | // write the frame size there later. |
| 963 | // Strictly speaking, we wouldn't have to write anything, just |
| 964 | // increment the TMemoryBuffer writeOffset_. This would yield a tiny |
| 965 | // performance gain. |
| 966 | ubyte[4] space = void; |
| 967 | outputTransport_.write(space); |
| 968 | |
| 969 | server_.incrementActiveProcessors(); |
| 970 | |
| 971 | taskPool_ = server_.taskPool; |
| 972 | if (taskPool_) { |
| 973 | // Create a new task and add it to the task pool queue. |
| 974 | auto processingTask = task!processRequest(this); |
| 975 | connState_ = ConnectionState.WAIT_PROCESSOR; |
| 976 | taskPool_.put(processingTask); |
| 977 | |
| 978 | // We don't want to process any more data while the task is active. |
| 979 | unregisterEvent(); |
| 980 | return; |
| 981 | } |
| 982 | |
| 983 | // Just process it right now if there is no task pool set. |
| 984 | processRequest(this); |
| 985 | goto case; |
| 986 | case ConnectionState.WAIT_PROCESSOR: |
| 987 | // We have now finished processing the request, set the frame size |
| 988 | // for the outputTransport_ contents and set everything up to write |
| 989 | // it out via libevent. |
| 990 | server_.decrementActiveProcessors(); |
| 991 | |
| 992 | // Acquire the data written to the transport. |
| 993 | // KLUDGE: To avoid copying, we simply cast the const away and |
| 994 | // modify the internal buffer of the TMemoryBuffer – works with the |
| 995 | // current implementation, but isn't exactly beautiful. |
| 996 | writeBuffer_ = cast(ubyte[])outputTransport_.getContents(); |
| 997 | |
| 998 | assert(writeBuffer_.length >= 4, "The write buffer should have " ~ |
| 999 | "least the initially added dummy length bytes."); |
| 1000 | if (writeBuffer_.length == 4) { |
| 1001 | // The request was one-way, no response to write. |
| 1002 | goto case ConnectionState.INIT; |
| 1003 | } |
| 1004 | |
| 1005 | // Write the frame size into the four bytes reserved for it. |
| 1006 | auto size = hostToNet(cast(uint)(writeBuffer_.length - 4)); |
| 1007 | writeBuffer_[0 .. 4] = cast(ubyte[])((&size)[0 .. 1]); |
| 1008 | |
| 1009 | writeBufferPos_ = 0; |
| 1010 | socketState_ = SocketState.SEND; |
| 1011 | connState_ = ConnectionState.SEND_RESULT; |
| 1012 | registerEvent(EV_WRITE | EV_PERSIST); |
| 1013 | |
| 1014 | return; |
| 1015 | case ConnectionState.SEND_RESULT: |
| 1016 | // The result has been sent back to the client, we don't need the |
| 1017 | // buffers anymore. |
| 1018 | if (writeBuffer_.length > largestWriteBufferSize_) { |
| 1019 | largestWriteBufferSize_ = writeBuffer_.length; |
| 1020 | } |
| 1021 | |
| 1022 | if (server_.resizeBufferEveryN > 0 && |
| 1023 | ++callsSinceResize_ >= server_.resizeBufferEveryN |
| 1024 | ) { |
| 1025 | checkIdleBufferLimit(server_.idleReadBufferLimit, |
| 1026 | server_.idleWriteBufferLimit); |
| 1027 | callsSinceResize_ = 0; |
| 1028 | } |
| 1029 | |
| 1030 | goto case; |
| 1031 | case ConnectionState.INIT: |
| 1032 | writeBuffer_ = null; |
| 1033 | writeBufferPos_ = 0; |
| 1034 | socketState_ = SocketState.RECV_FRAME_SIZE; |
| 1035 | connState_ = ConnectionState.READ_FRAME_SIZE; |
| 1036 | readBufferPos_ = 0; |
| 1037 | registerEvent(EV_READ | EV_PERSIST); |
| 1038 | |
| 1039 | return; |
| 1040 | case ConnectionState.READ_FRAME_SIZE: |
| 1041 | // We just read the request length, set up the buffers for reading |
| 1042 | // the payload. |
| 1043 | if (readWant_ > readBufferSize_) { |
| 1044 | // The current buffer is too small, exponentially grow the buffer |
| 1045 | // until it is big enough. |
| 1046 | |
| 1047 | if (readBufferSize_ == 0) { |
| 1048 | readBufferSize_ = 1; |
| 1049 | } |
| 1050 | |
| 1051 | auto newSize = readBufferSize_; |
| 1052 | while (readWant_ > newSize) { |
| 1053 | newSize *= 2; |
| 1054 | } |
| 1055 | |
| 1056 | auto newBuffer = cast(ubyte*)realloc(readBuffer_, newSize); |
| 1057 | if (!newBuffer) onOutOfMemoryError(); |
| 1058 | |
| 1059 | readBuffer_ = newBuffer; |
| 1060 | readBufferSize_ = newSize; |
| 1061 | } |
| 1062 | |
| 1063 | readBufferPos_= 0; |
| 1064 | |
| 1065 | socketState_ = SocketState.RECV; |
| 1066 | connState_ = ConnectionState.READ_REQUEST; |
| 1067 | |
| 1068 | return; |
| 1069 | } |
| 1070 | } |
| 1071 | |
| 1072 | private: |
| 1073 | /** |
| 1074 | * C callback to call workSocket() from libevent. |
| 1075 | * |
| 1076 | * Expects the custom argument to be the this pointer of the associated |
| 1077 | * connection. |
| 1078 | */ |
| 1079 | extern(C) static void workSocketCallback(int fd, short flags, void* connThis) { |
| 1080 | auto conn = cast(Connection)connThis; |
| 1081 | assert(fd == conn.socket_.socketHandle); |
| 1082 | conn.workSocket(); |
| 1083 | } |
| 1084 | |
| 1085 | /** |
| 1086 | * Invoked by libevent when something happens on the socket. |
| 1087 | */ |
| 1088 | void workSocket() { |
| 1089 | final switch (socketState_) { |
| 1090 | case SocketState.RECV_FRAME_SIZE: |
| 1091 | // If some bytes have already been read, they have been kept in |
| 1092 | // readWant_. |
| 1093 | auto frameSize = readWant_; |
| 1094 | |
| 1095 | try { |
| 1096 | // Read from the socket |
| 1097 | auto bytesRead = socket_.read( |
| 1098 | (cast(ubyte[])((&frameSize)[0 .. 1]))[readBufferPos_ .. $]); |
| 1099 | if (bytesRead == 0) { |
| 1100 | // Couldn't read anything, but we have been notified – client |
| 1101 | // has disconnected. |
| 1102 | close(); |
| 1103 | return; |
| 1104 | } |
| 1105 | |
| 1106 | readBufferPos_ += bytesRead; |
| 1107 | } catch (TTransportException te) { |
| 1108 | logError("Failed to read frame size from client connection: %s", te); |
| 1109 | close(); |
| 1110 | return; |
| 1111 | } |
| 1112 | |
| 1113 | if (readBufferPos_ < frameSize.sizeof) { |
| 1114 | // Frame size not complete yet, save the current buffer in |
| 1115 | // readWant_ so that the remaining bytes can be read later. |
| 1116 | readWant_ = frameSize; |
| 1117 | return; |
| 1118 | } |
| 1119 | |
| 1120 | auto size = netToHost(frameSize); |
| 1121 | if (size > server_.maxFrameSize) { |
| 1122 | logError("Frame size too large (%s > %s), client %s not using " ~ |
| 1123 | "TFramedTransport?", size, server_.maxFrameSize, |
| 1124 | socket_.getPeerAddress().toHostNameString()); |
| 1125 | close(); |
| 1126 | return; |
| 1127 | } |
| 1128 | readWant_ = size; |
| 1129 | |
| 1130 | // Now we know the frame size, set everything up for reading the |
| 1131 | // payload. |
| 1132 | transition(); |
| 1133 | return; |
| 1134 | |
| 1135 | case SocketState.RECV: |
| 1136 | // If we already got all the data, we should be in the SEND state. |
| 1137 | assert(readBufferPos_ < readWant_); |
| 1138 | |
| 1139 | size_t bytesRead; |
| 1140 | try { |
| 1141 | // Read as much as possible from the socket. |
| 1142 | bytesRead = socket_.read(readBuffer_[readBufferPos_ .. readWant_]); |
| 1143 | } catch (TTransportException te) { |
| 1144 | logError("Failed to read from client socket: %s", te); |
| 1145 | close(); |
| 1146 | return; |
| 1147 | } |
| 1148 | |
| 1149 | if (bytesRead == 0) { |
| 1150 | // We were notified, but no bytes could be read -> the client |
| 1151 | // disconnected. |
| 1152 | close(); |
| 1153 | return; |
| 1154 | } |
| 1155 | |
| 1156 | readBufferPos_ += bytesRead; |
| 1157 | assert(readBufferPos_ <= readWant_); |
| 1158 | |
| 1159 | if (readBufferPos_ == readWant_) { |
| 1160 | // The payload has been read completely, move on. |
| 1161 | transition(); |
| 1162 | } |
| 1163 | |
| 1164 | return; |
| 1165 | case SocketState.SEND: |
| 1166 | assert(writeBufferPos_ <= writeBuffer_.length); |
| 1167 | |
| 1168 | if (writeBufferPos_ == writeBuffer_.length) { |
| 1169 | // Nothing left to send – this shouldn't happen, just move on. |
| 1170 | logInfo("WARNING: In send state, but no data to send.\n"); |
| 1171 | transition(); |
| 1172 | return; |
| 1173 | } |
| 1174 | |
| 1175 | size_t bytesSent; |
| 1176 | try { |
| 1177 | bytesSent = socket_.writeSome(writeBuffer_[writeBufferPos_ .. $]); |
| 1178 | } catch (TTransportException te) { |
| 1179 | logError("Failed to write to client socket: %s", te); |
| 1180 | close(); |
| 1181 | return; |
| 1182 | } |
| 1183 | |
| 1184 | writeBufferPos_ += bytesSent; |
| 1185 | assert(writeBufferPos_ <= writeBuffer_.length); |
| 1186 | |
| 1187 | if (writeBufferPos_ == writeBuffer_.length) { |
| 1188 | // The whole response has been written out, we are done. |
| 1189 | transition(); |
| 1190 | } |
| 1191 | |
| 1192 | return; |
| 1193 | } |
| 1194 | } |
| 1195 | |
| 1196 | /** |
| 1197 | * Registers a libevent event for workSocket() with the passed flags, |
| 1198 | * unregistering the previous one (if any). |
| 1199 | */ |
| 1200 | void registerEvent(short eventFlags) { |
| 1201 | if (eventFlags_ == eventFlags) { |
| 1202 | // Nothing to do if flags are the same. |
| 1203 | return; |
| 1204 | } |
| 1205 | |
| 1206 | // Delete the previously existing event. |
| 1207 | unregisterEvent(); |
| 1208 | |
| 1209 | eventFlags_ = eventFlags; |
| 1210 | |
| 1211 | if (eventFlags == 0) return; |
| 1212 | |
| 1213 | if (!event_) { |
| 1214 | // If the event was not already allocated, do it now. |
| 1215 | event_ = event_new(loop_.eventBase_, socket_.socketHandle, |
| 1216 | eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this); |
| 1217 | } else { |
| 1218 | event_assign(event_, loop_.eventBase_, socket_.socketHandle, |
| 1219 | eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this); |
| 1220 | } |
| 1221 | |
| 1222 | // Add the event |
| 1223 | if (event_add(event_, null) == -1) { |
| 1224 | logError("event_add() for client socket failed."); |
| 1225 | } |
| 1226 | } |
| 1227 | |
| 1228 | /** |
| 1229 | * Unregisters the current libevent event, if any. |
| 1230 | */ |
| 1231 | void unregisterEvent() { |
| 1232 | if (event_ && eventFlags_ != 0) { |
| 1233 | eventFlags_ = 0; |
| 1234 | if (event_del(event_) == -1) { |
| 1235 | logError("event_del() for client socket failed."); |
| 1236 | return; |
| 1237 | } |
| 1238 | } |
| 1239 | } |
| 1240 | |
| 1241 | /** |
| 1242 | * Closes this connection and returns it back to the server. |
| 1243 | */ |
| 1244 | void close() { |
| 1245 | unregisterEvent(); |
| 1246 | |
| 1247 | if (server_.eventHandler) { |
| 1248 | server_.eventHandler.deleteContext( |
| 1249 | connectionContext_, inputProtocol_, outputProtocol_); |
| 1250 | } |
| 1251 | |
| 1252 | // Close the socket |
| 1253 | socket_.close(); |
| 1254 | |
| 1255 | // close any factory produced transports. |
| 1256 | factoryInputTransport_.close(); |
| 1257 | factoryOutputTransport_.close(); |
| 1258 | |
| 1259 | // This connection object can now be reused. |
| 1260 | loop_.disposeConnection(this); |
| 1261 | } |
| 1262 | |
| 1263 | /// The server this connection belongs to. |
| 1264 | TNonblockingServer server_; |
| 1265 | |
| 1266 | /// The task pool used for this connection. This is cached instead of |
| 1267 | /// directly using server_.taskPool to avoid confusion if it is changed in |
| 1268 | /// another thread while the request is processed. |
| 1269 | TaskPool taskPool_; |
| 1270 | |
| 1271 | /// The I/O thread handling this connection. |
| 1272 | IOLoop loop_; |
| 1273 | |
| 1274 | /// The socket managed by this connection. |
| 1275 | TSocket socket_; |
| 1276 | |
| 1277 | /// The libevent object used for registering the workSocketCallback. |
| 1278 | event* event_; |
| 1279 | |
| 1280 | /// Libevent flags |
| 1281 | short eventFlags_; |
| 1282 | |
| 1283 | /// Socket mode |
| 1284 | SocketState socketState_; |
| 1285 | |
| 1286 | /// Application state |
| 1287 | ConnectionState connState_; |
| 1288 | |
| 1289 | /// The size of the frame to read. If still in READ_FRAME_SIZE state, some |
| 1290 | /// of the bytes might not have been written, and the value might still be |
| 1291 | /// in network byte order. An uint (not a size_t) because the frame size on |
| 1292 | /// the wire is specified as one. |
| 1293 | uint readWant_; |
| 1294 | |
| 1295 | /// The position in the read buffer, i.e. the number of payload bytes |
| 1296 | /// already received from the socket in READ_REQUEST state, resp. the |
| 1297 | /// number of size bytes in READ_FRAME_SIZE state. |
| 1298 | uint readBufferPos_; |
| 1299 | |
| 1300 | /// Read buffer |
| 1301 | ubyte* readBuffer_; |
| 1302 | |
| 1303 | /// Read buffer size |
| 1304 | size_t readBufferSize_; |
| 1305 | |
| 1306 | /// Write buffer |
| 1307 | ubyte[] writeBuffer_; |
| 1308 | |
| 1309 | /// How far through writing are we? |
| 1310 | size_t writeBufferPos_; |
| 1311 | |
| 1312 | /// Largest size of write buffer seen since buffer was constructed |
| 1313 | size_t largestWriteBufferSize_; |
| 1314 | |
| 1315 | /// Number of calls since the last time checkIdleBufferLimit has been |
| 1316 | /// invoked (see TServer.resizeBufferEveryN). |
| 1317 | uint callsSinceResize_; |
| 1318 | |
| 1319 | /// Base transports the processor reads from/writes to. |
| 1320 | TInputRangeTransport!(ubyte[]) inputTransport_; |
| 1321 | TMemoryBuffer outputTransport_; |
| 1322 | |
| 1323 | /// The actual transports passed to the processor obtained via the |
| 1324 | /// transport factory. |
| 1325 | TTransport factoryInputTransport_; |
| 1326 | TTransport factoryOutputTransport_; /// Ditto |
| 1327 | |
| 1328 | /// Input/output protocols, connected to factory{Input, Output}Transport. |
| 1329 | TProtocol inputProtocol_; |
| 1330 | TProtocol outputProtocol_; /// Ditto. |
| 1331 | |
| 1332 | /// Connection context optionally created by the server event handler. |
| 1333 | Variant connectionContext_; |
| 1334 | |
| 1335 | /// The processor used for this connection. |
| 1336 | TProcessor processor_; |
| 1337 | } |
| 1338 | } |
| 1339 | |
| 1340 | /* |
| 1341 | * The request processing function, which invokes the processor for the server |
| 1342 | * for all the RPC messages received over a connection. |
| 1343 | * |
| 1344 | * Must be public because it is passed as alias to std.parallelism.task(). |
| 1345 | */ |
| 1346 | void processRequest(Connection connection) { |
| 1347 | try { |
| 1348 | while (true) { |
| 1349 | with (connection) { |
| 1350 | if (server_.eventHandler) { |
| 1351 | server_.eventHandler.preProcess(connectionContext_, socket_); |
| 1352 | } |
| 1353 | |
| 1354 | if (!processor_.process(inputProtocol_, outputProtocol_, |
| 1355 | connectionContext_) || !inputProtocol_.transport.peek() |
| 1356 | ) { |
| 1357 | // Something went fundamentally wrong or there is nothing more to |
| 1358 | // process, close the connection. |
| 1359 | break; |
| 1360 | } |
| 1361 | } |
| 1362 | } |
| 1363 | } catch (TTransportException ttx) { |
| 1364 | logError("Client died: %s", ttx); |
| 1365 | } catch (Exception e) { |
| 1366 | logError("Uncaught exception: %s", e); |
| 1367 | } |
| 1368 | |
| 1369 | if (connection.taskPool_) connection.loop_.notifyCompleted(connection); |
| 1370 | } |
| 1371 | |
| 1372 | unittest { |
| 1373 | import thrift.internal.test.server; |
| 1374 | |
| 1375 | // Temporarily disable info log output in order not to spam the test results |
| 1376 | // with startup info messages. |
| 1377 | auto oldInfoLogSink = g_infoLogSink; |
| 1378 | g_infoLogSink = null; |
| 1379 | scope (exit) g_infoLogSink = oldInfoLogSink; |
| 1380 | |
| 1381 | // Test in-line processing shutdown with one as well as several I/O threads. |
| 1382 | testServeCancel!(TNonblockingServer)(); |
| 1383 | testServeCancel!(TNonblockingServer)((TNonblockingServer s) { |
| 1384 | s.numIOThreads = 4; |
| 1385 | }); |
| 1386 | |
| 1387 | // Test task pool processing shutdown with one as well as several I/O threads. |
| 1388 | auto tp = new TaskPool(4); |
| 1389 | tp.isDaemon = true; |
| 1390 | testServeCancel!(TNonblockingServer)((TNonblockingServer s) { |
| 1391 | s.taskPool = tp; |
| 1392 | }); |
| 1393 | testServeCancel!(TNonblockingServer)((TNonblockingServer s) { |
| 1394 | s.taskPool = tp; |
| 1395 | s.numIOThreads = 4; |
| 1396 | }); |
| 1397 | } |