blob: 9f813ed055e3121e6dc14daf3c26e59f48756644 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/Thrift.h>
cyy316723a2019-01-05 16:35:14 +080024#include <memory>
Roger Meier49ff8b12012-04-13 09:12:31 +000025#include <thrift/server/TServer.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040026#include <thrift/transport/PlatformSocket.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000027#include <thrift/transport/TBufferTransports.h>
28#include <thrift/transport/TSocket.h>
Divya Thaluru808d1432017-08-06 16:36:36 -070029#include <thrift/transport/TNonblockingServerTransport.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000030#include <thrift/concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000031#include <climits>
Roger Meier49ff8b12012-04-13 09:12:31 +000032#include <thrift/concurrency/Thread.h>
cyyca8af9b2019-01-11 22:13:12 +080033#include <thrift/concurrency/ThreadFactory.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000034#include <thrift/concurrency/Mutex.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000035#include <stack>
Jake Farrellb0d95602011-12-06 01:17:26 +000036#include <vector>
David Reiss9b209552008-04-08 06:26:05 +000037#include <string>
David Reissd7a16f42008-02-19 22:47:29 +000038#include <cstdlib>
uv7471252cf32024-05-11 11:14:25 +080039#include <unordered_set>
Bryan Duxbury266b1732011-09-01 16:50:28 +000040#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000041#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000042#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000043#include <event.h>
Ben Craig7207c222015-07-06 08:40:35 -050044#include <event2/event_compat.h>
45#include <event2/event_struct.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000046
Konrad Grochowski16a23a62014-11-13 15:33:38 +010047namespace apache {
48namespace thrift {
49namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000050
T Jake Lucianib5e62212009-01-31 22:36:20 +000051using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000052using apache::thrift::transport::TSocket;
Divya Thaluru808d1432017-08-06 16:36:36 -070053using apache::thrift::transport::TNonblockingServerTransport;
T Jake Lucianib5e62212009-01-31 22:36:20 +000054using apache::thrift::protocol::TProtocol;
55using apache::thrift::concurrency::Runnable;
56using apache::thrift::concurrency::ThreadManager;
cyyca8af9b2019-01-11 22:13:12 +080057using apache::thrift::concurrency::ThreadFactory;
Jake Farrellb0d95602011-12-06 01:17:26 +000058using apache::thrift::concurrency::Thread;
59using apache::thrift::concurrency::Mutex;
60using apache::thrift::concurrency::Guard;
Mark Slee2f6404d2006-10-10 01:37:40 +000061
Roger Meier30aae0c2011-07-08 12:23:31 +000062#ifdef LIBEVENT_VERSION_NUMBER
63#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
64#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
65#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
66#else
67// assume latest version 1 series
68#define LIBEVENT_VERSION_MAJOR 1
69#define LIBEVENT_VERSION_MINOR 14
70#define LIBEVENT_VERSION_REL 13
Konrad Grochowski16a23a62014-11-13 15:33:38 +010071#define LIBEVENT_VERSION_NUMBER \
72 ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
Roger Meier30aae0c2011-07-08 12:23:31 +000073#endif
74
75#if LIBEVENT_VERSION_NUMBER < 0x02000000
Konrad Grochowski16a23a62014-11-13 15:33:38 +010076typedef THRIFT_SOCKET evutil_socket_t;
Roger Meier30aae0c2011-07-08 12:23:31 +000077#endif
78
79#ifndef SOCKOPT_CAST_T
Konrad Grochowski16a23a62014-11-13 15:33:38 +010080#ifndef _WIN32
81#define SOCKOPT_CAST_T void
82#else
83#define SOCKOPT_CAST_T char
84#endif // _WIN32
Roger Meier30aae0c2011-07-08 12:23:31 +000085#endif
86
Konrad Grochowski16a23a62014-11-13 15:33:38 +010087template <class T>
Roger Meier30aae0c2011-07-08 12:23:31 +000088inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
89 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
90}
91
Konrad Grochowski16a23a62014-11-13 15:33:38 +010092template <class T>
Roger Meier30aae0c2011-07-08 12:23:31 +000093inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
94 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
95}
96
Mark Slee2f6404d2006-10-10 01:37:40 +000097/**
Jake Farrellb0d95602011-12-06 01:17:26 +000098 * This is a non-blocking server in C++ for high performance that
99 * operates a set of IO threads (by default only one). It assumes that
100 * all incoming requests are framed with a 4 byte length indicator and
101 * writes out responses using the same framing.
Mark Slee2f6404d2006-10-10 01:37:40 +0000102 */
David Reiss01fe1532010-03-09 05:19:25 +0000103
David Reiss01fe1532010-03-09 05:19:25 +0000104/// Overload condition actions.
105enum TOverloadAction {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100106 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
107 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
108 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
David Reiss01fe1532010-03-09 05:19:25 +0000109};
110
Jake Farrellb0d95602011-12-06 01:17:26 +0000111class TNonblockingIOThread;
112
Mark Slee2f6404d2006-10-10 01:37:40 +0000113class TNonblockingServer : public TServer {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100114private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000115 class TConnection;
116
Jake Farrellb0d95602011-12-06 01:17:26 +0000117 friend class TNonblockingIOThread;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100118
119private:
David Reiss01fe1532010-03-09 05:19:25 +0000120 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +0000121 static const int LISTEN_BACKLOG = 1024;
122
David Reiss01fe1532010-03-09 05:19:25 +0000123 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000124 static const size_t CONNECTION_STACK_LIMIT = 1024;
125
Roger Meier3781c242011-12-11 20:07:21 +0000126 /// Default limit on frame size
127 static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
128
David Reiss01fe1532010-03-09 05:19:25 +0000129 /// Default limit on total number of connected sockets
130 static const int MAX_CONNECTIONS = INT_MAX;
131
132 /// Default limit on connections in handler/task processing
133 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
134
David Reiss89a12942010-10-06 17:10:52 +0000135 /// Default size of write buffer
136 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
137
David Reiss54bec5d2010-10-06 17:10:45 +0000138 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
139 static const int IDLE_READ_BUFFER_LIMIT = 1024;
140
141 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
142 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
143
144 /// # of calls before resizing oversized buffers (0 = check only on close)
145 static const int RESIZE_BUFFER_EVERY_N = 512;
146
Jake Farrellb0d95602011-12-06 01:17:26 +0000147 /// # of IO threads to use by default
148 static const int DEFAULT_IO_THREADS = 1;
149
Jake Farrellb0d95602011-12-06 01:17:26 +0000150 /// # of IO threads this server will use
151 size_t numIOThreads_;
152
153 /// Whether to set high scheduling priority for IO threads
154 bool useHighPriorityIOThreads_;
155
David Reiss01fe1532010-03-09 05:19:25 +0000156 /// Server socket file descriptor
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400157 THRIFT_SOCKET serverSocket_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000158
Roger Meier6f2a5032013-07-08 23:35:25 +0200159 /// The optional user-provided event-base (for single-thread servers)
160 event_base* userEventBase_;
161
zeshuai00726681fb2020-06-03 17:24:38 +0800162 /// For processing via thread pool, may be nullptr
cyy316723a2019-01-05 16:35:14 +0800163 std::shared_ptr<ThreadManager> threadManager_;
Mark Sleee02385b2007-06-09 01:21:16 +0000164
David Reiss01fe1532010-03-09 05:19:25 +0000165 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000166 bool threadPoolProcessing_;
167
Jake Farrellb0d95602011-12-06 01:17:26 +0000168 // Factory to create the IO threads
cyyca8af9b2019-01-11 22:13:12 +0800169 std::shared_ptr<ThreadFactory> ioThreadFactory_;
Mark Slee79b16942007-11-26 19:05:29 +0000170
Jake Farrellb0d95602011-12-06 01:17:26 +0000171 // Vector of IOThread objects that will handle our IO
cyy316723a2019-01-05 16:35:14 +0800172 std::vector<std::shared_ptr<TNonblockingIOThread> > ioThreads_;
Mark Slee79b16942007-11-26 19:05:29 +0000173
Jake Farrellb0d95602011-12-06 01:17:26 +0000174 // Index of next IO Thread to be used (for round-robin)
Roger Meierd0cdecf2011-12-08 19:34:01 +0000175 uint32_t nextIOThread_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000176
177 // Synchronizes access to connection stack and similar data
178 Mutex connMutex_;
David Reiss01fe1532010-03-09 05:19:25 +0000179
180 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000181 size_t numTConnections_;
182
David Reiss9e8073c2010-03-09 05:19:39 +0000183 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000184 size_t numActiveProcessors_;
185
186 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000187 size_t connectionStackLimit_;
188
David Reiss01fe1532010-03-09 05:19:25 +0000189 /// Limit for number of connections processing or waiting to process
190 size_t maxActiveProcessors_;
191
192 /// Limit for number of open connections
193 size_t maxConnections_;
194
Roger Meier3781c242011-12-11 20:07:21 +0000195 /// Limit for frame size
196 size_t maxFrameSize_;
197
David Reiss068f4162010-03-09 05:19:45 +0000198 /// Time in milliseconds before an unperformed task expires (0 == infinite).
199 int64_t taskExpireTime_;
200
David Reiss01fe1532010-03-09 05:19:25 +0000201 /**
202 * Hysteresis for overload state. This is the fraction of the overload
203 * value that needs to be reached before the overload state is cleared;
204 * must be <= 1.0.
205 */
206 double overloadHysteresis_;
207
208 /// Action to take when we're overloaded.
209 TOverloadAction overloadAction_;
210
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000211 /**
David Reiss89a12942010-10-06 17:10:52 +0000212 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
213 * and found to be exceeded, reinitialized) to this size.
214 */
215 size_t writeBufferDefaultSize_;
216
217 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000218 * Max read buffer size for an idle TConnection. When we place an idle
219 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000220 * we will free the buffer (such that it will be reinitialized by the next
221 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000222 */
David Reiss54bec5d2010-10-06 17:10:45 +0000223 size_t idleReadBufferLimit_;
224
225 /**
226 * Max write buffer size for an idle connection. When we place an idle
227 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
228 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000229 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
230 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000231 */
232 size_t idleWriteBufferLimit_;
233
234 /**
235 * Every N calls we check the buffer size limits on a connected TConnection.
236 * 0 disables (i.e. the checks are only done when a connection closes).
237 */
238 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000239
240 /// Set if we are currently in an overloaded state.
241 bool overloaded_;
242
243 /// Count of connections dropped since overload started
244 uint32_t nConnectionsDropped_;
245
246 /// Count of connections dropped on overload since server started
247 uint64_t nTotalConnectionsDropped_;
248
Mark Slee2f6404d2006-10-10 01:37:40 +0000249 /**
250 * This is a stack of all the objects that have been created but that
251 * are NOT currently in use. When we close a connection, we place it on this
252 * stack so that the object can be reused later, rather than freeing the
253 * memory and reallocating a new object later.
254 */
255 std::stack<TConnection*> connectionStack_;
256
David Reiss01fe1532010-03-09 05:19:25 +0000257 /**
Roger Meier0c04fcc2013-03-22 19:52:08 +0100258 * This container holds pointers to all active connections. This container
259 * allows the server to clean up unlcosed connection objects at destruction,
260 * which in turn allows their transports, protocols, processors and handlers
261 * to deallocate and clean up correctly.
262 */
uv7471252cf32024-05-11 11:14:25 +0800263 std::unordered_set<TConnection*> activeConnections_;
Roger Meier0c04fcc2013-03-22 19:52:08 +0100264
Divya Thaluru808d1432017-08-06 16:36:36 -0700265 /*
266 */
cyy316723a2019-01-05 16:35:14 +0800267 std::shared_ptr<TNonblockingServerTransport> serverTransport_;
Divya Thaluru808d1432017-08-06 16:36:36 -0700268
Roger Meier0c04fcc2013-03-22 19:52:08 +0100269 /**
David Reiss01fe1532010-03-09 05:19:25 +0000270 * Called when server socket had something happen. We accept all waiting
271 * client connections on listen socket fd and assign TConnection objects
272 * to handle those requests.
273 *
David Reiss01fe1532010-03-09 05:19:25 +0000274 * @param which the event flag that triggered the handler.
275 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400276 void handleEvent(THRIFT_SOCKET fd, short which);
Mark Slee2f6404d2006-10-10 01:37:40 +0000277
Divya Thaluru808d1432017-08-06 16:36:36 -0700278 void init() {
Roger Meier0be9ffa2013-07-19 21:10:01 +0200279 serverSocket_ = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +0000280 numIOThreads_ = DEFAULT_IO_THREADS;
281 nextIOThread_ = 0;
282 useHighPriorityIOThreads_ = false;
Sebastian Zenker042580f2019-01-29 15:48:12 +0100283 userEventBase_ = nullptr;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000284 threadPoolProcessing_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000285 numTConnections_ = 0;
286 numActiveProcessors_ = 0;
287 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
288 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
289 maxConnections_ = MAX_CONNECTIONS;
Roger Meier3781c242011-12-11 20:07:21 +0000290 maxFrameSize_ = MAX_FRAME_SIZE;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000291 taskExpireTime_ = 0;
292 overloadHysteresis_ = 0.8;
293 overloadAction_ = T_OVERLOAD_NO_ACTION;
294 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
295 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
296 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
297 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
298 overloaded_ = false;
299 nConnectionsDropped_ = 0;
300 nTotalConnectionsDropped_ = 0;
301 }
Mark Sleef9373392007-01-24 19:41:57 +0000302
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100303public:
cyy316723a2019-01-05 16:35:14 +0800304 TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
305 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
Divya Thaluru808d1432017-08-06 16:36:36 -0700306 : TServer(processorFactory), serverTransport_(serverTransport) {
307 init();
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000308 }
309
cyy316723a2019-01-05 16:35:14 +0800310 TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
311 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
Divya Thaluru808d1432017-08-06 16:36:36 -0700312 : TServer(processor), serverTransport_(serverTransport) {
313 init();
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000314 }
315
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000316
cyy316723a2019-01-05 16:35:14 +0800317 TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
318 const std::shared_ptr<TProtocolFactory>& protocolFactory,
319 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
320 const std::shared_ptr<ThreadManager>& threadManager
321 = std::shared_ptr<ThreadManager>())
James E. King, III82ae9572017-08-05 12:23:54 -0400322 : TServer(processorFactory), serverTransport_(serverTransport) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700323 init();
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000324
325 setInputProtocolFactory(protocolFactory);
326 setOutputProtocolFactory(protocolFactory);
327 setThreadManager(threadManager);
328 }
329
cyy316723a2019-01-05 16:35:14 +0800330 TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
331 const std::shared_ptr<TProtocolFactory>& protocolFactory,
332 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
333 const std::shared_ptr<ThreadManager>& threadManager
334 = std::shared_ptr<ThreadManager>())
Divya Thaluru808d1432017-08-06 16:36:36 -0700335 : TServer(processor), serverTransport_(serverTransport) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700336 init();
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000337
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000338 setInputProtocolFactory(protocolFactory);
339 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000340 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000341 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000342
cyy316723a2019-01-05 16:35:14 +0800343 TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
344 const std::shared_ptr<TTransportFactory>& inputTransportFactory,
345 const std::shared_ptr<TTransportFactory>& outputTransportFactory,
346 const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
347 const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
348 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
349 const std::shared_ptr<ThreadManager>& threadManager
350 = std::shared_ptr<ThreadManager>())
Divya Thaluru808d1432017-08-06 16:36:36 -0700351 : TServer(processorFactory), serverTransport_(serverTransport) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700352 init();
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000353
354 setInputTransportFactory(inputTransportFactory);
355 setOutputTransportFactory(outputTransportFactory);
356 setInputProtocolFactory(inputProtocolFactory);
357 setOutputProtocolFactory(outputProtocolFactory);
358 setThreadManager(threadManager);
359 }
360
cyy316723a2019-01-05 16:35:14 +0800361 TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
362 const std::shared_ptr<TTransportFactory>& inputTransportFactory,
363 const std::shared_ptr<TTransportFactory>& outputTransportFactory,
364 const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
365 const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
366 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
367 const std::shared_ptr<ThreadManager>& threadManager
368 = std::shared_ptr<ThreadManager>())
Divya Thaluru808d1432017-08-06 16:36:36 -0700369 : TServer(processor), serverTransport_(serverTransport) {
Divya Thaluru808d1432017-08-06 16:36:36 -0700370 init();
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000371
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000372 setInputTransportFactory(inputTransportFactory);
373 setOutputTransportFactory(outputTransportFactory);
374 setInputProtocolFactory(inputProtocolFactory);
375 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000376 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000377 }
Mark Slee79b16942007-11-26 19:05:29 +0000378
Sebastian Zenker042580f2019-01-29 15:48:12 +0100379 ~TNonblockingServer() override;
Mark Slee2f6404d2006-10-10 01:37:40 +0000380
cyy316723a2019-01-05 16:35:14 +0800381 void setThreadManager(std::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000382
Divya Thaluru808d1432017-08-06 16:36:36 -0700383 int getListenPort() { return serverTransport_->getListenPort(); }
Nobuaki Sukegawad0d7a652014-12-07 21:36:51 +0900384
cyy316723a2019-01-05 16:35:14 +0800385 std::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }
David Reiss1997f102008-04-29 00:29:41 +0000386
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000387 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000388 * Sets the number of IO threads used by this server. Can only be used before
cyyca8af9b2019-01-11 22:13:12 +0800389 * the call to serve() and has no effect afterwards.
Jake Farrellb0d95602011-12-06 01:17:26 +0000390 */
Nobuaki Sukegawa8016af82015-01-02 23:14:22 +0900391 void setNumIOThreads(size_t numThreads) {
392 numIOThreads_ = numThreads;
393 // User-provided event-base doesn't works for multi-threaded servers
394 assert(numIOThreads_ <= 1 || !userEventBase_);
395 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000396
397 /** Return whether the IO threads will get high scheduling priority */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100398 bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000399
400 /** Set whether the IO threads will get high scheduling priority. */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100401 void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000402
403 /** Return the number of IO threads used by this server. */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100404 size_t getNumIOThreads() const { return numIOThreads_; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000405
406 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000407 * Get the maximum number of unused TConnection we will hold in reserve.
408 *
409 * @return the current limit on TConnection pool size.
410 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100411 size_t getConnectionStackLimit() const { return connectionStackLimit_; }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000412
413 /**
414 * Set the maximum number of unused TConnection we will hold in reserve.
415 *
416 * @param sz the new limit for TConnection pool size.
417 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100418 void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000419
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100420 bool isThreadPoolProcessing() const { return threadPoolProcessing_; }
Mark Sleee02385b2007-06-09 01:21:16 +0000421
cyy316723a2019-01-05 16:35:14 +0800422 void addTask(std::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000423 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000424 }
425
David Reiss01fe1532010-03-09 05:19:25 +0000426 /**
427 * Return the count of sockets currently connected to.
428 *
429 * @return count of connected sockets.
430 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100431 size_t getNumConnections() const { return numTConnections_; }
David Reiss1997f102008-04-29 00:29:41 +0000432
David Reiss01fe1532010-03-09 05:19:25 +0000433 /**
Roger Meierec8027f2012-04-11 21:43:25 +0000434 * Return the count of sockets currently connected to.
435 *
436 * @return count of connected sockets.
437 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100438 size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }
Roger Meierec8027f2012-04-11 21:43:25 +0000439
440 /**
David Reiss01fe1532010-03-09 05:19:25 +0000441 * Return the count of connection objects allocated but not in use.
442 *
443 * @return count of idle connection objects.
444 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100445 size_t getNumIdleConnections() const { return connectionStack_.size(); }
David Reiss1997f102008-04-29 00:29:41 +0000446
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000447 /**
David Reiss01fe1532010-03-09 05:19:25 +0000448 * Return count of number of connections which are currently processing.
449 * This is defined as a connection where all data has been received and
450 * either assigned a task (when threading) or passed to a handler (when
451 * not threading), and where the handler has not yet returned.
452 *
453 * @return # of connections currently processing.
454 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100455 size_t getNumActiveProcessors() const { return numActiveProcessors_; }
David Reiss01fe1532010-03-09 05:19:25 +0000456
457 /// Increment the count of connections currently processing.
458 void incrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000459 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000460 ++numActiveProcessors_;
461 }
462
463 /// Decrement the count of connections currently processing.
464 void decrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000465 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000466 if (numActiveProcessors_ > 0) {
467 --numActiveProcessors_;
468 }
469 }
470
471 /**
472 * Get the maximum # of connections allowed before overload.
473 *
474 * @return current setting.
475 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100476 size_t getMaxConnections() const { return maxConnections_; }
David Reiss01fe1532010-03-09 05:19:25 +0000477
478 /**
479 * Set the maximum # of connections allowed before overload.
480 *
481 * @param maxConnections new setting for maximum # of connections.
482 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100483 void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }
David Reiss01fe1532010-03-09 05:19:25 +0000484
485 /**
486 * Get the maximum # of connections waiting in handler/task before overload.
487 *
488 * @return current setting.
489 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100490 size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }
David Reiss01fe1532010-03-09 05:19:25 +0000491
492 /**
493 * Set the maximum # of connections waiting in handler/task before overload.
494 *
495 * @param maxActiveProcessors new setting for maximum # of active processes.
496 */
497 void setMaxActiveProcessors(size_t maxActiveProcessors) {
498 maxActiveProcessors_ = maxActiveProcessors;
499 }
500
501 /**
Roger Meier3781c242011-12-11 20:07:21 +0000502 * Get the maximum allowed frame size.
503 *
504 * If a client tries to send a message larger than this limit,
505 * its connection will be closed.
506 *
507 * @return Maxium frame size, in bytes.
508 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100509 size_t getMaxFrameSize() const { return maxFrameSize_; }
Roger Meier3781c242011-12-11 20:07:21 +0000510
511 /**
512 * Set the maximum allowed frame size.
513 *
514 * @param maxFrameSize The new maximum frame size.
515 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100516 void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
Roger Meier3781c242011-12-11 20:07:21 +0000517
518 /**
David Reiss01fe1532010-03-09 05:19:25 +0000519 * Get fraction of maximum limits before an overload condition is cleared.
520 *
521 * @return hysteresis fraction
522 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100523 double getOverloadHysteresis() const { return overloadHysteresis_; }
David Reiss01fe1532010-03-09 05:19:25 +0000524
525 /**
526 * Set fraction of maximum limits before an overload condition is cleared.
527 * A good value would probably be between 0.5 and 0.9.
528 *
529 * @param hysteresisFraction fraction <= 1.0.
530 */
531 void setOverloadHysteresis(double hysteresisFraction) {
532 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
533 overloadHysteresis_ = hysteresisFraction;
534 }
535 }
536
537 /**
538 * Get the action the server will take on overload.
539 *
540 * @return a TOverloadAction enum value for the currently set action.
541 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100542 TOverloadAction getOverloadAction() const { return overloadAction_; }
David Reiss01fe1532010-03-09 05:19:25 +0000543
544 /**
545 * Set the action the server is to take on overload.
546 *
547 * @param overloadAction a TOverloadAction enum value for the action.
548 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100549 void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }
David Reiss01fe1532010-03-09 05:19:25 +0000550
551 /**
David Reiss068f4162010-03-09 05:19:45 +0000552 * Get the time in milliseconds after which a task expires (0 == infinite).
553 *
554 * @return a 64-bit time in milliseconds.
555 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100556 int64_t getTaskExpireTime() const { return taskExpireTime_; }
David Reiss068f4162010-03-09 05:19:45 +0000557
558 /**
559 * Set the time in milliseconds after which a task expires (0 == infinite).
560 *
561 * @param taskExpireTime a 64-bit time in milliseconds.
562 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100563 void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }
David Reiss068f4162010-03-09 05:19:45 +0000564
565 /**
David Reiss01fe1532010-03-09 05:19:25 +0000566 * Determine if the server is currently overloaded.
567 * This function checks the maximums for open connections and connections
568 * currently in processing, and sets an overload condition if they are
569 * exceeded. The overload will persist until both values are below the
570 * current hysteresis fraction of their maximums.
571 *
572 * @return true if an overload condition exists, false if not.
573 */
574 bool serverOverloaded();
575
576 /** Pop and discard next task on threadpool wait queue.
577 *
578 * @return true if a task was discarded, false if the wait queue was empty.
579 */
580 bool drainPendingTask();
581
582 /**
David Reiss89a12942010-10-06 17:10:52 +0000583 * Get the starting size of a TConnection object's write buffer.
584 *
585 * @return # bytes we initialize a TConnection object's write buffer to.
586 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100587 size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }
David Reiss89a12942010-10-06 17:10:52 +0000588
589 /**
590 * Set the starting size of a TConnection object's write buffer.
591 *
592 * @param size # bytes we initialize a TConnection object's write buffer to.
593 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100594 void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }
David Reiss89a12942010-10-06 17:10:52 +0000595
596 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000597 * Get the maximum size of read buffer allocated to idle TConnection objects.
598 *
David Reiss89a12942010-10-06 17:10:52 +0000599 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000600 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100601 size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }
David Reiss54bec5d2010-10-06 17:10:45 +0000602
603 /**
604 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
605 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000606 *
David Reiss89a12942010-10-06 17:10:52 +0000607 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000608 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100609 size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000610
611 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000612 * Set the maximum size read buffer allocated to idle TConnection objects.
613 * If a TConnection object is found (either on connection close or between
614 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000615 * allocated to its read buffer, we free it and allow it to be reinitialized
616 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000617 *
618 * @param limit of bytes beyond which we will shrink buffers when checked.
619 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100620 void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }
David Reiss54bec5d2010-10-06 17:10:45 +0000621
622 /**
623 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
624 * Set the maximum size read buffer allocated to idle TConnection objects.
625 * If a TConnection object is found (either on connection close or between
626 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000627 * allocated to its read buffer, we free it and allow it to be reinitialized
628 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000629 *
630 * @param limit of bytes beyond which we will shrink buffers when checked.
631 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100632 void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }
David Reiss54bec5d2010-10-06 17:10:45 +0000633
634 /**
635 * Get the maximum size of write buffer allocated to idle TConnection objects.
636 *
637 * @return # bytes beyond which we will reallocate buffers when checked.
638 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100639 size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }
David Reiss54bec5d2010-10-06 17:10:45 +0000640
641 /**
642 * Set the maximum size write buffer allocated to idle TConnection objects.
643 * If a TConnection object is found (either on connection close or between
644 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000645 * allocated to its write buffer, we destroy and construct that buffer with
646 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000647 *
648 * @param limit of bytes beyond which we will shrink buffers when idle.
649 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100650 void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000651
David Reiss01fe1532010-03-09 05:19:25 +0000652 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000653 * Get # of calls made between buffer size checks. 0 means disabled.
654 *
655 * @return # of calls between buffer size checks.
656 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100657 int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }
David Reiss54bec5d2010-10-06 17:10:45 +0000658
659 /**
660 * Check buffer sizes every "count" calls. This allows buffer limits
Konrad Grochowski3b5dacb2014-11-24 10:55:31 +0100661 * to be enforced for persistent connections with a controllable degree
David Reiss54bec5d2010-10-06 17:10:45 +0000662 * of overhead. 0 disables checks except at connection close.
663 *
664 * @param count the number of calls between checks, or 0 to disable
665 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100666 void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }
David Reiss54bec5d2010-10-06 17:10:45 +0000667
Jake Farrellb0d95602011-12-06 01:17:26 +0000668 /**
669 * Main workhorse function, starts up the server listening on a port and
670 * loops over the libevent handler.
671 */
Sebastian Zenker042580f2019-01-29 15:48:12 +0100672 void serve() override;
David Reiss54bec5d2010-10-06 17:10:45 +0000673
Jake Farrellb0d95602011-12-06 01:17:26 +0000674 /**
675 * Causes the server to terminate gracefully (can be called from any thread).
676 */
Sebastian Zenker042580f2019-01-29 15:48:12 +0100677 void stop() override;
David Reiss54bec5d2010-10-06 17:10:45 +0000678
Jake Farrellb0d95602011-12-06 01:17:26 +0000679 /// Creates a socket to listen on and binds it to the local port.
680 void createAndListenOnSocket();
James E. King, III82ae9572017-08-05 12:23:54 -0400681
Roger Meier6f2a5032013-07-08 23:35:25 +0200682 /**
683 * Register the optional user-provided event-base (for single-thread servers)
684 *
685 * This method should be used when the server is running in a single-thread
686 * mode, and the event base is provided by the user (i.e., the caller).
687 *
688 * @param user_event_base the user-provided event-base. The user is
689 * responsible for freeing the event base memory.
690 */
691 void registerEvents(event_base* user_event_base);
692
693 /**
694 * Returns the optional user-provided event-base (for single-thread servers).
695 */
696 event_base* getUserEventBase() const { return userEventBase_; }
697
Dave Watson792db4e2015-01-16 11:22:01 -0800698 /** Some transports, like THeaderTransport, require passing through
699 * the framing size instead of stripping it.
700 */
701 bool getHeaderTransport();
702
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100703private:
Roger Meier6f2a5032013-07-08 23:35:25 +0200704 /**
705 * Callback function that the threadmanager calls when a task reaches
706 * its expiration time. It is needed to clean up the expired connection.
707 *
708 * @param task the runnable associated with the expired task.
709 */
cyy316723a2019-01-05 16:35:14 +0800710 void expireClose(std::shared_ptr<Runnable> task);
Roger Meier6f2a5032013-07-08 23:35:25 +0200711
David Reiss54bec5d2010-10-06 17:10:45 +0000712 /**
David Reiss01fe1532010-03-09 05:19:25 +0000713 * Return an initialized connection object. Creates or recovers from
714 * pool a TConnection and initializes it with the provided socket FD
715 * and flags.
716 *
717 * @param socket FD of socket associated with this connection.
David Reiss105961d2010-10-06 17:10:17 +0000718 * @param addr the sockaddr of the client
719 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000720 * @return pointer to initialized TConnection object.
721 */
cyy316723a2019-01-05 16:35:14 +0800722 TConnection* createConnection(std::shared_ptr<TSocket> socket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000723
David Reiss01fe1532010-03-09 05:19:25 +0000724 /**
725 * Returns a connection to pool or deletion. If the connection pool
726 * (a stack) isn't full, place the connection object on it, otherwise
727 * just delete it.
728 *
729 * @param connection the TConection being returned.
730 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000731 void returnConnection(TConnection* connection);
Jake Farrellb0d95602011-12-06 01:17:26 +0000732};
Mark Slee2f6404d2006-10-10 01:37:40 +0000733
Jake Farrellb0d95602011-12-06 01:17:26 +0000734class TNonblockingIOThread : public Runnable {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100735public:
Jake Farrellb0d95602011-12-06 01:17:26 +0000736 // Creates an IO thread and sets up the event base. The listenSocket should
737 // be a valid FD on which listen() has already been called. If the
738 // listenSocket is < 0, accepting will not be done.
739 TNonblockingIOThread(TNonblockingServer* server,
740 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400741 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +0000742 bool useHighPriority);
743
Sebastian Zenker042580f2019-01-29 15:48:12 +0100744 ~TNonblockingIOThread() override;
Jake Farrellb0d95602011-12-06 01:17:26 +0000745
746 // Returns the event-base for this thread.
747 event_base* getEventBase() const { return eventBase_; }
748
749 // Returns the server for this thread.
750 TNonblockingServer* getServer() const { return server_; }
751
752 // Returns the number of this IO thread.
753 int getThreadNumber() const { return number_; }
754
755 // Returns the thread id associated with this object. This should
756 // only be called after the thread has been started.
Roger Meier12d70532011-12-14 23:35:28 +0000757 Thread::id_t getThreadId() const { return threadId_; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000758
759 // Returns the send-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000760 evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000761
762 // Returns the read-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000763 evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000764
765 // Returns the actual thread object associated with this IO thread.
cyy316723a2019-01-05 16:35:14 +0800766 std::shared_ptr<Thread> getThread() const { return thread_; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000767
768 // Sets the actual thread object associated with this IO thread.
cyy316723a2019-01-05 16:35:14 +0800769 void setThread(const std::shared_ptr<Thread>& t) { thread_ = t; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000770
771 // Used by TConnection objects to indicate processing has finished.
772 bool notify(TNonblockingServer::TConnection* conn);
773
774 // Enters the event loop and does not return until a call to stop().
Sebastian Zenker042580f2019-01-29 15:48:12 +0100775 void run() override;
Jake Farrellb0d95602011-12-06 01:17:26 +0000776
777 // Exits the event loop as soon as possible.
778 void stop();
779
780 // Ensures that the event-loop thread is fully finished and shut down.
781 void join();
782
Roger Meier6f2a5032013-07-08 23:35:25 +0200783 /// Registers the events for the notification & listen sockets
784 void registerEvents();
785
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100786private:
David Reiss01fe1532010-03-09 05:19:25 +0000787 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000788 * C-callable event handler for signaling task completion. Provides a
789 * callback that libevent can understand that will read a connection
790 * object's address from a pipe and call connection->transition() for
791 * that object.
David Reiss068f4162010-03-09 05:19:45 +0000792 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000793 * @param fd the descriptor the event occurred on.
David Reiss068f4162010-03-09 05:19:45 +0000794 */
Roger Meier12d70532011-12-14 23:35:28 +0000795 static void notifyHandler(evutil_socket_t fd, short which, void* v);
David Reiss068f4162010-03-09 05:19:45 +0000796
797 /**
David Reiss01fe1532010-03-09 05:19:25 +0000798 * C-callable event handler for listener events. Provides a callback
799 * that libevent can understand which invokes server->handleEvent().
800 *
Konrad Grochowski3b5dacb2014-11-24 10:55:31 +0100801 * @param fd the descriptor the event occurred on.
David Reiss01fe1532010-03-09 05:19:25 +0000802 * @param which the flags associated with the event.
803 * @param v void* callback arg where we placed TNonblockingServer's "this".
804 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000805 static void listenHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000806 ((TNonblockingServer*)v)->handleEvent(fd, which);
807 }
808
Jake Farrellb0d95602011-12-06 01:17:26 +0000809 /// Exits the loop ASAP in case of shutdown or error.
810 void breakLoop(bool error);
Mark Slee79b16942007-11-26 19:05:29 +0000811
David Reiss01fe1532010-03-09 05:19:25 +0000812 /// Create the pipe used to notify I/O process of task completion.
813 void createNotificationPipe();
814
Jake Farrellb0d95602011-12-06 01:17:26 +0000815 /// Unregisters our events for notification and listen sockets.
816 void cleanupEvents();
David Reiss01fe1532010-03-09 05:19:25 +0000817
Jake Farrellb0d95602011-12-06 01:17:26 +0000818 /// Sets (or clears) high priority scheduling status for the current thread.
819 void setCurrentThreadHighPriority(bool value);
David Reiss01fe1532010-03-09 05:19:25 +0000820
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100821private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000822 /// associated server
823 TNonblockingServer* server_;
Mark Slee79b16942007-11-26 19:05:29 +0000824
Jake Farrellb0d95602011-12-06 01:17:26 +0000825 /// thread number (for debugging).
826 const int number_;
Bryan Duxbury76c43682011-08-24 21:26:48 +0000827
Jake Farrellb0d95602011-12-06 01:17:26 +0000828 /// The actual physical thread id.
Roger Meier12d70532011-12-14 23:35:28 +0000829 Thread::id_t threadId_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000830
831 /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400832 THRIFT_SOCKET listenSocket_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000833
834 /// Sets a high scheduling priority when running
835 bool useHighPriority_;
836
837 /// pointer to eventbase to be used for looping
838 event_base* eventBase_;
839
Roger Meier6f2a5032013-07-08 23:35:25 +0200840 /// Set to true if this class is responsible for freeing the event base
841 /// memory.
842 bool ownEventBase_;
843
Jake Farrellb0d95602011-12-06 01:17:26 +0000844 /// Used with eventBase_ for connection events (only in listener thread)
845 struct event serverEvent_;
846
847 /// Used with eventBase_ for task completion notification
848 struct event notificationEvent_;
849
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100850 /// File descriptors for pipe used for task completion notification.
Roger Meier12d70532011-12-14 23:35:28 +0000851 evutil_socket_t notificationPipeFDs_[2];
Jake Farrellb0d95602011-12-06 01:17:26 +0000852
853 /// Actual IO Thread
cyy316723a2019-01-05 16:35:14 +0800854 std::shared_ptr<Thread> thread_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000855};
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100856}
857}
858} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000859
Jake Farrellb0d95602011-12-06 01:17:26 +0000860#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_