blob: 02167996ee4acffc61942b5c69813efb6780744c [file] [log] [blame]
Jake Farrellb95b0ff2012-03-22 21:49:10 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20/**
21 * A non-blocking server implementation that operates a set of I/O threads (by
22 * default only one) and either does processing »in-line« or off-loads it to a
23 * task pool.
24 *
25 * It *requires* TFramedTransport to be used on the client side, as it expects
26 * a 4 byte length indicator and writes out responses using the same framing.
27 *
28 * Because I/O is done asynchronous/event based, unfortunately
29 * TServerTransport can't be used.
30 *
31 * This implementation is based on the C++ one, with the exception of request
32 * timeouts and the drain task queue overload handling strategy not being
33 * implemented yet.
34 */
35// This really should use a D non-blocking I/O library, once one becomes
36// available.
37module thrift.server.nonblocking;
38
39import core.atomic : atomicLoad, atomicStore, atomicOp;
40import core.exception : onOutOfMemoryError;
41import core.memory : GC;
42import core.sync.mutex;
43import core.stdc.stdlib : free, realloc;
44import core.time : Duration, dur;
45import core.thread : Thread, ThreadGroup;
46import deimos.event2.event;
47import std.array : empty;
48import std.conv : emplace, to;
49import std.exception : enforce;
50import std.parallelism : TaskPool, task;
51import std.socket : Socket, socketPair, SocketAcceptException,
52 SocketException, TcpSocket;
53import std.variant : Variant;
54import thrift.base;
55import thrift.internal.endian;
56import thrift.internal.socket;
57import thrift.internal.traits;
58import thrift.protocol.base;
59import thrift.protocol.binary;
60import thrift.protocol.processor;
61import thrift.server.base;
62import thrift.server.transport.socket;
63import thrift.transport.base;
64import thrift.transport.memory;
65import thrift.transport.range;
66import thrift.transport.socket;
67import thrift.util.cancellation;
68
69/**
70 * Possible actions taken on new incoming connections when the server is
71 * overloaded.
72 */
73enum TOverloadAction {
74 /// Do not take any special actions while the server is overloaded, just
75 /// continue accepting connections.
76 NONE,
77
78 /// Immediately drop new connections after they have been accepted if the
79 /// server is overloaded.
80 CLOSE_ON_ACCEPT
81}
82
83///
84class TNonblockingServer : TServer {
85 ///
86 this(TProcessor processor, ushort port, TTransportFactory transportFactory,
87 TProtocolFactory protocolFactory, TaskPool taskPool = null
88 ) {
89 this(new TSingletonProcessorFactory(processor), port, transportFactory,
90 transportFactory, protocolFactory, protocolFactory, taskPool);
91 }
92
93 ///
94 this(TProcessorFactory processorFactory, ushort port,
95 TTransportFactory transportFactory, TProtocolFactory protocolFactory,
96 TaskPool taskPool = null
97 ) {
98 this(processorFactory, port, transportFactory, transportFactory,
99 protocolFactory, protocolFactory, taskPool);
100 }
101
102 ///
103 this(
104 TProcessor processor,
105 ushort port,
106 TTransportFactory inputTransportFactory,
107 TTransportFactory outputTransportFactory,
108 TProtocolFactory inputProtocolFactory,
109 TProtocolFactory outputProtocolFactory,
110 TaskPool taskPool = null
111 ) {
112 this(new TSingletonProcessorFactory(processor), port,
113 inputTransportFactory, outputTransportFactory,
114 inputProtocolFactory, outputProtocolFactory, taskPool);
115 }
116
117 ///
118 this(
119 TProcessorFactory processorFactory,
120 ushort port,
121 TTransportFactory inputTransportFactory,
122 TTransportFactory outputTransportFactory,
123 TProtocolFactory inputProtocolFactory,
124 TProtocolFactory outputProtocolFactory,
125 TaskPool taskPool = null
126 ) {
127 super(processorFactory, null, inputTransportFactory,
128 outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
129 port_ = port;
130
131 this.taskPool = taskPool;
132
133 connectionMutex_ = new Mutex;
134
135 connectionStackLimit = DEFAULT_CONNECTION_STACK_LIMIT;
136 maxActiveProcessors = DEFAULT_MAX_ACTIVE_PROCESSORS;
137 maxConnections = DEFAULT_MAX_CONNECTIONS;
138 overloadHysteresis = DEFAULT_OVERLOAD_HYSTERESIS;
139 overloadAction = DEFAULT_OVERLOAD_ACTION;
140 writeBufferDefaultSize = DEFAULT_WRITE_BUFFER_DEFAULT_SIZE;
141 idleReadBufferLimit = DEFAULT_IDLE_READ_BUFFER_LIMIT;
142 idleWriteBufferLimit = DEFAULT_IDLE_WRITE_BUFFER_LIMIT;
143 resizeBufferEveryN = DEFAULT_RESIZE_BUFFER_EVERY_N;
144 maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
145 numIOThreads_ = DEFAULT_NUM_IO_THREADS;
146 }
147
148 override void serve(TCancellation cancellation = null) {
149 if (cancellation && cancellation.triggered) return;
150
151 // Initialize the listening socket.
152 // TODO: SO_KEEPALIVE, TCP_LOW_MIN_RTO, etc.
153 listenSocket_ = makeSocketAndListen(port_, TServerSocket.ACCEPT_BACKLOG,
154 BIND_RETRY_LIMIT, BIND_RETRY_DELAY, 0, 0, ipv6Only_);
155 listenSocket_.blocking = false;
156
157 logInfo("Using %s I/O thread(s).", numIOThreads_);
158 if (taskPool_) {
159 logInfo("Using task pool with size: %s.", numIOThreads_, taskPool_.size);
160 }
161
162 assert(numIOThreads_ > 0);
163 assert(ioLoops_.empty);
164 foreach (id; 0 .. numIOThreads_) {
165 // The IO loop on the first IO thread (this thread, i.e. the one serve()
166 // is called from) also accepts new connections.
167 auto listenSocket = (id == 0 ? listenSocket_ : null);
168 ioLoops_ ~= new IOLoop(this, listenSocket);
169 }
170
171 if (cancellation) {
172 cancellation.triggering.addCallback({
173 foreach (i, loop; ioLoops_) loop.stop();
174
175 // Stop accepting new connections right away.
176 listenSocket_.close();
177 listenSocket_ = null;
178 });
179 }
180
181 // Start the IO helper threads for all but the first loop, which we will run
182 // ourselves. Note that the threads run forever, only terminating if stop()
183 // is called.
184 auto threads = new ThreadGroup();
185 foreach (loop; ioLoops_[1 .. $]) {
186 auto t = new Thread(&loop.run);
187 threads.add(t);
188 t.start();
189 }
190
191 if (eventHandler) eventHandler.preServe();
192
193 // Run the primary (listener) IO thread loop in our main thread; this will
194 // block until the server is shutting down.
195 ioLoops_[0].run();
196
197 // Ensure all threads are finished before leaving serve().
198 threads.joinAll();
199
200 ioLoops_ = null;
201 }
202
203 /**
204 * Returns the number of currently active connections, i.e. open sockets.
205 */
206 size_t numConnections() const @property {
207 return numConnections_;
208 }
209
210 /**
211 * Returns the number of connection objects allocated, but not in use.
212 */
213 size_t numIdleConnections() const @property {
214 return connectionStack_.length;
215 }
216
217 /**
218 * Return count of number of connections which are currently processing.
219 *
220 * This is defined as a connection where all data has been received, and the
221 * processor was invoked but has not yet completed.
222 */
223 size_t numActiveProcessors() const @property {
224 return numActiveProcessors_;
225 }
226
227 /// Number of bind() retries.
228 enum BIND_RETRY_LIMIT = 0;
229
230 /// Duration between bind() retries.
231 enum BIND_RETRY_DELAY = dur!"hnsecs"(0);
232
233 /// Whether to listen on IPv6 only, if IPv6 support is detected
234 // (default: false).
235 void ipv6Only(bool value) @property {
236 ipv6Only_ = value;
237 }
238
239 /**
240 * The task pool to use for processing requests. If null, no additional
241 * threads are used and request are processed »inline«.
242 *
243 * Can safely be set even when the server is already running.
244 */
245 TaskPool taskPool() @property {
246 return taskPool_;
247 }
248
249 /// ditto
250 void taskPool(TaskPool pool) @property {
251 taskPool_ = pool;
252 }
253
254 /**
255 * Hysteresis for overload state.
256 *
257 * This is the fraction of the overload value that needs to be reached
258 * before the overload state is cleared. It must be between 0 and 1,
259 * practical choices probably lie between 0.5 and 0.9.
260 */
261 double overloadHysteresis() const @property {
262 return overloadHysteresis_;
263 }
264
265 /// Ditto
266 void overloadHysteresis(double value) @property {
267 enforce(0 < value && value <= 1,
268 "Invalid value for overload hysteresis: " ~ to!string(value));
269 overloadHysteresis_ = value;
270 }
271
272 /// Ditto
273 enum DEFAULT_OVERLOAD_HYSTERESIS = 0.8;
274
275 /**
276 * The action which will be taken on overload.
277 */
278 TOverloadAction overloadAction;
279
280 /// Ditto
281 enum DEFAULT_OVERLOAD_ACTION = TOverloadAction.NONE;
282
283 /**
284 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
285 * and found to be exceeded, reinitialized) to this size.
286 */
287 size_t writeBufferDefaultSize;
288
289 /// Ditto
290 enum size_t DEFAULT_WRITE_BUFFER_DEFAULT_SIZE = 1024;
291
292 /**
293 * Max read buffer size for an idle Connection. When we place an idle
294 * Connection into connectionStack_ or on every resizeBufferEveryN_ calls,
295 * we will free the buffer (such that it will be reinitialized by the next
296 * received frame) if it has exceeded this limit. 0 disables this check.
297 */
298 size_t idleReadBufferLimit;
299
300 /// Ditto
301 enum size_t DEFAULT_IDLE_READ_BUFFER_LIMIT = 1024;
302
303 /**
304 * Max write buffer size for an idle connection. When we place an idle
305 * Connection into connectionStack_ or on every resizeBufferEveryN_ calls,
306 * we ensure that its write buffer is <= to this size; otherwise we
307 * replace it with a new one of writeBufferDefaultSize_ bytes to ensure that
308 * idle connections don't hog memory. 0 disables this check.
309 */
310 size_t idleWriteBufferLimit;
311
312 /// Ditto
313 enum size_t DEFAULT_IDLE_WRITE_BUFFER_LIMIT = 1024;
314
315 /**
316 * Every N calls we check the buffer size limits on a connected Connection.
317 * 0 disables (i.e. the checks are only done when a connection closes).
318 */
319 uint resizeBufferEveryN;
320
321 /// Ditto
322 enum uint DEFAULT_RESIZE_BUFFER_EVERY_N = 512;
323
324 /// Limit for how many Connection objects to cache.
325 size_t connectionStackLimit;
326
327 /// Ditto
328 enum size_t DEFAULT_CONNECTION_STACK_LIMIT = 1024;
329
330 /// Limit for number of open connections before server goes into overload
331 /// state.
332 size_t maxConnections;
333
334 /// Ditto
335 enum size_t DEFAULT_MAX_CONNECTIONS = int.max;
336
337 /// Limit for number of connections processing or waiting to process
338 size_t maxActiveProcessors;
339
340 /// Ditto
341 enum size_t DEFAULT_MAX_ACTIVE_PROCESSORS = int.max;
342
343 /// Maximum frame size, in bytes.
344 ///
345 /// If a client tries to send a message larger than this limit, its
346 /// connection will be closed. This helps to avoid allocating huge buffers
347 /// on bogous input.
348 uint maxFrameSize;
349
350 /// Ditto
351 enum uint DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024;
352
353
354 size_t numIOThreads() @property {
355 return numIOThreads_;
356 }
357
358 void numIOThreads(size_t value) @property {
359 enforce(value >= 1, new TException("Must use at least one I/O thread."));
360 numIOThreads_ = value;
361 }
362
363 enum DEFAULT_NUM_IO_THREADS = 1;
364
365private:
366 /**
367 * C callback wrapper around acceptConnections(). Expects the custom argument
368 * to be the this pointer of the associated server instance.
369 */
370 extern(C) static void acceptConnectionsCallback(int fd, short which,
371 void* serverThis
372 ) {
373 (cast(TNonblockingServer)serverThis).acceptConnections(fd, which);
374 }
375
376 /**
377 * Called by libevent (IO loop 0/serve() thread only) when something
378 * happened on the listening socket.
379 */
380 void acceptConnections(int fd, short eventFlags) {
381 if (atomicLoad(ioLoops_[0].shuttingDown_)) return;
382
383 assert(!!listenSocket_,
384 "Server should be shutting down if listen socket is null.");
385 assert(fd == listenSocket_.handle);
386 assert(eventFlags & EV_READ);
387
388 // Accept as many new clients as possible, even though libevent signaled
389 // only one. This helps the number of calls into libevent space.
390 while (true) {
391 // It is lame to use exceptions for regular control flow (failing is
392 // excepted due to non-blocking mode of operation), but that's the
393 // interface std.socket offers…
394 Socket clientSocket;
395 try {
396 clientSocket = listenSocket_.accept();
397 } catch (SocketAcceptException e) {
398 if (e.errorCode != WOULD_BLOCK_ERRNO) {
399 logError("Error accepting connection: %s", e);
400 }
401 break;
402 }
403
404 // If the server is overloaded, this is the point to take the specified
405 // action.
406 if (overloadAction != TOverloadAction.NONE && checkOverloaded()) {
407 nConnectionsDropped_++;
408 nTotalConnectionsDropped_++;
409 if (overloadAction == TOverloadAction.CLOSE_ON_ACCEPT) {
410 clientSocket.close();
411 return;
412 }
413 }
414
415 try {
416 clientSocket.blocking = false;
417 } catch (SocketException e) {
418 logError("Couldn't set client socket to non-blocking mode: %s", e);
419 clientSocket.close();
420 return;
421 }
422
423 // Create a new Connection for this client socket.
424 Connection conn = void;
425 IOLoop loop = void;
426 bool thisThread = void;
427 synchronized (connectionMutex_) {
428 // Assign an I/O loop to the connection (round-robin).
429 assert(nextIOLoop_ >= 0);
430 assert(nextIOLoop_ < ioLoops_.length);
431 auto selectedThreadIdx = nextIOLoop_;
432 nextIOLoop_ = (nextIOLoop_ + 1) % ioLoops_.length;
433
434 loop = ioLoops_[selectedThreadIdx];
435 thisThread = (selectedThreadIdx == 0);
436
437 // Check the connection stack to see if we can re-use an existing one.
438 if (connectionStack_.empty) {
439 ++numConnections_;
440 conn = new Connection(clientSocket, loop);
441
442 // Make sure the connection does not get collected while it is active,
443 // i.e. hooked up with libevent.
444 GC.addRoot(cast(void*)conn);
445 } else {
446 conn = connectionStack_[$ - 1];
447 connectionStack_ = connectionStack_[0 .. $ - 1];
448 connectionStack_.assumeSafeAppend();
449 conn.init(clientSocket, loop);
450 }
451 }
452
453 loop.addConnection();
454
455 // Either notify the ioThread that is assigned this connection to
456 // start processing, or if it is us, we'll just ask this
457 // connection to do its initial state change here.
458 //
459 // (We need to avoid writing to our own notification pipe, to
460 // avoid possible deadlocks if the pipe is full.)
461 if (thisThread) {
462 conn.transition();
463 } else {
464 loop.notifyCompleted(conn);
465 }
466 }
467 }
468
469 /// Increment the count of connections currently processing.
470 void incrementActiveProcessors() {
471 atomicOp!"+="(numActiveProcessors_, 1);
472 }
473
474 /// Decrement the count of connections currently processing.
475 void decrementActiveProcessors() {
476 assert(numActiveProcessors_ > 0);
477 atomicOp!"-="(numActiveProcessors_, 1);
478 }
479
480 /**
481 * Determines if the server is currently overloaded.
482 *
483 * If the number of open connections or »processing« connections is over the
484 * respective limit, the server will enter overload handling mode and a
485 * warning will be logged. If below values are below the hysteresis curve,
486 * this will cause the server to exit it again.
487 *
488 * Returns: Whether the server is currently overloaded.
489 */
490 bool checkOverloaded() {
491 auto activeConnections = numConnections_ - connectionStack_.length;
492 if (numActiveProcessors_ > maxActiveProcessors ||
493 activeConnections > maxConnections) {
494 if (!overloaded_) {
495 logInfo("Entering overloaded state.");
496 overloaded_ = true;
497 }
498 } else {
499 if (overloaded_ &&
500 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors) &&
501 (activeConnections <= overloadHysteresis_ * maxConnections))
502 {
503 logInfo("Exiting overloaded state, %s connection(s) dropped (% total).",
504 nConnectionsDropped_, nTotalConnectionsDropped_);
505 nConnectionsDropped_ = 0;
506 overloaded_ = false;
507 }
508 }
509
510 return overloaded_;
511 }
512
513 /**
514 * Marks a connection as inactive and either puts it back into the
515 * connection pool or leaves it for garbage collection.
516 */
517 void disposeConnection(Connection connection) {
518 synchronized (connectionMutex_) {
519 if (!connectionStackLimit ||
520 (connectionStack_.length < connectionStackLimit))
521 {
522 connection.checkIdleBufferLimit(idleReadBufferLimit,
523 idleWriteBufferLimit);
524 connectionStack_ ~= connection;
525 } else {
526 assert(numConnections_ > 0);
527 --numConnections_;
528
529 // Leave the connection object for collection now.
530 GC.removeRoot(cast(void*)connection);
531 }
532 }
533 }
534
535 /// Socket used to listen for connections and accepting them.
536 Socket listenSocket_;
537
538 /// Port to listen on.
539 ushort port_;
540
541 /// Whether to listen on IPv6 only.
542 bool ipv6Only_;
543
544 /// The total number of connections existing, both active and idle.
545 size_t numConnections_;
546
547 /// The number of connections which are currently waiting for the processor
548 /// to return.
549 shared size_t numActiveProcessors_;
550
551 /// Hysteresis for leaving overload state.
552 double overloadHysteresis_;
553
554 /// Whether the server is currently overloaded.
555 bool overloaded_;
556
557 /// Number of connections dropped since the server entered the current
558 /// overloaded state.
559 uint nConnectionsDropped_;
560
561 /// Number of connections dropped due to overload since the server started.
562 ulong nTotalConnectionsDropped_;
563
564 /// The task pool used for processing requests.
565 TaskPool taskPool_;
566
567 /// Number of IO threads this server will use (>= 1).
568 size_t numIOThreads_;
569
570 /// The IOLoops among which socket handling work is distributed.
571 IOLoop[] ioLoops_;
572
573 /// The index of the loop in ioLoops_ which will handle the next accepted
574 /// connection.
575 size_t nextIOLoop_;
576
577 /// All the connection objects which have been created but are not currently
578 /// in use. When a connection is closed, it it placed here to enable object
579 /// (resp. buffer) reuse.
580 Connection[] connectionStack_;
581
582 /// This mutex protects the connection stack.
583 Mutex connectionMutex_;
584}
585
586private {
587 /*
588 * Encapsulates a libevent event loop.
589 *
590 * The design is a bit of a mess, since the first loop is actually run on the
591 * server thread itself and is special because it is the only instance for
592 * which listenSocket_ is not null.
593 */
594 final class IOLoop {
595 /**
596 * Creates a new instance and set up the event base.
597 *
598 * If listenSocket is not null, the thread will also accept new
599 * connections itself.
600 */
601 this(TNonblockingServer server, Socket listenSocket) {
602 server_ = server;
603 listenSocket_ = listenSocket;
604 initMutex_ = new Mutex;
605 }
606
607 /**
608 * Runs the event loop; only returns after a call to stop().
609 */
610 void run() {
611 assert(!atomicLoad(initialized_), "IOLoop already running?!");
612
613 synchronized (initMutex_) {
614 if (atomicLoad(shuttingDown_)) return;
615 atomicStore(initialized_, true);
616
617 assert(!eventBase_);
618 eventBase_ = event_base_new();
619
620 if (listenSocket_) {
621 // Log the libevent version and backend.
622 logInfo("libevent version %s, using method %s.",
623 to!string(event_get_version()), to!string(event_base_get_method(eventBase_)));
624
625 // Register the event for the listening socket.
626 listenEvent_ = event_new(eventBase_, listenSocket_.handle,
627 EV_READ | EV_PERSIST | EV_ET,
628 assumeNothrow(&TNonblockingServer.acceptConnectionsCallback),
629 cast(void*)server_);
630 if (event_add(listenEvent_, null) == -1) {
631 throw new TException("event_add for the listening socket event failed.");
632 }
633 }
634
635 auto pair = socketPair();
636 foreach (s; pair) s.blocking = false;
637 completionSendSocket_ = pair[0];
638 completionReceiveSocket_ = pair[1];
639
640 // Register an event for the task completion notification socket.
641 completionEvent_ = event_new(eventBase_, completionReceiveSocket_.handle,
642 EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&completedCallback),
643 cast(void*)this);
644
645 if (event_add(completionEvent_, null) == -1) {
646 throw new TException("event_add for the notification socket failed.");
647 }
648 }
649
650 // Run libevent engine, returns only after stop().
651 event_base_dispatch(eventBase_);
652
653 if (listenEvent_) {
654 event_free(listenEvent_);
655 listenEvent_ = null;
656 }
657
658 event_free(completionEvent_);
659 completionEvent_ = null;
660
661 completionSendSocket_.close();
662 completionSendSocket_ = null;
663
664 completionReceiveSocket_.close();
665 completionReceiveSocket_ = null;
666
667 event_base_free(eventBase_);
668 eventBase_ = null;
669
670 atomicStore(shuttingDown_, false);
671
672 initialized_ = false;
673 }
674
675 /**
676 * Adds a new connection handled by this loop.
677 */
678 void addConnection() {
679 ++numActiveConnections_;
680 }
681
682 /**
683 * Disposes a connection object (typically after it has been closed).
684 */
685 void disposeConnection(Connection conn) {
686 server_.disposeConnection(conn);
687 assert(numActiveConnections_ > 0);
688 --numActiveConnections_;
689 if (numActiveConnections_ == 0) {
690 if (atomicLoad(shuttingDown_)) {
691 event_base_loopbreak(eventBase_);
692 }
693 }
694 }
695
696 /**
697 * Notifies the event loop that the current step (initialization,
698 * processing of a request) on a certain connection has been completed.
699 *
700 * This function is thread-safe, but should never be called from the
701 * thread running the loop itself.
702 */
703 void notifyCompleted(Connection conn) {
704 assert(!!completionSendSocket_);
705 auto bytesSent = completionSendSocket_.send(cast(ubyte[])((&conn)[0 .. 1]));
706
707 if (bytesSent != Connection.sizeof) {
708 logError("Sending completion notification failed, connection will " ~
709 "not be properly terminated.");
710 }
711 }
712
713 /**
714 * Exits the event loop after all currently active connections have been
715 * closed.
716 *
717 * This function is thread-safe.
718 */
719 void stop() {
720 // There is a bug in either libevent or its documentation, having no
721 // events registered doesn't actually terminate the loop, because
722 // event_base_new() registers some internal one by calling
723 // evthread_make_base_notifiable().
724 // Due to this, we can't simply remove all events and expect the event
725 // loop to terminate. Instead, we ping the event loop using a null
726 // completion message. This way, we make sure to wake up the libevent
727 // thread if it not currently processing any connections. It will break
728 // out of the loop in disposeConnection() after the last active
729 // connection has been closed.
730 synchronized (initMutex_) {
731 atomicStore(shuttingDown_, true);
732 if (atomicLoad(initialized_)) notifyCompleted(null);
733 }
734 }
735
736 private:
737 /**
738 * C callback to call completed() from libevent.
739 *
740 * Expects the custom argument to be the this pointer of the associated
741 * IOLoop instance.
742 */
743 extern(C) static void completedCallback(int fd, short what, void* loopThis) {
744 assert(what & EV_READ);
745 auto loop = cast(IOLoop)loopThis;
746 assert(fd == loop.completionReceiveSocket_.handle);
747 loop.completed();
748 }
749
750 /**
751 * Reads from the completion receive socket and appropriately transitions
752 * the connections and shuts down the loop if requested.
753 */
754 void completed() {
755 Connection connection;
756 ptrdiff_t bytesRead;
757 while (true) {
758 bytesRead = completionReceiveSocket_.receive(
759 cast(ubyte[])((&connection)[0 .. 1]));
760 if (bytesRead < 0) {
761 auto errno = getSocketErrno();
762
763 if (errno != WOULD_BLOCK_ERRNO) {
764 logError("Reading from completion socket failed, some connection " ~
765 "will never be properly terminated: %s", socketErrnoString(errno));
766 }
767 }
768
769 if (bytesRead != Connection.sizeof) break;
770
771 if (!connection) {
772 assert(atomicLoad(shuttingDown_));
773 if (numActiveConnections_ == 0) {
774 event_base_loopbreak(eventBase_);
775 }
776 continue;
777 }
778
779 connection.transition();
780 }
781
782 if (bytesRead > 0) {
783 logError("Unexpected partial read from completion socket " ~
784 "(%s bytes instead of %s).", bytesRead, Connection.sizeof);
785 }
786 }
787
788 /// associated server
789 TNonblockingServer server_;
790
791 /// The managed listening socket, if any.
792 Socket listenSocket_;
793
794 /// The libevent event base for the loop.
795 event_base* eventBase_;
796
797 /// Triggered on listen socket events.
798 event* listenEvent_;
799
800 /// Triggered on completion receive socket events.
801 event* completionEvent_;
802
803 /// Socket used to send completion notification messages. Paired with
804 /// completionReceiveSocket_.
805 Socket completionSendSocket_;
806
807 /// Socket used to send completion notification messages. Paired with
808 /// completionSendSocket_.
809 Socket completionReceiveSocket_;
810
811 /// Whether the server is currently shutting down (i.e. the cancellation has
812 /// been triggered, but not all client connections have been closed yet).
813 shared bool shuttingDown_;
814
815 /// The number of currently active client connections.
816 size_t numActiveConnections_;
817
818 /// Guards loop startup so that the loop can be reliably shut down even if
819 /// another thread has just started to execute run(). Locked during
820 /// initialization in run(). When unlocked, the completion mechanism is
821 /// expected to be fully set up.
822 Mutex initMutex_;
823 shared bool initialized_; /// Ditto
824 }
825
826 /*
827 * I/O states a socket can be in.
828 */
829 enum SocketState {
830 RECV_FRAME_SIZE, /// The frame size is received.
831 RECV, /// The payload is received.
832 SEND /// The response is written back out.
833 }
834
835 /*
836 * States a connection can be in.
837 */
838 enum ConnectionState {
839 INIT, /// The connection will be initialized.
840 READ_FRAME_SIZE, /// The four frame size bytes are being read.
841 READ_REQUEST, /// The request payload itself is being read.
842 WAIT_PROCESSOR, /// The connection waits for the processor to finish.
843 SEND_RESULT /// The result is written back out.
844 }
845
846 /*
847 * A connection that is handled via libevent.
848 *
849 * Data received is buffered until the request is complete (returning back to
850 * libevent if not), at which point the processor is invoked.
851 */
852 final class Connection {
853 /**
854 * Constructs a new instance.
855 *
856 * To reuse a connection object later on, the init() function can be used
857 * to the same effect on the internal state.
858 */
859 this(Socket socket, IOLoop loop) {
860 // The input and output transport objects are reused between clients
861 // connections, so initialize them here rather than in init().
862 inputTransport_ = new TInputRangeTransport!(ubyte[])([]);
863 outputTransport_ = new TMemoryBuffer(loop.server_.writeBufferDefaultSize);
864
865 init(socket, loop);
866 }
867
868 /**
869 * Initializes the connection.
870 *
871 * Params:
872 * socket = The socket to work on.
873 * eventFlags = Any flags to pass to libevent.
874 * s = The server this connection is part of.
875 */
876 void init(Socket socket, IOLoop loop) {
877 // TODO: This allocation could be avoided.
878 socket_ = new TSocket(socket);
879
880 loop_ = loop;
881 server_ = loop_.server_;
882 connState_ = ConnectionState.INIT;
883 eventFlags_ = 0;
884
885 readBufferPos_ = 0;
886 readWant_ = 0;
887
888 writeBuffer_ = null;
889 writeBufferPos_ = 0;
890 largestWriteBufferSize_ = 0;
891
892 socketState_ = SocketState.RECV_FRAME_SIZE;
893 callsSinceResize_ = 0;
894
895 factoryInputTransport_ =
896 server_.inputTransportFactory_.getTransport(inputTransport_);
897 factoryOutputTransport_ =
898 server_.outputTransportFactory_.getTransport(outputTransport_);
899
900 inputProtocol_ =
901 server_.inputProtocolFactory_.getProtocol(factoryInputTransport_);
902 outputProtocol_ =
903 server_.outputProtocolFactory_.getProtocol(factoryOutputTransport_);
904
905 if (server_.eventHandler) {
906 connectionContext_ =
907 server_.eventHandler.createContext(inputProtocol_, outputProtocol_);
908 }
909
910 auto info = TConnectionInfo(inputProtocol_, outputProtocol_, socket_);
911 processor_ = server_.processorFactory_.getProcessor(info);
912 }
913
914 ~this() {
915 free(readBuffer_);
916 if (event_) {
917 event_free(event_);
918 event_ = null;
919 }
920 }
921
922 /**
923 * Check buffers against the size limits and shrink them if exceeded.
924 *
925 * Params:
926 * readLimit = Read buffer size limit (in bytes, 0 to ignore).
927 * writeLimit = Write buffer size limit (in bytes, 0 to ignore).
928 */
929 void checkIdleBufferLimit(size_t readLimit, size_t writeLimit) {
930 if (readLimit > 0 && readBufferSize_ > readLimit) {
931 free(readBuffer_);
932 readBuffer_ = null;
933 readBufferSize_ = 0;
934 }
935
936 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
937 // just start over
938 outputTransport_.reset(server_.writeBufferDefaultSize);
939 largestWriteBufferSize_ = 0;
940 }
941 }
942
943 /**
944 * Transitions the connection to the next state.
945 *
946 * This is called e.g. when the request has been read completely or all
947 * the data has been written back.
948 */
949 void transition() {
950 assert(!!loop_);
951 assert(!!server_);
952
953 // Switch upon the state that we are currently in and move to a new state
954 final switch (connState_) {
955 case ConnectionState.READ_REQUEST:
956 // We are done reading the request, package the read buffer into transport
957 // and get back some data from the dispatch function
958 inputTransport_.reset(readBuffer_[0 .. readBufferPos_]);
959 outputTransport_.reset();
960
961 // Prepend four bytes of blank space to the buffer so we can
962 // write the frame size there later.
963 // Strictly speaking, we wouldn't have to write anything, just
964 // increment the TMemoryBuffer writeOffset_. This would yield a tiny
965 // performance gain.
966 ubyte[4] space = void;
967 outputTransport_.write(space);
968
969 server_.incrementActiveProcessors();
970
971 taskPool_ = server_.taskPool;
972 if (taskPool_) {
973 // Create a new task and add it to the task pool queue.
974 auto processingTask = task!processRequest(this);
975 connState_ = ConnectionState.WAIT_PROCESSOR;
976 taskPool_.put(processingTask);
977
978 // We don't want to process any more data while the task is active.
979 unregisterEvent();
980 return;
981 }
982
983 // Just process it right now if there is no task pool set.
984 processRequest(this);
985 goto case;
986 case ConnectionState.WAIT_PROCESSOR:
987 // We have now finished processing the request, set the frame size
988 // for the outputTransport_ contents and set everything up to write
989 // it out via libevent.
990 server_.decrementActiveProcessors();
991
992 // Acquire the data written to the transport.
993 // KLUDGE: To avoid copying, we simply cast the const away and
994 // modify the internal buffer of the TMemoryBuffer – works with the
995 // current implementation, but isn't exactly beautiful.
996 writeBuffer_ = cast(ubyte[])outputTransport_.getContents();
997
998 assert(writeBuffer_.length >= 4, "The write buffer should have " ~
999 "least the initially added dummy length bytes.");
1000 if (writeBuffer_.length == 4) {
1001 // The request was one-way, no response to write.
1002 goto case ConnectionState.INIT;
1003 }
1004
1005 // Write the frame size into the four bytes reserved for it.
1006 auto size = hostToNet(cast(uint)(writeBuffer_.length - 4));
1007 writeBuffer_[0 .. 4] = cast(ubyte[])((&size)[0 .. 1]);
1008
1009 writeBufferPos_ = 0;
1010 socketState_ = SocketState.SEND;
1011 connState_ = ConnectionState.SEND_RESULT;
1012 registerEvent(EV_WRITE | EV_PERSIST);
1013
1014 return;
1015 case ConnectionState.SEND_RESULT:
1016 // The result has been sent back to the client, we don't need the
1017 // buffers anymore.
1018 if (writeBuffer_.length > largestWriteBufferSize_) {
1019 largestWriteBufferSize_ = writeBuffer_.length;
1020 }
1021
1022 if (server_.resizeBufferEveryN > 0 &&
1023 ++callsSinceResize_ >= server_.resizeBufferEveryN
1024 ) {
1025 checkIdleBufferLimit(server_.idleReadBufferLimit,
1026 server_.idleWriteBufferLimit);
1027 callsSinceResize_ = 0;
1028 }
1029
1030 goto case;
1031 case ConnectionState.INIT:
1032 writeBuffer_ = null;
1033 writeBufferPos_ = 0;
1034 socketState_ = SocketState.RECV_FRAME_SIZE;
1035 connState_ = ConnectionState.READ_FRAME_SIZE;
1036 readBufferPos_ = 0;
1037 registerEvent(EV_READ | EV_PERSIST);
1038
1039 return;
1040 case ConnectionState.READ_FRAME_SIZE:
1041 // We just read the request length, set up the buffers for reading
1042 // the payload.
1043 if (readWant_ > readBufferSize_) {
1044 // The current buffer is too small, exponentially grow the buffer
1045 // until it is big enough.
1046
1047 if (readBufferSize_ == 0) {
1048 readBufferSize_ = 1;
1049 }
1050
1051 auto newSize = readBufferSize_;
1052 while (readWant_ > newSize) {
1053 newSize *= 2;
1054 }
1055
1056 auto newBuffer = cast(ubyte*)realloc(readBuffer_, newSize);
1057 if (!newBuffer) onOutOfMemoryError();
1058
1059 readBuffer_ = newBuffer;
1060 readBufferSize_ = newSize;
1061 }
1062
1063 readBufferPos_= 0;
1064
1065 socketState_ = SocketState.RECV;
1066 connState_ = ConnectionState.READ_REQUEST;
1067
1068 return;
1069 }
1070 }
1071
1072 private:
1073 /**
1074 * C callback to call workSocket() from libevent.
1075 *
1076 * Expects the custom argument to be the this pointer of the associated
1077 * connection.
1078 */
1079 extern(C) static void workSocketCallback(int fd, short flags, void* connThis) {
1080 auto conn = cast(Connection)connThis;
1081 assert(fd == conn.socket_.socketHandle);
1082 conn.workSocket();
1083 }
1084
1085 /**
1086 * Invoked by libevent when something happens on the socket.
1087 */
1088 void workSocket() {
1089 final switch (socketState_) {
1090 case SocketState.RECV_FRAME_SIZE:
1091 // If some bytes have already been read, they have been kept in
1092 // readWant_.
1093 auto frameSize = readWant_;
1094
1095 try {
1096 // Read from the socket
1097 auto bytesRead = socket_.read(
1098 (cast(ubyte[])((&frameSize)[0 .. 1]))[readBufferPos_ .. $]);
1099 if (bytesRead == 0) {
1100 // Couldn't read anything, but we have been notified – client
1101 // has disconnected.
1102 close();
1103 return;
1104 }
1105
1106 readBufferPos_ += bytesRead;
1107 } catch (TTransportException te) {
1108 logError("Failed to read frame size from client connection: %s", te);
1109 close();
1110 return;
1111 }
1112
1113 if (readBufferPos_ < frameSize.sizeof) {
1114 // Frame size not complete yet, save the current buffer in
1115 // readWant_ so that the remaining bytes can be read later.
1116 readWant_ = frameSize;
1117 return;
1118 }
1119
1120 auto size = netToHost(frameSize);
1121 if (size > server_.maxFrameSize) {
1122 logError("Frame size too large (%s > %s), client %s not using " ~
1123 "TFramedTransport?", size, server_.maxFrameSize,
1124 socket_.getPeerAddress().toHostNameString());
1125 close();
1126 return;
1127 }
1128 readWant_ = size;
1129
1130 // Now we know the frame size, set everything up for reading the
1131 // payload.
1132 transition();
1133 return;
1134
1135 case SocketState.RECV:
1136 // If we already got all the data, we should be in the SEND state.
1137 assert(readBufferPos_ < readWant_);
1138
1139 size_t bytesRead;
1140 try {
1141 // Read as much as possible from the socket.
1142 bytesRead = socket_.read(readBuffer_[readBufferPos_ .. readWant_]);
1143 } catch (TTransportException te) {
1144 logError("Failed to read from client socket: %s", te);
1145 close();
1146 return;
1147 }
1148
1149 if (bytesRead == 0) {
1150 // We were notified, but no bytes could be read -> the client
1151 // disconnected.
1152 close();
1153 return;
1154 }
1155
1156 readBufferPos_ += bytesRead;
1157 assert(readBufferPos_ <= readWant_);
1158
1159 if (readBufferPos_ == readWant_) {
1160 // The payload has been read completely, move on.
1161 transition();
1162 }
1163
1164 return;
1165 case SocketState.SEND:
1166 assert(writeBufferPos_ <= writeBuffer_.length);
1167
1168 if (writeBufferPos_ == writeBuffer_.length) {
1169 // Nothing left to send – this shouldn't happen, just move on.
1170 logInfo("WARNING: In send state, but no data to send.\n");
1171 transition();
1172 return;
1173 }
1174
1175 size_t bytesSent;
1176 try {
1177 bytesSent = socket_.writeSome(writeBuffer_[writeBufferPos_ .. $]);
1178 } catch (TTransportException te) {
1179 logError("Failed to write to client socket: %s", te);
1180 close();
1181 return;
1182 }
1183
1184 writeBufferPos_ += bytesSent;
1185 assert(writeBufferPos_ <= writeBuffer_.length);
1186
1187 if (writeBufferPos_ == writeBuffer_.length) {
1188 // The whole response has been written out, we are done.
1189 transition();
1190 }
1191
1192 return;
1193 }
1194 }
1195
1196 /**
1197 * Registers a libevent event for workSocket() with the passed flags,
1198 * unregistering the previous one (if any).
1199 */
1200 void registerEvent(short eventFlags) {
1201 if (eventFlags_ == eventFlags) {
1202 // Nothing to do if flags are the same.
1203 return;
1204 }
1205
1206 // Delete the previously existing event.
1207 unregisterEvent();
1208
1209 eventFlags_ = eventFlags;
1210
1211 if (eventFlags == 0) return;
1212
1213 if (!event_) {
1214 // If the event was not already allocated, do it now.
1215 event_ = event_new(loop_.eventBase_, socket_.socketHandle,
1216 eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this);
1217 } else {
1218 event_assign(event_, loop_.eventBase_, socket_.socketHandle,
1219 eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this);
1220 }
1221
1222 // Add the event
1223 if (event_add(event_, null) == -1) {
1224 logError("event_add() for client socket failed.");
1225 }
1226 }
1227
1228 /**
1229 * Unregisters the current libevent event, if any.
1230 */
1231 void unregisterEvent() {
1232 if (event_ && eventFlags_ != 0) {
1233 eventFlags_ = 0;
1234 if (event_del(event_) == -1) {
1235 logError("event_del() for client socket failed.");
1236 return;
1237 }
1238 }
1239 }
1240
1241 /**
1242 * Closes this connection and returns it back to the server.
1243 */
1244 void close() {
1245 unregisterEvent();
1246
1247 if (server_.eventHandler) {
1248 server_.eventHandler.deleteContext(
1249 connectionContext_, inputProtocol_, outputProtocol_);
1250 }
1251
1252 // Close the socket
1253 socket_.close();
1254
1255 // close any factory produced transports.
1256 factoryInputTransport_.close();
1257 factoryOutputTransport_.close();
1258
1259 // This connection object can now be reused.
1260 loop_.disposeConnection(this);
1261 }
1262
1263 /// The server this connection belongs to.
1264 TNonblockingServer server_;
1265
1266 /// The task pool used for this connection. This is cached instead of
1267 /// directly using server_.taskPool to avoid confusion if it is changed in
1268 /// another thread while the request is processed.
1269 TaskPool taskPool_;
1270
1271 /// The I/O thread handling this connection.
1272 IOLoop loop_;
1273
1274 /// The socket managed by this connection.
1275 TSocket socket_;
1276
1277 /// The libevent object used for registering the workSocketCallback.
1278 event* event_;
1279
1280 /// Libevent flags
1281 short eventFlags_;
1282
1283 /// Socket mode
1284 SocketState socketState_;
1285
1286 /// Application state
1287 ConnectionState connState_;
1288
1289 /// The size of the frame to read. If still in READ_FRAME_SIZE state, some
1290 /// of the bytes might not have been written, and the value might still be
1291 /// in network byte order. An uint (not a size_t) because the frame size on
1292 /// the wire is specified as one.
1293 uint readWant_;
1294
1295 /// The position in the read buffer, i.e. the number of payload bytes
1296 /// already received from the socket in READ_REQUEST state, resp. the
1297 /// number of size bytes in READ_FRAME_SIZE state.
1298 uint readBufferPos_;
1299
1300 /// Read buffer
1301 ubyte* readBuffer_;
1302
1303 /// Read buffer size
1304 size_t readBufferSize_;
1305
1306 /// Write buffer
1307 ubyte[] writeBuffer_;
1308
1309 /// How far through writing are we?
1310 size_t writeBufferPos_;
1311
1312 /// Largest size of write buffer seen since buffer was constructed
1313 size_t largestWriteBufferSize_;
1314
1315 /// Number of calls since the last time checkIdleBufferLimit has been
1316 /// invoked (see TServer.resizeBufferEveryN).
1317 uint callsSinceResize_;
1318
1319 /// Base transports the processor reads from/writes to.
1320 TInputRangeTransport!(ubyte[]) inputTransport_;
1321 TMemoryBuffer outputTransport_;
1322
1323 /// The actual transports passed to the processor obtained via the
1324 /// transport factory.
1325 TTransport factoryInputTransport_;
1326 TTransport factoryOutputTransport_; /// Ditto
1327
1328 /// Input/output protocols, connected to factory{Input, Output}Transport.
1329 TProtocol inputProtocol_;
1330 TProtocol outputProtocol_; /// Ditto.
1331
1332 /// Connection context optionally created by the server event handler.
1333 Variant connectionContext_;
1334
1335 /// The processor used for this connection.
1336 TProcessor processor_;
1337 }
1338}
1339
1340/*
1341 * The request processing function, which invokes the processor for the server
1342 * for all the RPC messages received over a connection.
1343 *
1344 * Must be public because it is passed as alias to std.parallelism.task().
1345 */
1346void processRequest(Connection connection) {
1347 try {
1348 while (true) {
1349 with (connection) {
1350 if (server_.eventHandler) {
1351 server_.eventHandler.preProcess(connectionContext_, socket_);
1352 }
1353
1354 if (!processor_.process(inputProtocol_, outputProtocol_,
1355 connectionContext_) || !inputProtocol_.transport.peek()
1356 ) {
1357 // Something went fundamentally wrong or there is nothing more to
1358 // process, close the connection.
1359 break;
1360 }
1361 }
1362 }
1363 } catch (TTransportException ttx) {
1364 logError("Client died: %s", ttx);
1365 } catch (Exception e) {
1366 logError("Uncaught exception: %s", e);
1367 }
1368
1369 if (connection.taskPool_) connection.loop_.notifyCompleted(connection);
1370}
1371
1372unittest {
1373 import thrift.internal.test.server;
1374
1375 // Temporarily disable info log output in order not to spam the test results
1376 // with startup info messages.
1377 auto oldInfoLogSink = g_infoLogSink;
1378 g_infoLogSink = null;
1379 scope (exit) g_infoLogSink = oldInfoLogSink;
1380
1381 // Test in-line processing shutdown with one as well as several I/O threads.
1382 testServeCancel!(TNonblockingServer)();
1383 testServeCancel!(TNonblockingServer)((TNonblockingServer s) {
1384 s.numIOThreads = 4;
1385 });
1386
1387 // Test task pool processing shutdown with one as well as several I/O threads.
1388 auto tp = new TaskPool(4);
1389 tp.isDaemon = true;
1390 testServeCancel!(TNonblockingServer)((TNonblockingServer s) {
1391 s.taskPool = tp;
1392 });
1393 testServeCancel!(TNonblockingServer)((TNonblockingServer s) {
1394 s.taskPool = tp;
1395 s.numIOThreads = 4;
1396 });
1397}