blob: 9eedcee011f52bb92741ababf789a9087b3d0e80 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Jake Farrellb0d95602011-12-06 01:17:26 +000029#include <concurrency/Thread.h>
30#include <concurrency/PosixThreadFactory.h>
31#include <concurrency/Mutex.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000032#include <stack>
Jake Farrellb0d95602011-12-06 01:17:26 +000033#include <vector>
David Reiss9b209552008-04-08 06:26:05 +000034#include <string>
35#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000036#include <cstdlib>
Bryan Duxbury266b1732011-09-01 16:50:28 +000037#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000038#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000039#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <event.h>
41
Jake Farrellb0d95602011-12-06 01:17:26 +000042
43
T Jake Lucianib5e62212009-01-31 22:36:20 +000044namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000045
T Jake Lucianib5e62212009-01-31 22:36:20 +000046using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000047using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000048using apache::thrift::protocol::TProtocol;
49using apache::thrift::concurrency::Runnable;
50using apache::thrift::concurrency::ThreadManager;
Jake Farrellb0d95602011-12-06 01:17:26 +000051using apache::thrift::concurrency::PosixThreadFactory;
52using apache::thrift::concurrency::ThreadFactory;
53using apache::thrift::concurrency::Thread;
54using apache::thrift::concurrency::Mutex;
55using apache::thrift::concurrency::Guard;
Mark Slee2f6404d2006-10-10 01:37:40 +000056
Roger Meier30aae0c2011-07-08 12:23:31 +000057#ifdef LIBEVENT_VERSION_NUMBER
58#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
59#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
60#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
61#else
62// assume latest version 1 series
63#define LIBEVENT_VERSION_MAJOR 1
64#define LIBEVENT_VERSION_MINOR 14
65#define LIBEVENT_VERSION_REL 13
66#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
67#endif
68
69#if LIBEVENT_VERSION_NUMBER < 0x02000000
70 typedef int evutil_socket_t;
71#endif
72
73#ifndef SOCKOPT_CAST_T
Roger Meier84e4a3c2011-09-16 20:58:44 +000074# ifndef _WIN32
75# define SOCKOPT_CAST_T void
76# else
77# define SOCKOPT_CAST_T char
78# endif // _WIN32
Roger Meier30aae0c2011-07-08 12:23:31 +000079#endif
80
81template<class T>
82inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
83 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
84}
85
86template<class T>
87inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
88 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
89}
90
Mark Slee2f6404d2006-10-10 01:37:40 +000091/**
Jake Farrellb0d95602011-12-06 01:17:26 +000092 * This is a non-blocking server in C++ for high performance that
93 * operates a set of IO threads (by default only one). It assumes that
94 * all incoming requests are framed with a 4 byte length indicator and
95 * writes out responses using the same framing.
Mark Slee2f6404d2006-10-10 01:37:40 +000096 *
97 * It does not use the TServerTransport framework, but rather has socket
98 * operations hardcoded for use with select.
99 *
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 */
David Reiss01fe1532010-03-09 05:19:25 +0000101
102
103/// Overload condition actions.
104enum TOverloadAction {
105 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
106 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
107 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
108};
109
Jake Farrellb0d95602011-12-06 01:17:26 +0000110class TNonblockingIOThread;
111
Mark Slee2f6404d2006-10-10 01:37:40 +0000112class TNonblockingServer : public TServer {
113 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000114 class TConnection;
115
Jake Farrellb0d95602011-12-06 01:17:26 +0000116 friend class TNonblockingIOThread;
117 private:
David Reiss01fe1532010-03-09 05:19:25 +0000118 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 static const int LISTEN_BACKLOG = 1024;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000122 static const size_t CONNECTION_STACK_LIMIT = 1024;
123
David Reiss01fe1532010-03-09 05:19:25 +0000124 /// Default limit on total number of connected sockets
125 static const int MAX_CONNECTIONS = INT_MAX;
126
127 /// Default limit on connections in handler/task processing
128 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
129
David Reiss89a12942010-10-06 17:10:52 +0000130 /// Default size of write buffer
131 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
132
David Reiss54bec5d2010-10-06 17:10:45 +0000133 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
134 static const int IDLE_READ_BUFFER_LIMIT = 1024;
135
136 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
137 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
138
139 /// # of calls before resizing oversized buffers (0 = check only on close)
140 static const int RESIZE_BUFFER_EVERY_N = 512;
141
Jake Farrellb0d95602011-12-06 01:17:26 +0000142 /// # of IO threads to use by default
143 static const int DEFAULT_IO_THREADS = 1;
144
145 /// File descriptor of an invalid socket
146 static const int INVALID_SOCKET = -1;
147
148 /// # of IO threads this server will use
149 size_t numIOThreads_;
150
151 /// Whether to set high scheduling priority for IO threads
152 bool useHighPriorityIOThreads_;
153
David Reiss01fe1532010-03-09 05:19:25 +0000154 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000155 int serverSocket_;
156
David Reiss01fe1532010-03-09 05:19:25 +0000157 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 int port_;
159
David Reiss01fe1532010-03-09 05:19:25 +0000160 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000161 boost::shared_ptr<ThreadManager> threadManager_;
162
David Reiss01fe1532010-03-09 05:19:25 +0000163 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000164 bool threadPoolProcessing_;
165
Jake Farrellb0d95602011-12-06 01:17:26 +0000166 // Factory to create the IO threads
167 boost::shared_ptr<PosixThreadFactory> ioThreadFactory_;
Mark Slee79b16942007-11-26 19:05:29 +0000168
Jake Farrellb0d95602011-12-06 01:17:26 +0000169 // Vector of IOThread objects that will handle our IO
170 std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
Mark Slee79b16942007-11-26 19:05:29 +0000171
Jake Farrellb0d95602011-12-06 01:17:26 +0000172 // Index of next IO Thread to be used (for round-robin)
Roger Meierd0cdecf2011-12-08 19:34:01 +0000173 uint32_t nextIOThread_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000174
175 // Synchronizes access to connection stack and similar data
176 Mutex connMutex_;
David Reiss01fe1532010-03-09 05:19:25 +0000177
178 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000179 size_t numTConnections_;
180
David Reiss9e8073c2010-03-09 05:19:39 +0000181 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000182 size_t numActiveProcessors_;
183
184 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000185 size_t connectionStackLimit_;
186
David Reiss01fe1532010-03-09 05:19:25 +0000187 /// Limit for number of connections processing or waiting to process
188 size_t maxActiveProcessors_;
189
190 /// Limit for number of open connections
191 size_t maxConnections_;
192
David Reiss068f4162010-03-09 05:19:45 +0000193 /// Time in milliseconds before an unperformed task expires (0 == infinite).
194 int64_t taskExpireTime_;
195
David Reiss01fe1532010-03-09 05:19:25 +0000196 /**
197 * Hysteresis for overload state. This is the fraction of the overload
198 * value that needs to be reached before the overload state is cleared;
199 * must be <= 1.0.
200 */
201 double overloadHysteresis_;
202
203 /// Action to take when we're overloaded.
204 TOverloadAction overloadAction_;
205
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000206 /**
David Reiss89a12942010-10-06 17:10:52 +0000207 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
208 * and found to be exceeded, reinitialized) to this size.
209 */
210 size_t writeBufferDefaultSize_;
211
212 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000213 * Max read buffer size for an idle TConnection. When we place an idle
214 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000215 * we will free the buffer (such that it will be reinitialized by the next
216 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000217 */
David Reiss54bec5d2010-10-06 17:10:45 +0000218 size_t idleReadBufferLimit_;
219
220 /**
221 * Max write buffer size for an idle connection. When we place an idle
222 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
223 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000224 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
225 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000226 */
227 size_t idleWriteBufferLimit_;
228
229 /**
230 * Every N calls we check the buffer size limits on a connected TConnection.
231 * 0 disables (i.e. the checks are only done when a connection closes).
232 */
233 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000234
235 /// Set if we are currently in an overloaded state.
236 bool overloaded_;
237
238 /// Count of connections dropped since overload started
239 uint32_t nConnectionsDropped_;
240
241 /// Count of connections dropped on overload since server started
242 uint64_t nTotalConnectionsDropped_;
243
Mark Slee2f6404d2006-10-10 01:37:40 +0000244 /**
245 * This is a stack of all the objects that have been created but that
246 * are NOT currently in use. When we close a connection, we place it on this
247 * stack so that the object can be reused later, rather than freeing the
248 * memory and reallocating a new object later.
249 */
250 std::stack<TConnection*> connectionStack_;
251
David Reiss01fe1532010-03-09 05:19:25 +0000252 /**
253 * Called when server socket had something happen. We accept all waiting
254 * client connections on listen socket fd and assign TConnection objects
255 * to handle those requests.
256 *
257 * @param fd the listen socket.
258 * @param which the event flag that triggered the handler.
259 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000260 void handleEvent(int fd, short which);
261
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000262 void init(int port) {
263 serverSocket_ = -1;
Jake Farrellb0d95602011-12-06 01:17:26 +0000264 numIOThreads_ = DEFAULT_IO_THREADS;
265 nextIOThread_ = 0;
266 useHighPriorityIOThreads_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000267 port_ = port;
268 threadPoolProcessing_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000269 numTConnections_ = 0;
270 numActiveProcessors_ = 0;
271 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
272 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
273 maxConnections_ = MAX_CONNECTIONS;
274 taskExpireTime_ = 0;
275 overloadHysteresis_ = 0.8;
276 overloadAction_ = T_OVERLOAD_NO_ACTION;
277 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
278 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
279 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
280 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
281 overloaded_ = false;
282 nConnectionsDropped_ = 0;
283 nTotalConnectionsDropped_ = 0;
284 }
Mark Sleef9373392007-01-24 19:41:57 +0000285
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000286 public:
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000287 template<typename ProcessorFactory>
288 TNonblockingServer(
289 const boost::shared_ptr<ProcessorFactory>& processorFactory,
290 int port,
291 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
292 TServer(processorFactory) {
293 init(port);
294 }
295
296 template<typename Processor>
297 TNonblockingServer(const boost::shared_ptr<Processor>& processor,
298 int port,
299 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000300 TServer(processor) {
301 init(port);
302 }
303
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000304 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000305 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000306 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000307 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
308 int port,
309 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000310 boost::shared_ptr<ThreadManager>(),
311 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
312 TServer(processorFactory) {
313
314 init(port);
315
316 setInputProtocolFactory(protocolFactory);
317 setOutputProtocolFactory(protocolFactory);
318 setThreadManager(threadManager);
319 }
320
321 template<typename Processor>
322 TNonblockingServer(
323 const boost::shared_ptr<Processor>& processor,
324 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
325 int port,
326 const boost::shared_ptr<ThreadManager>& threadManager =
327 boost::shared_ptr<ThreadManager>(),
328 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000329 TServer(processor) {
330
331 init(port);
332
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000333 setInputProtocolFactory(protocolFactory);
334 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000335 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000336 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000337
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000338 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000339 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000340 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000341 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
342 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
343 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
344 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
345 int port,
346 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000347 boost::shared_ptr<ThreadManager>(),
348 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
349 TServer(processorFactory) {
350
351 init(port);
352
353 setInputTransportFactory(inputTransportFactory);
354 setOutputTransportFactory(outputTransportFactory);
355 setInputProtocolFactory(inputProtocolFactory);
356 setOutputProtocolFactory(outputProtocolFactory);
357 setThreadManager(threadManager);
358 }
359
360 template<typename Processor>
361 TNonblockingServer(
362 const boost::shared_ptr<Processor>& processor,
363 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
364 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
365 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
366 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
367 int port,
368 const boost::shared_ptr<ThreadManager>& threadManager =
369 boost::shared_ptr<ThreadManager>(),
370 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000371 TServer(processor) {
372
373 init(port);
374
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000375 setInputTransportFactory(inputTransportFactory);
376 setOutputTransportFactory(outputTransportFactory);
377 setInputProtocolFactory(inputProtocolFactory);
378 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000379 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000380 }
Mark Slee79b16942007-11-26 19:05:29 +0000381
David Reiss8ede8182010-09-02 15:26:28 +0000382 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000383
David Reiss068f4162010-03-09 05:19:45 +0000384 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000385
David Reiss1997f102008-04-29 00:29:41 +0000386 boost::shared_ptr<ThreadManager> getThreadManager() {
387 return threadManager_;
388 }
389
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000390 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000391 * Sets the number of IO threads used by this server. Can only be used before
392 * the call to serve() and has no effect afterwards. We always use a
393 * PosixThreadFactory for the IO worker threads, because they must joinable
394 * for clean shutdown.
395 */
396 void setNumIOThreads(size_t numThreads) {
397 numIOThreads_ = numThreads;
398 }
399
400 /** Return whether the IO threads will get high scheduling priority */
401 bool useHighPriorityIOThreads() const {
402 return useHighPriorityIOThreads_;
403 }
404
405 /** Set whether the IO threads will get high scheduling priority. */
406 void setUseHighPriorityIOThreads(bool val) {
407 useHighPriorityIOThreads_ = val;
408 }
409
410 /** Return the number of IO threads used by this server. */
411 size_t getNumIOThreads() const {
412 return numIOThreads_;
413 }
414
415 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000416 * Get the maximum number of unused TConnection we will hold in reserve.
417 *
418 * @return the current limit on TConnection pool size.
419 */
David Reiss260fa932009-04-02 23:51:39 +0000420 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000421 return connectionStackLimit_;
422 }
423
424 /**
425 * Set the maximum number of unused TConnection we will hold in reserve.
426 *
427 * @param sz the new limit for TConnection pool size.
428 */
David Reiss260fa932009-04-02 23:51:39 +0000429 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000430 connectionStackLimit_ = sz;
431 }
432
Mark Slee79b16942007-11-26 19:05:29 +0000433 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000434 return threadPoolProcessing_;
435 }
436
437 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000438 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000439 }
440
David Reiss01fe1532010-03-09 05:19:25 +0000441 /**
442 * Return the count of sockets currently connected to.
443 *
444 * @return count of connected sockets.
445 */
446 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000447 return numTConnections_;
448 }
449
David Reiss01fe1532010-03-09 05:19:25 +0000450 /**
451 * Return the count of connection objects allocated but not in use.
452 *
453 * @return count of idle connection objects.
454 */
455 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000456 return connectionStack_.size();
457 }
458
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000459 /**
David Reiss01fe1532010-03-09 05:19:25 +0000460 * Return count of number of connections which are currently processing.
461 * This is defined as a connection where all data has been received and
462 * either assigned a task (when threading) or passed to a handler (when
463 * not threading), and where the handler has not yet returned.
464 *
465 * @return # of connections currently processing.
466 */
467 size_t getNumActiveProcessors() const {
468 return numActiveProcessors_;
469 }
470
471 /// Increment the count of connections currently processing.
472 void incrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000473 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000474 ++numActiveProcessors_;
475 }
476
477 /// Decrement the count of connections currently processing.
478 void decrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000479 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000480 if (numActiveProcessors_ > 0) {
481 --numActiveProcessors_;
482 }
483 }
484
485 /**
486 * Get the maximum # of connections allowed before overload.
487 *
488 * @return current setting.
489 */
490 size_t getMaxConnections() const {
491 return maxConnections_;
492 }
493
494 /**
495 * Set the maximum # of connections allowed before overload.
496 *
497 * @param maxConnections new setting for maximum # of connections.
498 */
499 void setMaxConnections(size_t maxConnections) {
500 maxConnections_ = maxConnections;
501 }
502
503 /**
504 * Get the maximum # of connections waiting in handler/task before overload.
505 *
506 * @return current setting.
507 */
508 size_t getMaxActiveProcessors() const {
509 return maxActiveProcessors_;
510 }
511
512 /**
513 * Set the maximum # of connections waiting in handler/task before overload.
514 *
515 * @param maxActiveProcessors new setting for maximum # of active processes.
516 */
517 void setMaxActiveProcessors(size_t maxActiveProcessors) {
518 maxActiveProcessors_ = maxActiveProcessors;
519 }
520
521 /**
522 * Get fraction of maximum limits before an overload condition is cleared.
523 *
524 * @return hysteresis fraction
525 */
526 double getOverloadHysteresis() const {
527 return overloadHysteresis_;
528 }
529
530 /**
531 * Set fraction of maximum limits before an overload condition is cleared.
532 * A good value would probably be between 0.5 and 0.9.
533 *
534 * @param hysteresisFraction fraction <= 1.0.
535 */
536 void setOverloadHysteresis(double hysteresisFraction) {
537 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
538 overloadHysteresis_ = hysteresisFraction;
539 }
540 }
541
542 /**
543 * Get the action the server will take on overload.
544 *
545 * @return a TOverloadAction enum value for the currently set action.
546 */
547 TOverloadAction getOverloadAction() const {
548 return overloadAction_;
549 }
550
551 /**
552 * Set the action the server is to take on overload.
553 *
554 * @param overloadAction a TOverloadAction enum value for the action.
555 */
556 void setOverloadAction(TOverloadAction overloadAction) {
557 overloadAction_ = overloadAction;
558 }
559
560 /**
David Reiss068f4162010-03-09 05:19:45 +0000561 * Get the time in milliseconds after which a task expires (0 == infinite).
562 *
563 * @return a 64-bit time in milliseconds.
564 */
565 int64_t getTaskExpireTime() const {
566 return taskExpireTime_;
567 }
568
569 /**
570 * Set the time in milliseconds after which a task expires (0 == infinite).
571 *
572 * @param taskExpireTime a 64-bit time in milliseconds.
573 */
574 void setTaskExpireTime(int64_t taskExpireTime) {
575 taskExpireTime_ = taskExpireTime;
576 }
577
578 /**
David Reiss01fe1532010-03-09 05:19:25 +0000579 * Determine if the server is currently overloaded.
580 * This function checks the maximums for open connections and connections
581 * currently in processing, and sets an overload condition if they are
582 * exceeded. The overload will persist until both values are below the
583 * current hysteresis fraction of their maximums.
584 *
585 * @return true if an overload condition exists, false if not.
586 */
587 bool serverOverloaded();
588
589 /** Pop and discard next task on threadpool wait queue.
590 *
591 * @return true if a task was discarded, false if the wait queue was empty.
592 */
593 bool drainPendingTask();
594
595 /**
David Reiss89a12942010-10-06 17:10:52 +0000596 * Get the starting size of a TConnection object's write buffer.
597 *
598 * @return # bytes we initialize a TConnection object's write buffer to.
599 */
600 size_t getWriteBufferDefaultSize() const {
601 return writeBufferDefaultSize_;
602 }
603
604 /**
605 * Set the starting size of a TConnection object's write buffer.
606 *
607 * @param size # bytes we initialize a TConnection object's write buffer to.
608 */
609 void setWriteBufferDefaultSize(size_t size) {
610 writeBufferDefaultSize_ = size;
611 }
612
613 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000614 * Get the maximum size of read buffer allocated to idle TConnection objects.
615 *
David Reiss89a12942010-10-06 17:10:52 +0000616 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000617 */
618 size_t getIdleReadBufferLimit() const {
619 return idleReadBufferLimit_;
620 }
621
622 /**
623 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
624 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000625 *
David Reiss89a12942010-10-06 17:10:52 +0000626 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000627 */
628 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000629 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000630 }
631
632 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000633 * Set the maximum size read buffer allocated to idle TConnection objects.
634 * If a TConnection object is found (either on connection close or between
635 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000636 * allocated to its read buffer, we free it and allow it to be reinitialized
637 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000638 *
639 * @param limit of bytes beyond which we will shrink buffers when checked.
640 */
641 void setIdleReadBufferLimit(size_t limit) {
642 idleReadBufferLimit_ = limit;
643 }
644
645 /**
646 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
647 * Set the maximum size read buffer allocated to idle TConnection objects.
648 * If a TConnection object is found (either on connection close or between
649 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000650 * allocated to its read buffer, we free it and allow it to be reinitialized
651 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000652 *
653 * @param limit of bytes beyond which we will shrink buffers when checked.
654 */
655 void setIdleBufferMemLimit(size_t limit) {
656 idleReadBufferLimit_ = limit;
657 }
658
Jake Farrellb0d95602011-12-06 01:17:26 +0000659
David Reiss54bec5d2010-10-06 17:10:45 +0000660
661 /**
662 * Get the maximum size of write buffer allocated to idle TConnection objects.
663 *
664 * @return # bytes beyond which we will reallocate buffers when checked.
665 */
666 size_t getIdleWriteBufferLimit() const {
667 return idleWriteBufferLimit_;
668 }
669
670 /**
671 * Set the maximum size write buffer allocated to idle TConnection objects.
672 * If a TConnection object is found (either on connection close or between
673 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000674 * allocated to its write buffer, we destroy and construct that buffer with
675 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000676 *
677 * @param limit of bytes beyond which we will shrink buffers when idle.
678 */
David Reiss54bec5d2010-10-06 17:10:45 +0000679 void setIdleWriteBufferLimit(size_t limit) {
680 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000681 }
682
David Reiss01fe1532010-03-09 05:19:25 +0000683 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000684 * Get # of calls made between buffer size checks. 0 means disabled.
685 *
686 * @return # of calls between buffer size checks.
687 */
688 int32_t getResizeBufferEveryN() const {
689 return resizeBufferEveryN_;
690 }
691
692 /**
693 * Check buffer sizes every "count" calls. This allows buffer limits
694 * to be enforced for persistant connections with a controllable degree
695 * of overhead. 0 disables checks except at connection close.
696 *
697 * @param count the number of calls between checks, or 0 to disable
698 */
699 void setResizeBufferEveryN(int32_t count) {
700 resizeBufferEveryN_ = count;
701 }
702
Jake Farrellb0d95602011-12-06 01:17:26 +0000703 /**
704 * Main workhorse function, starts up the server listening on a port and
705 * loops over the libevent handler.
706 */
707 void serve();
David Reiss54bec5d2010-10-06 17:10:45 +0000708
Jake Farrellb0d95602011-12-06 01:17:26 +0000709 /**
710 * Causes the server to terminate gracefully (can be called from any thread).
711 */
712 void stop();
David Reiss54bec5d2010-10-06 17:10:45 +0000713
Jake Farrellb0d95602011-12-06 01:17:26 +0000714 private:
715 /**
716 * Callback function that the threadmanager calls when a task reaches
717 * its expiration time. It is needed to clean up the expired connection.
718 *
719 * @param task the runnable associated with the expired task.
720 */
721 void expireClose(boost::shared_ptr<Runnable> task);
722
723 /// Creates a socket to listen on and binds it to the local port.
724 void createAndListenOnSocket();
725
726 /**
727 * Takes a socket created by createAndListenOnSocket() and sets various
728 * options on it to prepare for use in the server.
729 *
730 * @param fd descriptor of socket to be initialized/
731 */
732 void listenSocket(int fd);
David Reiss54bec5d2010-10-06 17:10:45 +0000733 /**
David Reiss01fe1532010-03-09 05:19:25 +0000734 * Return an initialized connection object. Creates or recovers from
735 * pool a TConnection and initializes it with the provided socket FD
736 * and flags.
737 *
738 * @param socket FD of socket associated with this connection.
David Reiss105961d2010-10-06 17:10:17 +0000739 * @param addr the sockaddr of the client
740 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000741 * @return pointer to initialized TConnection object.
742 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000743 TConnection* createConnection(int socket, const sockaddr* addr,
744 socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000745
David Reiss01fe1532010-03-09 05:19:25 +0000746 /**
747 * Returns a connection to pool or deletion. If the connection pool
748 * (a stack) isn't full, place the connection object on it, otherwise
749 * just delete it.
750 *
751 * @param connection the TConection being returned.
752 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000753 void returnConnection(TConnection* connection);
Jake Farrellb0d95602011-12-06 01:17:26 +0000754};
Mark Slee2f6404d2006-10-10 01:37:40 +0000755
Jake Farrellb0d95602011-12-06 01:17:26 +0000756class TNonblockingIOThread : public Runnable {
757 public:
758 // Creates an IO thread and sets up the event base. The listenSocket should
759 // be a valid FD on which listen() has already been called. If the
760 // listenSocket is < 0, accepting will not be done.
761 TNonblockingIOThread(TNonblockingServer* server,
762 int number,
763 int listenSocket,
764 bool useHighPriority);
765
766 ~TNonblockingIOThread();
767
768 // Returns the event-base for this thread.
769 event_base* getEventBase() const { return eventBase_; }
770
771 // Returns the server for this thread.
772 TNonblockingServer* getServer() const { return server_; }
773
774 // Returns the number of this IO thread.
775 int getThreadNumber() const { return number_; }
776
777 // Returns the thread id associated with this object. This should
778 // only be called after the thread has been started.
779 pthread_t getThreadId() const { return threadId_; }
780
781 // Returns the send-fd for task complete notifications.
782 int getNotificationSendFD() const { return notificationPipeFDs_[1]; }
783
784 // Returns the read-fd for task complete notifications.
785 int getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
786
787 // Returns the actual thread object associated with this IO thread.
788 boost::shared_ptr<Thread> getThread() const { return thread_; }
789
790 // Sets the actual thread object associated with this IO thread.
791 void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
792
793 // Used by TConnection objects to indicate processing has finished.
794 bool notify(TNonblockingServer::TConnection* conn);
795
796 // Enters the event loop and does not return until a call to stop().
797 virtual void run();
798
799 // Exits the event loop as soon as possible.
800 void stop();
801
802 // Ensures that the event-loop thread is fully finished and shut down.
803 void join();
804
805 private:
David Reiss01fe1532010-03-09 05:19:25 +0000806 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000807 * C-callable event handler for signaling task completion. Provides a
808 * callback that libevent can understand that will read a connection
809 * object's address from a pipe and call connection->transition() for
810 * that object.
David Reiss068f4162010-03-09 05:19:45 +0000811 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000812 * @param fd the descriptor the event occurred on.
David Reiss068f4162010-03-09 05:19:45 +0000813 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000814 static void notifyHandler(int fd, short which, void* v);
David Reiss068f4162010-03-09 05:19:45 +0000815
816 /**
David Reiss01fe1532010-03-09 05:19:25 +0000817 * C-callable event handler for listener events. Provides a callback
818 * that libevent can understand which invokes server->handleEvent().
819 *
820 * @param fd the descriptor the event occured on.
821 * @param which the flags associated with the event.
822 * @param v void* callback arg where we placed TNonblockingServer's "this".
823 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000824 static void listenHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000825 ((TNonblockingServer*)v)->handleEvent(fd, which);
826 }
827
Jake Farrellb0d95602011-12-06 01:17:26 +0000828 /// Exits the loop ASAP in case of shutdown or error.
829 void breakLoop(bool error);
Mark Slee79b16942007-11-26 19:05:29 +0000830
Jake Farrellb0d95602011-12-06 01:17:26 +0000831 /// Registers the events for the notification & listen sockets
832 void registerEvents();
Mark Slee79b16942007-11-26 19:05:29 +0000833
David Reiss01fe1532010-03-09 05:19:25 +0000834 /// Create the pipe used to notify I/O process of task completion.
835 void createNotificationPipe();
836
Jake Farrellb0d95602011-12-06 01:17:26 +0000837 /// Unregisters our events for notification and listen sockets.
838 void cleanupEvents();
David Reiss01fe1532010-03-09 05:19:25 +0000839
Jake Farrellb0d95602011-12-06 01:17:26 +0000840 /// Sets (or clears) high priority scheduling status for the current thread.
841 void setCurrentThreadHighPriority(bool value);
David Reiss01fe1532010-03-09 05:19:25 +0000842
Jake Farrellb0d95602011-12-06 01:17:26 +0000843 private:
844 /// associated server
845 TNonblockingServer* server_;
Mark Slee79b16942007-11-26 19:05:29 +0000846
Jake Farrellb0d95602011-12-06 01:17:26 +0000847 /// thread number (for debugging).
848 const int number_;
Bryan Duxbury76c43682011-08-24 21:26:48 +0000849
Jake Farrellb0d95602011-12-06 01:17:26 +0000850 /// The actual physical thread id.
851 pthread_t threadId_;
852
853 /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
854 int listenSocket_;
855
856 /// Sets a high scheduling priority when running
857 bool useHighPriority_;
858
859 /// pointer to eventbase to be used for looping
860 event_base* eventBase_;
861
862 /// Used with eventBase_ for connection events (only in listener thread)
863 struct event serverEvent_;
864
865 /// Used with eventBase_ for task completion notification
866 struct event notificationEvent_;
867
868 /// File descriptors for pipe used for task completion notification.
869 int notificationPipeFDs_[2];
870
871 /// Actual IO Thread
872 boost::shared_ptr<Thread> thread_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000873};
874
T Jake Lucianib5e62212009-01-31 22:36:20 +0000875}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000876
Jake Farrellb0d95602011-12-06 01:17:26 +0000877#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_