blob: a26fcc5eabe9fcbda95b0875c3a4dd23fd5eba64 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000030#include <string>
31#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000032#include <cstdlib>
Bryan Duxbury266b1732011-09-01 16:50:28 +000033#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000034#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000035#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000036#include <event.h>
37
T Jake Lucianib5e62212009-01-31 22:36:20 +000038namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000039
T Jake Lucianib5e62212009-01-31 22:36:20 +000040using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000041using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000042using apache::thrift::protocol::TProtocol;
43using apache::thrift::concurrency::Runnable;
44using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000045
Roger Meier30aae0c2011-07-08 12:23:31 +000046#ifdef LIBEVENT_VERSION_NUMBER
47#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
48#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
49#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
50#else
51// assume latest version 1 series
52#define LIBEVENT_VERSION_MAJOR 1
53#define LIBEVENT_VERSION_MINOR 14
54#define LIBEVENT_VERSION_REL 13
55#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
56#endif
57
58#if LIBEVENT_VERSION_NUMBER < 0x02000000
59 typedef int evutil_socket_t;
60#endif
61
62#ifndef SOCKOPT_CAST_T
63#define SOCKOPT_CAST_T void
64#endif
65
66template<class T>
67inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
68 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
69}
70
71template<class T>
72inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
73 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
74}
75
Mark Slee2f6404d2006-10-10 01:37:40 +000076/**
77 * This is a non-blocking server in C++ for high performance that operates a
78 * single IO thread. It assumes that all incoming requests are framed with a
79 * 4 byte length indicator and writes out responses using the same framing.
80 *
81 * It does not use the TServerTransport framework, but rather has socket
82 * operations hardcoded for use with select.
83 *
Mark Slee2f6404d2006-10-10 01:37:40 +000084 */
David Reiss01fe1532010-03-09 05:19:25 +000085
86
87/// Overload condition actions.
88enum TOverloadAction {
89 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
90 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
91 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
92};
93
Mark Slee2f6404d2006-10-10 01:37:40 +000094class TNonblockingServer : public TServer {
95 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000096 class TConnection;
97
David Reiss01fe1532010-03-09 05:19:25 +000098 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000099 static const int LISTEN_BACKLOG = 1024;
100
David Reiss01fe1532010-03-09 05:19:25 +0000101 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000102 static const size_t CONNECTION_STACK_LIMIT = 1024;
103
David Reiss01fe1532010-03-09 05:19:25 +0000104 /// Default limit on total number of connected sockets
105 static const int MAX_CONNECTIONS = INT_MAX;
106
107 /// Default limit on connections in handler/task processing
108 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
109
David Reiss89a12942010-10-06 17:10:52 +0000110 /// Default size of write buffer
111 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
112
David Reiss54bec5d2010-10-06 17:10:45 +0000113 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
114 static const int IDLE_READ_BUFFER_LIMIT = 1024;
115
116 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
117 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
118
119 /// # of calls before resizing oversized buffers (0 = check only on close)
120 static const int RESIZE_BUFFER_EVERY_N = 512;
121
David Reiss01fe1532010-03-09 05:19:25 +0000122 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000123 int serverSocket_;
124
David Reiss01fe1532010-03-09 05:19:25 +0000125 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 int port_;
127
David Reiss01fe1532010-03-09 05:19:25 +0000128 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000129 boost::shared_ptr<ThreadManager> threadManager_;
130
David Reiss01fe1532010-03-09 05:19:25 +0000131 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000132 bool threadPoolProcessing_;
133
David Reiss01fe1532010-03-09 05:19:25 +0000134 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +0000135 event_base* eventBase_;
Roger Meierc1905582011-08-02 23:37:36 +0000136 bool ownEventBase_;
Mark Slee79b16942007-11-26 19:05:29 +0000137
David Reiss01fe1532010-03-09 05:19:25 +0000138 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +0000139 struct event serverEvent_;
140
David Reiss01fe1532010-03-09 05:19:25 +0000141 /// Event struct, used with eventBase_ for task completion notification
142 struct event notificationEvent_;
143
144 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000145 size_t numTConnections_;
146
David Reiss9e8073c2010-03-09 05:19:39 +0000147 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000148 size_t numActiveProcessors_;
149
150 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000151 size_t connectionStackLimit_;
152
David Reiss01fe1532010-03-09 05:19:25 +0000153 /// Limit for number of connections processing or waiting to process
154 size_t maxActiveProcessors_;
155
156 /// Limit for number of open connections
157 size_t maxConnections_;
158
David Reiss068f4162010-03-09 05:19:45 +0000159 /// Time in milliseconds before an unperformed task expires (0 == infinite).
160 int64_t taskExpireTime_;
161
David Reiss01fe1532010-03-09 05:19:25 +0000162 /**
163 * Hysteresis for overload state. This is the fraction of the overload
164 * value that needs to be reached before the overload state is cleared;
165 * must be <= 1.0.
166 */
167 double overloadHysteresis_;
168
169 /// Action to take when we're overloaded.
170 TOverloadAction overloadAction_;
171
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000172 /**
David Reiss89a12942010-10-06 17:10:52 +0000173 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
174 * and found to be exceeded, reinitialized) to this size.
175 */
176 size_t writeBufferDefaultSize_;
177
178 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000179 * Max read buffer size for an idle TConnection. When we place an idle
180 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000181 * we will free the buffer (such that it will be reinitialized by the next
182 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000183 */
David Reiss54bec5d2010-10-06 17:10:45 +0000184 size_t idleReadBufferLimit_;
185
186 /**
187 * Max write buffer size for an idle connection. When we place an idle
188 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
189 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000190 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
191 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000192 */
193 size_t idleWriteBufferLimit_;
194
195 /**
196 * Every N calls we check the buffer size limits on a connected TConnection.
197 * 0 disables (i.e. the checks are only done when a connection closes).
198 */
199 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000200
201 /// Set if we are currently in an overloaded state.
202 bool overloaded_;
203
204 /// Count of connections dropped since overload started
205 uint32_t nConnectionsDropped_;
206
207 /// Count of connections dropped on overload since server started
208 uint64_t nTotalConnectionsDropped_;
209
210 /// File descriptors for pipe used for task completion notification.
Roger Meier30aae0c2011-07-08 12:23:31 +0000211 evutil_socket_t notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000212
Mark Slee2f6404d2006-10-10 01:37:40 +0000213 /**
214 * This is a stack of all the objects that have been created but that
215 * are NOT currently in use. When we close a connection, we place it on this
216 * stack so that the object can be reused later, rather than freeing the
217 * memory and reallocating a new object later.
218 */
219 std::stack<TConnection*> connectionStack_;
220
David Reiss01fe1532010-03-09 05:19:25 +0000221 /**
222 * Called when server socket had something happen. We accept all waiting
223 * client connections on listen socket fd and assign TConnection objects
224 * to handle those requests.
225 *
226 * @param fd the listen socket.
227 * @param which the event flag that triggered the handler.
228 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000229 void handleEvent(int fd, short which);
230
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000231 void init(int port) {
232 serverSocket_ = -1;
233 port_ = port;
234 threadPoolProcessing_ = false;
235 eventBase_ = NULL;
236 ownEventBase_ = false;
237 numTConnections_ = 0;
238 numActiveProcessors_ = 0;
239 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
240 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
241 maxConnections_ = MAX_CONNECTIONS;
242 taskExpireTime_ = 0;
243 overloadHysteresis_ = 0.8;
244 overloadAction_ = T_OVERLOAD_NO_ACTION;
245 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
246 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
247 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
248 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
249 overloaded_ = false;
250 nConnectionsDropped_ = 0;
251 nTotalConnectionsDropped_ = 0;
252 }
Mark Sleef9373392007-01-24 19:41:57 +0000253
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000254 public:
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000255 template<typename ProcessorFactory>
256 TNonblockingServer(
257 const boost::shared_ptr<ProcessorFactory>& processorFactory,
258 int port,
259 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
260 TServer(processorFactory) {
261 init(port);
262 }
263
264 template<typename Processor>
265 TNonblockingServer(const boost::shared_ptr<Processor>& processor,
266 int port,
267 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000268 TServer(processor) {
269 init(port);
270 }
271
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000272 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000273 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000274 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000275 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
276 int port,
277 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000278 boost::shared_ptr<ThreadManager>(),
279 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
280 TServer(processorFactory) {
281
282 init(port);
283
284 setInputProtocolFactory(protocolFactory);
285 setOutputProtocolFactory(protocolFactory);
286 setThreadManager(threadManager);
287 }
288
289 template<typename Processor>
290 TNonblockingServer(
291 const boost::shared_ptr<Processor>& processor,
292 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
293 int port,
294 const boost::shared_ptr<ThreadManager>& threadManager =
295 boost::shared_ptr<ThreadManager>(),
296 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000297 TServer(processor) {
298
299 init(port);
300
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000301 setInputProtocolFactory(protocolFactory);
302 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000303 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000304 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000305
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000306 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000307 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000308 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000309 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
310 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
311 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
312 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
313 int port,
314 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000315 boost::shared_ptr<ThreadManager>(),
316 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
317 TServer(processorFactory) {
318
319 init(port);
320
321 setInputTransportFactory(inputTransportFactory);
322 setOutputTransportFactory(outputTransportFactory);
323 setInputProtocolFactory(inputProtocolFactory);
324 setOutputProtocolFactory(outputProtocolFactory);
325 setThreadManager(threadManager);
326 }
327
328 template<typename Processor>
329 TNonblockingServer(
330 const boost::shared_ptr<Processor>& processor,
331 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
332 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
333 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
334 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
335 int port,
336 const boost::shared_ptr<ThreadManager>& threadManager =
337 boost::shared_ptr<ThreadManager>(),
338 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000339 TServer(processor) {
340
341 init(port);
342
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000343 setInputTransportFactory(inputTransportFactory);
344 setOutputTransportFactory(outputTransportFactory);
345 setInputProtocolFactory(inputProtocolFactory);
346 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000347 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000348 }
Mark Slee79b16942007-11-26 19:05:29 +0000349
David Reiss8ede8182010-09-02 15:26:28 +0000350 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000351
David Reiss068f4162010-03-09 05:19:45 +0000352 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000353
David Reiss1997f102008-04-29 00:29:41 +0000354 boost::shared_ptr<ThreadManager> getThreadManager() {
355 return threadManager_;
356 }
357
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000358 /**
359 * Get the maximum number of unused TConnection we will hold in reserve.
360 *
361 * @return the current limit on TConnection pool size.
362 */
David Reiss260fa932009-04-02 23:51:39 +0000363 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000364 return connectionStackLimit_;
365 }
366
367 /**
368 * Set the maximum number of unused TConnection we will hold in reserve.
369 *
370 * @param sz the new limit for TConnection pool size.
371 */
David Reiss260fa932009-04-02 23:51:39 +0000372 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000373 connectionStackLimit_ = sz;
374 }
375
Mark Slee79b16942007-11-26 19:05:29 +0000376 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000377 return threadPoolProcessing_;
378 }
379
380 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000381 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000382 }
383
Mark Slee79b16942007-11-26 19:05:29 +0000384 event_base* getEventBase() const {
385 return eventBase_;
386 }
387
David Reiss01fe1532010-03-09 05:19:25 +0000388 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000389 void incrementNumConnections() {
390 ++numTConnections_;
391 }
392
David Reiss01fe1532010-03-09 05:19:25 +0000393 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000394 void decrementNumConnections() {
395 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000396 }
397
David Reiss01fe1532010-03-09 05:19:25 +0000398 /**
399 * Return the count of sockets currently connected to.
400 *
401 * @return count of connected sockets.
402 */
403 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000404 return numTConnections_;
405 }
406
David Reiss01fe1532010-03-09 05:19:25 +0000407 /**
408 * Return the count of connection objects allocated but not in use.
409 *
410 * @return count of idle connection objects.
411 */
412 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000413 return connectionStack_.size();
414 }
415
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000416 /**
David Reiss01fe1532010-03-09 05:19:25 +0000417 * Return count of number of connections which are currently processing.
418 * This is defined as a connection where all data has been received and
419 * either assigned a task (when threading) or passed to a handler (when
420 * not threading), and where the handler has not yet returned.
421 *
422 * @return # of connections currently processing.
423 */
424 size_t getNumActiveProcessors() const {
425 return numActiveProcessors_;
426 }
427
428 /// Increment the count of connections currently processing.
429 void incrementActiveProcessors() {
430 ++numActiveProcessors_;
431 }
432
433 /// Decrement the count of connections currently processing.
434 void decrementActiveProcessors() {
435 if (numActiveProcessors_ > 0) {
436 --numActiveProcessors_;
437 }
438 }
439
440 /**
441 * Get the maximum # of connections allowed before overload.
442 *
443 * @return current setting.
444 */
445 size_t getMaxConnections() const {
446 return maxConnections_;
447 }
448
449 /**
450 * Set the maximum # of connections allowed before overload.
451 *
452 * @param maxConnections new setting for maximum # of connections.
453 */
454 void setMaxConnections(size_t maxConnections) {
455 maxConnections_ = maxConnections;
456 }
457
458 /**
459 * Get the maximum # of connections waiting in handler/task before overload.
460 *
461 * @return current setting.
462 */
463 size_t getMaxActiveProcessors() const {
464 return maxActiveProcessors_;
465 }
466
467 /**
468 * Set the maximum # of connections waiting in handler/task before overload.
469 *
470 * @param maxActiveProcessors new setting for maximum # of active processes.
471 */
472 void setMaxActiveProcessors(size_t maxActiveProcessors) {
473 maxActiveProcessors_ = maxActiveProcessors;
474 }
475
476 /**
477 * Get fraction of maximum limits before an overload condition is cleared.
478 *
479 * @return hysteresis fraction
480 */
481 double getOverloadHysteresis() const {
482 return overloadHysteresis_;
483 }
484
485 /**
486 * Set fraction of maximum limits before an overload condition is cleared.
487 * A good value would probably be between 0.5 and 0.9.
488 *
489 * @param hysteresisFraction fraction <= 1.0.
490 */
491 void setOverloadHysteresis(double hysteresisFraction) {
492 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
493 overloadHysteresis_ = hysteresisFraction;
494 }
495 }
496
497 /**
498 * Get the action the server will take on overload.
499 *
500 * @return a TOverloadAction enum value for the currently set action.
501 */
502 TOverloadAction getOverloadAction() const {
503 return overloadAction_;
504 }
505
506 /**
507 * Set the action the server is to take on overload.
508 *
509 * @param overloadAction a TOverloadAction enum value for the action.
510 */
511 void setOverloadAction(TOverloadAction overloadAction) {
512 overloadAction_ = overloadAction;
513 }
514
515 /**
David Reiss068f4162010-03-09 05:19:45 +0000516 * Get the time in milliseconds after which a task expires (0 == infinite).
517 *
518 * @return a 64-bit time in milliseconds.
519 */
520 int64_t getTaskExpireTime() const {
521 return taskExpireTime_;
522 }
523
524 /**
525 * Set the time in milliseconds after which a task expires (0 == infinite).
526 *
527 * @param taskExpireTime a 64-bit time in milliseconds.
528 */
529 void setTaskExpireTime(int64_t taskExpireTime) {
530 taskExpireTime_ = taskExpireTime;
531 }
532
533 /**
David Reiss01fe1532010-03-09 05:19:25 +0000534 * Determine if the server is currently overloaded.
535 * This function checks the maximums for open connections and connections
536 * currently in processing, and sets an overload condition if they are
537 * exceeded. The overload will persist until both values are below the
538 * current hysteresis fraction of their maximums.
539 *
540 * @return true if an overload condition exists, false if not.
541 */
542 bool serverOverloaded();
543
544 /** Pop and discard next task on threadpool wait queue.
545 *
546 * @return true if a task was discarded, false if the wait queue was empty.
547 */
548 bool drainPendingTask();
549
550 /**
David Reiss89a12942010-10-06 17:10:52 +0000551 * Get the starting size of a TConnection object's write buffer.
552 *
553 * @return # bytes we initialize a TConnection object's write buffer to.
554 */
555 size_t getWriteBufferDefaultSize() const {
556 return writeBufferDefaultSize_;
557 }
558
559 /**
560 * Set the starting size of a TConnection object's write buffer.
561 *
562 * @param size # bytes we initialize a TConnection object's write buffer to.
563 */
564 void setWriteBufferDefaultSize(size_t size) {
565 writeBufferDefaultSize_ = size;
566 }
567
568 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000569 * Get the maximum size of read buffer allocated to idle TConnection objects.
570 *
David Reiss89a12942010-10-06 17:10:52 +0000571 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000572 */
573 size_t getIdleReadBufferLimit() const {
574 return idleReadBufferLimit_;
575 }
576
577 /**
578 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
579 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000580 *
David Reiss89a12942010-10-06 17:10:52 +0000581 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000582 */
583 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000584 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000585 }
586
587 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000588 * Set the maximum size read buffer allocated to idle TConnection objects.
589 * If a TConnection object is found (either on connection close or between
590 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000591 * allocated to its read buffer, we free it and allow it to be reinitialized
592 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000593 *
594 * @param limit of bytes beyond which we will shrink buffers when checked.
595 */
596 void setIdleReadBufferLimit(size_t limit) {
597 idleReadBufferLimit_ = limit;
598 }
599
600 /**
601 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
602 * Set the maximum size read buffer allocated to idle TConnection objects.
603 * If a TConnection object is found (either on connection close or between
604 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000605 * allocated to its read buffer, we free it and allow it to be reinitialized
606 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000607 *
608 * @param limit of bytes beyond which we will shrink buffers when checked.
609 */
610 void setIdleBufferMemLimit(size_t limit) {
611 idleReadBufferLimit_ = limit;
612 }
613
614
615
616 /**
617 * Get the maximum size of write buffer allocated to idle TConnection objects.
618 *
619 * @return # bytes beyond which we will reallocate buffers when checked.
620 */
621 size_t getIdleWriteBufferLimit() const {
622 return idleWriteBufferLimit_;
623 }
624
625 /**
626 * Set the maximum size write buffer allocated to idle TConnection objects.
627 * If a TConnection object is found (either on connection close or between
628 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000629 * allocated to its write buffer, we destroy and construct that buffer with
630 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000631 *
632 * @param limit of bytes beyond which we will shrink buffers when idle.
633 */
David Reiss54bec5d2010-10-06 17:10:45 +0000634 void setIdleWriteBufferLimit(size_t limit) {
635 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000636 }
637
David Reiss01fe1532010-03-09 05:19:25 +0000638 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000639 * Get # of calls made between buffer size checks. 0 means disabled.
640 *
641 * @return # of calls between buffer size checks.
642 */
643 int32_t getResizeBufferEveryN() const {
644 return resizeBufferEveryN_;
645 }
646
647 /**
648 * Check buffer sizes every "count" calls. This allows buffer limits
649 * to be enforced for persistant connections with a controllable degree
650 * of overhead. 0 disables checks except at connection close.
651 *
652 * @param count the number of calls between checks, or 0 to disable
653 */
654 void setResizeBufferEveryN(int32_t count) {
655 resizeBufferEveryN_ = count;
656 }
657
658
659
660 /**
David Reiss01fe1532010-03-09 05:19:25 +0000661 * Return an initialized connection object. Creates or recovers from
662 * pool a TConnection and initializes it with the provided socket FD
663 * and flags.
664 *
665 * @param socket FD of socket associated with this connection.
666 * @param flags initial lib_event flags for this connection.
David Reiss105961d2010-10-06 17:10:17 +0000667 * @param addr the sockaddr of the client
668 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000669 * @return pointer to initialized TConnection object.
670 */
David Reiss105961d2010-10-06 17:10:17 +0000671 TConnection* createConnection(int socket, short flags,
672 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000673
David Reiss01fe1532010-03-09 05:19:25 +0000674 /**
675 * Returns a connection to pool or deletion. If the connection pool
676 * (a stack) isn't full, place the connection object on it, otherwise
677 * just delete it.
678 *
679 * @param connection the TConection being returned.
680 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000681 void returnConnection(TConnection* connection);
682
David Reiss01fe1532010-03-09 05:19:25 +0000683 /**
David Reiss068f4162010-03-09 05:19:45 +0000684 * Callback function that the threadmanager calls when a task reaches
685 * its expiration time. It is needed to clean up the expired connection.
686 *
687 * @param task the runnable associated with the expired task.
688 */
689 void expireClose(boost::shared_ptr<Runnable> task);
690
691 /**
David Reiss01fe1532010-03-09 05:19:25 +0000692 * C-callable event handler for listener events. Provides a callback
693 * that libevent can understand which invokes server->handleEvent().
694 *
695 * @param fd the descriptor the event occured on.
696 * @param which the flags associated with the event.
697 * @param v void* callback arg where we placed TNonblockingServer's "this".
698 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000699 static void eventHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000700 ((TNonblockingServer*)v)->handleEvent(fd, which);
701 }
702
David Reiss01fe1532010-03-09 05:19:25 +0000703 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000704 void listenSocket();
705
David Reiss01fe1532010-03-09 05:19:25 +0000706 /**
707 * Takes a socket created by listenSocket() and sets various options on it
708 * to prepare for use in the server.
709 *
710 * @param fd descriptor of socket to be initialized/
711 */
Mark Slee79b16942007-11-26 19:05:29 +0000712 void listenSocket(int fd);
713
David Reiss01fe1532010-03-09 05:19:25 +0000714 /// Create the pipe used to notify I/O process of task completion.
715 void createNotificationPipe();
716
717 /**
718 * Get notification pipe send descriptor.
719 *
720 * @return write fd for pipe.
721 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000722 evutil_socket_t getNotificationSendFD() const {
David Reiss01fe1532010-03-09 05:19:25 +0000723 return notificationPipeFDs_[1];
724 }
725
726 /**
727 * Get notification pipe receive descriptor.
728 *
729 * @return read fd of pipe.
730 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000731 evutil_socket_t getNotificationRecvFD() const {
David Reiss01fe1532010-03-09 05:19:25 +0000732 return notificationPipeFDs_[0];
733 }
734
735 /**
736 * Register the core libevent events onto the proper base.
737 *
738 * @param base pointer to the event base to be initialized.
Roger Meierc1905582011-08-02 23:37:36 +0000739 * @param ownEventBase if true, this server is responsible for
740 * freeing the event base memory.
David Reiss01fe1532010-03-09 05:19:25 +0000741 */
Roger Meierc1905582011-08-02 23:37:36 +0000742 void registerEvents(event_base* base, bool ownEventBase = true);
Mark Slee79b16942007-11-26 19:05:29 +0000743
David Reiss01fe1532010-03-09 05:19:25 +0000744 /**
745 * Main workhorse function, starts up the server listening on a port and
746 * loops over the libevent handler.
747 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000748 void serve();
Bryan Duxbury76c43682011-08-24 21:26:48 +0000749
750 /**
751 * May be called from a separate thread to cause serve() to return.
752 */
753 void stop();
Mark Slee2f6404d2006-10-10 01:37:40 +0000754};
755
T Jake Lucianib5e62212009-01-31 22:36:20 +0000756}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000757
758#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_