blob: 6b854b5037dad44828521f43d9ee08088eecdc8a [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Jake Farrellb0d95602011-12-06 01:17:26 +000029#include <concurrency/Thread.h>
Roger Meier12d70532011-12-14 23:35:28 +000030#include <concurrency/PlatformThreadFactory.h>
Jake Farrellb0d95602011-12-06 01:17:26 +000031#include <concurrency/Mutex.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000032#include <stack>
Jake Farrellb0d95602011-12-06 01:17:26 +000033#include <vector>
David Reiss9b209552008-04-08 06:26:05 +000034#include <string>
35#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000036#include <cstdlib>
Bryan Duxbury266b1732011-09-01 16:50:28 +000037#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000038#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000039#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <event.h>
41
Jake Farrellb0d95602011-12-06 01:17:26 +000042
43
T Jake Lucianib5e62212009-01-31 22:36:20 +000044namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000045
T Jake Lucianib5e62212009-01-31 22:36:20 +000046using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000047using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000048using apache::thrift::protocol::TProtocol;
49using apache::thrift::concurrency::Runnable;
50using apache::thrift::concurrency::ThreadManager;
Roger Meier12d70532011-12-14 23:35:28 +000051using apache::thrift::concurrency::PlatformThreadFactory;
Jake Farrellb0d95602011-12-06 01:17:26 +000052using apache::thrift::concurrency::ThreadFactory;
53using apache::thrift::concurrency::Thread;
54using apache::thrift::concurrency::Mutex;
55using apache::thrift::concurrency::Guard;
Mark Slee2f6404d2006-10-10 01:37:40 +000056
Roger Meier30aae0c2011-07-08 12:23:31 +000057#ifdef LIBEVENT_VERSION_NUMBER
58#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
59#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
60#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
61#else
62// assume latest version 1 series
63#define LIBEVENT_VERSION_MAJOR 1
64#define LIBEVENT_VERSION_MINOR 14
65#define LIBEVENT_VERSION_REL 13
66#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
67#endif
68
69#if LIBEVENT_VERSION_NUMBER < 0x02000000
70 typedef int evutil_socket_t;
71#endif
72
73#ifndef SOCKOPT_CAST_T
Roger Meier84e4a3c2011-09-16 20:58:44 +000074# ifndef _WIN32
75# define SOCKOPT_CAST_T void
76# else
77# define SOCKOPT_CAST_T char
78# endif // _WIN32
Roger Meier30aae0c2011-07-08 12:23:31 +000079#endif
80
81template<class T>
82inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
83 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
84}
85
86template<class T>
87inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
88 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
89}
90
Mark Slee2f6404d2006-10-10 01:37:40 +000091/**
Jake Farrellb0d95602011-12-06 01:17:26 +000092 * This is a non-blocking server in C++ for high performance that
93 * operates a set of IO threads (by default only one). It assumes that
94 * all incoming requests are framed with a 4 byte length indicator and
95 * writes out responses using the same framing.
Mark Slee2f6404d2006-10-10 01:37:40 +000096 *
97 * It does not use the TServerTransport framework, but rather has socket
98 * operations hardcoded for use with select.
99 *
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 */
David Reiss01fe1532010-03-09 05:19:25 +0000101
102
103/// Overload condition actions.
104enum TOverloadAction {
105 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
106 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
107 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
108};
109
Jake Farrellb0d95602011-12-06 01:17:26 +0000110class TNonblockingIOThread;
111
Mark Slee2f6404d2006-10-10 01:37:40 +0000112class TNonblockingServer : public TServer {
113 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000114 class TConnection;
115
Jake Farrellb0d95602011-12-06 01:17:26 +0000116 friend class TNonblockingIOThread;
117 private:
David Reiss01fe1532010-03-09 05:19:25 +0000118 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 static const int LISTEN_BACKLOG = 1024;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000122 static const size_t CONNECTION_STACK_LIMIT = 1024;
123
Roger Meier3781c242011-12-11 20:07:21 +0000124 /// Default limit on frame size
125 static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
126
David Reiss01fe1532010-03-09 05:19:25 +0000127 /// Default limit on total number of connected sockets
128 static const int MAX_CONNECTIONS = INT_MAX;
129
130 /// Default limit on connections in handler/task processing
131 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
132
David Reiss89a12942010-10-06 17:10:52 +0000133 /// Default size of write buffer
134 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
135
David Reiss54bec5d2010-10-06 17:10:45 +0000136 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
137 static const int IDLE_READ_BUFFER_LIMIT = 1024;
138
139 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
140 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
141
142 /// # of calls before resizing oversized buffers (0 = check only on close)
143 static const int RESIZE_BUFFER_EVERY_N = 512;
144
Jake Farrellb0d95602011-12-06 01:17:26 +0000145 /// # of IO threads to use by default
146 static const int DEFAULT_IO_THREADS = 1;
147
148 /// File descriptor of an invalid socket
Roger Meier12d70532011-12-14 23:35:28 +0000149 static const int INVALID_SOCKET_VALUE = -1;
Jake Farrellb0d95602011-12-06 01:17:26 +0000150
151 /// # of IO threads this server will use
152 size_t numIOThreads_;
153
154 /// Whether to set high scheduling priority for IO threads
155 bool useHighPriorityIOThreads_;
156
David Reiss01fe1532010-03-09 05:19:25 +0000157 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 int serverSocket_;
159
David Reiss01fe1532010-03-09 05:19:25 +0000160 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 int port_;
162
David Reiss01fe1532010-03-09 05:19:25 +0000163 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000164 boost::shared_ptr<ThreadManager> threadManager_;
165
David Reiss01fe1532010-03-09 05:19:25 +0000166 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000167 bool threadPoolProcessing_;
168
Jake Farrellb0d95602011-12-06 01:17:26 +0000169 // Factory to create the IO threads
Roger Meier12d70532011-12-14 23:35:28 +0000170 boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
Mark Slee79b16942007-11-26 19:05:29 +0000171
Jake Farrellb0d95602011-12-06 01:17:26 +0000172 // Vector of IOThread objects that will handle our IO
173 std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
Mark Slee79b16942007-11-26 19:05:29 +0000174
Jake Farrellb0d95602011-12-06 01:17:26 +0000175 // Index of next IO Thread to be used (for round-robin)
Roger Meierd0cdecf2011-12-08 19:34:01 +0000176 uint32_t nextIOThread_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000177
178 // Synchronizes access to connection stack and similar data
179 Mutex connMutex_;
David Reiss01fe1532010-03-09 05:19:25 +0000180
181 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000182 size_t numTConnections_;
183
David Reiss9e8073c2010-03-09 05:19:39 +0000184 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000185 size_t numActiveProcessors_;
186
187 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000188 size_t connectionStackLimit_;
189
David Reiss01fe1532010-03-09 05:19:25 +0000190 /// Limit for number of connections processing or waiting to process
191 size_t maxActiveProcessors_;
192
193 /// Limit for number of open connections
194 size_t maxConnections_;
195
Roger Meier3781c242011-12-11 20:07:21 +0000196 /// Limit for frame size
197 size_t maxFrameSize_;
198
David Reiss068f4162010-03-09 05:19:45 +0000199 /// Time in milliseconds before an unperformed task expires (0 == infinite).
200 int64_t taskExpireTime_;
201
David Reiss01fe1532010-03-09 05:19:25 +0000202 /**
203 * Hysteresis for overload state. This is the fraction of the overload
204 * value that needs to be reached before the overload state is cleared;
205 * must be <= 1.0.
206 */
207 double overloadHysteresis_;
208
209 /// Action to take when we're overloaded.
210 TOverloadAction overloadAction_;
211
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000212 /**
David Reiss89a12942010-10-06 17:10:52 +0000213 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
214 * and found to be exceeded, reinitialized) to this size.
215 */
216 size_t writeBufferDefaultSize_;
217
218 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000219 * Max read buffer size for an idle TConnection. When we place an idle
220 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000221 * we will free the buffer (such that it will be reinitialized by the next
222 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000223 */
David Reiss54bec5d2010-10-06 17:10:45 +0000224 size_t idleReadBufferLimit_;
225
226 /**
227 * Max write buffer size for an idle connection. When we place an idle
228 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
229 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000230 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
231 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000232 */
233 size_t idleWriteBufferLimit_;
234
235 /**
236 * Every N calls we check the buffer size limits on a connected TConnection.
237 * 0 disables (i.e. the checks are only done when a connection closes).
238 */
239 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000240
241 /// Set if we are currently in an overloaded state.
242 bool overloaded_;
243
244 /// Count of connections dropped since overload started
245 uint32_t nConnectionsDropped_;
246
247 /// Count of connections dropped on overload since server started
248 uint64_t nTotalConnectionsDropped_;
249
Mark Slee2f6404d2006-10-10 01:37:40 +0000250 /**
251 * This is a stack of all the objects that have been created but that
252 * are NOT currently in use. When we close a connection, we place it on this
253 * stack so that the object can be reused later, rather than freeing the
254 * memory and reallocating a new object later.
255 */
256 std::stack<TConnection*> connectionStack_;
257
David Reiss01fe1532010-03-09 05:19:25 +0000258 /**
259 * Called when server socket had something happen. We accept all waiting
260 * client connections on listen socket fd and assign TConnection objects
261 * to handle those requests.
262 *
263 * @param fd the listen socket.
264 * @param which the event flag that triggered the handler.
265 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000266 void handleEvent(int fd, short which);
267
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000268 void init(int port) {
269 serverSocket_ = -1;
Jake Farrellb0d95602011-12-06 01:17:26 +0000270 numIOThreads_ = DEFAULT_IO_THREADS;
271 nextIOThread_ = 0;
272 useHighPriorityIOThreads_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000273 port_ = port;
274 threadPoolProcessing_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000275 numTConnections_ = 0;
276 numActiveProcessors_ = 0;
277 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
278 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
279 maxConnections_ = MAX_CONNECTIONS;
Roger Meier3781c242011-12-11 20:07:21 +0000280 maxFrameSize_ = MAX_FRAME_SIZE;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000281 taskExpireTime_ = 0;
282 overloadHysteresis_ = 0.8;
283 overloadAction_ = T_OVERLOAD_NO_ACTION;
284 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
285 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
286 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
287 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
288 overloaded_ = false;
289 nConnectionsDropped_ = 0;
290 nTotalConnectionsDropped_ = 0;
291 }
Mark Sleef9373392007-01-24 19:41:57 +0000292
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000293 public:
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000294 template<typename ProcessorFactory>
295 TNonblockingServer(
296 const boost::shared_ptr<ProcessorFactory>& processorFactory,
297 int port,
298 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
299 TServer(processorFactory) {
300 init(port);
301 }
302
303 template<typename Processor>
304 TNonblockingServer(const boost::shared_ptr<Processor>& processor,
305 int port,
306 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000307 TServer(processor) {
308 init(port);
309 }
310
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000311 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000312 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000313 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000314 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
315 int port,
316 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000317 boost::shared_ptr<ThreadManager>(),
318 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
319 TServer(processorFactory) {
320
321 init(port);
322
323 setInputProtocolFactory(protocolFactory);
324 setOutputProtocolFactory(protocolFactory);
325 setThreadManager(threadManager);
326 }
327
328 template<typename Processor>
329 TNonblockingServer(
330 const boost::shared_ptr<Processor>& processor,
331 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
332 int port,
333 const boost::shared_ptr<ThreadManager>& threadManager =
334 boost::shared_ptr<ThreadManager>(),
335 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000336 TServer(processor) {
337
338 init(port);
339
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000340 setInputProtocolFactory(protocolFactory);
341 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000342 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000343 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000344
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000345 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000346 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000347 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000348 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
349 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
350 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
351 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
352 int port,
353 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000354 boost::shared_ptr<ThreadManager>(),
355 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
356 TServer(processorFactory) {
357
358 init(port);
359
360 setInputTransportFactory(inputTransportFactory);
361 setOutputTransportFactory(outputTransportFactory);
362 setInputProtocolFactory(inputProtocolFactory);
363 setOutputProtocolFactory(outputProtocolFactory);
364 setThreadManager(threadManager);
365 }
366
367 template<typename Processor>
368 TNonblockingServer(
369 const boost::shared_ptr<Processor>& processor,
370 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
371 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
372 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
373 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
374 int port,
375 const boost::shared_ptr<ThreadManager>& threadManager =
376 boost::shared_ptr<ThreadManager>(),
377 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000378 TServer(processor) {
379
380 init(port);
381
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000382 setInputTransportFactory(inputTransportFactory);
383 setOutputTransportFactory(outputTransportFactory);
384 setInputProtocolFactory(inputProtocolFactory);
385 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000386 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000387 }
Mark Slee79b16942007-11-26 19:05:29 +0000388
David Reiss8ede8182010-09-02 15:26:28 +0000389 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000390
David Reiss068f4162010-03-09 05:19:45 +0000391 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000392
David Reiss1997f102008-04-29 00:29:41 +0000393 boost::shared_ptr<ThreadManager> getThreadManager() {
394 return threadManager_;
395 }
396
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000397 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000398 * Sets the number of IO threads used by this server. Can only be used before
399 * the call to serve() and has no effect afterwards. We always use a
400 * PosixThreadFactory for the IO worker threads, because they must joinable
401 * for clean shutdown.
402 */
403 void setNumIOThreads(size_t numThreads) {
404 numIOThreads_ = numThreads;
405 }
406
407 /** Return whether the IO threads will get high scheduling priority */
408 bool useHighPriorityIOThreads() const {
409 return useHighPriorityIOThreads_;
410 }
411
412 /** Set whether the IO threads will get high scheduling priority. */
413 void setUseHighPriorityIOThreads(bool val) {
414 useHighPriorityIOThreads_ = val;
415 }
416
417 /** Return the number of IO threads used by this server. */
418 size_t getNumIOThreads() const {
419 return numIOThreads_;
420 }
421
422 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000423 * Get the maximum number of unused TConnection we will hold in reserve.
424 *
425 * @return the current limit on TConnection pool size.
426 */
David Reiss260fa932009-04-02 23:51:39 +0000427 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000428 return connectionStackLimit_;
429 }
430
431 /**
432 * Set the maximum number of unused TConnection we will hold in reserve.
433 *
434 * @param sz the new limit for TConnection pool size.
435 */
David Reiss260fa932009-04-02 23:51:39 +0000436 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000437 connectionStackLimit_ = sz;
438 }
439
Mark Slee79b16942007-11-26 19:05:29 +0000440 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000441 return threadPoolProcessing_;
442 }
443
444 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000445 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000446 }
447
David Reiss01fe1532010-03-09 05:19:25 +0000448 /**
449 * Return the count of sockets currently connected to.
450 *
451 * @return count of connected sockets.
452 */
453 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000454 return numTConnections_;
455 }
456
David Reiss01fe1532010-03-09 05:19:25 +0000457 /**
Roger Meierec8027f2012-04-11 21:43:25 +0000458 * Return the count of sockets currently connected to.
459 *
460 * @return count of connected sockets.
461 */
462 size_t getNumActiveConnections() const {
463 return getNumConnections() - getNumIdleConnections();
464 }
465
466 /**
David Reiss01fe1532010-03-09 05:19:25 +0000467 * Return the count of connection objects allocated but not in use.
468 *
469 * @return count of idle connection objects.
470 */
471 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000472 return connectionStack_.size();
473 }
474
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000475 /**
David Reiss01fe1532010-03-09 05:19:25 +0000476 * Return count of number of connections which are currently processing.
477 * This is defined as a connection where all data has been received and
478 * either assigned a task (when threading) or passed to a handler (when
479 * not threading), and where the handler has not yet returned.
480 *
481 * @return # of connections currently processing.
482 */
483 size_t getNumActiveProcessors() const {
484 return numActiveProcessors_;
485 }
486
487 /// Increment the count of connections currently processing.
488 void incrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000489 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000490 ++numActiveProcessors_;
491 }
492
493 /// Decrement the count of connections currently processing.
494 void decrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000495 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000496 if (numActiveProcessors_ > 0) {
497 --numActiveProcessors_;
498 }
499 }
500
501 /**
502 * Get the maximum # of connections allowed before overload.
503 *
504 * @return current setting.
505 */
506 size_t getMaxConnections() const {
507 return maxConnections_;
508 }
509
510 /**
511 * Set the maximum # of connections allowed before overload.
512 *
513 * @param maxConnections new setting for maximum # of connections.
514 */
515 void setMaxConnections(size_t maxConnections) {
516 maxConnections_ = maxConnections;
517 }
518
519 /**
520 * Get the maximum # of connections waiting in handler/task before overload.
521 *
522 * @return current setting.
523 */
524 size_t getMaxActiveProcessors() const {
525 return maxActiveProcessors_;
526 }
527
528 /**
529 * Set the maximum # of connections waiting in handler/task before overload.
530 *
531 * @param maxActiveProcessors new setting for maximum # of active processes.
532 */
533 void setMaxActiveProcessors(size_t maxActiveProcessors) {
534 maxActiveProcessors_ = maxActiveProcessors;
535 }
536
537 /**
Roger Meier3781c242011-12-11 20:07:21 +0000538 * Get the maximum allowed frame size.
539 *
540 * If a client tries to send a message larger than this limit,
541 * its connection will be closed.
542 *
543 * @return Maxium frame size, in bytes.
544 */
545 size_t getMaxFrameSize() const {
546 return maxFrameSize_;
547 }
548
549 /**
550 * Set the maximum allowed frame size.
551 *
552 * @param maxFrameSize The new maximum frame size.
553 */
554 void setMaxFrameSize(size_t maxFrameSize) {
555 maxFrameSize_ = maxFrameSize;
556 }
557
558 /**
David Reiss01fe1532010-03-09 05:19:25 +0000559 * Get fraction of maximum limits before an overload condition is cleared.
560 *
561 * @return hysteresis fraction
562 */
563 double getOverloadHysteresis() const {
564 return overloadHysteresis_;
565 }
566
567 /**
568 * Set fraction of maximum limits before an overload condition is cleared.
569 * A good value would probably be between 0.5 and 0.9.
570 *
571 * @param hysteresisFraction fraction <= 1.0.
572 */
573 void setOverloadHysteresis(double hysteresisFraction) {
574 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
575 overloadHysteresis_ = hysteresisFraction;
576 }
577 }
578
579 /**
580 * Get the action the server will take on overload.
581 *
582 * @return a TOverloadAction enum value for the currently set action.
583 */
584 TOverloadAction getOverloadAction() const {
585 return overloadAction_;
586 }
587
588 /**
589 * Set the action the server is to take on overload.
590 *
591 * @param overloadAction a TOverloadAction enum value for the action.
592 */
593 void setOverloadAction(TOverloadAction overloadAction) {
594 overloadAction_ = overloadAction;
595 }
596
597 /**
David Reiss068f4162010-03-09 05:19:45 +0000598 * Get the time in milliseconds after which a task expires (0 == infinite).
599 *
600 * @return a 64-bit time in milliseconds.
601 */
602 int64_t getTaskExpireTime() const {
603 return taskExpireTime_;
604 }
605
606 /**
607 * Set the time in milliseconds after which a task expires (0 == infinite).
608 *
609 * @param taskExpireTime a 64-bit time in milliseconds.
610 */
611 void setTaskExpireTime(int64_t taskExpireTime) {
612 taskExpireTime_ = taskExpireTime;
613 }
614
615 /**
David Reiss01fe1532010-03-09 05:19:25 +0000616 * Determine if the server is currently overloaded.
617 * This function checks the maximums for open connections and connections
618 * currently in processing, and sets an overload condition if they are
619 * exceeded. The overload will persist until both values are below the
620 * current hysteresis fraction of their maximums.
621 *
622 * @return true if an overload condition exists, false if not.
623 */
624 bool serverOverloaded();
625
626 /** Pop and discard next task on threadpool wait queue.
627 *
628 * @return true if a task was discarded, false if the wait queue was empty.
629 */
630 bool drainPendingTask();
631
632 /**
David Reiss89a12942010-10-06 17:10:52 +0000633 * Get the starting size of a TConnection object's write buffer.
634 *
635 * @return # bytes we initialize a TConnection object's write buffer to.
636 */
637 size_t getWriteBufferDefaultSize() const {
638 return writeBufferDefaultSize_;
639 }
640
641 /**
642 * Set the starting size of a TConnection object's write buffer.
643 *
644 * @param size # bytes we initialize a TConnection object's write buffer to.
645 */
646 void setWriteBufferDefaultSize(size_t size) {
647 writeBufferDefaultSize_ = size;
648 }
649
650 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000651 * Get the maximum size of read buffer allocated to idle TConnection objects.
652 *
David Reiss89a12942010-10-06 17:10:52 +0000653 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000654 */
655 size_t getIdleReadBufferLimit() const {
656 return idleReadBufferLimit_;
657 }
658
659 /**
660 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
661 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000662 *
David Reiss89a12942010-10-06 17:10:52 +0000663 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000664 */
665 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000666 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000667 }
668
669 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000670 * Set the maximum size read buffer allocated to idle TConnection objects.
671 * If a TConnection object is found (either on connection close or between
672 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000673 * allocated to its read buffer, we free it and allow it to be reinitialized
674 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000675 *
676 * @param limit of bytes beyond which we will shrink buffers when checked.
677 */
678 void setIdleReadBufferLimit(size_t limit) {
679 idleReadBufferLimit_ = limit;
680 }
681
682 /**
683 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
684 * Set the maximum size read buffer allocated to idle TConnection objects.
685 * If a TConnection object is found (either on connection close or between
686 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000687 * allocated to its read buffer, we free it and allow it to be reinitialized
688 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000689 *
690 * @param limit of bytes beyond which we will shrink buffers when checked.
691 */
692 void setIdleBufferMemLimit(size_t limit) {
693 idleReadBufferLimit_ = limit;
694 }
695
Jake Farrellb0d95602011-12-06 01:17:26 +0000696
David Reiss54bec5d2010-10-06 17:10:45 +0000697
698 /**
699 * Get the maximum size of write buffer allocated to idle TConnection objects.
700 *
701 * @return # bytes beyond which we will reallocate buffers when checked.
702 */
703 size_t getIdleWriteBufferLimit() const {
704 return idleWriteBufferLimit_;
705 }
706
707 /**
708 * Set the maximum size write buffer allocated to idle TConnection objects.
709 * If a TConnection object is found (either on connection close or between
710 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000711 * allocated to its write buffer, we destroy and construct that buffer with
712 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000713 *
714 * @param limit of bytes beyond which we will shrink buffers when idle.
715 */
David Reiss54bec5d2010-10-06 17:10:45 +0000716 void setIdleWriteBufferLimit(size_t limit) {
717 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000718 }
719
David Reiss01fe1532010-03-09 05:19:25 +0000720 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000721 * Get # of calls made between buffer size checks. 0 means disabled.
722 *
723 * @return # of calls between buffer size checks.
724 */
725 int32_t getResizeBufferEveryN() const {
726 return resizeBufferEveryN_;
727 }
728
729 /**
730 * Check buffer sizes every "count" calls. This allows buffer limits
731 * to be enforced for persistant connections with a controllable degree
732 * of overhead. 0 disables checks except at connection close.
733 *
734 * @param count the number of calls between checks, or 0 to disable
735 */
736 void setResizeBufferEveryN(int32_t count) {
737 resizeBufferEveryN_ = count;
738 }
739
Jake Farrellb0d95602011-12-06 01:17:26 +0000740 /**
741 * Main workhorse function, starts up the server listening on a port and
742 * loops over the libevent handler.
743 */
744 void serve();
David Reiss54bec5d2010-10-06 17:10:45 +0000745
Jake Farrellb0d95602011-12-06 01:17:26 +0000746 /**
747 * Causes the server to terminate gracefully (can be called from any thread).
748 */
749 void stop();
David Reiss54bec5d2010-10-06 17:10:45 +0000750
Jake Farrellb0d95602011-12-06 01:17:26 +0000751 private:
752 /**
753 * Callback function that the threadmanager calls when a task reaches
754 * its expiration time. It is needed to clean up the expired connection.
755 *
756 * @param task the runnable associated with the expired task.
757 */
758 void expireClose(boost::shared_ptr<Runnable> task);
759
760 /// Creates a socket to listen on and binds it to the local port.
761 void createAndListenOnSocket();
762
763 /**
764 * Takes a socket created by createAndListenOnSocket() and sets various
765 * options on it to prepare for use in the server.
766 *
767 * @param fd descriptor of socket to be initialized/
768 */
769 void listenSocket(int fd);
David Reiss54bec5d2010-10-06 17:10:45 +0000770 /**
David Reiss01fe1532010-03-09 05:19:25 +0000771 * Return an initialized connection object. Creates or recovers from
772 * pool a TConnection and initializes it with the provided socket FD
773 * and flags.
774 *
775 * @param socket FD of socket associated with this connection.
David Reiss105961d2010-10-06 17:10:17 +0000776 * @param addr the sockaddr of the client
777 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000778 * @return pointer to initialized TConnection object.
779 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000780 TConnection* createConnection(int socket, const sockaddr* addr,
781 socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000782
David Reiss01fe1532010-03-09 05:19:25 +0000783 /**
784 * Returns a connection to pool or deletion. If the connection pool
785 * (a stack) isn't full, place the connection object on it, otherwise
786 * just delete it.
787 *
788 * @param connection the TConection being returned.
789 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000790 void returnConnection(TConnection* connection);
Jake Farrellb0d95602011-12-06 01:17:26 +0000791};
Mark Slee2f6404d2006-10-10 01:37:40 +0000792
Jake Farrellb0d95602011-12-06 01:17:26 +0000793class TNonblockingIOThread : public Runnable {
794 public:
795 // Creates an IO thread and sets up the event base. The listenSocket should
796 // be a valid FD on which listen() has already been called. If the
797 // listenSocket is < 0, accepting will not be done.
798 TNonblockingIOThread(TNonblockingServer* server,
799 int number,
800 int listenSocket,
801 bool useHighPriority);
802
803 ~TNonblockingIOThread();
804
805 // Returns the event-base for this thread.
806 event_base* getEventBase() const { return eventBase_; }
807
808 // Returns the server for this thread.
809 TNonblockingServer* getServer() const { return server_; }
810
811 // Returns the number of this IO thread.
812 int getThreadNumber() const { return number_; }
813
814 // Returns the thread id associated with this object. This should
815 // only be called after the thread has been started.
Roger Meier12d70532011-12-14 23:35:28 +0000816 Thread::id_t getThreadId() const { return threadId_; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000817
818 // Returns the send-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000819 evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000820
821 // Returns the read-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000822 evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000823
824 // Returns the actual thread object associated with this IO thread.
825 boost::shared_ptr<Thread> getThread() const { return thread_; }
826
827 // Sets the actual thread object associated with this IO thread.
828 void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
829
830 // Used by TConnection objects to indicate processing has finished.
831 bool notify(TNonblockingServer::TConnection* conn);
832
833 // Enters the event loop and does not return until a call to stop().
834 virtual void run();
835
836 // Exits the event loop as soon as possible.
837 void stop();
838
839 // Ensures that the event-loop thread is fully finished and shut down.
840 void join();
841
842 private:
David Reiss01fe1532010-03-09 05:19:25 +0000843 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000844 * C-callable event handler for signaling task completion. Provides a
845 * callback that libevent can understand that will read a connection
846 * object's address from a pipe and call connection->transition() for
847 * that object.
David Reiss068f4162010-03-09 05:19:45 +0000848 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000849 * @param fd the descriptor the event occurred on.
David Reiss068f4162010-03-09 05:19:45 +0000850 */
Roger Meier12d70532011-12-14 23:35:28 +0000851 static void notifyHandler(evutil_socket_t fd, short which, void* v);
David Reiss068f4162010-03-09 05:19:45 +0000852
853 /**
David Reiss01fe1532010-03-09 05:19:25 +0000854 * C-callable event handler for listener events. Provides a callback
855 * that libevent can understand which invokes server->handleEvent().
856 *
857 * @param fd the descriptor the event occured on.
858 * @param which the flags associated with the event.
859 * @param v void* callback arg where we placed TNonblockingServer's "this".
860 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000861 static void listenHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000862 ((TNonblockingServer*)v)->handleEvent(fd, which);
863 }
864
Jake Farrellb0d95602011-12-06 01:17:26 +0000865 /// Exits the loop ASAP in case of shutdown or error.
866 void breakLoop(bool error);
Mark Slee79b16942007-11-26 19:05:29 +0000867
Jake Farrellb0d95602011-12-06 01:17:26 +0000868 /// Registers the events for the notification & listen sockets
869 void registerEvents();
Mark Slee79b16942007-11-26 19:05:29 +0000870
David Reiss01fe1532010-03-09 05:19:25 +0000871 /// Create the pipe used to notify I/O process of task completion.
872 void createNotificationPipe();
873
Jake Farrellb0d95602011-12-06 01:17:26 +0000874 /// Unregisters our events for notification and listen sockets.
875 void cleanupEvents();
David Reiss01fe1532010-03-09 05:19:25 +0000876
Jake Farrellb0d95602011-12-06 01:17:26 +0000877 /// Sets (or clears) high priority scheduling status for the current thread.
878 void setCurrentThreadHighPriority(bool value);
David Reiss01fe1532010-03-09 05:19:25 +0000879
Jake Farrellb0d95602011-12-06 01:17:26 +0000880 private:
881 /// associated server
882 TNonblockingServer* server_;
Mark Slee79b16942007-11-26 19:05:29 +0000883
Jake Farrellb0d95602011-12-06 01:17:26 +0000884 /// thread number (for debugging).
885 const int number_;
Bryan Duxbury76c43682011-08-24 21:26:48 +0000886
Jake Farrellb0d95602011-12-06 01:17:26 +0000887 /// The actual physical thread id.
Roger Meier12d70532011-12-14 23:35:28 +0000888 Thread::id_t threadId_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000889
890 /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
891 int listenSocket_;
892
893 /// Sets a high scheduling priority when running
894 bool useHighPriority_;
895
896 /// pointer to eventbase to be used for looping
897 event_base* eventBase_;
898
899 /// Used with eventBase_ for connection events (only in listener thread)
900 struct event serverEvent_;
901
902 /// Used with eventBase_ for task completion notification
903 struct event notificationEvent_;
904
905 /// File descriptors for pipe used for task completion notification.
Roger Meier12d70532011-12-14 23:35:28 +0000906 evutil_socket_t notificationPipeFDs_[2];
Jake Farrellb0d95602011-12-06 01:17:26 +0000907
908 /// Actual IO Thread
909 boost::shared_ptr<Thread> thread_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000910};
911
T Jake Lucianib5e62212009-01-31 22:36:20 +0000912}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000913
Jake Farrellb0d95602011-12-06 01:17:26 +0000914#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_