blob: e4e0e64882b5d89c093dfd0b9a8df668bd7e6a6d [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000030#include <string>
31#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000032#include <cstdlib>
David Reiss5105b2e2009-05-21 02:28:27 +000033#include <unistd.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000034#include <event.h>
35
T Jake Lucianib5e62212009-01-31 22:36:20 +000036namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000039using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000040using apache::thrift::protocol::TProtocol;
41using apache::thrift::concurrency::Runnable;
42using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000043
Roger Meier30aae0c2011-07-08 12:23:31 +000044#ifdef LIBEVENT_VERSION_NUMBER
45#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
46#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
47#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
48#else
49// assume latest version 1 series
50#define LIBEVENT_VERSION_MAJOR 1
51#define LIBEVENT_VERSION_MINOR 14
52#define LIBEVENT_VERSION_REL 13
53#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
54#endif
55
56#if LIBEVENT_VERSION_NUMBER < 0x02000000
57 typedef int evutil_socket_t;
58#endif
59
60#ifndef SOCKOPT_CAST_T
61#define SOCKOPT_CAST_T void
62#endif
63
64template<class T>
65inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
66 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
67}
68
69template<class T>
70inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
71 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
72}
73
Mark Slee2f6404d2006-10-10 01:37:40 +000074/**
75 * This is a non-blocking server in C++ for high performance that operates a
76 * single IO thread. It assumes that all incoming requests are framed with a
77 * 4 byte length indicator and writes out responses using the same framing.
78 *
79 * It does not use the TServerTransport framework, but rather has socket
80 * operations hardcoded for use with select.
81 *
Mark Slee2f6404d2006-10-10 01:37:40 +000082 */
David Reiss01fe1532010-03-09 05:19:25 +000083
84
85/// Overload condition actions.
86enum TOverloadAction {
87 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
88 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
89 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
90};
91
Mark Slee2f6404d2006-10-10 01:37:40 +000092class TNonblockingServer : public TServer {
93 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000094 class TConnection;
95
David Reiss01fe1532010-03-09 05:19:25 +000096 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000097 static const int LISTEN_BACKLOG = 1024;
98
David Reiss01fe1532010-03-09 05:19:25 +000099 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000100 static const size_t CONNECTION_STACK_LIMIT = 1024;
101
David Reiss01fe1532010-03-09 05:19:25 +0000102 /// Default limit on total number of connected sockets
103 static const int MAX_CONNECTIONS = INT_MAX;
104
105 /// Default limit on connections in handler/task processing
106 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
107
David Reiss89a12942010-10-06 17:10:52 +0000108 /// Default size of write buffer
109 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
110
David Reiss54bec5d2010-10-06 17:10:45 +0000111 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
112 static const int IDLE_READ_BUFFER_LIMIT = 1024;
113
114 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
115 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
116
117 /// # of calls before resizing oversized buffers (0 = check only on close)
118 static const int RESIZE_BUFFER_EVERY_N = 512;
119
David Reiss01fe1532010-03-09 05:19:25 +0000120 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000121 int serverSocket_;
122
David Reiss01fe1532010-03-09 05:19:25 +0000123 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000124 int port_;
125
David Reiss01fe1532010-03-09 05:19:25 +0000126 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000127 boost::shared_ptr<ThreadManager> threadManager_;
128
David Reiss01fe1532010-03-09 05:19:25 +0000129 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000130 bool threadPoolProcessing_;
131
David Reiss01fe1532010-03-09 05:19:25 +0000132 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +0000133 event_base* eventBase_;
Roger Meierc1905582011-08-02 23:37:36 +0000134 bool ownEventBase_;
Mark Slee79b16942007-11-26 19:05:29 +0000135
David Reiss01fe1532010-03-09 05:19:25 +0000136 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +0000137 struct event serverEvent_;
138
David Reiss01fe1532010-03-09 05:19:25 +0000139 /// Event struct, used with eventBase_ for task completion notification
140 struct event notificationEvent_;
141
142 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000143 size_t numTConnections_;
144
David Reiss9e8073c2010-03-09 05:19:39 +0000145 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000146 size_t numActiveProcessors_;
147
148 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000149 size_t connectionStackLimit_;
150
David Reiss01fe1532010-03-09 05:19:25 +0000151 /// Limit for number of connections processing or waiting to process
152 size_t maxActiveProcessors_;
153
154 /// Limit for number of open connections
155 size_t maxConnections_;
156
David Reiss068f4162010-03-09 05:19:45 +0000157 /// Time in milliseconds before an unperformed task expires (0 == infinite).
158 int64_t taskExpireTime_;
159
David Reiss01fe1532010-03-09 05:19:25 +0000160 /**
161 * Hysteresis for overload state. This is the fraction of the overload
162 * value that needs to be reached before the overload state is cleared;
163 * must be <= 1.0.
164 */
165 double overloadHysteresis_;
166
167 /// Action to take when we're overloaded.
168 TOverloadAction overloadAction_;
169
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000170 /**
David Reiss89a12942010-10-06 17:10:52 +0000171 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
172 * and found to be exceeded, reinitialized) to this size.
173 */
174 size_t writeBufferDefaultSize_;
175
176 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000177 * Max read buffer size for an idle TConnection. When we place an idle
178 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000179 * we will free the buffer (such that it will be reinitialized by the next
180 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000181 */
David Reiss54bec5d2010-10-06 17:10:45 +0000182 size_t idleReadBufferLimit_;
183
184 /**
185 * Max write buffer size for an idle connection. When we place an idle
186 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
187 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000188 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
189 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000190 */
191 size_t idleWriteBufferLimit_;
192
193 /**
194 * Every N calls we check the buffer size limits on a connected TConnection.
195 * 0 disables (i.e. the checks are only done when a connection closes).
196 */
197 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000198
199 /// Set if we are currently in an overloaded state.
200 bool overloaded_;
201
202 /// Count of connections dropped since overload started
203 uint32_t nConnectionsDropped_;
204
205 /// Count of connections dropped on overload since server started
206 uint64_t nTotalConnectionsDropped_;
207
208 /// File descriptors for pipe used for task completion notification.
Roger Meier30aae0c2011-07-08 12:23:31 +0000209 evutil_socket_t notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000210
Mark Slee2f6404d2006-10-10 01:37:40 +0000211 /**
212 * This is a stack of all the objects that have been created but that
213 * are NOT currently in use. When we close a connection, we place it on this
214 * stack so that the object can be reused later, rather than freeing the
215 * memory and reallocating a new object later.
216 */
217 std::stack<TConnection*> connectionStack_;
218
David Reiss01fe1532010-03-09 05:19:25 +0000219 /**
220 * Called when server socket had something happen. We accept all waiting
221 * client connections on listen socket fd and assign TConnection objects
222 * to handle those requests.
223 *
224 * @param fd the listen socket.
225 * @param which the event flag that triggered the handler.
226 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000227 void handleEvent(int fd, short which);
228
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000229 void init(int port) {
230 serverSocket_ = -1;
231 port_ = port;
232 threadPoolProcessing_ = false;
233 eventBase_ = NULL;
234 ownEventBase_ = false;
235 numTConnections_ = 0;
236 numActiveProcessors_ = 0;
237 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
238 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
239 maxConnections_ = MAX_CONNECTIONS;
240 taskExpireTime_ = 0;
241 overloadHysteresis_ = 0.8;
242 overloadAction_ = T_OVERLOAD_NO_ACTION;
243 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
244 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
245 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
246 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
247 overloaded_ = false;
248 nConnectionsDropped_ = 0;
249 nTotalConnectionsDropped_ = 0;
250 }
Mark Sleef9373392007-01-24 19:41:57 +0000251
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000252 public:
253 TNonblockingServer(const boost::shared_ptr<TProcessor>& processor,
254 int port) :
255 TServer(processor) {
256 init(port);
257 }
258
259 TNonblockingServer(
260 const boost::shared_ptr<TProcessor>& processor,
261 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
262 int port,
263 const boost::shared_ptr<ThreadManager>& threadManager =
264 boost::shared_ptr<ThreadManager>()) :
265 TServer(processor) {
266
267 init(port);
268
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000269 setInputProtocolFactory(protocolFactory);
270 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000271 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000272 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000273
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000274 TNonblockingServer(
275 const boost::shared_ptr<TProcessor>& processor,
276 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
277 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
278 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
279 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
280 int port,
281 const boost::shared_ptr<ThreadManager>& threadManager =
282 boost::shared_ptr<ThreadManager>()) :
283 TServer(processor) {
284
285 init(port);
286
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000287 setInputTransportFactory(inputTransportFactory);
288 setOutputTransportFactory(outputTransportFactory);
289 setInputProtocolFactory(inputProtocolFactory);
290 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000291 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000292 }
Mark Slee79b16942007-11-26 19:05:29 +0000293
David Reiss8ede8182010-09-02 15:26:28 +0000294 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000295
David Reiss068f4162010-03-09 05:19:45 +0000296 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000297
David Reiss1997f102008-04-29 00:29:41 +0000298 boost::shared_ptr<ThreadManager> getThreadManager() {
299 return threadManager_;
300 }
301
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000302 /**
303 * Get the maximum number of unused TConnection we will hold in reserve.
304 *
305 * @return the current limit on TConnection pool size.
306 */
David Reiss260fa932009-04-02 23:51:39 +0000307 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000308 return connectionStackLimit_;
309 }
310
311 /**
312 * Set the maximum number of unused TConnection we will hold in reserve.
313 *
314 * @param sz the new limit for TConnection pool size.
315 */
David Reiss260fa932009-04-02 23:51:39 +0000316 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000317 connectionStackLimit_ = sz;
318 }
319
Mark Slee79b16942007-11-26 19:05:29 +0000320 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000321 return threadPoolProcessing_;
322 }
323
324 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000325 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000326 }
327
Mark Slee79b16942007-11-26 19:05:29 +0000328 event_base* getEventBase() const {
329 return eventBase_;
330 }
331
David Reiss01fe1532010-03-09 05:19:25 +0000332 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000333 void incrementNumConnections() {
334 ++numTConnections_;
335 }
336
David Reiss01fe1532010-03-09 05:19:25 +0000337 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000338 void decrementNumConnections() {
339 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000340 }
341
David Reiss01fe1532010-03-09 05:19:25 +0000342 /**
343 * Return the count of sockets currently connected to.
344 *
345 * @return count of connected sockets.
346 */
347 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000348 return numTConnections_;
349 }
350
David Reiss01fe1532010-03-09 05:19:25 +0000351 /**
352 * Return the count of connection objects allocated but not in use.
353 *
354 * @return count of idle connection objects.
355 */
356 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000357 return connectionStack_.size();
358 }
359
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000360 /**
David Reiss01fe1532010-03-09 05:19:25 +0000361 * Return count of number of connections which are currently processing.
362 * This is defined as a connection where all data has been received and
363 * either assigned a task (when threading) or passed to a handler (when
364 * not threading), and where the handler has not yet returned.
365 *
366 * @return # of connections currently processing.
367 */
368 size_t getNumActiveProcessors() const {
369 return numActiveProcessors_;
370 }
371
372 /// Increment the count of connections currently processing.
373 void incrementActiveProcessors() {
374 ++numActiveProcessors_;
375 }
376
377 /// Decrement the count of connections currently processing.
378 void decrementActiveProcessors() {
379 if (numActiveProcessors_ > 0) {
380 --numActiveProcessors_;
381 }
382 }
383
384 /**
385 * Get the maximum # of connections allowed before overload.
386 *
387 * @return current setting.
388 */
389 size_t getMaxConnections() const {
390 return maxConnections_;
391 }
392
393 /**
394 * Set the maximum # of connections allowed before overload.
395 *
396 * @param maxConnections new setting for maximum # of connections.
397 */
398 void setMaxConnections(size_t maxConnections) {
399 maxConnections_ = maxConnections;
400 }
401
402 /**
403 * Get the maximum # of connections waiting in handler/task before overload.
404 *
405 * @return current setting.
406 */
407 size_t getMaxActiveProcessors() const {
408 return maxActiveProcessors_;
409 }
410
411 /**
412 * Set the maximum # of connections waiting in handler/task before overload.
413 *
414 * @param maxActiveProcessors new setting for maximum # of active processes.
415 */
416 void setMaxActiveProcessors(size_t maxActiveProcessors) {
417 maxActiveProcessors_ = maxActiveProcessors;
418 }
419
420 /**
421 * Get fraction of maximum limits before an overload condition is cleared.
422 *
423 * @return hysteresis fraction
424 */
425 double getOverloadHysteresis() const {
426 return overloadHysteresis_;
427 }
428
429 /**
430 * Set fraction of maximum limits before an overload condition is cleared.
431 * A good value would probably be between 0.5 and 0.9.
432 *
433 * @param hysteresisFraction fraction <= 1.0.
434 */
435 void setOverloadHysteresis(double hysteresisFraction) {
436 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
437 overloadHysteresis_ = hysteresisFraction;
438 }
439 }
440
441 /**
442 * Get the action the server will take on overload.
443 *
444 * @return a TOverloadAction enum value for the currently set action.
445 */
446 TOverloadAction getOverloadAction() const {
447 return overloadAction_;
448 }
449
450 /**
451 * Set the action the server is to take on overload.
452 *
453 * @param overloadAction a TOverloadAction enum value for the action.
454 */
455 void setOverloadAction(TOverloadAction overloadAction) {
456 overloadAction_ = overloadAction;
457 }
458
459 /**
David Reiss068f4162010-03-09 05:19:45 +0000460 * Get the time in milliseconds after which a task expires (0 == infinite).
461 *
462 * @return a 64-bit time in milliseconds.
463 */
464 int64_t getTaskExpireTime() const {
465 return taskExpireTime_;
466 }
467
468 /**
469 * Set the time in milliseconds after which a task expires (0 == infinite).
470 *
471 * @param taskExpireTime a 64-bit time in milliseconds.
472 */
473 void setTaskExpireTime(int64_t taskExpireTime) {
474 taskExpireTime_ = taskExpireTime;
475 }
476
477 /**
David Reiss01fe1532010-03-09 05:19:25 +0000478 * Determine if the server is currently overloaded.
479 * This function checks the maximums for open connections and connections
480 * currently in processing, and sets an overload condition if they are
481 * exceeded. The overload will persist until both values are below the
482 * current hysteresis fraction of their maximums.
483 *
484 * @return true if an overload condition exists, false if not.
485 */
486 bool serverOverloaded();
487
488 /** Pop and discard next task on threadpool wait queue.
489 *
490 * @return true if a task was discarded, false if the wait queue was empty.
491 */
492 bool drainPendingTask();
493
494 /**
David Reiss89a12942010-10-06 17:10:52 +0000495 * Get the starting size of a TConnection object's write buffer.
496 *
497 * @return # bytes we initialize a TConnection object's write buffer to.
498 */
499 size_t getWriteBufferDefaultSize() const {
500 return writeBufferDefaultSize_;
501 }
502
503 /**
504 * Set the starting size of a TConnection object's write buffer.
505 *
506 * @param size # bytes we initialize a TConnection object's write buffer to.
507 */
508 void setWriteBufferDefaultSize(size_t size) {
509 writeBufferDefaultSize_ = size;
510 }
511
512 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000513 * Get the maximum size of read buffer allocated to idle TConnection objects.
514 *
David Reiss89a12942010-10-06 17:10:52 +0000515 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000516 */
517 size_t getIdleReadBufferLimit() const {
518 return idleReadBufferLimit_;
519 }
520
521 /**
522 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
523 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000524 *
David Reiss89a12942010-10-06 17:10:52 +0000525 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000526 */
527 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000528 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000529 }
530
531 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000532 * Set the maximum size read buffer allocated to idle TConnection objects.
533 * If a TConnection object is found (either on connection close or between
534 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000535 * allocated to its read buffer, we free it and allow it to be reinitialized
536 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000537 *
538 * @param limit of bytes beyond which we will shrink buffers when checked.
539 */
540 void setIdleReadBufferLimit(size_t limit) {
541 idleReadBufferLimit_ = limit;
542 }
543
544 /**
545 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
546 * Set the maximum size read buffer allocated to idle TConnection objects.
547 * If a TConnection object is found (either on connection close or between
548 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000549 * allocated to its read buffer, we free it and allow it to be reinitialized
550 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000551 *
552 * @param limit of bytes beyond which we will shrink buffers when checked.
553 */
554 void setIdleBufferMemLimit(size_t limit) {
555 idleReadBufferLimit_ = limit;
556 }
557
558
559
560 /**
561 * Get the maximum size of write buffer allocated to idle TConnection objects.
562 *
563 * @return # bytes beyond which we will reallocate buffers when checked.
564 */
565 size_t getIdleWriteBufferLimit() const {
566 return idleWriteBufferLimit_;
567 }
568
569 /**
570 * Set the maximum size write buffer allocated to idle TConnection objects.
571 * If a TConnection object is found (either on connection close or between
572 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000573 * allocated to its write buffer, we destroy and construct that buffer with
574 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000575 *
576 * @param limit of bytes beyond which we will shrink buffers when idle.
577 */
David Reiss54bec5d2010-10-06 17:10:45 +0000578 void setIdleWriteBufferLimit(size_t limit) {
579 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000580 }
581
David Reiss01fe1532010-03-09 05:19:25 +0000582 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000583 * Get # of calls made between buffer size checks. 0 means disabled.
584 *
585 * @return # of calls between buffer size checks.
586 */
587 int32_t getResizeBufferEveryN() const {
588 return resizeBufferEveryN_;
589 }
590
591 /**
592 * Check buffer sizes every "count" calls. This allows buffer limits
593 * to be enforced for persistant connections with a controllable degree
594 * of overhead. 0 disables checks except at connection close.
595 *
596 * @param count the number of calls between checks, or 0 to disable
597 */
598 void setResizeBufferEveryN(int32_t count) {
599 resizeBufferEveryN_ = count;
600 }
601
602
603
604 /**
David Reiss01fe1532010-03-09 05:19:25 +0000605 * Return an initialized connection object. Creates or recovers from
606 * pool a TConnection and initializes it with the provided socket FD
607 * and flags.
608 *
609 * @param socket FD of socket associated with this connection.
610 * @param flags initial lib_event flags for this connection.
David Reiss105961d2010-10-06 17:10:17 +0000611 * @param addr the sockaddr of the client
612 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000613 * @return pointer to initialized TConnection object.
614 */
David Reiss105961d2010-10-06 17:10:17 +0000615 TConnection* createConnection(int socket, short flags,
616 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000617
David Reiss01fe1532010-03-09 05:19:25 +0000618 /**
619 * Returns a connection to pool or deletion. If the connection pool
620 * (a stack) isn't full, place the connection object on it, otherwise
621 * just delete it.
622 *
623 * @param connection the TConection being returned.
624 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000625 void returnConnection(TConnection* connection);
626
David Reiss01fe1532010-03-09 05:19:25 +0000627 /**
David Reiss068f4162010-03-09 05:19:45 +0000628 * Callback function that the threadmanager calls when a task reaches
629 * its expiration time. It is needed to clean up the expired connection.
630 *
631 * @param task the runnable associated with the expired task.
632 */
633 void expireClose(boost::shared_ptr<Runnable> task);
634
635 /**
David Reiss01fe1532010-03-09 05:19:25 +0000636 * C-callable event handler for listener events. Provides a callback
637 * that libevent can understand which invokes server->handleEvent().
638 *
639 * @param fd the descriptor the event occured on.
640 * @param which the flags associated with the event.
641 * @param v void* callback arg where we placed TNonblockingServer's "this".
642 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000643 static void eventHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000644 ((TNonblockingServer*)v)->handleEvent(fd, which);
645 }
646
David Reiss01fe1532010-03-09 05:19:25 +0000647 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000648 void listenSocket();
649
David Reiss01fe1532010-03-09 05:19:25 +0000650 /**
651 * Takes a socket created by listenSocket() and sets various options on it
652 * to prepare for use in the server.
653 *
654 * @param fd descriptor of socket to be initialized/
655 */
Mark Slee79b16942007-11-26 19:05:29 +0000656 void listenSocket(int fd);
657
David Reiss01fe1532010-03-09 05:19:25 +0000658 /// Create the pipe used to notify I/O process of task completion.
659 void createNotificationPipe();
660
661 /**
662 * Get notification pipe send descriptor.
663 *
664 * @return write fd for pipe.
665 */
666 int getNotificationSendFD() const {
667 return notificationPipeFDs_[1];
668 }
669
670 /**
671 * Get notification pipe receive descriptor.
672 *
673 * @return read fd of pipe.
674 */
675 int getNotificationRecvFD() const {
676 return notificationPipeFDs_[0];
677 }
678
679 /**
680 * Register the core libevent events onto the proper base.
681 *
682 * @param base pointer to the event base to be initialized.
Roger Meierc1905582011-08-02 23:37:36 +0000683 * @param ownEventBase if true, this server is responsible for
684 * freeing the event base memory.
David Reiss01fe1532010-03-09 05:19:25 +0000685 */
Roger Meierc1905582011-08-02 23:37:36 +0000686 void registerEvents(event_base* base, bool ownEventBase = true);
Mark Slee79b16942007-11-26 19:05:29 +0000687
David Reiss01fe1532010-03-09 05:19:25 +0000688 /**
689 * Main workhorse function, starts up the server listening on a port and
690 * loops over the libevent handler.
691 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000692 void serve();
Bryan Duxbury76c43682011-08-24 21:26:48 +0000693
694 /**
695 * May be called from a separate thread to cause serve() to return.
696 */
697 void stop();
Mark Slee2f6404d2006-10-10 01:37:40 +0000698};
699
T Jake Lucianib5e62212009-01-31 22:36:20 +0000700}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000701
702#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_