blob: e7bbdc5ae917ae78a2f00c2c94a3f03de918adcc [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/Thrift.h>
24#include <thrift/server/TServer.h>
25#include <thrift/transport/TBufferTransports.h>
26#include <thrift/transport/TSocket.h>
27#include <thrift/concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Roger Meier49ff8b12012-04-13 09:12:31 +000029#include <thrift/concurrency/Thread.h>
30#include <thrift/concurrency/PlatformThreadFactory.h>
31#include <thrift/concurrency/Mutex.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000032#include <stack>
Jake Farrellb0d95602011-12-06 01:17:26 +000033#include <vector>
David Reiss9b209552008-04-08 06:26:05 +000034#include <string>
35#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000036#include <cstdlib>
Bryan Duxbury266b1732011-09-01 16:50:28 +000037#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000038#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000039#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <event.h>
41
Jake Farrellb0d95602011-12-06 01:17:26 +000042
43
T Jake Lucianib5e62212009-01-31 22:36:20 +000044namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000045
T Jake Lucianib5e62212009-01-31 22:36:20 +000046using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000047using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000048using apache::thrift::protocol::TProtocol;
49using apache::thrift::concurrency::Runnable;
50using apache::thrift::concurrency::ThreadManager;
Roger Meier12d70532011-12-14 23:35:28 +000051using apache::thrift::concurrency::PlatformThreadFactory;
Jake Farrellb0d95602011-12-06 01:17:26 +000052using apache::thrift::concurrency::ThreadFactory;
53using apache::thrift::concurrency::Thread;
54using apache::thrift::concurrency::Mutex;
55using apache::thrift::concurrency::Guard;
Mark Slee2f6404d2006-10-10 01:37:40 +000056
Roger Meier30aae0c2011-07-08 12:23:31 +000057#ifdef LIBEVENT_VERSION_NUMBER
58#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
59#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
60#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
61#else
62// assume latest version 1 series
63#define LIBEVENT_VERSION_MAJOR 1
64#define LIBEVENT_VERSION_MINOR 14
65#define LIBEVENT_VERSION_REL 13
66#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
67#endif
68
69#if LIBEVENT_VERSION_NUMBER < 0x02000000
70 typedef int evutil_socket_t;
71#endif
72
73#ifndef SOCKOPT_CAST_T
Roger Meier84e4a3c2011-09-16 20:58:44 +000074# ifndef _WIN32
75# define SOCKOPT_CAST_T void
76# else
77# define SOCKOPT_CAST_T char
78# endif // _WIN32
Roger Meier30aae0c2011-07-08 12:23:31 +000079#endif
80
81template<class T>
82inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
83 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
84}
85
86template<class T>
87inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
88 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
89}
90
Mark Slee2f6404d2006-10-10 01:37:40 +000091/**
Jake Farrellb0d95602011-12-06 01:17:26 +000092 * This is a non-blocking server in C++ for high performance that
93 * operates a set of IO threads (by default only one). It assumes that
94 * all incoming requests are framed with a 4 byte length indicator and
95 * writes out responses using the same framing.
Mark Slee2f6404d2006-10-10 01:37:40 +000096 *
97 * It does not use the TServerTransport framework, but rather has socket
98 * operations hardcoded for use with select.
99 *
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 */
David Reiss01fe1532010-03-09 05:19:25 +0000101
102
103/// Overload condition actions.
104enum TOverloadAction {
105 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
106 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
107 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
108};
109
Jake Farrellb0d95602011-12-06 01:17:26 +0000110class TNonblockingIOThread;
111
Mark Slee2f6404d2006-10-10 01:37:40 +0000112class TNonblockingServer : public TServer {
113 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000114 class TConnection;
115
Jake Farrellb0d95602011-12-06 01:17:26 +0000116 friend class TNonblockingIOThread;
117 private:
David Reiss01fe1532010-03-09 05:19:25 +0000118 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 static const int LISTEN_BACKLOG = 1024;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000122 static const size_t CONNECTION_STACK_LIMIT = 1024;
123
Roger Meier3781c242011-12-11 20:07:21 +0000124 /// Default limit on frame size
125 static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
126
David Reiss01fe1532010-03-09 05:19:25 +0000127 /// Default limit on total number of connected sockets
128 static const int MAX_CONNECTIONS = INT_MAX;
129
130 /// Default limit on connections in handler/task processing
131 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
132
David Reiss89a12942010-10-06 17:10:52 +0000133 /// Default size of write buffer
134 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
135
David Reiss54bec5d2010-10-06 17:10:45 +0000136 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
137 static const int IDLE_READ_BUFFER_LIMIT = 1024;
138
139 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
140 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
141
142 /// # of calls before resizing oversized buffers (0 = check only on close)
143 static const int RESIZE_BUFFER_EVERY_N = 512;
144
Jake Farrellb0d95602011-12-06 01:17:26 +0000145 /// # of IO threads to use by default
146 static const int DEFAULT_IO_THREADS = 1;
147
148 /// File descriptor of an invalid socket
Roger Meier12d70532011-12-14 23:35:28 +0000149 static const int INVALID_SOCKET_VALUE = -1;
Jake Farrellb0d95602011-12-06 01:17:26 +0000150
151 /// # of IO threads this server will use
152 size_t numIOThreads_;
153
154 /// Whether to set high scheduling priority for IO threads
155 bool useHighPriorityIOThreads_;
156
David Reiss01fe1532010-03-09 05:19:25 +0000157 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 int serverSocket_;
159
David Reiss01fe1532010-03-09 05:19:25 +0000160 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 int port_;
162
David Reiss01fe1532010-03-09 05:19:25 +0000163 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000164 boost::shared_ptr<ThreadManager> threadManager_;
165
David Reiss01fe1532010-03-09 05:19:25 +0000166 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000167 bool threadPoolProcessing_;
168
Jake Farrellb0d95602011-12-06 01:17:26 +0000169 // Factory to create the IO threads
Roger Meier12d70532011-12-14 23:35:28 +0000170 boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
Mark Slee79b16942007-11-26 19:05:29 +0000171
Jake Farrellb0d95602011-12-06 01:17:26 +0000172 // Vector of IOThread objects that will handle our IO
173 std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
Mark Slee79b16942007-11-26 19:05:29 +0000174
Jake Farrellb0d95602011-12-06 01:17:26 +0000175 // Index of next IO Thread to be used (for round-robin)
Roger Meierd0cdecf2011-12-08 19:34:01 +0000176 uint32_t nextIOThread_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000177
178 // Synchronizes access to connection stack and similar data
179 Mutex connMutex_;
David Reiss01fe1532010-03-09 05:19:25 +0000180
181 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000182 size_t numTConnections_;
183
David Reiss9e8073c2010-03-09 05:19:39 +0000184 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000185 size_t numActiveProcessors_;
186
187 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000188 size_t connectionStackLimit_;
189
David Reiss01fe1532010-03-09 05:19:25 +0000190 /// Limit for number of connections processing or waiting to process
191 size_t maxActiveProcessors_;
192
193 /// Limit for number of open connections
194 size_t maxConnections_;
195
Roger Meier3781c242011-12-11 20:07:21 +0000196 /// Limit for frame size
197 size_t maxFrameSize_;
198
David Reiss068f4162010-03-09 05:19:45 +0000199 /// Time in milliseconds before an unperformed task expires (0 == infinite).
200 int64_t taskExpireTime_;
201
David Reiss01fe1532010-03-09 05:19:25 +0000202 /**
203 * Hysteresis for overload state. This is the fraction of the overload
204 * value that needs to be reached before the overload state is cleared;
205 * must be <= 1.0.
206 */
207 double overloadHysteresis_;
208
209 /// Action to take when we're overloaded.
210 TOverloadAction overloadAction_;
211
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000212 /**
David Reiss89a12942010-10-06 17:10:52 +0000213 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
214 * and found to be exceeded, reinitialized) to this size.
215 */
216 size_t writeBufferDefaultSize_;
217
218 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000219 * Max read buffer size for an idle TConnection. When we place an idle
220 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000221 * we will free the buffer (such that it will be reinitialized by the next
222 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000223 */
David Reiss54bec5d2010-10-06 17:10:45 +0000224 size_t idleReadBufferLimit_;
225
226 /**
227 * Max write buffer size for an idle connection. When we place an idle
228 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
229 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000230 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
231 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000232 */
233 size_t idleWriteBufferLimit_;
234
235 /**
236 * Every N calls we check the buffer size limits on a connected TConnection.
237 * 0 disables (i.e. the checks are only done when a connection closes).
238 */
239 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000240
241 /// Set if we are currently in an overloaded state.
242 bool overloaded_;
243
244 /// Count of connections dropped since overload started
245 uint32_t nConnectionsDropped_;
246
247 /// Count of connections dropped on overload since server started
248 uint64_t nTotalConnectionsDropped_;
249
Mark Slee2f6404d2006-10-10 01:37:40 +0000250 /**
251 * This is a stack of all the objects that have been created but that
252 * are NOT currently in use. When we close a connection, we place it on this
253 * stack so that the object can be reused later, rather than freeing the
254 * memory and reallocating a new object later.
255 */
256 std::stack<TConnection*> connectionStack_;
257
David Reiss01fe1532010-03-09 05:19:25 +0000258 /**
Roger Meier0c04fcc2013-03-22 19:52:08 +0100259 * This container holds pointers to all active connections. This container
260 * allows the server to clean up unlcosed connection objects at destruction,
261 * which in turn allows their transports, protocols, processors and handlers
262 * to deallocate and clean up correctly.
263 */
264 std::vector<TConnection*> activeConnections_;
265
266 /**
David Reiss01fe1532010-03-09 05:19:25 +0000267 * Called when server socket had something happen. We accept all waiting
268 * client connections on listen socket fd and assign TConnection objects
269 * to handle those requests.
270 *
271 * @param fd the listen socket.
272 * @param which the event flag that triggered the handler.
273 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000274 void handleEvent(int fd, short which);
275
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000276 void init(int port) {
277 serverSocket_ = -1;
Jake Farrellb0d95602011-12-06 01:17:26 +0000278 numIOThreads_ = DEFAULT_IO_THREADS;
279 nextIOThread_ = 0;
280 useHighPriorityIOThreads_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000281 port_ = port;
282 threadPoolProcessing_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000283 numTConnections_ = 0;
284 numActiveProcessors_ = 0;
285 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
286 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
287 maxConnections_ = MAX_CONNECTIONS;
Roger Meier3781c242011-12-11 20:07:21 +0000288 maxFrameSize_ = MAX_FRAME_SIZE;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000289 taskExpireTime_ = 0;
290 overloadHysteresis_ = 0.8;
291 overloadAction_ = T_OVERLOAD_NO_ACTION;
292 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
293 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
294 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
295 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
296 overloaded_ = false;
297 nConnectionsDropped_ = 0;
298 nTotalConnectionsDropped_ = 0;
299 }
Mark Sleef9373392007-01-24 19:41:57 +0000300
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000301 public:
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000302 template<typename ProcessorFactory>
303 TNonblockingServer(
304 const boost::shared_ptr<ProcessorFactory>& processorFactory,
305 int port,
306 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
307 TServer(processorFactory) {
308 init(port);
309 }
310
311 template<typename Processor>
312 TNonblockingServer(const boost::shared_ptr<Processor>& processor,
313 int port,
314 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000315 TServer(processor) {
316 init(port);
317 }
318
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000319 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000320 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000321 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000322 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
323 int port,
324 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000325 boost::shared_ptr<ThreadManager>(),
326 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
327 TServer(processorFactory) {
328
329 init(port);
330
331 setInputProtocolFactory(protocolFactory);
332 setOutputProtocolFactory(protocolFactory);
333 setThreadManager(threadManager);
334 }
335
336 template<typename Processor>
337 TNonblockingServer(
338 const boost::shared_ptr<Processor>& processor,
339 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
340 int port,
341 const boost::shared_ptr<ThreadManager>& threadManager =
342 boost::shared_ptr<ThreadManager>(),
343 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000344 TServer(processor) {
345
346 init(port);
347
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000348 setInputProtocolFactory(protocolFactory);
349 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000350 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000351 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000352
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000353 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000354 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000355 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000356 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
357 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
358 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
359 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
360 int port,
361 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000362 boost::shared_ptr<ThreadManager>(),
363 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
364 TServer(processorFactory) {
365
366 init(port);
367
368 setInputTransportFactory(inputTransportFactory);
369 setOutputTransportFactory(outputTransportFactory);
370 setInputProtocolFactory(inputProtocolFactory);
371 setOutputProtocolFactory(outputProtocolFactory);
372 setThreadManager(threadManager);
373 }
374
375 template<typename Processor>
376 TNonblockingServer(
377 const boost::shared_ptr<Processor>& processor,
378 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
379 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
380 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
381 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
382 int port,
383 const boost::shared_ptr<ThreadManager>& threadManager =
384 boost::shared_ptr<ThreadManager>(),
385 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000386 TServer(processor) {
387
388 init(port);
389
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000390 setInputTransportFactory(inputTransportFactory);
391 setOutputTransportFactory(outputTransportFactory);
392 setInputProtocolFactory(inputProtocolFactory);
393 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000394 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000395 }
Mark Slee79b16942007-11-26 19:05:29 +0000396
David Reiss8ede8182010-09-02 15:26:28 +0000397 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000398
David Reiss068f4162010-03-09 05:19:45 +0000399 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000400
David Reiss1997f102008-04-29 00:29:41 +0000401 boost::shared_ptr<ThreadManager> getThreadManager() {
402 return threadManager_;
403 }
404
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000405 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000406 * Sets the number of IO threads used by this server. Can only be used before
407 * the call to serve() and has no effect afterwards. We always use a
408 * PosixThreadFactory for the IO worker threads, because they must joinable
409 * for clean shutdown.
410 */
411 void setNumIOThreads(size_t numThreads) {
412 numIOThreads_ = numThreads;
413 }
414
415 /** Return whether the IO threads will get high scheduling priority */
416 bool useHighPriorityIOThreads() const {
417 return useHighPriorityIOThreads_;
418 }
419
420 /** Set whether the IO threads will get high scheduling priority. */
421 void setUseHighPriorityIOThreads(bool val) {
422 useHighPriorityIOThreads_ = val;
423 }
424
425 /** Return the number of IO threads used by this server. */
426 size_t getNumIOThreads() const {
427 return numIOThreads_;
428 }
429
430 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000431 * Get the maximum number of unused TConnection we will hold in reserve.
432 *
433 * @return the current limit on TConnection pool size.
434 */
David Reiss260fa932009-04-02 23:51:39 +0000435 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000436 return connectionStackLimit_;
437 }
438
439 /**
440 * Set the maximum number of unused TConnection we will hold in reserve.
441 *
442 * @param sz the new limit for TConnection pool size.
443 */
David Reiss260fa932009-04-02 23:51:39 +0000444 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000445 connectionStackLimit_ = sz;
446 }
447
Mark Slee79b16942007-11-26 19:05:29 +0000448 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000449 return threadPoolProcessing_;
450 }
451
452 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000453 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000454 }
455
David Reiss01fe1532010-03-09 05:19:25 +0000456 /**
457 * Return the count of sockets currently connected to.
458 *
459 * @return count of connected sockets.
460 */
461 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000462 return numTConnections_;
463 }
464
David Reiss01fe1532010-03-09 05:19:25 +0000465 /**
Roger Meierec8027f2012-04-11 21:43:25 +0000466 * Return the count of sockets currently connected to.
467 *
468 * @return count of connected sockets.
469 */
470 size_t getNumActiveConnections() const {
471 return getNumConnections() - getNumIdleConnections();
472 }
473
474 /**
David Reiss01fe1532010-03-09 05:19:25 +0000475 * Return the count of connection objects allocated but not in use.
476 *
477 * @return count of idle connection objects.
478 */
479 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000480 return connectionStack_.size();
481 }
482
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000483 /**
David Reiss01fe1532010-03-09 05:19:25 +0000484 * Return count of number of connections which are currently processing.
485 * This is defined as a connection where all data has been received and
486 * either assigned a task (when threading) or passed to a handler (when
487 * not threading), and where the handler has not yet returned.
488 *
489 * @return # of connections currently processing.
490 */
491 size_t getNumActiveProcessors() const {
492 return numActiveProcessors_;
493 }
494
495 /// Increment the count of connections currently processing.
496 void incrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000497 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000498 ++numActiveProcessors_;
499 }
500
501 /// Decrement the count of connections currently processing.
502 void decrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000503 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000504 if (numActiveProcessors_ > 0) {
505 --numActiveProcessors_;
506 }
507 }
508
509 /**
510 * Get the maximum # of connections allowed before overload.
511 *
512 * @return current setting.
513 */
514 size_t getMaxConnections() const {
515 return maxConnections_;
516 }
517
518 /**
519 * Set the maximum # of connections allowed before overload.
520 *
521 * @param maxConnections new setting for maximum # of connections.
522 */
523 void setMaxConnections(size_t maxConnections) {
524 maxConnections_ = maxConnections;
525 }
526
527 /**
528 * Get the maximum # of connections waiting in handler/task before overload.
529 *
530 * @return current setting.
531 */
532 size_t getMaxActiveProcessors() const {
533 return maxActiveProcessors_;
534 }
535
536 /**
537 * Set the maximum # of connections waiting in handler/task before overload.
538 *
539 * @param maxActiveProcessors new setting for maximum # of active processes.
540 */
541 void setMaxActiveProcessors(size_t maxActiveProcessors) {
542 maxActiveProcessors_ = maxActiveProcessors;
543 }
544
545 /**
Roger Meier3781c242011-12-11 20:07:21 +0000546 * Get the maximum allowed frame size.
547 *
548 * If a client tries to send a message larger than this limit,
549 * its connection will be closed.
550 *
551 * @return Maxium frame size, in bytes.
552 */
553 size_t getMaxFrameSize() const {
554 return maxFrameSize_;
555 }
556
557 /**
558 * Set the maximum allowed frame size.
559 *
560 * @param maxFrameSize The new maximum frame size.
561 */
562 void setMaxFrameSize(size_t maxFrameSize) {
563 maxFrameSize_ = maxFrameSize;
564 }
565
566 /**
David Reiss01fe1532010-03-09 05:19:25 +0000567 * Get fraction of maximum limits before an overload condition is cleared.
568 *
569 * @return hysteresis fraction
570 */
571 double getOverloadHysteresis() const {
572 return overloadHysteresis_;
573 }
574
575 /**
576 * Set fraction of maximum limits before an overload condition is cleared.
577 * A good value would probably be between 0.5 and 0.9.
578 *
579 * @param hysteresisFraction fraction <= 1.0.
580 */
581 void setOverloadHysteresis(double hysteresisFraction) {
582 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
583 overloadHysteresis_ = hysteresisFraction;
584 }
585 }
586
587 /**
588 * Get the action the server will take on overload.
589 *
590 * @return a TOverloadAction enum value for the currently set action.
591 */
592 TOverloadAction getOverloadAction() const {
593 return overloadAction_;
594 }
595
596 /**
597 * Set the action the server is to take on overload.
598 *
599 * @param overloadAction a TOverloadAction enum value for the action.
600 */
601 void setOverloadAction(TOverloadAction overloadAction) {
602 overloadAction_ = overloadAction;
603 }
604
605 /**
David Reiss068f4162010-03-09 05:19:45 +0000606 * Get the time in milliseconds after which a task expires (0 == infinite).
607 *
608 * @return a 64-bit time in milliseconds.
609 */
610 int64_t getTaskExpireTime() const {
611 return taskExpireTime_;
612 }
613
614 /**
615 * Set the time in milliseconds after which a task expires (0 == infinite).
616 *
617 * @param taskExpireTime a 64-bit time in milliseconds.
618 */
619 void setTaskExpireTime(int64_t taskExpireTime) {
620 taskExpireTime_ = taskExpireTime;
621 }
622
623 /**
David Reiss01fe1532010-03-09 05:19:25 +0000624 * Determine if the server is currently overloaded.
625 * This function checks the maximums for open connections and connections
626 * currently in processing, and sets an overload condition if they are
627 * exceeded. The overload will persist until both values are below the
628 * current hysteresis fraction of their maximums.
629 *
630 * @return true if an overload condition exists, false if not.
631 */
632 bool serverOverloaded();
633
634 /** Pop and discard next task on threadpool wait queue.
635 *
636 * @return true if a task was discarded, false if the wait queue was empty.
637 */
638 bool drainPendingTask();
639
640 /**
David Reiss89a12942010-10-06 17:10:52 +0000641 * Get the starting size of a TConnection object's write buffer.
642 *
643 * @return # bytes we initialize a TConnection object's write buffer to.
644 */
645 size_t getWriteBufferDefaultSize() const {
646 return writeBufferDefaultSize_;
647 }
648
649 /**
650 * Set the starting size of a TConnection object's write buffer.
651 *
652 * @param size # bytes we initialize a TConnection object's write buffer to.
653 */
654 void setWriteBufferDefaultSize(size_t size) {
655 writeBufferDefaultSize_ = size;
656 }
657
658 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000659 * Get the maximum size of read buffer allocated to idle TConnection objects.
660 *
David Reiss89a12942010-10-06 17:10:52 +0000661 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000662 */
663 size_t getIdleReadBufferLimit() const {
664 return idleReadBufferLimit_;
665 }
666
667 /**
668 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
669 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000670 *
David Reiss89a12942010-10-06 17:10:52 +0000671 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000672 */
673 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000674 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000675 }
676
677 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000678 * Set the maximum size read buffer allocated to idle TConnection objects.
679 * If a TConnection object is found (either on connection close or between
680 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000681 * allocated to its read buffer, we free it and allow it to be reinitialized
682 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000683 *
684 * @param limit of bytes beyond which we will shrink buffers when checked.
685 */
686 void setIdleReadBufferLimit(size_t limit) {
687 idleReadBufferLimit_ = limit;
688 }
689
690 /**
691 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
692 * Set the maximum size read buffer allocated to idle TConnection objects.
693 * If a TConnection object is found (either on connection close or between
694 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000695 * allocated to its read buffer, we free it and allow it to be reinitialized
696 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000697 *
698 * @param limit of bytes beyond which we will shrink buffers when checked.
699 */
700 void setIdleBufferMemLimit(size_t limit) {
701 idleReadBufferLimit_ = limit;
702 }
703
Jake Farrellb0d95602011-12-06 01:17:26 +0000704
David Reiss54bec5d2010-10-06 17:10:45 +0000705
706 /**
707 * Get the maximum size of write buffer allocated to idle TConnection objects.
708 *
709 * @return # bytes beyond which we will reallocate buffers when checked.
710 */
711 size_t getIdleWriteBufferLimit() const {
712 return idleWriteBufferLimit_;
713 }
714
715 /**
716 * Set the maximum size write buffer allocated to idle TConnection objects.
717 * If a TConnection object is found (either on connection close or between
718 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000719 * allocated to its write buffer, we destroy and construct that buffer with
720 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000721 *
722 * @param limit of bytes beyond which we will shrink buffers when idle.
723 */
David Reiss54bec5d2010-10-06 17:10:45 +0000724 void setIdleWriteBufferLimit(size_t limit) {
725 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000726 }
727
David Reiss01fe1532010-03-09 05:19:25 +0000728 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000729 * Get # of calls made between buffer size checks. 0 means disabled.
730 *
731 * @return # of calls between buffer size checks.
732 */
733 int32_t getResizeBufferEveryN() const {
734 return resizeBufferEveryN_;
735 }
736
737 /**
738 * Check buffer sizes every "count" calls. This allows buffer limits
739 * to be enforced for persistant connections with a controllable degree
740 * of overhead. 0 disables checks except at connection close.
741 *
742 * @param count the number of calls between checks, or 0 to disable
743 */
744 void setResizeBufferEveryN(int32_t count) {
745 resizeBufferEveryN_ = count;
746 }
747
Jake Farrellb0d95602011-12-06 01:17:26 +0000748 /**
749 * Main workhorse function, starts up the server listening on a port and
750 * loops over the libevent handler.
751 */
752 void serve();
David Reiss54bec5d2010-10-06 17:10:45 +0000753
Jake Farrellb0d95602011-12-06 01:17:26 +0000754 /**
755 * Causes the server to terminate gracefully (can be called from any thread).
756 */
757 void stop();
David Reiss54bec5d2010-10-06 17:10:45 +0000758
Jake Farrellb0d95602011-12-06 01:17:26 +0000759 private:
760 /**
761 * Callback function that the threadmanager calls when a task reaches
762 * its expiration time. It is needed to clean up the expired connection.
763 *
764 * @param task the runnable associated with the expired task.
765 */
766 void expireClose(boost::shared_ptr<Runnable> task);
767
768 /// Creates a socket to listen on and binds it to the local port.
769 void createAndListenOnSocket();
770
771 /**
772 * Takes a socket created by createAndListenOnSocket() and sets various
773 * options on it to prepare for use in the server.
774 *
775 * @param fd descriptor of socket to be initialized/
776 */
777 void listenSocket(int fd);
David Reiss54bec5d2010-10-06 17:10:45 +0000778 /**
David Reiss01fe1532010-03-09 05:19:25 +0000779 * Return an initialized connection object. Creates or recovers from
780 * pool a TConnection and initializes it with the provided socket FD
781 * and flags.
782 *
783 * @param socket FD of socket associated with this connection.
David Reiss105961d2010-10-06 17:10:17 +0000784 * @param addr the sockaddr of the client
785 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000786 * @return pointer to initialized TConnection object.
787 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000788 TConnection* createConnection(int socket, const sockaddr* addr,
789 socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000790
David Reiss01fe1532010-03-09 05:19:25 +0000791 /**
792 * Returns a connection to pool or deletion. If the connection pool
793 * (a stack) isn't full, place the connection object on it, otherwise
794 * just delete it.
795 *
796 * @param connection the TConection being returned.
797 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000798 void returnConnection(TConnection* connection);
Jake Farrellb0d95602011-12-06 01:17:26 +0000799};
Mark Slee2f6404d2006-10-10 01:37:40 +0000800
Jake Farrellb0d95602011-12-06 01:17:26 +0000801class TNonblockingIOThread : public Runnable {
802 public:
803 // Creates an IO thread and sets up the event base. The listenSocket should
804 // be a valid FD on which listen() has already been called. If the
805 // listenSocket is < 0, accepting will not be done.
806 TNonblockingIOThread(TNonblockingServer* server,
807 int number,
808 int listenSocket,
809 bool useHighPriority);
810
811 ~TNonblockingIOThread();
812
813 // Returns the event-base for this thread.
814 event_base* getEventBase() const { return eventBase_; }
815
816 // Returns the server for this thread.
817 TNonblockingServer* getServer() const { return server_; }
818
819 // Returns the number of this IO thread.
820 int getThreadNumber() const { return number_; }
821
822 // Returns the thread id associated with this object. This should
823 // only be called after the thread has been started.
Roger Meier12d70532011-12-14 23:35:28 +0000824 Thread::id_t getThreadId() const { return threadId_; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000825
826 // Returns the send-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000827 evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000828
829 // Returns the read-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000830 evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000831
832 // Returns the actual thread object associated with this IO thread.
833 boost::shared_ptr<Thread> getThread() const { return thread_; }
834
835 // Sets the actual thread object associated with this IO thread.
836 void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
837
838 // Used by TConnection objects to indicate processing has finished.
839 bool notify(TNonblockingServer::TConnection* conn);
840
841 // Enters the event loop and does not return until a call to stop().
842 virtual void run();
843
844 // Exits the event loop as soon as possible.
845 void stop();
846
847 // Ensures that the event-loop thread is fully finished and shut down.
848 void join();
849
850 private:
David Reiss01fe1532010-03-09 05:19:25 +0000851 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000852 * C-callable event handler for signaling task completion. Provides a
853 * callback that libevent can understand that will read a connection
854 * object's address from a pipe and call connection->transition() for
855 * that object.
David Reiss068f4162010-03-09 05:19:45 +0000856 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000857 * @param fd the descriptor the event occurred on.
David Reiss068f4162010-03-09 05:19:45 +0000858 */
Roger Meier12d70532011-12-14 23:35:28 +0000859 static void notifyHandler(evutil_socket_t fd, short which, void* v);
David Reiss068f4162010-03-09 05:19:45 +0000860
861 /**
David Reiss01fe1532010-03-09 05:19:25 +0000862 * C-callable event handler for listener events. Provides a callback
863 * that libevent can understand which invokes server->handleEvent().
864 *
865 * @param fd the descriptor the event occured on.
866 * @param which the flags associated with the event.
867 * @param v void* callback arg where we placed TNonblockingServer's "this".
868 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000869 static void listenHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000870 ((TNonblockingServer*)v)->handleEvent(fd, which);
871 }
872
Jake Farrellb0d95602011-12-06 01:17:26 +0000873 /// Exits the loop ASAP in case of shutdown or error.
874 void breakLoop(bool error);
Mark Slee79b16942007-11-26 19:05:29 +0000875
Jake Farrellb0d95602011-12-06 01:17:26 +0000876 /// Registers the events for the notification & listen sockets
877 void registerEvents();
Mark Slee79b16942007-11-26 19:05:29 +0000878
David Reiss01fe1532010-03-09 05:19:25 +0000879 /// Create the pipe used to notify I/O process of task completion.
880 void createNotificationPipe();
881
Jake Farrellb0d95602011-12-06 01:17:26 +0000882 /// Unregisters our events for notification and listen sockets.
883 void cleanupEvents();
David Reiss01fe1532010-03-09 05:19:25 +0000884
Jake Farrellb0d95602011-12-06 01:17:26 +0000885 /// Sets (or clears) high priority scheduling status for the current thread.
886 void setCurrentThreadHighPriority(bool value);
David Reiss01fe1532010-03-09 05:19:25 +0000887
Jake Farrellb0d95602011-12-06 01:17:26 +0000888 private:
889 /// associated server
890 TNonblockingServer* server_;
Mark Slee79b16942007-11-26 19:05:29 +0000891
Jake Farrellb0d95602011-12-06 01:17:26 +0000892 /// thread number (for debugging).
893 const int number_;
Bryan Duxbury76c43682011-08-24 21:26:48 +0000894
Jake Farrellb0d95602011-12-06 01:17:26 +0000895 /// The actual physical thread id.
Roger Meier12d70532011-12-14 23:35:28 +0000896 Thread::id_t threadId_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000897
898 /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
899 int listenSocket_;
900
901 /// Sets a high scheduling priority when running
902 bool useHighPriority_;
903
904 /// pointer to eventbase to be used for looping
905 event_base* eventBase_;
906
907 /// Used with eventBase_ for connection events (only in listener thread)
908 struct event serverEvent_;
909
910 /// Used with eventBase_ for task completion notification
911 struct event notificationEvent_;
912
913 /// File descriptors for pipe used for task completion notification.
Roger Meier12d70532011-12-14 23:35:28 +0000914 evutil_socket_t notificationPipeFDs_[2];
Jake Farrellb0d95602011-12-06 01:17:26 +0000915
916 /// Actual IO Thread
917 boost::shared_ptr<Thread> thread_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000918};
919
T Jake Lucianib5e62212009-01-31 22:36:20 +0000920}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000921
Jake Farrellb0d95602011-12-06 01:17:26 +0000922#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_