blob: da36045da3560f97651c0eb1aaa58a78652cfd0e [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000030#include <string>
31#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000032#include <cstdlib>
Bryan Duxbury266b1732011-09-01 16:50:28 +000033#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000034#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000035#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000036#include <event.h>
37
T Jake Lucianib5e62212009-01-31 22:36:20 +000038namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000039
T Jake Lucianib5e62212009-01-31 22:36:20 +000040using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000041using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000042using apache::thrift::protocol::TProtocol;
43using apache::thrift::concurrency::Runnable;
44using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000045
Roger Meier30aae0c2011-07-08 12:23:31 +000046#ifdef LIBEVENT_VERSION_NUMBER
47#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
48#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
49#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
50#else
51// assume latest version 1 series
52#define LIBEVENT_VERSION_MAJOR 1
53#define LIBEVENT_VERSION_MINOR 14
54#define LIBEVENT_VERSION_REL 13
55#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
56#endif
57
58#if LIBEVENT_VERSION_NUMBER < 0x02000000
59 typedef int evutil_socket_t;
60#endif
61
62#ifndef SOCKOPT_CAST_T
Roger Meier84e4a3c2011-09-16 20:58:44 +000063# ifndef _WIN32
64# define SOCKOPT_CAST_T void
65# else
66# define SOCKOPT_CAST_T char
67# endif // _WIN32
Roger Meier30aae0c2011-07-08 12:23:31 +000068#endif
69
70template<class T>
71inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
72 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
73}
74
75template<class T>
76inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
77 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
78}
79
Mark Slee2f6404d2006-10-10 01:37:40 +000080/**
81 * This is a non-blocking server in C++ for high performance that operates a
82 * single IO thread. It assumes that all incoming requests are framed with a
83 * 4 byte length indicator and writes out responses using the same framing.
84 *
85 * It does not use the TServerTransport framework, but rather has socket
86 * operations hardcoded for use with select.
87 *
Mark Slee2f6404d2006-10-10 01:37:40 +000088 */
David Reiss01fe1532010-03-09 05:19:25 +000089
90
91/// Overload condition actions.
92enum TOverloadAction {
93 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
94 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
95 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
96};
97
Mark Slee2f6404d2006-10-10 01:37:40 +000098class TNonblockingServer : public TServer {
99 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000100 class TConnection;
101
David Reiss01fe1532010-03-09 05:19:25 +0000102 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +0000103 static const int LISTEN_BACKLOG = 1024;
104
David Reiss01fe1532010-03-09 05:19:25 +0000105 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000106 static const size_t CONNECTION_STACK_LIMIT = 1024;
107
David Reiss01fe1532010-03-09 05:19:25 +0000108 /// Default limit on total number of connected sockets
109 static const int MAX_CONNECTIONS = INT_MAX;
110
111 /// Default limit on connections in handler/task processing
112 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
113
David Reiss89a12942010-10-06 17:10:52 +0000114 /// Default size of write buffer
115 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
116
David Reiss54bec5d2010-10-06 17:10:45 +0000117 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
118 static const int IDLE_READ_BUFFER_LIMIT = 1024;
119
120 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
121 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
122
123 /// # of calls before resizing oversized buffers (0 = check only on close)
124 static const int RESIZE_BUFFER_EVERY_N = 512;
125
David Reiss01fe1532010-03-09 05:19:25 +0000126 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000127 int serverSocket_;
128
David Reiss01fe1532010-03-09 05:19:25 +0000129 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 int port_;
131
David Reiss01fe1532010-03-09 05:19:25 +0000132 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000133 boost::shared_ptr<ThreadManager> threadManager_;
134
David Reiss01fe1532010-03-09 05:19:25 +0000135 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000136 bool threadPoolProcessing_;
137
David Reiss01fe1532010-03-09 05:19:25 +0000138 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +0000139 event_base* eventBase_;
Roger Meierc1905582011-08-02 23:37:36 +0000140 bool ownEventBase_;
Mark Slee79b16942007-11-26 19:05:29 +0000141
David Reiss01fe1532010-03-09 05:19:25 +0000142 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +0000143 struct event serverEvent_;
144
David Reiss01fe1532010-03-09 05:19:25 +0000145 /// Event struct, used with eventBase_ for task completion notification
146 struct event notificationEvent_;
147
148 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000149 size_t numTConnections_;
150
David Reiss9e8073c2010-03-09 05:19:39 +0000151 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000152 size_t numActiveProcessors_;
153
154 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000155 size_t connectionStackLimit_;
156
David Reiss01fe1532010-03-09 05:19:25 +0000157 /// Limit for number of connections processing or waiting to process
158 size_t maxActiveProcessors_;
159
160 /// Limit for number of open connections
161 size_t maxConnections_;
162
David Reiss068f4162010-03-09 05:19:45 +0000163 /// Time in milliseconds before an unperformed task expires (0 == infinite).
164 int64_t taskExpireTime_;
165
David Reiss01fe1532010-03-09 05:19:25 +0000166 /**
167 * Hysteresis for overload state. This is the fraction of the overload
168 * value that needs to be reached before the overload state is cleared;
169 * must be <= 1.0.
170 */
171 double overloadHysteresis_;
172
173 /// Action to take when we're overloaded.
174 TOverloadAction overloadAction_;
175
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000176 /**
David Reiss89a12942010-10-06 17:10:52 +0000177 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
178 * and found to be exceeded, reinitialized) to this size.
179 */
180 size_t writeBufferDefaultSize_;
181
182 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000183 * Max read buffer size for an idle TConnection. When we place an idle
184 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000185 * we will free the buffer (such that it will be reinitialized by the next
186 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000187 */
David Reiss54bec5d2010-10-06 17:10:45 +0000188 size_t idleReadBufferLimit_;
189
190 /**
191 * Max write buffer size for an idle connection. When we place an idle
192 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
193 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000194 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
195 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000196 */
197 size_t idleWriteBufferLimit_;
198
199 /**
200 * Every N calls we check the buffer size limits on a connected TConnection.
201 * 0 disables (i.e. the checks are only done when a connection closes).
202 */
203 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000204
205 /// Set if we are currently in an overloaded state.
206 bool overloaded_;
207
208 /// Count of connections dropped since overload started
209 uint32_t nConnectionsDropped_;
210
211 /// Count of connections dropped on overload since server started
212 uint64_t nTotalConnectionsDropped_;
213
214 /// File descriptors for pipe used for task completion notification.
Roger Meier30aae0c2011-07-08 12:23:31 +0000215 evutil_socket_t notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000216
Mark Slee2f6404d2006-10-10 01:37:40 +0000217 /**
218 * This is a stack of all the objects that have been created but that
219 * are NOT currently in use. When we close a connection, we place it on this
220 * stack so that the object can be reused later, rather than freeing the
221 * memory and reallocating a new object later.
222 */
223 std::stack<TConnection*> connectionStack_;
224
David Reiss01fe1532010-03-09 05:19:25 +0000225 /**
226 * Called when server socket had something happen. We accept all waiting
227 * client connections on listen socket fd and assign TConnection objects
228 * to handle those requests.
229 *
230 * @param fd the listen socket.
231 * @param which the event flag that triggered the handler.
232 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000233 void handleEvent(int fd, short which);
234
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000235 void init(int port) {
236 serverSocket_ = -1;
237 port_ = port;
238 threadPoolProcessing_ = false;
239 eventBase_ = NULL;
240 ownEventBase_ = false;
241 numTConnections_ = 0;
242 numActiveProcessors_ = 0;
243 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
244 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
245 maxConnections_ = MAX_CONNECTIONS;
246 taskExpireTime_ = 0;
247 overloadHysteresis_ = 0.8;
248 overloadAction_ = T_OVERLOAD_NO_ACTION;
249 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
250 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
251 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
252 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
253 overloaded_ = false;
254 nConnectionsDropped_ = 0;
255 nTotalConnectionsDropped_ = 0;
256 }
Mark Sleef9373392007-01-24 19:41:57 +0000257
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000258 public:
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000259 template<typename ProcessorFactory>
260 TNonblockingServer(
261 const boost::shared_ptr<ProcessorFactory>& processorFactory,
262 int port,
263 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
264 TServer(processorFactory) {
265 init(port);
266 }
267
268 template<typename Processor>
269 TNonblockingServer(const boost::shared_ptr<Processor>& processor,
270 int port,
271 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000272 TServer(processor) {
273 init(port);
274 }
275
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000276 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000277 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000278 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000279 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
280 int port,
281 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000282 boost::shared_ptr<ThreadManager>(),
283 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
284 TServer(processorFactory) {
285
286 init(port);
287
288 setInputProtocolFactory(protocolFactory);
289 setOutputProtocolFactory(protocolFactory);
290 setThreadManager(threadManager);
291 }
292
293 template<typename Processor>
294 TNonblockingServer(
295 const boost::shared_ptr<Processor>& processor,
296 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
297 int port,
298 const boost::shared_ptr<ThreadManager>& threadManager =
299 boost::shared_ptr<ThreadManager>(),
300 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000301 TServer(processor) {
302
303 init(port);
304
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000305 setInputProtocolFactory(protocolFactory);
306 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000307 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000308 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000309
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000310 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000311 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000312 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000313 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
314 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
315 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
316 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
317 int port,
318 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000319 boost::shared_ptr<ThreadManager>(),
320 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
321 TServer(processorFactory) {
322
323 init(port);
324
325 setInputTransportFactory(inputTransportFactory);
326 setOutputTransportFactory(outputTransportFactory);
327 setInputProtocolFactory(inputProtocolFactory);
328 setOutputProtocolFactory(outputProtocolFactory);
329 setThreadManager(threadManager);
330 }
331
332 template<typename Processor>
333 TNonblockingServer(
334 const boost::shared_ptr<Processor>& processor,
335 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
336 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
337 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
338 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
339 int port,
340 const boost::shared_ptr<ThreadManager>& threadManager =
341 boost::shared_ptr<ThreadManager>(),
342 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000343 TServer(processor) {
344
345 init(port);
346
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000347 setInputTransportFactory(inputTransportFactory);
348 setOutputTransportFactory(outputTransportFactory);
349 setInputProtocolFactory(inputProtocolFactory);
350 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000351 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000352 }
Mark Slee79b16942007-11-26 19:05:29 +0000353
David Reiss8ede8182010-09-02 15:26:28 +0000354 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000355
David Reiss068f4162010-03-09 05:19:45 +0000356 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000357
David Reiss1997f102008-04-29 00:29:41 +0000358 boost::shared_ptr<ThreadManager> getThreadManager() {
359 return threadManager_;
360 }
361
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000362 /**
363 * Get the maximum number of unused TConnection we will hold in reserve.
364 *
365 * @return the current limit on TConnection pool size.
366 */
David Reiss260fa932009-04-02 23:51:39 +0000367 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000368 return connectionStackLimit_;
369 }
370
371 /**
372 * Set the maximum number of unused TConnection we will hold in reserve.
373 *
374 * @param sz the new limit for TConnection pool size.
375 */
David Reiss260fa932009-04-02 23:51:39 +0000376 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000377 connectionStackLimit_ = sz;
378 }
379
Mark Slee79b16942007-11-26 19:05:29 +0000380 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000381 return threadPoolProcessing_;
382 }
383
384 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000385 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000386 }
387
Mark Slee79b16942007-11-26 19:05:29 +0000388 event_base* getEventBase() const {
389 return eventBase_;
390 }
391
David Reiss01fe1532010-03-09 05:19:25 +0000392 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000393 void incrementNumConnections() {
394 ++numTConnections_;
395 }
396
David Reiss01fe1532010-03-09 05:19:25 +0000397 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000398 void decrementNumConnections() {
399 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000400 }
401
David Reiss01fe1532010-03-09 05:19:25 +0000402 /**
403 * Return the count of sockets currently connected to.
404 *
405 * @return count of connected sockets.
406 */
407 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000408 return numTConnections_;
409 }
410
David Reiss01fe1532010-03-09 05:19:25 +0000411 /**
412 * Return the count of connection objects allocated but not in use.
413 *
414 * @return count of idle connection objects.
415 */
416 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000417 return connectionStack_.size();
418 }
419
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000420 /**
David Reiss01fe1532010-03-09 05:19:25 +0000421 * Return count of number of connections which are currently processing.
422 * This is defined as a connection where all data has been received and
423 * either assigned a task (when threading) or passed to a handler (when
424 * not threading), and where the handler has not yet returned.
425 *
426 * @return # of connections currently processing.
427 */
428 size_t getNumActiveProcessors() const {
429 return numActiveProcessors_;
430 }
431
432 /// Increment the count of connections currently processing.
433 void incrementActiveProcessors() {
434 ++numActiveProcessors_;
435 }
436
437 /// Decrement the count of connections currently processing.
438 void decrementActiveProcessors() {
439 if (numActiveProcessors_ > 0) {
440 --numActiveProcessors_;
441 }
442 }
443
444 /**
445 * Get the maximum # of connections allowed before overload.
446 *
447 * @return current setting.
448 */
449 size_t getMaxConnections() const {
450 return maxConnections_;
451 }
452
453 /**
454 * Set the maximum # of connections allowed before overload.
455 *
456 * @param maxConnections new setting for maximum # of connections.
457 */
458 void setMaxConnections(size_t maxConnections) {
459 maxConnections_ = maxConnections;
460 }
461
462 /**
463 * Get the maximum # of connections waiting in handler/task before overload.
464 *
465 * @return current setting.
466 */
467 size_t getMaxActiveProcessors() const {
468 return maxActiveProcessors_;
469 }
470
471 /**
472 * Set the maximum # of connections waiting in handler/task before overload.
473 *
474 * @param maxActiveProcessors new setting for maximum # of active processes.
475 */
476 void setMaxActiveProcessors(size_t maxActiveProcessors) {
477 maxActiveProcessors_ = maxActiveProcessors;
478 }
479
480 /**
481 * Get fraction of maximum limits before an overload condition is cleared.
482 *
483 * @return hysteresis fraction
484 */
485 double getOverloadHysteresis() const {
486 return overloadHysteresis_;
487 }
488
489 /**
490 * Set fraction of maximum limits before an overload condition is cleared.
491 * A good value would probably be between 0.5 and 0.9.
492 *
493 * @param hysteresisFraction fraction <= 1.0.
494 */
495 void setOverloadHysteresis(double hysteresisFraction) {
496 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
497 overloadHysteresis_ = hysteresisFraction;
498 }
499 }
500
501 /**
502 * Get the action the server will take on overload.
503 *
504 * @return a TOverloadAction enum value for the currently set action.
505 */
506 TOverloadAction getOverloadAction() const {
507 return overloadAction_;
508 }
509
510 /**
511 * Set the action the server is to take on overload.
512 *
513 * @param overloadAction a TOverloadAction enum value for the action.
514 */
515 void setOverloadAction(TOverloadAction overloadAction) {
516 overloadAction_ = overloadAction;
517 }
518
519 /**
David Reiss068f4162010-03-09 05:19:45 +0000520 * Get the time in milliseconds after which a task expires (0 == infinite).
521 *
522 * @return a 64-bit time in milliseconds.
523 */
524 int64_t getTaskExpireTime() const {
525 return taskExpireTime_;
526 }
527
528 /**
529 * Set the time in milliseconds after which a task expires (0 == infinite).
530 *
531 * @param taskExpireTime a 64-bit time in milliseconds.
532 */
533 void setTaskExpireTime(int64_t taskExpireTime) {
534 taskExpireTime_ = taskExpireTime;
535 }
536
537 /**
David Reiss01fe1532010-03-09 05:19:25 +0000538 * Determine if the server is currently overloaded.
539 * This function checks the maximums for open connections and connections
540 * currently in processing, and sets an overload condition if they are
541 * exceeded. The overload will persist until both values are below the
542 * current hysteresis fraction of their maximums.
543 *
544 * @return true if an overload condition exists, false if not.
545 */
546 bool serverOverloaded();
547
548 /** Pop and discard next task on threadpool wait queue.
549 *
550 * @return true if a task was discarded, false if the wait queue was empty.
551 */
552 bool drainPendingTask();
553
554 /**
David Reiss89a12942010-10-06 17:10:52 +0000555 * Get the starting size of a TConnection object's write buffer.
556 *
557 * @return # bytes we initialize a TConnection object's write buffer to.
558 */
559 size_t getWriteBufferDefaultSize() const {
560 return writeBufferDefaultSize_;
561 }
562
563 /**
564 * Set the starting size of a TConnection object's write buffer.
565 *
566 * @param size # bytes we initialize a TConnection object's write buffer to.
567 */
568 void setWriteBufferDefaultSize(size_t size) {
569 writeBufferDefaultSize_ = size;
570 }
571
572 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000573 * Get the maximum size of read buffer allocated to idle TConnection objects.
574 *
David Reiss89a12942010-10-06 17:10:52 +0000575 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000576 */
577 size_t getIdleReadBufferLimit() const {
578 return idleReadBufferLimit_;
579 }
580
581 /**
582 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
583 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000584 *
David Reiss89a12942010-10-06 17:10:52 +0000585 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000586 */
587 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000588 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000589 }
590
591 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000592 * Set the maximum size read buffer allocated to idle TConnection objects.
593 * If a TConnection object is found (either on connection close or between
594 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000595 * allocated to its read buffer, we free it and allow it to be reinitialized
596 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000597 *
598 * @param limit of bytes beyond which we will shrink buffers when checked.
599 */
600 void setIdleReadBufferLimit(size_t limit) {
601 idleReadBufferLimit_ = limit;
602 }
603
604 /**
605 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
606 * Set the maximum size read buffer allocated to idle TConnection objects.
607 * If a TConnection object is found (either on connection close or between
608 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000609 * allocated to its read buffer, we free it and allow it to be reinitialized
610 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000611 *
612 * @param limit of bytes beyond which we will shrink buffers when checked.
613 */
614 void setIdleBufferMemLimit(size_t limit) {
615 idleReadBufferLimit_ = limit;
616 }
617
618
619
620 /**
621 * Get the maximum size of write buffer allocated to idle TConnection objects.
622 *
623 * @return # bytes beyond which we will reallocate buffers when checked.
624 */
625 size_t getIdleWriteBufferLimit() const {
626 return idleWriteBufferLimit_;
627 }
628
629 /**
630 * Set the maximum size write buffer allocated to idle TConnection objects.
631 * If a TConnection object is found (either on connection close or between
632 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000633 * allocated to its write buffer, we destroy and construct that buffer with
634 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000635 *
636 * @param limit of bytes beyond which we will shrink buffers when idle.
637 */
David Reiss54bec5d2010-10-06 17:10:45 +0000638 void setIdleWriteBufferLimit(size_t limit) {
639 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000640 }
641
David Reiss01fe1532010-03-09 05:19:25 +0000642 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000643 * Get # of calls made between buffer size checks. 0 means disabled.
644 *
645 * @return # of calls between buffer size checks.
646 */
647 int32_t getResizeBufferEveryN() const {
648 return resizeBufferEveryN_;
649 }
650
651 /**
652 * Check buffer sizes every "count" calls. This allows buffer limits
653 * to be enforced for persistant connections with a controllable degree
654 * of overhead. 0 disables checks except at connection close.
655 *
656 * @param count the number of calls between checks, or 0 to disable
657 */
658 void setResizeBufferEveryN(int32_t count) {
659 resizeBufferEveryN_ = count;
660 }
661
662
663
664 /**
David Reiss01fe1532010-03-09 05:19:25 +0000665 * Return an initialized connection object. Creates or recovers from
666 * pool a TConnection and initializes it with the provided socket FD
667 * and flags.
668 *
669 * @param socket FD of socket associated with this connection.
670 * @param flags initial lib_event flags for this connection.
David Reiss105961d2010-10-06 17:10:17 +0000671 * @param addr the sockaddr of the client
672 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000673 * @return pointer to initialized TConnection object.
674 */
David Reiss105961d2010-10-06 17:10:17 +0000675 TConnection* createConnection(int socket, short flags,
676 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000677
David Reiss01fe1532010-03-09 05:19:25 +0000678 /**
679 * Returns a connection to pool or deletion. If the connection pool
680 * (a stack) isn't full, place the connection object on it, otherwise
681 * just delete it.
682 *
683 * @param connection the TConection being returned.
684 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000685 void returnConnection(TConnection* connection);
686
David Reiss01fe1532010-03-09 05:19:25 +0000687 /**
David Reiss068f4162010-03-09 05:19:45 +0000688 * Callback function that the threadmanager calls when a task reaches
689 * its expiration time. It is needed to clean up the expired connection.
690 *
691 * @param task the runnable associated with the expired task.
692 */
693 void expireClose(boost::shared_ptr<Runnable> task);
694
695 /**
David Reiss01fe1532010-03-09 05:19:25 +0000696 * C-callable event handler for listener events. Provides a callback
697 * that libevent can understand which invokes server->handleEvent().
698 *
699 * @param fd the descriptor the event occured on.
700 * @param which the flags associated with the event.
701 * @param v void* callback arg where we placed TNonblockingServer's "this".
702 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000703 static void eventHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000704 ((TNonblockingServer*)v)->handleEvent(fd, which);
705 }
706
David Reiss01fe1532010-03-09 05:19:25 +0000707 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000708 void listenSocket();
709
David Reiss01fe1532010-03-09 05:19:25 +0000710 /**
711 * Takes a socket created by listenSocket() and sets various options on it
712 * to prepare for use in the server.
713 *
714 * @param fd descriptor of socket to be initialized/
715 */
Mark Slee79b16942007-11-26 19:05:29 +0000716 void listenSocket(int fd);
717
David Reiss01fe1532010-03-09 05:19:25 +0000718 /// Create the pipe used to notify I/O process of task completion.
719 void createNotificationPipe();
720
721 /**
722 * Get notification pipe send descriptor.
723 *
724 * @return write fd for pipe.
725 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000726 evutil_socket_t getNotificationSendFD() const {
David Reiss01fe1532010-03-09 05:19:25 +0000727 return notificationPipeFDs_[1];
728 }
729
730 /**
731 * Get notification pipe receive descriptor.
732 *
733 * @return read fd of pipe.
734 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000735 evutil_socket_t getNotificationRecvFD() const {
David Reiss01fe1532010-03-09 05:19:25 +0000736 return notificationPipeFDs_[0];
737 }
738
739 /**
740 * Register the core libevent events onto the proper base.
741 *
742 * @param base pointer to the event base to be initialized.
Roger Meierc1905582011-08-02 23:37:36 +0000743 * @param ownEventBase if true, this server is responsible for
744 * freeing the event base memory.
David Reiss01fe1532010-03-09 05:19:25 +0000745 */
Roger Meierc1905582011-08-02 23:37:36 +0000746 void registerEvents(event_base* base, bool ownEventBase = true);
Mark Slee79b16942007-11-26 19:05:29 +0000747
David Reiss01fe1532010-03-09 05:19:25 +0000748 /**
749 * Main workhorse function, starts up the server listening on a port and
750 * loops over the libevent handler.
751 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000752 void serve();
Bryan Duxbury76c43682011-08-24 21:26:48 +0000753
754 /**
755 * May be called from a separate thread to cause serve() to return.
756 */
757 void stop();
Mark Slee2f6404d2006-10-10 01:37:40 +0000758};
759
T Jake Lucianib5e62212009-01-31 22:36:20 +0000760}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000761
762#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_