blob: 5f5ea11877a41c7a3d094471e874f6113f675a2e [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Jake Farrellb0d95602011-12-06 01:17:26 +000029#include <concurrency/Thread.h>
Roger Meier12d70532011-12-14 23:35:28 +000030#include <concurrency/PlatformThreadFactory.h>
Jake Farrellb0d95602011-12-06 01:17:26 +000031#include <concurrency/Mutex.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000032#include <stack>
Jake Farrellb0d95602011-12-06 01:17:26 +000033#include <vector>
David Reiss9b209552008-04-08 06:26:05 +000034#include <string>
35#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000036#include <cstdlib>
Bryan Duxbury266b1732011-09-01 16:50:28 +000037#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000038#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000039#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <event.h>
41
Jake Farrellb0d95602011-12-06 01:17:26 +000042
43
T Jake Lucianib5e62212009-01-31 22:36:20 +000044namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000045
T Jake Lucianib5e62212009-01-31 22:36:20 +000046using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000047using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000048using apache::thrift::protocol::TProtocol;
49using apache::thrift::concurrency::Runnable;
50using apache::thrift::concurrency::ThreadManager;
Roger Meier12d70532011-12-14 23:35:28 +000051using apache::thrift::concurrency::PlatformThreadFactory;
Jake Farrellb0d95602011-12-06 01:17:26 +000052using apache::thrift::concurrency::ThreadFactory;
53using apache::thrift::concurrency::Thread;
54using apache::thrift::concurrency::Mutex;
55using apache::thrift::concurrency::Guard;
Mark Slee2f6404d2006-10-10 01:37:40 +000056
Roger Meier30aae0c2011-07-08 12:23:31 +000057#ifdef LIBEVENT_VERSION_NUMBER
58#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
59#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
60#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
61#else
62// assume latest version 1 series
63#define LIBEVENT_VERSION_MAJOR 1
64#define LIBEVENT_VERSION_MINOR 14
65#define LIBEVENT_VERSION_REL 13
66#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
67#endif
68
69#if LIBEVENT_VERSION_NUMBER < 0x02000000
70 typedef int evutil_socket_t;
71#endif
72
73#ifndef SOCKOPT_CAST_T
Roger Meier84e4a3c2011-09-16 20:58:44 +000074# ifndef _WIN32
75# define SOCKOPT_CAST_T void
76# else
77# define SOCKOPT_CAST_T char
78# endif // _WIN32
Roger Meier30aae0c2011-07-08 12:23:31 +000079#endif
80
81template<class T>
82inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
83 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
84}
85
86template<class T>
87inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
88 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
89}
90
Mark Slee2f6404d2006-10-10 01:37:40 +000091/**
Jake Farrellb0d95602011-12-06 01:17:26 +000092 * This is a non-blocking server in C++ for high performance that
93 * operates a set of IO threads (by default only one). It assumes that
94 * all incoming requests are framed with a 4 byte length indicator and
95 * writes out responses using the same framing.
Mark Slee2f6404d2006-10-10 01:37:40 +000096 *
97 * It does not use the TServerTransport framework, but rather has socket
98 * operations hardcoded for use with select.
99 *
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 */
David Reiss01fe1532010-03-09 05:19:25 +0000101
102
103/// Overload condition actions.
104enum TOverloadAction {
105 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
106 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
107 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
108};
109
Jake Farrellb0d95602011-12-06 01:17:26 +0000110class TNonblockingIOThread;
111
Mark Slee2f6404d2006-10-10 01:37:40 +0000112class TNonblockingServer : public TServer {
113 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000114 class TConnection;
115
Jake Farrellb0d95602011-12-06 01:17:26 +0000116 friend class TNonblockingIOThread;
117 private:
David Reiss01fe1532010-03-09 05:19:25 +0000118 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 static const int LISTEN_BACKLOG = 1024;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000122 static const size_t CONNECTION_STACK_LIMIT = 1024;
123
Roger Meier3781c242011-12-11 20:07:21 +0000124 /// Default limit on frame size
125 static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
126
David Reiss01fe1532010-03-09 05:19:25 +0000127 /// Default limit on total number of connected sockets
128 static const int MAX_CONNECTIONS = INT_MAX;
129
130 /// Default limit on connections in handler/task processing
131 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
132
David Reiss89a12942010-10-06 17:10:52 +0000133 /// Default size of write buffer
134 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
135
David Reiss54bec5d2010-10-06 17:10:45 +0000136 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
137 static const int IDLE_READ_BUFFER_LIMIT = 1024;
138
139 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
140 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
141
142 /// # of calls before resizing oversized buffers (0 = check only on close)
143 static const int RESIZE_BUFFER_EVERY_N = 512;
144
Jake Farrellb0d95602011-12-06 01:17:26 +0000145 /// # of IO threads to use by default
146 static const int DEFAULT_IO_THREADS = 1;
147
148 /// File descriptor of an invalid socket
Roger Meier12d70532011-12-14 23:35:28 +0000149 static const int INVALID_SOCKET_VALUE = -1;
Jake Farrellb0d95602011-12-06 01:17:26 +0000150
151 /// # of IO threads this server will use
152 size_t numIOThreads_;
153
154 /// Whether to set high scheduling priority for IO threads
155 bool useHighPriorityIOThreads_;
156
David Reiss01fe1532010-03-09 05:19:25 +0000157 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 int serverSocket_;
159
David Reiss01fe1532010-03-09 05:19:25 +0000160 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 int port_;
162
David Reiss01fe1532010-03-09 05:19:25 +0000163 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000164 boost::shared_ptr<ThreadManager> threadManager_;
165
David Reiss01fe1532010-03-09 05:19:25 +0000166 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000167 bool threadPoolProcessing_;
168
Jake Farrellb0d95602011-12-06 01:17:26 +0000169 // Factory to create the IO threads
Roger Meier12d70532011-12-14 23:35:28 +0000170 boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
Mark Slee79b16942007-11-26 19:05:29 +0000171
Jake Farrellb0d95602011-12-06 01:17:26 +0000172 // Vector of IOThread objects that will handle our IO
173 std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
Mark Slee79b16942007-11-26 19:05:29 +0000174
Jake Farrellb0d95602011-12-06 01:17:26 +0000175 // Index of next IO Thread to be used (for round-robin)
Roger Meierd0cdecf2011-12-08 19:34:01 +0000176 uint32_t nextIOThread_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000177
178 // Synchronizes access to connection stack and similar data
179 Mutex connMutex_;
David Reiss01fe1532010-03-09 05:19:25 +0000180
181 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000182 size_t numTConnections_;
183
David Reiss9e8073c2010-03-09 05:19:39 +0000184 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000185 size_t numActiveProcessors_;
186
187 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000188 size_t connectionStackLimit_;
189
David Reiss01fe1532010-03-09 05:19:25 +0000190 /// Limit for number of connections processing or waiting to process
191 size_t maxActiveProcessors_;
192
193 /// Limit for number of open connections
194 size_t maxConnections_;
195
Roger Meier3781c242011-12-11 20:07:21 +0000196 /// Limit for frame size
197 size_t maxFrameSize_;
198
David Reiss068f4162010-03-09 05:19:45 +0000199 /// Time in milliseconds before an unperformed task expires (0 == infinite).
200 int64_t taskExpireTime_;
201
David Reiss01fe1532010-03-09 05:19:25 +0000202 /**
203 * Hysteresis for overload state. This is the fraction of the overload
204 * value that needs to be reached before the overload state is cleared;
205 * must be <= 1.0.
206 */
207 double overloadHysteresis_;
208
209 /// Action to take when we're overloaded.
210 TOverloadAction overloadAction_;
211
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000212 /**
David Reiss89a12942010-10-06 17:10:52 +0000213 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
214 * and found to be exceeded, reinitialized) to this size.
215 */
216 size_t writeBufferDefaultSize_;
217
218 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000219 * Max read buffer size for an idle TConnection. When we place an idle
220 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000221 * we will free the buffer (such that it will be reinitialized by the next
222 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000223 */
David Reiss54bec5d2010-10-06 17:10:45 +0000224 size_t idleReadBufferLimit_;
225
226 /**
227 * Max write buffer size for an idle connection. When we place an idle
228 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
229 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000230 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
231 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000232 */
233 size_t idleWriteBufferLimit_;
234
235 /**
236 * Every N calls we check the buffer size limits on a connected TConnection.
237 * 0 disables (i.e. the checks are only done when a connection closes).
238 */
239 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000240
241 /// Set if we are currently in an overloaded state.
242 bool overloaded_;
243
244 /// Count of connections dropped since overload started
245 uint32_t nConnectionsDropped_;
246
247 /// Count of connections dropped on overload since server started
248 uint64_t nTotalConnectionsDropped_;
249
Mark Slee2f6404d2006-10-10 01:37:40 +0000250 /**
251 * This is a stack of all the objects that have been created but that
252 * are NOT currently in use. When we close a connection, we place it on this
253 * stack so that the object can be reused later, rather than freeing the
254 * memory and reallocating a new object later.
255 */
256 std::stack<TConnection*> connectionStack_;
257
David Reiss01fe1532010-03-09 05:19:25 +0000258 /**
259 * Called when server socket had something happen. We accept all waiting
260 * client connections on listen socket fd and assign TConnection objects
261 * to handle those requests.
262 *
263 * @param fd the listen socket.
264 * @param which the event flag that triggered the handler.
265 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000266 void handleEvent(int fd, short which);
267
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000268 void init(int port) {
269 serverSocket_ = -1;
Jake Farrellb0d95602011-12-06 01:17:26 +0000270 numIOThreads_ = DEFAULT_IO_THREADS;
271 nextIOThread_ = 0;
272 useHighPriorityIOThreads_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000273 port_ = port;
274 threadPoolProcessing_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000275 numTConnections_ = 0;
276 numActiveProcessors_ = 0;
277 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
278 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
279 maxConnections_ = MAX_CONNECTIONS;
Roger Meier3781c242011-12-11 20:07:21 +0000280 maxFrameSize_ = MAX_FRAME_SIZE;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000281 taskExpireTime_ = 0;
282 overloadHysteresis_ = 0.8;
283 overloadAction_ = T_OVERLOAD_NO_ACTION;
284 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
285 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
286 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
287 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
288 overloaded_ = false;
289 nConnectionsDropped_ = 0;
290 nTotalConnectionsDropped_ = 0;
291 }
Mark Sleef9373392007-01-24 19:41:57 +0000292
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000293 public:
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000294 template<typename ProcessorFactory>
295 TNonblockingServer(
296 const boost::shared_ptr<ProcessorFactory>& processorFactory,
297 int port,
298 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
299 TServer(processorFactory) {
300 init(port);
301 }
302
303 template<typename Processor>
304 TNonblockingServer(const boost::shared_ptr<Processor>& processor,
305 int port,
306 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000307 TServer(processor) {
308 init(port);
309 }
310
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000311 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000312 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000313 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000314 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
315 int port,
316 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000317 boost::shared_ptr<ThreadManager>(),
318 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
319 TServer(processorFactory) {
320
321 init(port);
322
323 setInputProtocolFactory(protocolFactory);
324 setOutputProtocolFactory(protocolFactory);
325 setThreadManager(threadManager);
326 }
327
328 template<typename Processor>
329 TNonblockingServer(
330 const boost::shared_ptr<Processor>& processor,
331 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
332 int port,
333 const boost::shared_ptr<ThreadManager>& threadManager =
334 boost::shared_ptr<ThreadManager>(),
335 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000336 TServer(processor) {
337
338 init(port);
339
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000340 setInputProtocolFactory(protocolFactory);
341 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000342 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000343 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000344
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000345 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000346 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000347 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000348 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
349 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
350 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
351 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
352 int port,
353 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000354 boost::shared_ptr<ThreadManager>(),
355 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
356 TServer(processorFactory) {
357
358 init(port);
359
360 setInputTransportFactory(inputTransportFactory);
361 setOutputTransportFactory(outputTransportFactory);
362 setInputProtocolFactory(inputProtocolFactory);
363 setOutputProtocolFactory(outputProtocolFactory);
364 setThreadManager(threadManager);
365 }
366
367 template<typename Processor>
368 TNonblockingServer(
369 const boost::shared_ptr<Processor>& processor,
370 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
371 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
372 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
373 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
374 int port,
375 const boost::shared_ptr<ThreadManager>& threadManager =
376 boost::shared_ptr<ThreadManager>(),
377 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000378 TServer(processor) {
379
380 init(port);
381
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000382 setInputTransportFactory(inputTransportFactory);
383 setOutputTransportFactory(outputTransportFactory);
384 setInputProtocolFactory(inputProtocolFactory);
385 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000386 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000387 }
Mark Slee79b16942007-11-26 19:05:29 +0000388
David Reiss8ede8182010-09-02 15:26:28 +0000389 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000390
David Reiss068f4162010-03-09 05:19:45 +0000391 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000392
David Reiss1997f102008-04-29 00:29:41 +0000393 boost::shared_ptr<ThreadManager> getThreadManager() {
394 return threadManager_;
395 }
396
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000397 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000398 * Sets the number of IO threads used by this server. Can only be used before
399 * the call to serve() and has no effect afterwards. We always use a
400 * PosixThreadFactory for the IO worker threads, because they must joinable
401 * for clean shutdown.
402 */
403 void setNumIOThreads(size_t numThreads) {
404 numIOThreads_ = numThreads;
405 }
406
407 /** Return whether the IO threads will get high scheduling priority */
408 bool useHighPriorityIOThreads() const {
409 return useHighPriorityIOThreads_;
410 }
411
412 /** Set whether the IO threads will get high scheduling priority. */
413 void setUseHighPriorityIOThreads(bool val) {
414 useHighPriorityIOThreads_ = val;
415 }
416
417 /** Return the number of IO threads used by this server. */
418 size_t getNumIOThreads() const {
419 return numIOThreads_;
420 }
421
422 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000423 * Get the maximum number of unused TConnection we will hold in reserve.
424 *
425 * @return the current limit on TConnection pool size.
426 */
David Reiss260fa932009-04-02 23:51:39 +0000427 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000428 return connectionStackLimit_;
429 }
430
431 /**
432 * Set the maximum number of unused TConnection we will hold in reserve.
433 *
434 * @param sz the new limit for TConnection pool size.
435 */
David Reiss260fa932009-04-02 23:51:39 +0000436 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000437 connectionStackLimit_ = sz;
438 }
439
Mark Slee79b16942007-11-26 19:05:29 +0000440 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000441 return threadPoolProcessing_;
442 }
443
444 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000445 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000446 }
447
David Reiss01fe1532010-03-09 05:19:25 +0000448 /**
449 * Return the count of sockets currently connected to.
450 *
451 * @return count of connected sockets.
452 */
453 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000454 return numTConnections_;
455 }
456
David Reiss01fe1532010-03-09 05:19:25 +0000457 /**
458 * Return the count of connection objects allocated but not in use.
459 *
460 * @return count of idle connection objects.
461 */
462 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000463 return connectionStack_.size();
464 }
465
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000466 /**
David Reiss01fe1532010-03-09 05:19:25 +0000467 * Return count of number of connections which are currently processing.
468 * This is defined as a connection where all data has been received and
469 * either assigned a task (when threading) or passed to a handler (when
470 * not threading), and where the handler has not yet returned.
471 *
472 * @return # of connections currently processing.
473 */
474 size_t getNumActiveProcessors() const {
475 return numActiveProcessors_;
476 }
477
478 /// Increment the count of connections currently processing.
479 void incrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000480 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000481 ++numActiveProcessors_;
482 }
483
484 /// Decrement the count of connections currently processing.
485 void decrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000486 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000487 if (numActiveProcessors_ > 0) {
488 --numActiveProcessors_;
489 }
490 }
491
492 /**
493 * Get the maximum # of connections allowed before overload.
494 *
495 * @return current setting.
496 */
497 size_t getMaxConnections() const {
498 return maxConnections_;
499 }
500
501 /**
502 * Set the maximum # of connections allowed before overload.
503 *
504 * @param maxConnections new setting for maximum # of connections.
505 */
506 void setMaxConnections(size_t maxConnections) {
507 maxConnections_ = maxConnections;
508 }
509
510 /**
511 * Get the maximum # of connections waiting in handler/task before overload.
512 *
513 * @return current setting.
514 */
515 size_t getMaxActiveProcessors() const {
516 return maxActiveProcessors_;
517 }
518
519 /**
520 * Set the maximum # of connections waiting in handler/task before overload.
521 *
522 * @param maxActiveProcessors new setting for maximum # of active processes.
523 */
524 void setMaxActiveProcessors(size_t maxActiveProcessors) {
525 maxActiveProcessors_ = maxActiveProcessors;
526 }
527
528 /**
Roger Meier3781c242011-12-11 20:07:21 +0000529 * Get the maximum allowed frame size.
530 *
531 * If a client tries to send a message larger than this limit,
532 * its connection will be closed.
533 *
534 * @return Maxium frame size, in bytes.
535 */
536 size_t getMaxFrameSize() const {
537 return maxFrameSize_;
538 }
539
540 /**
541 * Set the maximum allowed frame size.
542 *
543 * @param maxFrameSize The new maximum frame size.
544 */
545 void setMaxFrameSize(size_t maxFrameSize) {
546 maxFrameSize_ = maxFrameSize;
547 }
548
549 /**
David Reiss01fe1532010-03-09 05:19:25 +0000550 * Get fraction of maximum limits before an overload condition is cleared.
551 *
552 * @return hysteresis fraction
553 */
554 double getOverloadHysteresis() const {
555 return overloadHysteresis_;
556 }
557
558 /**
559 * Set fraction of maximum limits before an overload condition is cleared.
560 * A good value would probably be between 0.5 and 0.9.
561 *
562 * @param hysteresisFraction fraction <= 1.0.
563 */
564 void setOverloadHysteresis(double hysteresisFraction) {
565 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
566 overloadHysteresis_ = hysteresisFraction;
567 }
568 }
569
570 /**
571 * Get the action the server will take on overload.
572 *
573 * @return a TOverloadAction enum value for the currently set action.
574 */
575 TOverloadAction getOverloadAction() const {
576 return overloadAction_;
577 }
578
579 /**
580 * Set the action the server is to take on overload.
581 *
582 * @param overloadAction a TOverloadAction enum value for the action.
583 */
584 void setOverloadAction(TOverloadAction overloadAction) {
585 overloadAction_ = overloadAction;
586 }
587
588 /**
David Reiss068f4162010-03-09 05:19:45 +0000589 * Get the time in milliseconds after which a task expires (0 == infinite).
590 *
591 * @return a 64-bit time in milliseconds.
592 */
593 int64_t getTaskExpireTime() const {
594 return taskExpireTime_;
595 }
596
597 /**
598 * Set the time in milliseconds after which a task expires (0 == infinite).
599 *
600 * @param taskExpireTime a 64-bit time in milliseconds.
601 */
602 void setTaskExpireTime(int64_t taskExpireTime) {
603 taskExpireTime_ = taskExpireTime;
604 }
605
606 /**
David Reiss01fe1532010-03-09 05:19:25 +0000607 * Determine if the server is currently overloaded.
608 * This function checks the maximums for open connections and connections
609 * currently in processing, and sets an overload condition if they are
610 * exceeded. The overload will persist until both values are below the
611 * current hysteresis fraction of their maximums.
612 *
613 * @return true if an overload condition exists, false if not.
614 */
615 bool serverOverloaded();
616
617 /** Pop and discard next task on threadpool wait queue.
618 *
619 * @return true if a task was discarded, false if the wait queue was empty.
620 */
621 bool drainPendingTask();
622
623 /**
David Reiss89a12942010-10-06 17:10:52 +0000624 * Get the starting size of a TConnection object's write buffer.
625 *
626 * @return # bytes we initialize a TConnection object's write buffer to.
627 */
628 size_t getWriteBufferDefaultSize() const {
629 return writeBufferDefaultSize_;
630 }
631
632 /**
633 * Set the starting size of a TConnection object's write buffer.
634 *
635 * @param size # bytes we initialize a TConnection object's write buffer to.
636 */
637 void setWriteBufferDefaultSize(size_t size) {
638 writeBufferDefaultSize_ = size;
639 }
640
641 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000642 * Get the maximum size of read buffer allocated to idle TConnection objects.
643 *
David Reiss89a12942010-10-06 17:10:52 +0000644 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000645 */
646 size_t getIdleReadBufferLimit() const {
647 return idleReadBufferLimit_;
648 }
649
650 /**
651 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
652 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000653 *
David Reiss89a12942010-10-06 17:10:52 +0000654 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000655 */
656 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000657 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000658 }
659
660 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000661 * Set the maximum size read buffer allocated to idle TConnection objects.
662 * If a TConnection object is found (either on connection close or between
663 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000664 * allocated to its read buffer, we free it and allow it to be reinitialized
665 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000666 *
667 * @param limit of bytes beyond which we will shrink buffers when checked.
668 */
669 void setIdleReadBufferLimit(size_t limit) {
670 idleReadBufferLimit_ = limit;
671 }
672
673 /**
674 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
675 * Set the maximum size read buffer allocated to idle TConnection objects.
676 * If a TConnection object is found (either on connection close or between
677 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000678 * allocated to its read buffer, we free it and allow it to be reinitialized
679 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000680 *
681 * @param limit of bytes beyond which we will shrink buffers when checked.
682 */
683 void setIdleBufferMemLimit(size_t limit) {
684 idleReadBufferLimit_ = limit;
685 }
686
Jake Farrellb0d95602011-12-06 01:17:26 +0000687
David Reiss54bec5d2010-10-06 17:10:45 +0000688
689 /**
690 * Get the maximum size of write buffer allocated to idle TConnection objects.
691 *
692 * @return # bytes beyond which we will reallocate buffers when checked.
693 */
694 size_t getIdleWriteBufferLimit() const {
695 return idleWriteBufferLimit_;
696 }
697
698 /**
699 * Set the maximum size write buffer allocated to idle TConnection objects.
700 * If a TConnection object is found (either on connection close or between
701 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000702 * allocated to its write buffer, we destroy and construct that buffer with
703 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000704 *
705 * @param limit of bytes beyond which we will shrink buffers when idle.
706 */
David Reiss54bec5d2010-10-06 17:10:45 +0000707 void setIdleWriteBufferLimit(size_t limit) {
708 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000709 }
710
David Reiss01fe1532010-03-09 05:19:25 +0000711 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000712 * Get # of calls made between buffer size checks. 0 means disabled.
713 *
714 * @return # of calls between buffer size checks.
715 */
716 int32_t getResizeBufferEveryN() const {
717 return resizeBufferEveryN_;
718 }
719
720 /**
721 * Check buffer sizes every "count" calls. This allows buffer limits
722 * to be enforced for persistant connections with a controllable degree
723 * of overhead. 0 disables checks except at connection close.
724 *
725 * @param count the number of calls between checks, or 0 to disable
726 */
727 void setResizeBufferEveryN(int32_t count) {
728 resizeBufferEveryN_ = count;
729 }
730
Jake Farrellb0d95602011-12-06 01:17:26 +0000731 /**
732 * Main workhorse function, starts up the server listening on a port and
733 * loops over the libevent handler.
734 */
735 void serve();
David Reiss54bec5d2010-10-06 17:10:45 +0000736
Jake Farrellb0d95602011-12-06 01:17:26 +0000737 /**
738 * Causes the server to terminate gracefully (can be called from any thread).
739 */
740 void stop();
David Reiss54bec5d2010-10-06 17:10:45 +0000741
Jake Farrellb0d95602011-12-06 01:17:26 +0000742 private:
743 /**
744 * Callback function that the threadmanager calls when a task reaches
745 * its expiration time. It is needed to clean up the expired connection.
746 *
747 * @param task the runnable associated with the expired task.
748 */
749 void expireClose(boost::shared_ptr<Runnable> task);
750
751 /// Creates a socket to listen on and binds it to the local port.
752 void createAndListenOnSocket();
753
754 /**
755 * Takes a socket created by createAndListenOnSocket() and sets various
756 * options on it to prepare for use in the server.
757 *
758 * @param fd descriptor of socket to be initialized/
759 */
760 void listenSocket(int fd);
David Reiss54bec5d2010-10-06 17:10:45 +0000761 /**
David Reiss01fe1532010-03-09 05:19:25 +0000762 * Return an initialized connection object. Creates or recovers from
763 * pool a TConnection and initializes it with the provided socket FD
764 * and flags.
765 *
766 * @param socket FD of socket associated with this connection.
David Reiss105961d2010-10-06 17:10:17 +0000767 * @param addr the sockaddr of the client
768 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000769 * @return pointer to initialized TConnection object.
770 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000771 TConnection* createConnection(int socket, const sockaddr* addr,
772 socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000773
David Reiss01fe1532010-03-09 05:19:25 +0000774 /**
775 * Returns a connection to pool or deletion. If the connection pool
776 * (a stack) isn't full, place the connection object on it, otherwise
777 * just delete it.
778 *
779 * @param connection the TConection being returned.
780 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000781 void returnConnection(TConnection* connection);
Jake Farrellb0d95602011-12-06 01:17:26 +0000782};
Mark Slee2f6404d2006-10-10 01:37:40 +0000783
Jake Farrellb0d95602011-12-06 01:17:26 +0000784class TNonblockingIOThread : public Runnable {
785 public:
786 // Creates an IO thread and sets up the event base. The listenSocket should
787 // be a valid FD on which listen() has already been called. If the
788 // listenSocket is < 0, accepting will not be done.
789 TNonblockingIOThread(TNonblockingServer* server,
790 int number,
791 int listenSocket,
792 bool useHighPriority);
793
794 ~TNonblockingIOThread();
795
796 // Returns the event-base for this thread.
797 event_base* getEventBase() const { return eventBase_; }
798
799 // Returns the server for this thread.
800 TNonblockingServer* getServer() const { return server_; }
801
802 // Returns the number of this IO thread.
803 int getThreadNumber() const { return number_; }
804
805 // Returns the thread id associated with this object. This should
806 // only be called after the thread has been started.
Roger Meier12d70532011-12-14 23:35:28 +0000807 Thread::id_t getThreadId() const { return threadId_; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000808
809 // Returns the send-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000810 evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000811
812 // Returns the read-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000813 evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000814
815 // Returns the actual thread object associated with this IO thread.
816 boost::shared_ptr<Thread> getThread() const { return thread_; }
817
818 // Sets the actual thread object associated with this IO thread.
819 void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
820
821 // Used by TConnection objects to indicate processing has finished.
822 bool notify(TNonblockingServer::TConnection* conn);
823
824 // Enters the event loop and does not return until a call to stop().
825 virtual void run();
826
827 // Exits the event loop as soon as possible.
828 void stop();
829
830 // Ensures that the event-loop thread is fully finished and shut down.
831 void join();
832
833 private:
David Reiss01fe1532010-03-09 05:19:25 +0000834 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000835 * C-callable event handler for signaling task completion. Provides a
836 * callback that libevent can understand that will read a connection
837 * object's address from a pipe and call connection->transition() for
838 * that object.
David Reiss068f4162010-03-09 05:19:45 +0000839 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000840 * @param fd the descriptor the event occurred on.
David Reiss068f4162010-03-09 05:19:45 +0000841 */
Roger Meier12d70532011-12-14 23:35:28 +0000842 static void notifyHandler(evutil_socket_t fd, short which, void* v);
David Reiss068f4162010-03-09 05:19:45 +0000843
844 /**
David Reiss01fe1532010-03-09 05:19:25 +0000845 * C-callable event handler for listener events. Provides a callback
846 * that libevent can understand which invokes server->handleEvent().
847 *
848 * @param fd the descriptor the event occured on.
849 * @param which the flags associated with the event.
850 * @param v void* callback arg where we placed TNonblockingServer's "this".
851 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000852 static void listenHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000853 ((TNonblockingServer*)v)->handleEvent(fd, which);
854 }
855
Jake Farrellb0d95602011-12-06 01:17:26 +0000856 /// Exits the loop ASAP in case of shutdown or error.
857 void breakLoop(bool error);
Mark Slee79b16942007-11-26 19:05:29 +0000858
Jake Farrellb0d95602011-12-06 01:17:26 +0000859 /// Registers the events for the notification & listen sockets
860 void registerEvents();
Mark Slee79b16942007-11-26 19:05:29 +0000861
David Reiss01fe1532010-03-09 05:19:25 +0000862 /// Create the pipe used to notify I/O process of task completion.
863 void createNotificationPipe();
864
Jake Farrellb0d95602011-12-06 01:17:26 +0000865 /// Unregisters our events for notification and listen sockets.
866 void cleanupEvents();
David Reiss01fe1532010-03-09 05:19:25 +0000867
Jake Farrellb0d95602011-12-06 01:17:26 +0000868 /// Sets (or clears) high priority scheduling status for the current thread.
869 void setCurrentThreadHighPriority(bool value);
David Reiss01fe1532010-03-09 05:19:25 +0000870
Jake Farrellb0d95602011-12-06 01:17:26 +0000871 private:
872 /// associated server
873 TNonblockingServer* server_;
Mark Slee79b16942007-11-26 19:05:29 +0000874
Jake Farrellb0d95602011-12-06 01:17:26 +0000875 /// thread number (for debugging).
876 const int number_;
Bryan Duxbury76c43682011-08-24 21:26:48 +0000877
Jake Farrellb0d95602011-12-06 01:17:26 +0000878 /// The actual physical thread id.
Roger Meier12d70532011-12-14 23:35:28 +0000879 Thread::id_t threadId_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000880
881 /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
882 int listenSocket_;
883
884 /// Sets a high scheduling priority when running
885 bool useHighPriority_;
886
887 /// pointer to eventbase to be used for looping
888 event_base* eventBase_;
889
890 /// Used with eventBase_ for connection events (only in listener thread)
891 struct event serverEvent_;
892
893 /// Used with eventBase_ for task completion notification
894 struct event notificationEvent_;
895
896 /// File descriptors for pipe used for task completion notification.
Roger Meier12d70532011-12-14 23:35:28 +0000897 evutil_socket_t notificationPipeFDs_[2];
Jake Farrellb0d95602011-12-06 01:17:26 +0000898
899 /// Actual IO Thread
900 boost::shared_ptr<Thread> thread_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000901};
902
T Jake Lucianib5e62212009-01-31 22:36:20 +0000903}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000904
Jake Farrellb0d95602011-12-06 01:17:26 +0000905#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_