blob: 501433cb0f4c0065544729987a53c889ec46e724 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000030#include <string>
31#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000032#include <cstdlib>
David Reiss5105b2e2009-05-21 02:28:27 +000033#include <unistd.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000034#include <event.h>
35
T Jake Lucianib5e62212009-01-31 22:36:20 +000036namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000039using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000040using apache::thrift::protocol::TProtocol;
41using apache::thrift::concurrency::Runnable;
42using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000043
44// Forward declaration of class
45class TConnection;
46
47/**
48 * This is a non-blocking server in C++ for high performance that operates a
49 * single IO thread. It assumes that all incoming requests are framed with a
50 * 4 byte length indicator and writes out responses using the same framing.
51 *
52 * It does not use the TServerTransport framework, but rather has socket
53 * operations hardcoded for use with select.
54 *
Mark Slee2f6404d2006-10-10 01:37:40 +000055 */
David Reiss01fe1532010-03-09 05:19:25 +000056
57
58/// Overload condition actions.
59enum TOverloadAction {
60 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
61 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
62 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
63};
64
Mark Slee2f6404d2006-10-10 01:37:40 +000065class TNonblockingServer : public TServer {
66 private:
David Reiss01fe1532010-03-09 05:19:25 +000067 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000068 static const int LISTEN_BACKLOG = 1024;
69
David Reiss01fe1532010-03-09 05:19:25 +000070 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000071 static const size_t CONNECTION_STACK_LIMIT = 1024;
72
David Reiss01fe1532010-03-09 05:19:25 +000073 /// Default limit on total number of connected sockets
74 static const int MAX_CONNECTIONS = INT_MAX;
75
76 /// Default limit on connections in handler/task processing
77 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
78
David Reiss54bec5d2010-10-06 17:10:45 +000079 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
80 static const int IDLE_READ_BUFFER_LIMIT = 1024;
81
82 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
83 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
84
85 /// # of calls before resizing oversized buffers (0 = check only on close)
86 static const int RESIZE_BUFFER_EVERY_N = 512;
87
David Reiss01fe1532010-03-09 05:19:25 +000088 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +000089 int serverSocket_;
90
David Reiss01fe1532010-03-09 05:19:25 +000091 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +000092 int port_;
93
David Reiss01fe1532010-03-09 05:19:25 +000094 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +000095 boost::shared_ptr<ThreadManager> threadManager_;
96
David Reiss01fe1532010-03-09 05:19:25 +000097 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +000098 bool threadPoolProcessing_;
99
David Reiss01fe1532010-03-09 05:19:25 +0000100 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +0000101 event_base* eventBase_;
102
David Reiss01fe1532010-03-09 05:19:25 +0000103 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +0000104 struct event serverEvent_;
105
David Reiss01fe1532010-03-09 05:19:25 +0000106 /// Event struct, used with eventBase_ for task completion notification
107 struct event notificationEvent_;
108
109 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000110 size_t numTConnections_;
111
David Reiss9e8073c2010-03-09 05:19:39 +0000112 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000113 size_t numActiveProcessors_;
114
115 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000116 size_t connectionStackLimit_;
117
David Reiss01fe1532010-03-09 05:19:25 +0000118 /// Limit for number of connections processing or waiting to process
119 size_t maxActiveProcessors_;
120
121 /// Limit for number of open connections
122 size_t maxConnections_;
123
David Reiss068f4162010-03-09 05:19:45 +0000124 /// Time in milliseconds before an unperformed task expires (0 == infinite).
125 int64_t taskExpireTime_;
126
David Reiss01fe1532010-03-09 05:19:25 +0000127 /**
128 * Hysteresis for overload state. This is the fraction of the overload
129 * value that needs to be reached before the overload state is cleared;
130 * must be <= 1.0.
131 */
132 double overloadHysteresis_;
133
134 /// Action to take when we're overloaded.
135 TOverloadAction overloadAction_;
136
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000137 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000138 * Max read buffer size for an idle TConnection. When we place an idle
139 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
140 * we insure that its read buffer is reduced to this size to insure that
141 * idle connections don't hog memory. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000142 */
David Reiss54bec5d2010-10-06 17:10:45 +0000143 size_t idleReadBufferLimit_;
144
145 /**
146 * Max write buffer size for an idle connection. When we place an idle
147 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
148 * we insure that its write buffer is <= to this size; otherwise we
149 * replace it with a new one to insure that idle connections don't hog
150 * memory. 0 disables this check.
151 */
152 size_t idleWriteBufferLimit_;
153
154 /**
155 * Every N calls we check the buffer size limits on a connected TConnection.
156 * 0 disables (i.e. the checks are only done when a connection closes).
157 */
158 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000159
160 /// Set if we are currently in an overloaded state.
161 bool overloaded_;
162
163 /// Count of connections dropped since overload started
164 uint32_t nConnectionsDropped_;
165
166 /// Count of connections dropped on overload since server started
167 uint64_t nTotalConnectionsDropped_;
168
169 /// File descriptors for pipe used for task completion notification.
170 int notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000171
Mark Slee2f6404d2006-10-10 01:37:40 +0000172 /**
173 * This is a stack of all the objects that have been created but that
174 * are NOT currently in use. When we close a connection, we place it on this
175 * stack so that the object can be reused later, rather than freeing the
176 * memory and reallocating a new object later.
177 */
178 std::stack<TConnection*> connectionStack_;
179
David Reiss01fe1532010-03-09 05:19:25 +0000180 /**
181 * Called when server socket had something happen. We accept all waiting
182 * client connections on listen socket fd and assign TConnection objects
183 * to handle those requests.
184 *
185 * @param fd the listen socket.
186 * @param which the event flag that triggered the handler.
187 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000188 void handleEvent(int fd, short which);
189
190 public:
Mark Slee5ea15f92007-03-05 22:55:59 +0000191 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +0000192 int port) :
193 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000194 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +0000195 port_(port),
dweatherford58985992007-06-19 23:10:19 +0000196 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +0000197 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000198 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000199 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000200 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000201 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
202 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000203 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000204 overloadHysteresis_(0.8),
205 overloadAction_(T_OVERLOAD_NO_ACTION),
David Reiss54bec5d2010-10-06 17:10:45 +0000206 idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
207 idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
208 resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
David Reiss01fe1532010-03-09 05:19:25 +0000209 overloaded_(false),
210 nConnectionsDropped_(0),
211 nTotalConnectionsDropped_(0) {}
Mark Sleef9373392007-01-24 19:41:57 +0000212
Mark Slee79b16942007-11-26 19:05:29 +0000213 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000214 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000215 int port,
216 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000217 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000218 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000219 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000220 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000221 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000222 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000223 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000224 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000225 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
226 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000227 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000228 overloadHysteresis_(0.8),
229 overloadAction_(T_OVERLOAD_NO_ACTION),
David Reiss54bec5d2010-10-06 17:10:45 +0000230 idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
231 idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
232 resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
David Reiss01fe1532010-03-09 05:19:25 +0000233 overloaded_(false),
234 nConnectionsDropped_(0),
235 nTotalConnectionsDropped_(0) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000236 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
237 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000238 setInputProtocolFactory(protocolFactory);
239 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000240 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000241 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000242
Mark Slee5ea15f92007-03-05 22:55:59 +0000243 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
244 boost::shared_ptr<TTransportFactory> inputTransportFactory,
245 boost::shared_ptr<TTransportFactory> outputTransportFactory,
246 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
247 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000248 int port,
249 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000250 TServer(processor),
David Reiss01fe1532010-03-09 05:19:25 +0000251 serverSocket_(-1),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000252 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000253 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000254 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000255 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000256 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000257 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000258 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
259 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000260 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000261 overloadHysteresis_(0.8),
262 overloadAction_(T_OVERLOAD_NO_ACTION),
David Reiss54bec5d2010-10-06 17:10:45 +0000263 idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
264 idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
265 resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
David Reiss01fe1532010-03-09 05:19:25 +0000266 overloaded_(false),
267 nConnectionsDropped_(0),
David Reiss068f4162010-03-09 05:19:45 +0000268 nTotalConnectionsDropped_(0) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000269 setInputTransportFactory(inputTransportFactory);
270 setOutputTransportFactory(outputTransportFactory);
271 setInputProtocolFactory(inputProtocolFactory);
272 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000273 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000274 }
Mark Slee79b16942007-11-26 19:05:29 +0000275
David Reiss8ede8182010-09-02 15:26:28 +0000276 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000277
David Reiss068f4162010-03-09 05:19:45 +0000278 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000279
David Reiss1997f102008-04-29 00:29:41 +0000280 boost::shared_ptr<ThreadManager> getThreadManager() {
281 return threadManager_;
282 }
283
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000284 /**
285 * Get the maximum number of unused TConnection we will hold in reserve.
286 *
287 * @return the current limit on TConnection pool size.
288 */
David Reiss260fa932009-04-02 23:51:39 +0000289 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000290 return connectionStackLimit_;
291 }
292
293 /**
294 * Set the maximum number of unused TConnection we will hold in reserve.
295 *
296 * @param sz the new limit for TConnection pool size.
297 */
David Reiss260fa932009-04-02 23:51:39 +0000298 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000299 connectionStackLimit_ = sz;
300 }
301
Mark Slee79b16942007-11-26 19:05:29 +0000302 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000303 return threadPoolProcessing_;
304 }
305
306 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000307 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000308 }
309
Mark Slee79b16942007-11-26 19:05:29 +0000310 event_base* getEventBase() const {
311 return eventBase_;
312 }
313
David Reiss01fe1532010-03-09 05:19:25 +0000314 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000315 void incrementNumConnections() {
316 ++numTConnections_;
317 }
318
David Reiss01fe1532010-03-09 05:19:25 +0000319 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000320 void decrementNumConnections() {
321 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000322 }
323
David Reiss01fe1532010-03-09 05:19:25 +0000324 /**
325 * Return the count of sockets currently connected to.
326 *
327 * @return count of connected sockets.
328 */
329 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000330 return numTConnections_;
331 }
332
David Reiss01fe1532010-03-09 05:19:25 +0000333 /**
334 * Return the count of connection objects allocated but not in use.
335 *
336 * @return count of idle connection objects.
337 */
338 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000339 return connectionStack_.size();
340 }
341
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000342 /**
David Reiss01fe1532010-03-09 05:19:25 +0000343 * Return count of number of connections which are currently processing.
344 * This is defined as a connection where all data has been received and
345 * either assigned a task (when threading) or passed to a handler (when
346 * not threading), and where the handler has not yet returned.
347 *
348 * @return # of connections currently processing.
349 */
350 size_t getNumActiveProcessors() const {
351 return numActiveProcessors_;
352 }
353
354 /// Increment the count of connections currently processing.
355 void incrementActiveProcessors() {
356 ++numActiveProcessors_;
357 }
358
359 /// Decrement the count of connections currently processing.
360 void decrementActiveProcessors() {
361 if (numActiveProcessors_ > 0) {
362 --numActiveProcessors_;
363 }
364 }
365
366 /**
367 * Get the maximum # of connections allowed before overload.
368 *
369 * @return current setting.
370 */
371 size_t getMaxConnections() const {
372 return maxConnections_;
373 }
374
375 /**
376 * Set the maximum # of connections allowed before overload.
377 *
378 * @param maxConnections new setting for maximum # of connections.
379 */
380 void setMaxConnections(size_t maxConnections) {
381 maxConnections_ = maxConnections;
382 }
383
384 /**
385 * Get the maximum # of connections waiting in handler/task before overload.
386 *
387 * @return current setting.
388 */
389 size_t getMaxActiveProcessors() const {
390 return maxActiveProcessors_;
391 }
392
393 /**
394 * Set the maximum # of connections waiting in handler/task before overload.
395 *
396 * @param maxActiveProcessors new setting for maximum # of active processes.
397 */
398 void setMaxActiveProcessors(size_t maxActiveProcessors) {
399 maxActiveProcessors_ = maxActiveProcessors;
400 }
401
402 /**
403 * Get fraction of maximum limits before an overload condition is cleared.
404 *
405 * @return hysteresis fraction
406 */
407 double getOverloadHysteresis() const {
408 return overloadHysteresis_;
409 }
410
411 /**
412 * Set fraction of maximum limits before an overload condition is cleared.
413 * A good value would probably be between 0.5 and 0.9.
414 *
415 * @param hysteresisFraction fraction <= 1.0.
416 */
417 void setOverloadHysteresis(double hysteresisFraction) {
418 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
419 overloadHysteresis_ = hysteresisFraction;
420 }
421 }
422
423 /**
424 * Get the action the server will take on overload.
425 *
426 * @return a TOverloadAction enum value for the currently set action.
427 */
428 TOverloadAction getOverloadAction() const {
429 return overloadAction_;
430 }
431
432 /**
433 * Set the action the server is to take on overload.
434 *
435 * @param overloadAction a TOverloadAction enum value for the action.
436 */
437 void setOverloadAction(TOverloadAction overloadAction) {
438 overloadAction_ = overloadAction;
439 }
440
441 /**
David Reiss068f4162010-03-09 05:19:45 +0000442 * Get the time in milliseconds after which a task expires (0 == infinite).
443 *
444 * @return a 64-bit time in milliseconds.
445 */
446 int64_t getTaskExpireTime() const {
447 return taskExpireTime_;
448 }
449
450 /**
451 * Set the time in milliseconds after which a task expires (0 == infinite).
452 *
453 * @param taskExpireTime a 64-bit time in milliseconds.
454 */
455 void setTaskExpireTime(int64_t taskExpireTime) {
456 taskExpireTime_ = taskExpireTime;
457 }
458
459 /**
David Reiss01fe1532010-03-09 05:19:25 +0000460 * Determine if the server is currently overloaded.
461 * This function checks the maximums for open connections and connections
462 * currently in processing, and sets an overload condition if they are
463 * exceeded. The overload will persist until both values are below the
464 * current hysteresis fraction of their maximums.
465 *
466 * @return true if an overload condition exists, false if not.
467 */
468 bool serverOverloaded();
469
470 /** Pop and discard next task on threadpool wait queue.
471 *
472 * @return true if a task was discarded, false if the wait queue was empty.
473 */
474 bool drainPendingTask();
475
476 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000477 * Get the maximum size of read buffer allocated to idle TConnection objects.
478 *
479 * @return # bytes beyond which we will shrink buffers when idle.
480 */
481 size_t getIdleReadBufferLimit() const {
482 return idleReadBufferLimit_;
483 }
484
485 /**
486 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
487 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000488 *
489 * @return # bytes beyond which we will shrink buffers when idle.
490 */
491 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000492 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000493 }
494
495 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000496 * Set the maximum size read buffer allocated to idle TConnection objects.
497 * If a TConnection object is found (either on connection close or between
498 * calls when resizeBufferEveryN_ is set) with more than this much memory
499 * allocated to its read buffer, we shrink it to this value.
500 *
501 * @param limit of bytes beyond which we will shrink buffers when checked.
502 */
503 void setIdleReadBufferLimit(size_t limit) {
504 idleReadBufferLimit_ = limit;
505 }
506
507 /**
508 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
509 * Set the maximum size read buffer allocated to idle TConnection objects.
510 * If a TConnection object is found (either on connection close or between
511 * calls when resizeBufferEveryN_ is set) with more than this much memory
512 * allocated to its read buffer, we shrink it to this value.
513 *
514 * @param limit of bytes beyond which we will shrink buffers when checked.
515 */
516 void setIdleBufferMemLimit(size_t limit) {
517 idleReadBufferLimit_ = limit;
518 }
519
520
521
522 /**
523 * Get the maximum size of write buffer allocated to idle TConnection objects.
524 *
525 * @return # bytes beyond which we will reallocate buffers when checked.
526 */
527 size_t getIdleWriteBufferLimit() const {
528 return idleWriteBufferLimit_;
529 }
530
531 /**
532 * Set the maximum size write buffer allocated to idle TConnection objects.
533 * If a TConnection object is found (either on connection close or between
534 * calls when resizeBufferEveryN_ is set) with more than this much memory
535 * allocated to its write buffer, we destroy and construct that buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000536 *
537 * @param limit of bytes beyond which we will shrink buffers when idle.
538 */
David Reiss54bec5d2010-10-06 17:10:45 +0000539 void setIdleWriteBufferLimit(size_t limit) {
540 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000541 }
542
David Reiss01fe1532010-03-09 05:19:25 +0000543 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000544 * Get # of calls made between buffer size checks. 0 means disabled.
545 *
546 * @return # of calls between buffer size checks.
547 */
548 int32_t getResizeBufferEveryN() const {
549 return resizeBufferEveryN_;
550 }
551
552 /**
553 * Check buffer sizes every "count" calls. This allows buffer limits
554 * to be enforced for persistant connections with a controllable degree
555 * of overhead. 0 disables checks except at connection close.
556 *
557 * @param count the number of calls between checks, or 0 to disable
558 */
559 void setResizeBufferEveryN(int32_t count) {
560 resizeBufferEveryN_ = count;
561 }
562
563
564
565 /**
David Reiss01fe1532010-03-09 05:19:25 +0000566 * Return an initialized connection object. Creates or recovers from
567 * pool a TConnection and initializes it with the provided socket FD
568 * and flags.
569 *
570 * @param socket FD of socket associated with this connection.
571 * @param flags initial lib_event flags for this connection.
David Reiss105961d2010-10-06 17:10:17 +0000572 * @param addr the sockaddr of the client
573 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000574 * @return pointer to initialized TConnection object.
575 */
David Reiss105961d2010-10-06 17:10:17 +0000576 TConnection* createConnection(int socket, short flags,
577 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000578
David Reiss01fe1532010-03-09 05:19:25 +0000579 /**
580 * Returns a connection to pool or deletion. If the connection pool
581 * (a stack) isn't full, place the connection object on it, otherwise
582 * just delete it.
583 *
584 * @param connection the TConection being returned.
585 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000586 void returnConnection(TConnection* connection);
587
David Reiss01fe1532010-03-09 05:19:25 +0000588 /**
David Reiss068f4162010-03-09 05:19:45 +0000589 * Callback function that the threadmanager calls when a task reaches
590 * its expiration time. It is needed to clean up the expired connection.
591 *
592 * @param task the runnable associated with the expired task.
593 */
594 void expireClose(boost::shared_ptr<Runnable> task);
595
596 /**
David Reiss01fe1532010-03-09 05:19:25 +0000597 * C-callable event handler for listener events. Provides a callback
598 * that libevent can understand which invokes server->handleEvent().
599 *
600 * @param fd the descriptor the event occured on.
601 * @param which the flags associated with the event.
602 * @param v void* callback arg where we placed TNonblockingServer's "this".
603 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000604 static void eventHandler(int fd, short which, void* v) {
605 ((TNonblockingServer*)v)->handleEvent(fd, which);
606 }
607
David Reiss01fe1532010-03-09 05:19:25 +0000608 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000609 void listenSocket();
610
David Reiss01fe1532010-03-09 05:19:25 +0000611 /**
612 * Takes a socket created by listenSocket() and sets various options on it
613 * to prepare for use in the server.
614 *
615 * @param fd descriptor of socket to be initialized/
616 */
Mark Slee79b16942007-11-26 19:05:29 +0000617 void listenSocket(int fd);
618
David Reiss01fe1532010-03-09 05:19:25 +0000619 /// Create the pipe used to notify I/O process of task completion.
620 void createNotificationPipe();
621
622 /**
623 * Get notification pipe send descriptor.
624 *
625 * @return write fd for pipe.
626 */
627 int getNotificationSendFD() const {
628 return notificationPipeFDs_[1];
629 }
630
631 /**
632 * Get notification pipe receive descriptor.
633 *
634 * @return read fd of pipe.
635 */
636 int getNotificationRecvFD() const {
637 return notificationPipeFDs_[0];
638 }
639
640 /**
641 * Register the core libevent events onto the proper base.
642 *
643 * @param base pointer to the event base to be initialized.
644 */
Mark Slee79b16942007-11-26 19:05:29 +0000645 void registerEvents(event_base* base);
646
David Reiss01fe1532010-03-09 05:19:25 +0000647 /**
648 * Main workhorse function, starts up the server listening on a port and
649 * loops over the libevent handler.
650 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000651 void serve();
652};
653
David Reiss01fe1532010-03-09 05:19:25 +0000654/// Two states for sockets, recv and send mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000655enum TSocketState {
656 SOCKET_RECV,
657 SOCKET_SEND
658};
659
660/**
David Reiss01fe1532010-03-09 05:19:25 +0000661 * Five states for the nonblocking servr:
Mark Slee2f6404d2006-10-10 01:37:40 +0000662 * 1) initialize
663 * 2) read 4 byte frame size
664 * 3) read frame of data
665 * 4) send back data (if any)
David Reiss01fe1532010-03-09 05:19:25 +0000666 * 5) force immediate connection close
Mark Slee2f6404d2006-10-10 01:37:40 +0000667 */
668enum TAppState {
669 APP_INIT,
670 APP_READ_FRAME_SIZE,
671 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000672 APP_WAIT_TASK,
David Reiss01fe1532010-03-09 05:19:25 +0000673 APP_SEND_RESULT,
674 APP_CLOSE_CONNECTION
Mark Slee2f6404d2006-10-10 01:37:40 +0000675};
676
677/**
678 * Represents a connection that is handled via libevent. This connection
679 * essentially encapsulates a socket that has some associated libevent state.
680 */
David Reiss54bec5d2010-10-06 17:10:45 +0000681class TConnection {
Mark Slee2f6404d2006-10-10 01:37:40 +0000682 private:
683
David Reiss01fe1532010-03-09 05:19:25 +0000684 /// Starting size for new connection buffer
685 static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
Mark Sleee02385b2007-06-09 01:21:16 +0000686
David Reiss01fe1532010-03-09 05:19:25 +0000687 /// Server handle
Mark Slee2f6404d2006-10-10 01:37:40 +0000688 TNonblockingServer* server_;
689
David Reiss105961d2010-10-06 17:10:17 +0000690 /// Object wrapping network socket
691 boost::shared_ptr<TSocket> tSocket_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000692
David Reiss01fe1532010-03-09 05:19:25 +0000693 /// Libevent object
Mark Slee2f6404d2006-10-10 01:37:40 +0000694 struct event event_;
695
David Reiss01fe1532010-03-09 05:19:25 +0000696 /// Libevent flags
Mark Slee2f6404d2006-10-10 01:37:40 +0000697 short eventFlags_;
698
David Reiss01fe1532010-03-09 05:19:25 +0000699 /// Socket mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000700 TSocketState socketState_;
701
David Reiss01fe1532010-03-09 05:19:25 +0000702 /// Application state
Mark Slee2f6404d2006-10-10 01:37:40 +0000703 TAppState appState_;
704
David Reiss01fe1532010-03-09 05:19:25 +0000705 /// How much data needed to read
Mark Slee2f6404d2006-10-10 01:37:40 +0000706 uint32_t readWant_;
707
David Reiss01fe1532010-03-09 05:19:25 +0000708 /// Where in the read buffer are we
Mark Slee2f6404d2006-10-10 01:37:40 +0000709 uint32_t readBufferPos_;
710
David Reiss01fe1532010-03-09 05:19:25 +0000711 /// Read buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000712 uint8_t* readBuffer_;
713
David Reiss01fe1532010-03-09 05:19:25 +0000714 /// Read buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000715 uint32_t readBufferSize_;
716
David Reiss01fe1532010-03-09 05:19:25 +0000717 /// Write buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000718 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000719
David Reiss01fe1532010-03-09 05:19:25 +0000720 /// Write buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000721 uint32_t writeBufferSize_;
722
David Reiss01fe1532010-03-09 05:19:25 +0000723 /// How far through writing are we?
Mark Slee2f6404d2006-10-10 01:37:40 +0000724 uint32_t writeBufferPos_;
725
David Reiss54bec5d2010-10-06 17:10:45 +0000726 /// Largest size of write buffer seen since buffer was constructed
727 size_t largestWriteBufferSize_;
728
729 /// Count of the number of calls for use with getResizeBufferEveryN().
730 int32_t callsForResize_;
731
David Reiss01fe1532010-03-09 05:19:25 +0000732 /// Task handle
Mark Sleee02385b2007-06-09 01:21:16 +0000733 int taskHandle_;
734
David Reiss01fe1532010-03-09 05:19:25 +0000735 /// Task event
Mark Sleee02385b2007-06-09 01:21:16 +0000736 struct event taskEvent_;
737
David Reiss01fe1532010-03-09 05:19:25 +0000738 /// Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000739 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000740
David Reiss01fe1532010-03-09 05:19:25 +0000741 /// Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000742 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000743
David Reiss01fe1532010-03-09 05:19:25 +0000744 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000745 boost::shared_ptr<TTransport> factoryInputTransport_;
746 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000747
David Reiss01fe1532010-03-09 05:19:25 +0000748 /// Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000749 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000750
David Reiss01fe1532010-03-09 05:19:25 +0000751 /// Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000752 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000753
David Reiss105961d2010-10-06 17:10:17 +0000754 /// Server event handler, if any
755 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
756
757 /// Thrift call context, if any
758 void *connectionContext_;
759
David Reiss01fe1532010-03-09 05:19:25 +0000760 /// Go into read mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000761 void setRead() {
762 setFlags(EV_READ | EV_PERSIST);
763 }
764
David Reiss01fe1532010-03-09 05:19:25 +0000765 /// Go into write mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000766 void setWrite() {
767 setFlags(EV_WRITE | EV_PERSIST);
768 }
769
David Reiss01fe1532010-03-09 05:19:25 +0000770 /// Set socket idle
Mark Slee402ee282007-08-23 01:43:20 +0000771 void setIdle() {
772 setFlags(0);
773 }
774
David Reiss01fe1532010-03-09 05:19:25 +0000775 /**
776 * Set event flags for this connection.
777 *
778 * @param eventFlags flags we pass to libevent for the connection.
779 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000780 void setFlags(short eventFlags);
781
David Reiss01fe1532010-03-09 05:19:25 +0000782 /**
783 * Libevent handler called (via our static wrapper) when the connection
784 * socket had something happen. Rather than use the flags libevent passed,
785 * we use the connection state to determine whether we need to read or
786 * write the socket.
787 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000788 void workSocket();
789
David Reiss01fe1532010-03-09 05:19:25 +0000790 /// Close this connection and free or reset its resources.
Mark Slee2f6404d2006-10-10 01:37:40 +0000791 void close();
792
793 public:
794
David Reiss01fe1532010-03-09 05:19:25 +0000795 class Task;
796
797 /// Constructor
David Reiss105961d2010-10-06 17:10:17 +0000798 TConnection(int socket, short eventFlags, TNonblockingServer *s,
799 const sockaddr* addr, socklen_t addrLen) {
David Reiss01fe1532010-03-09 05:19:25 +0000800 readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
Mark Slee2f6404d2006-10-10 01:37:40 +0000801 if (readBuffer_ == NULL) {
T Jake Lucianib5e62212009-01-31 22:36:20 +0000802 throw new apache::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000803 }
David Reiss01fe1532010-03-09 05:19:25 +0000804 readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
Mark Slee79b16942007-11-26 19:05:29 +0000805
Mark Slee2f6404d2006-10-10 01:37:40 +0000806 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000807 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000808 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000809 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
810 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
David Reiss105961d2010-10-06 17:10:17 +0000811 tSocket_.reset(new TSocket());
Mark Slee79b16942007-11-26 19:05:29 +0000812
David Reiss105961d2010-10-06 17:10:17 +0000813 init(socket, eventFlags, s, addr, addrLen);
David Reiss1997f102008-04-29 00:29:41 +0000814 server_->incrementNumConnections();
815 }
816
817 ~TConnection() {
David Reiss472fffb2010-03-09 05:20:24 +0000818 std::free(readBuffer_);
David Reissc17fe6b2008-04-29 00:29:43 +0000819 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000820 }
821
David Reiss54bec5d2010-10-06 17:10:45 +0000822 /**
823 * Check buffers against any size limits and shrink it if exceeded.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000824 *
David Reiss54bec5d2010-10-06 17:10:45 +0000825 * @param readLimit we reduce read buffer size to this (if nonzero).
826 * @param writeLimit if nonzero and write buffer is larger, replace it.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000827 */
David Reiss54bec5d2010-10-06 17:10:45 +0000828 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000829
David Reiss01fe1532010-03-09 05:19:25 +0000830 /// Initialize
David Reiss105961d2010-10-06 17:10:17 +0000831 void init(int socket, short eventFlags, TNonblockingServer *s,
832 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000833
David Reiss01fe1532010-03-09 05:19:25 +0000834 /**
835 * This is called when the application transitions from one state into
836 * another. This means that it has finished writing the data that it needed
837 * to, or finished receiving the data that it needed to.
838 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000839 void transition();
840
David Reiss01fe1532010-03-09 05:19:25 +0000841 /**
842 * C-callable event handler for connection events. Provides a callback
843 * that libevent can understand which invokes connection_->workSocket().
844 *
845 * @param fd the descriptor the event occured on.
846 * @param which the flags associated with the event.
847 * @param v void* callback arg where we placed TConnection's "this".
848 */
Mark Sleea8de4892008-02-09 00:02:26 +0000849 static void eventHandler(int fd, short /* which */, void* v) {
David Reiss105961d2010-10-06 17:10:17 +0000850 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
Mark Slee2f6404d2006-10-10 01:37:40 +0000851 ((TConnection*)v)->workSocket();
852 }
Mark Slee79b16942007-11-26 19:05:29 +0000853
David Reiss01fe1532010-03-09 05:19:25 +0000854 /**
855 * C-callable event handler for signaling task completion. Provides a
856 * callback that libevent can understand that will read a connection
857 * object's address from a pipe and call connection->transition() for
858 * that object.
859 *
860 * @param fd the descriptor the event occured on.
861 */
862 static void taskHandler(int fd, short /* which */, void* /* v */) {
863 TConnection* connection;
David Reiss83b8fda2010-03-09 05:19:34 +0000864 ssize_t nBytes;
865 while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*)))
866 == sizeof(TConnection*)) {
867 connection->transition();
Mark Sleee02385b2007-06-09 01:21:16 +0000868 }
David Reiss83b8fda2010-03-09 05:19:34 +0000869 if (nBytes > 0) {
870 throw TException("TConnection::taskHandler unexpected partial read");
871 }
872 if (errno != EWOULDBLOCK && errno != EAGAIN) {
873 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
874 }
Mark Sleee02385b2007-06-09 01:21:16 +0000875 }
876
David Reiss01fe1532010-03-09 05:19:25 +0000877 /**
878 * Notification to server that processing has ended on this request.
879 * Can be called either when processing is completed or when a waiting
880 * task has been preemptively terminated (on overload).
881 *
David Reiss9e8073c2010-03-09 05:19:39 +0000882 * @return true if successful, false if unable to notify (check errno).
David Reiss01fe1532010-03-09 05:19:25 +0000883 */
884 bool notifyServer() {
885 TConnection* connection = this;
886 if (write(server_->getNotificationSendFD(), (const void*)&connection,
887 sizeof(TConnection*)) != sizeof(TConnection*)) {
888 return false;
889 }
890
891 return true;
892 }
893
894 /// Force connection shutdown for this connection.
895 void forceClose() {
896 appState_ = APP_CLOSE_CONNECTION;
897 if (!notifyServer()) {
898 throw TException("TConnection::forceClose: failed write on notify pipe");
899 }
900 }
901
902 /// return the server this connection was initialized for.
903 TNonblockingServer* getServer() {
904 return server_;
905 }
906
907 /// get state of connection.
908 TAppState getState() {
909 return appState_;
910 }
David Reiss105961d2010-10-06 17:10:17 +0000911
912 /// return the TSocket transport wrapping this network connection
913 boost::shared_ptr<TSocket> getTSocket() const {
914 return tSocket_;
915 }
916
917 /// return the server event handler if any
918 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
919 return serverEventHandler_;
920 }
921
922 /// return the Thrift connection context if any
923 void* getConnectionContext() {
924 return connectionContext_;
925 }
926
Mark Slee2f6404d2006-10-10 01:37:40 +0000927};
928
T Jake Lucianib5e62212009-01-31 22:36:20 +0000929}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000930
931#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_