blob: 5cda2c53e634b2d03615bc3ea13b90bd6d7a1552 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000030#include <string>
31#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000032#include <cstdlib>
Bryan Duxbury266b1732011-09-01 16:50:28 +000033#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000034#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000035#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000036#include <event.h>
37
T Jake Lucianib5e62212009-01-31 22:36:20 +000038namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000039
T Jake Lucianib5e62212009-01-31 22:36:20 +000040using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000041using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000042using apache::thrift::protocol::TProtocol;
43using apache::thrift::concurrency::Runnable;
44using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000045
Roger Meier30aae0c2011-07-08 12:23:31 +000046#ifdef LIBEVENT_VERSION_NUMBER
47#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
48#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
49#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
50#else
51// assume latest version 1 series
52#define LIBEVENT_VERSION_MAJOR 1
53#define LIBEVENT_VERSION_MINOR 14
54#define LIBEVENT_VERSION_REL 13
55#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
56#endif
57
58#if LIBEVENT_VERSION_NUMBER < 0x02000000
59 typedef int evutil_socket_t;
60#endif
61
62#ifndef SOCKOPT_CAST_T
63#define SOCKOPT_CAST_T void
64#endif
65
66template<class T>
67inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
68 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
69}
70
71template<class T>
72inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
73 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
74}
75
Mark Slee2f6404d2006-10-10 01:37:40 +000076/**
77 * This is a non-blocking server in C++ for high performance that operates a
78 * single IO thread. It assumes that all incoming requests are framed with a
79 * 4 byte length indicator and writes out responses using the same framing.
80 *
81 * It does not use the TServerTransport framework, but rather has socket
82 * operations hardcoded for use with select.
83 *
Mark Slee2f6404d2006-10-10 01:37:40 +000084 */
David Reiss01fe1532010-03-09 05:19:25 +000085
86
87/// Overload condition actions.
88enum TOverloadAction {
89 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
90 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
91 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
92};
93
Mark Slee2f6404d2006-10-10 01:37:40 +000094class TNonblockingServer : public TServer {
95 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000096 class TConnection;
97
David Reiss01fe1532010-03-09 05:19:25 +000098 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000099 static const int LISTEN_BACKLOG = 1024;
100
David Reiss01fe1532010-03-09 05:19:25 +0000101 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000102 static const size_t CONNECTION_STACK_LIMIT = 1024;
103
David Reiss01fe1532010-03-09 05:19:25 +0000104 /// Default limit on total number of connected sockets
105 static const int MAX_CONNECTIONS = INT_MAX;
106
107 /// Default limit on connections in handler/task processing
108 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
109
David Reiss89a12942010-10-06 17:10:52 +0000110 /// Default size of write buffer
111 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
112
David Reiss54bec5d2010-10-06 17:10:45 +0000113 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
114 static const int IDLE_READ_BUFFER_LIMIT = 1024;
115
116 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
117 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
118
119 /// # of calls before resizing oversized buffers (0 = check only on close)
120 static const int RESIZE_BUFFER_EVERY_N = 512;
121
David Reiss01fe1532010-03-09 05:19:25 +0000122 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000123 int serverSocket_;
124
David Reiss01fe1532010-03-09 05:19:25 +0000125 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 int port_;
127
David Reiss01fe1532010-03-09 05:19:25 +0000128 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000129 boost::shared_ptr<ThreadManager> threadManager_;
130
David Reiss01fe1532010-03-09 05:19:25 +0000131 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000132 bool threadPoolProcessing_;
133
David Reiss01fe1532010-03-09 05:19:25 +0000134 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +0000135 event_base* eventBase_;
Roger Meierc1905582011-08-02 23:37:36 +0000136 bool ownEventBase_;
Mark Slee79b16942007-11-26 19:05:29 +0000137
David Reiss01fe1532010-03-09 05:19:25 +0000138 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +0000139 struct event serverEvent_;
140
David Reiss01fe1532010-03-09 05:19:25 +0000141 /// Event struct, used with eventBase_ for task completion notification
142 struct event notificationEvent_;
143
144 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000145 size_t numTConnections_;
146
David Reiss9e8073c2010-03-09 05:19:39 +0000147 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000148 size_t numActiveProcessors_;
149
150 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000151 size_t connectionStackLimit_;
152
David Reiss01fe1532010-03-09 05:19:25 +0000153 /// Limit for number of connections processing or waiting to process
154 size_t maxActiveProcessors_;
155
156 /// Limit for number of open connections
157 size_t maxConnections_;
158
David Reiss068f4162010-03-09 05:19:45 +0000159 /// Time in milliseconds before an unperformed task expires (0 == infinite).
160 int64_t taskExpireTime_;
161
David Reiss01fe1532010-03-09 05:19:25 +0000162 /**
163 * Hysteresis for overload state. This is the fraction of the overload
164 * value that needs to be reached before the overload state is cleared;
165 * must be <= 1.0.
166 */
167 double overloadHysteresis_;
168
169 /// Action to take when we're overloaded.
170 TOverloadAction overloadAction_;
171
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000172 /**
David Reiss89a12942010-10-06 17:10:52 +0000173 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
174 * and found to be exceeded, reinitialized) to this size.
175 */
176 size_t writeBufferDefaultSize_;
177
178 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000179 * Max read buffer size for an idle TConnection. When we place an idle
180 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000181 * we will free the buffer (such that it will be reinitialized by the next
182 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000183 */
David Reiss54bec5d2010-10-06 17:10:45 +0000184 size_t idleReadBufferLimit_;
185
186 /**
187 * Max write buffer size for an idle connection. When we place an idle
188 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
189 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000190 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
191 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000192 */
193 size_t idleWriteBufferLimit_;
194
195 /**
196 * Every N calls we check the buffer size limits on a connected TConnection.
197 * 0 disables (i.e. the checks are only done when a connection closes).
198 */
199 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000200
201 /// Set if we are currently in an overloaded state.
202 bool overloaded_;
203
204 /// Count of connections dropped since overload started
205 uint32_t nConnectionsDropped_;
206
207 /// Count of connections dropped on overload since server started
208 uint64_t nTotalConnectionsDropped_;
209
210 /// File descriptors for pipe used for task completion notification.
Roger Meier30aae0c2011-07-08 12:23:31 +0000211 evutil_socket_t notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000212
Mark Slee2f6404d2006-10-10 01:37:40 +0000213 /**
214 * This is a stack of all the objects that have been created but that
215 * are NOT currently in use. When we close a connection, we place it on this
216 * stack so that the object can be reused later, rather than freeing the
217 * memory and reallocating a new object later.
218 */
219 std::stack<TConnection*> connectionStack_;
220
David Reiss01fe1532010-03-09 05:19:25 +0000221 /**
222 * Called when server socket had something happen. We accept all waiting
223 * client connections on listen socket fd and assign TConnection objects
224 * to handle those requests.
225 *
226 * @param fd the listen socket.
227 * @param which the event flag that triggered the handler.
228 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000229 void handleEvent(int fd, short which);
230
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000231 void init(int port) {
232 serverSocket_ = -1;
233 port_ = port;
234 threadPoolProcessing_ = false;
235 eventBase_ = NULL;
236 ownEventBase_ = false;
237 numTConnections_ = 0;
238 numActiveProcessors_ = 0;
239 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
240 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
241 maxConnections_ = MAX_CONNECTIONS;
242 taskExpireTime_ = 0;
243 overloadHysteresis_ = 0.8;
244 overloadAction_ = T_OVERLOAD_NO_ACTION;
245 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
246 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
247 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
248 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
249 overloaded_ = false;
250 nConnectionsDropped_ = 0;
251 nTotalConnectionsDropped_ = 0;
252 }
Mark Sleef9373392007-01-24 19:41:57 +0000253
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000254 public:
255 TNonblockingServer(const boost::shared_ptr<TProcessor>& processor,
256 int port) :
257 TServer(processor) {
258 init(port);
259 }
260
261 TNonblockingServer(
262 const boost::shared_ptr<TProcessor>& processor,
263 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
264 int port,
265 const boost::shared_ptr<ThreadManager>& threadManager =
266 boost::shared_ptr<ThreadManager>()) :
267 TServer(processor) {
268
269 init(port);
270
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000271 setInputProtocolFactory(protocolFactory);
272 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000273 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000274 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000275
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000276 TNonblockingServer(
277 const boost::shared_ptr<TProcessor>& processor,
278 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
279 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
280 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
281 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
282 int port,
283 const boost::shared_ptr<ThreadManager>& threadManager =
284 boost::shared_ptr<ThreadManager>()) :
285 TServer(processor) {
286
287 init(port);
288
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000289 setInputTransportFactory(inputTransportFactory);
290 setOutputTransportFactory(outputTransportFactory);
291 setInputProtocolFactory(inputProtocolFactory);
292 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000293 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000294 }
Mark Slee79b16942007-11-26 19:05:29 +0000295
David Reiss8ede8182010-09-02 15:26:28 +0000296 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000297
David Reiss068f4162010-03-09 05:19:45 +0000298 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000299
David Reiss1997f102008-04-29 00:29:41 +0000300 boost::shared_ptr<ThreadManager> getThreadManager() {
301 return threadManager_;
302 }
303
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000304 /**
305 * Get the maximum number of unused TConnection we will hold in reserve.
306 *
307 * @return the current limit on TConnection pool size.
308 */
David Reiss260fa932009-04-02 23:51:39 +0000309 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000310 return connectionStackLimit_;
311 }
312
313 /**
314 * Set the maximum number of unused TConnection we will hold in reserve.
315 *
316 * @param sz the new limit for TConnection pool size.
317 */
David Reiss260fa932009-04-02 23:51:39 +0000318 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000319 connectionStackLimit_ = sz;
320 }
321
Mark Slee79b16942007-11-26 19:05:29 +0000322 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000323 return threadPoolProcessing_;
324 }
325
326 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000327 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000328 }
329
Mark Slee79b16942007-11-26 19:05:29 +0000330 event_base* getEventBase() const {
331 return eventBase_;
332 }
333
David Reiss01fe1532010-03-09 05:19:25 +0000334 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000335 void incrementNumConnections() {
336 ++numTConnections_;
337 }
338
David Reiss01fe1532010-03-09 05:19:25 +0000339 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000340 void decrementNumConnections() {
341 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000342 }
343
David Reiss01fe1532010-03-09 05:19:25 +0000344 /**
345 * Return the count of sockets currently connected to.
346 *
347 * @return count of connected sockets.
348 */
349 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000350 return numTConnections_;
351 }
352
David Reiss01fe1532010-03-09 05:19:25 +0000353 /**
354 * Return the count of connection objects allocated but not in use.
355 *
356 * @return count of idle connection objects.
357 */
358 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000359 return connectionStack_.size();
360 }
361
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000362 /**
David Reiss01fe1532010-03-09 05:19:25 +0000363 * Return count of number of connections which are currently processing.
364 * This is defined as a connection where all data has been received and
365 * either assigned a task (when threading) or passed to a handler (when
366 * not threading), and where the handler has not yet returned.
367 *
368 * @return # of connections currently processing.
369 */
370 size_t getNumActiveProcessors() const {
371 return numActiveProcessors_;
372 }
373
374 /// Increment the count of connections currently processing.
375 void incrementActiveProcessors() {
376 ++numActiveProcessors_;
377 }
378
379 /// Decrement the count of connections currently processing.
380 void decrementActiveProcessors() {
381 if (numActiveProcessors_ > 0) {
382 --numActiveProcessors_;
383 }
384 }
385
386 /**
387 * Get the maximum # of connections allowed before overload.
388 *
389 * @return current setting.
390 */
391 size_t getMaxConnections() const {
392 return maxConnections_;
393 }
394
395 /**
396 * Set the maximum # of connections allowed before overload.
397 *
398 * @param maxConnections new setting for maximum # of connections.
399 */
400 void setMaxConnections(size_t maxConnections) {
401 maxConnections_ = maxConnections;
402 }
403
404 /**
405 * Get the maximum # of connections waiting in handler/task before overload.
406 *
407 * @return current setting.
408 */
409 size_t getMaxActiveProcessors() const {
410 return maxActiveProcessors_;
411 }
412
413 /**
414 * Set the maximum # of connections waiting in handler/task before overload.
415 *
416 * @param maxActiveProcessors new setting for maximum # of active processes.
417 */
418 void setMaxActiveProcessors(size_t maxActiveProcessors) {
419 maxActiveProcessors_ = maxActiveProcessors;
420 }
421
422 /**
423 * Get fraction of maximum limits before an overload condition is cleared.
424 *
425 * @return hysteresis fraction
426 */
427 double getOverloadHysteresis() const {
428 return overloadHysteresis_;
429 }
430
431 /**
432 * Set fraction of maximum limits before an overload condition is cleared.
433 * A good value would probably be between 0.5 and 0.9.
434 *
435 * @param hysteresisFraction fraction <= 1.0.
436 */
437 void setOverloadHysteresis(double hysteresisFraction) {
438 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
439 overloadHysteresis_ = hysteresisFraction;
440 }
441 }
442
443 /**
444 * Get the action the server will take on overload.
445 *
446 * @return a TOverloadAction enum value for the currently set action.
447 */
448 TOverloadAction getOverloadAction() const {
449 return overloadAction_;
450 }
451
452 /**
453 * Set the action the server is to take on overload.
454 *
455 * @param overloadAction a TOverloadAction enum value for the action.
456 */
457 void setOverloadAction(TOverloadAction overloadAction) {
458 overloadAction_ = overloadAction;
459 }
460
461 /**
David Reiss068f4162010-03-09 05:19:45 +0000462 * Get the time in milliseconds after which a task expires (0 == infinite).
463 *
464 * @return a 64-bit time in milliseconds.
465 */
466 int64_t getTaskExpireTime() const {
467 return taskExpireTime_;
468 }
469
470 /**
471 * Set the time in milliseconds after which a task expires (0 == infinite).
472 *
473 * @param taskExpireTime a 64-bit time in milliseconds.
474 */
475 void setTaskExpireTime(int64_t taskExpireTime) {
476 taskExpireTime_ = taskExpireTime;
477 }
478
479 /**
David Reiss01fe1532010-03-09 05:19:25 +0000480 * Determine if the server is currently overloaded.
481 * This function checks the maximums for open connections and connections
482 * currently in processing, and sets an overload condition if they are
483 * exceeded. The overload will persist until both values are below the
484 * current hysteresis fraction of their maximums.
485 *
486 * @return true if an overload condition exists, false if not.
487 */
488 bool serverOverloaded();
489
490 /** Pop and discard next task on threadpool wait queue.
491 *
492 * @return true if a task was discarded, false if the wait queue was empty.
493 */
494 bool drainPendingTask();
495
496 /**
David Reiss89a12942010-10-06 17:10:52 +0000497 * Get the starting size of a TConnection object's write buffer.
498 *
499 * @return # bytes we initialize a TConnection object's write buffer to.
500 */
501 size_t getWriteBufferDefaultSize() const {
502 return writeBufferDefaultSize_;
503 }
504
505 /**
506 * Set the starting size of a TConnection object's write buffer.
507 *
508 * @param size # bytes we initialize a TConnection object's write buffer to.
509 */
510 void setWriteBufferDefaultSize(size_t size) {
511 writeBufferDefaultSize_ = size;
512 }
513
514 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000515 * Get the maximum size of read buffer allocated to idle TConnection objects.
516 *
David Reiss89a12942010-10-06 17:10:52 +0000517 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000518 */
519 size_t getIdleReadBufferLimit() const {
520 return idleReadBufferLimit_;
521 }
522
523 /**
524 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
525 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000526 *
David Reiss89a12942010-10-06 17:10:52 +0000527 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000528 */
529 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000530 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000531 }
532
533 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000534 * Set the maximum size read buffer allocated to idle TConnection objects.
535 * If a TConnection object is found (either on connection close or between
536 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000537 * allocated to its read buffer, we free it and allow it to be reinitialized
538 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000539 *
540 * @param limit of bytes beyond which we will shrink buffers when checked.
541 */
542 void setIdleReadBufferLimit(size_t limit) {
543 idleReadBufferLimit_ = limit;
544 }
545
546 /**
547 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
548 * Set the maximum size read buffer allocated to idle TConnection objects.
549 * If a TConnection object is found (either on connection close or between
550 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000551 * allocated to its read buffer, we free it and allow it to be reinitialized
552 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000553 *
554 * @param limit of bytes beyond which we will shrink buffers when checked.
555 */
556 void setIdleBufferMemLimit(size_t limit) {
557 idleReadBufferLimit_ = limit;
558 }
559
560
561
562 /**
563 * Get the maximum size of write buffer allocated to idle TConnection objects.
564 *
565 * @return # bytes beyond which we will reallocate buffers when checked.
566 */
567 size_t getIdleWriteBufferLimit() const {
568 return idleWriteBufferLimit_;
569 }
570
571 /**
572 * Set the maximum size write buffer allocated to idle TConnection objects.
573 * If a TConnection object is found (either on connection close or between
574 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000575 * allocated to its write buffer, we destroy and construct that buffer with
576 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000577 *
578 * @param limit of bytes beyond which we will shrink buffers when idle.
579 */
David Reiss54bec5d2010-10-06 17:10:45 +0000580 void setIdleWriteBufferLimit(size_t limit) {
581 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000582 }
583
David Reiss01fe1532010-03-09 05:19:25 +0000584 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000585 * Get # of calls made between buffer size checks. 0 means disabled.
586 *
587 * @return # of calls between buffer size checks.
588 */
589 int32_t getResizeBufferEveryN() const {
590 return resizeBufferEveryN_;
591 }
592
593 /**
594 * Check buffer sizes every "count" calls. This allows buffer limits
595 * to be enforced for persistant connections with a controllable degree
596 * of overhead. 0 disables checks except at connection close.
597 *
598 * @param count the number of calls between checks, or 0 to disable
599 */
600 void setResizeBufferEveryN(int32_t count) {
601 resizeBufferEveryN_ = count;
602 }
603
604
605
606 /**
David Reiss01fe1532010-03-09 05:19:25 +0000607 * Return an initialized connection object. Creates or recovers from
608 * pool a TConnection and initializes it with the provided socket FD
609 * and flags.
610 *
611 * @param socket FD of socket associated with this connection.
612 * @param flags initial lib_event flags for this connection.
David Reiss105961d2010-10-06 17:10:17 +0000613 * @param addr the sockaddr of the client
614 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000615 * @return pointer to initialized TConnection object.
616 */
David Reiss105961d2010-10-06 17:10:17 +0000617 TConnection* createConnection(int socket, short flags,
618 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000619
David Reiss01fe1532010-03-09 05:19:25 +0000620 /**
621 * Returns a connection to pool or deletion. If the connection pool
622 * (a stack) isn't full, place the connection object on it, otherwise
623 * just delete it.
624 *
625 * @param connection the TConection being returned.
626 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000627 void returnConnection(TConnection* connection);
628
David Reiss01fe1532010-03-09 05:19:25 +0000629 /**
David Reiss068f4162010-03-09 05:19:45 +0000630 * Callback function that the threadmanager calls when a task reaches
631 * its expiration time. It is needed to clean up the expired connection.
632 *
633 * @param task the runnable associated with the expired task.
634 */
635 void expireClose(boost::shared_ptr<Runnable> task);
636
637 /**
David Reiss01fe1532010-03-09 05:19:25 +0000638 * C-callable event handler for listener events. Provides a callback
639 * that libevent can understand which invokes server->handleEvent().
640 *
641 * @param fd the descriptor the event occured on.
642 * @param which the flags associated with the event.
643 * @param v void* callback arg where we placed TNonblockingServer's "this".
644 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000645 static void eventHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000646 ((TNonblockingServer*)v)->handleEvent(fd, which);
647 }
648
David Reiss01fe1532010-03-09 05:19:25 +0000649 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000650 void listenSocket();
651
David Reiss01fe1532010-03-09 05:19:25 +0000652 /**
653 * Takes a socket created by listenSocket() and sets various options on it
654 * to prepare for use in the server.
655 *
656 * @param fd descriptor of socket to be initialized/
657 */
Mark Slee79b16942007-11-26 19:05:29 +0000658 void listenSocket(int fd);
659
David Reiss01fe1532010-03-09 05:19:25 +0000660 /// Create the pipe used to notify I/O process of task completion.
661 void createNotificationPipe();
662
663 /**
664 * Get notification pipe send descriptor.
665 *
666 * @return write fd for pipe.
667 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000668 evutil_socket_t getNotificationSendFD() const {
David Reiss01fe1532010-03-09 05:19:25 +0000669 return notificationPipeFDs_[1];
670 }
671
672 /**
673 * Get notification pipe receive descriptor.
674 *
675 * @return read fd of pipe.
676 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000677 evutil_socket_t getNotificationRecvFD() const {
David Reiss01fe1532010-03-09 05:19:25 +0000678 return notificationPipeFDs_[0];
679 }
680
681 /**
682 * Register the core libevent events onto the proper base.
683 *
684 * @param base pointer to the event base to be initialized.
Roger Meierc1905582011-08-02 23:37:36 +0000685 * @param ownEventBase if true, this server is responsible for
686 * freeing the event base memory.
David Reiss01fe1532010-03-09 05:19:25 +0000687 */
Roger Meierc1905582011-08-02 23:37:36 +0000688 void registerEvents(event_base* base, bool ownEventBase = true);
Mark Slee79b16942007-11-26 19:05:29 +0000689
David Reiss01fe1532010-03-09 05:19:25 +0000690 /**
691 * Main workhorse function, starts up the server listening on a port and
692 * loops over the libevent handler.
693 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000694 void serve();
Bryan Duxbury76c43682011-08-24 21:26:48 +0000695
696 /**
697 * May be called from a separate thread to cause serve() to return.
698 */
699 void stop();
Mark Slee2f6404d2006-10-10 01:37:40 +0000700};
701
T Jake Lucianib5e62212009-01-31 22:36:20 +0000702}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000703
704#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_