blob: 31741e40a8b5f61966cac707fbe4f16838865b1e [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000030#include <string>
31#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000032#include <cstdlib>
David Reiss5105b2e2009-05-21 02:28:27 +000033#include <unistd.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000034#include <event.h>
35
T Jake Lucianib5e62212009-01-31 22:36:20 +000036namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000039using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000040using apache::thrift::protocol::TProtocol;
41using apache::thrift::concurrency::Runnable;
42using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000043
44// Forward declaration of class
45class TConnection;
46
Roger Meier30aae0c2011-07-08 12:23:31 +000047#ifdef LIBEVENT_VERSION_NUMBER
48#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
49#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
50#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
51#else
52// assume latest version 1 series
53#define LIBEVENT_VERSION_MAJOR 1
54#define LIBEVENT_VERSION_MINOR 14
55#define LIBEVENT_VERSION_REL 13
56#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
57#endif
58
59#if LIBEVENT_VERSION_NUMBER < 0x02000000
60 typedef int evutil_socket_t;
61#endif
62
63#ifndef SOCKOPT_CAST_T
64#define SOCKOPT_CAST_T void
65#endif
66
67template<class T>
68inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
69 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
70}
71
72template<class T>
73inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
74 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
75}
76
Mark Slee2f6404d2006-10-10 01:37:40 +000077/**
78 * This is a non-blocking server in C++ for high performance that operates a
79 * single IO thread. It assumes that all incoming requests are framed with a
80 * 4 byte length indicator and writes out responses using the same framing.
81 *
82 * It does not use the TServerTransport framework, but rather has socket
83 * operations hardcoded for use with select.
84 *
Mark Slee2f6404d2006-10-10 01:37:40 +000085 */
David Reiss01fe1532010-03-09 05:19:25 +000086
87
88/// Overload condition actions.
89enum TOverloadAction {
90 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
91 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
92 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
93};
94
Mark Slee2f6404d2006-10-10 01:37:40 +000095class TNonblockingServer : public TServer {
96 private:
David Reiss01fe1532010-03-09 05:19:25 +000097 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000098 static const int LISTEN_BACKLOG = 1024;
99
David Reiss01fe1532010-03-09 05:19:25 +0000100 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000101 static const size_t CONNECTION_STACK_LIMIT = 1024;
102
David Reiss01fe1532010-03-09 05:19:25 +0000103 /// Default limit on total number of connected sockets
104 static const int MAX_CONNECTIONS = INT_MAX;
105
106 /// Default limit on connections in handler/task processing
107 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
108
David Reiss89a12942010-10-06 17:10:52 +0000109 /// Default size of write buffer
110 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
111
David Reiss54bec5d2010-10-06 17:10:45 +0000112 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
113 static const int IDLE_READ_BUFFER_LIMIT = 1024;
114
115 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
116 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
117
118 /// # of calls before resizing oversized buffers (0 = check only on close)
119 static const int RESIZE_BUFFER_EVERY_N = 512;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000122 int serverSocket_;
123
David Reiss01fe1532010-03-09 05:19:25 +0000124 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000125 int port_;
126
David Reiss01fe1532010-03-09 05:19:25 +0000127 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000128 boost::shared_ptr<ThreadManager> threadManager_;
129
David Reiss01fe1532010-03-09 05:19:25 +0000130 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000131 bool threadPoolProcessing_;
132
David Reiss01fe1532010-03-09 05:19:25 +0000133 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +0000134 event_base* eventBase_;
Roger Meierc1905582011-08-02 23:37:36 +0000135 bool ownEventBase_;
Mark Slee79b16942007-11-26 19:05:29 +0000136
David Reiss01fe1532010-03-09 05:19:25 +0000137 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +0000138 struct event serverEvent_;
139
David Reiss01fe1532010-03-09 05:19:25 +0000140 /// Event struct, used with eventBase_ for task completion notification
141 struct event notificationEvent_;
142
143 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000144 size_t numTConnections_;
145
David Reiss9e8073c2010-03-09 05:19:39 +0000146 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000147 size_t numActiveProcessors_;
148
149 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000150 size_t connectionStackLimit_;
151
David Reiss01fe1532010-03-09 05:19:25 +0000152 /// Limit for number of connections processing or waiting to process
153 size_t maxActiveProcessors_;
154
155 /// Limit for number of open connections
156 size_t maxConnections_;
157
David Reiss068f4162010-03-09 05:19:45 +0000158 /// Time in milliseconds before an unperformed task expires (0 == infinite).
159 int64_t taskExpireTime_;
160
David Reiss01fe1532010-03-09 05:19:25 +0000161 /**
162 * Hysteresis for overload state. This is the fraction of the overload
163 * value that needs to be reached before the overload state is cleared;
164 * must be <= 1.0.
165 */
166 double overloadHysteresis_;
167
168 /// Action to take when we're overloaded.
169 TOverloadAction overloadAction_;
170
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000171 /**
David Reiss89a12942010-10-06 17:10:52 +0000172 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
173 * and found to be exceeded, reinitialized) to this size.
174 */
175 size_t writeBufferDefaultSize_;
176
177 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000178 * Max read buffer size for an idle TConnection. When we place an idle
179 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000180 * we will free the buffer (such that it will be reinitialized by the next
181 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000182 */
David Reiss54bec5d2010-10-06 17:10:45 +0000183 size_t idleReadBufferLimit_;
184
185 /**
186 * Max write buffer size for an idle connection. When we place an idle
187 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
188 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000189 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
190 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000191 */
192 size_t idleWriteBufferLimit_;
193
194 /**
195 * Every N calls we check the buffer size limits on a connected TConnection.
196 * 0 disables (i.e. the checks are only done when a connection closes).
197 */
198 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000199
200 /// Set if we are currently in an overloaded state.
201 bool overloaded_;
202
203 /// Count of connections dropped since overload started
204 uint32_t nConnectionsDropped_;
205
206 /// Count of connections dropped on overload since server started
207 uint64_t nTotalConnectionsDropped_;
208
209 /// File descriptors for pipe used for task completion notification.
Roger Meier30aae0c2011-07-08 12:23:31 +0000210 evutil_socket_t notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000211
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 /**
213 * This is a stack of all the objects that have been created but that
214 * are NOT currently in use. When we close a connection, we place it on this
215 * stack so that the object can be reused later, rather than freeing the
216 * memory and reallocating a new object later.
217 */
218 std::stack<TConnection*> connectionStack_;
219
David Reiss01fe1532010-03-09 05:19:25 +0000220 /**
221 * Called when server socket had something happen. We accept all waiting
222 * client connections on listen socket fd and assign TConnection objects
223 * to handle those requests.
224 *
225 * @param fd the listen socket.
226 * @param which the event flag that triggered the handler.
227 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000228 void handleEvent(int fd, short which);
229
230 public:
Mark Slee5ea15f92007-03-05 22:55:59 +0000231 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +0000232 int port) :
233 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000234 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +0000235 port_(port),
dweatherford58985992007-06-19 23:10:19 +0000236 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +0000237 eventBase_(NULL),
Roger Meierc1905582011-08-02 23:37:36 +0000238 ownEventBase_(false),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000239 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000240 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000241 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000242 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
243 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000244 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000245 overloadHysteresis_(0.8),
246 overloadAction_(T_OVERLOAD_NO_ACTION),
David Reiss89a12942010-10-06 17:10:52 +0000247 writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
David Reiss54bec5d2010-10-06 17:10:45 +0000248 idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
249 idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
250 resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
David Reiss01fe1532010-03-09 05:19:25 +0000251 overloaded_(false),
252 nConnectionsDropped_(0),
253 nTotalConnectionsDropped_(0) {}
Mark Sleef9373392007-01-24 19:41:57 +0000254
Mark Slee79b16942007-11-26 19:05:29 +0000255 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000256 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000257 int port,
258 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000259 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000260 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000261 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000262 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000263 eventBase_(NULL),
Roger Meierc1905582011-08-02 23:37:36 +0000264 ownEventBase_(false),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000265 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000266 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000267 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000268 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
269 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000270 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000271 overloadHysteresis_(0.8),
272 overloadAction_(T_OVERLOAD_NO_ACTION),
David Reiss89a12942010-10-06 17:10:52 +0000273 writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
David Reiss54bec5d2010-10-06 17:10:45 +0000274 idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
275 idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
276 resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
David Reiss01fe1532010-03-09 05:19:25 +0000277 overloaded_(false),
278 nConnectionsDropped_(0),
279 nTotalConnectionsDropped_(0) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000280 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
281 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000282 setInputProtocolFactory(protocolFactory);
283 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000284 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000285 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000286
Mark Slee5ea15f92007-03-05 22:55:59 +0000287 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
288 boost::shared_ptr<TTransportFactory> inputTransportFactory,
289 boost::shared_ptr<TTransportFactory> outputTransportFactory,
290 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
291 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000292 int port,
293 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000294 TServer(processor),
David Reiss01fe1532010-03-09 05:19:25 +0000295 serverSocket_(-1),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000296 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000297 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000298 eventBase_(NULL),
Roger Meierc1905582011-08-02 23:37:36 +0000299 ownEventBase_(false),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000300 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000301 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000302 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000303 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
304 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000305 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000306 overloadHysteresis_(0.8),
307 overloadAction_(T_OVERLOAD_NO_ACTION),
David Reiss89a12942010-10-06 17:10:52 +0000308 writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
David Reiss54bec5d2010-10-06 17:10:45 +0000309 idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
310 idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
311 resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
David Reiss01fe1532010-03-09 05:19:25 +0000312 overloaded_(false),
313 nConnectionsDropped_(0),
David Reiss068f4162010-03-09 05:19:45 +0000314 nTotalConnectionsDropped_(0) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000315 setInputTransportFactory(inputTransportFactory);
316 setOutputTransportFactory(outputTransportFactory);
317 setInputProtocolFactory(inputProtocolFactory);
318 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000319 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000320 }
Mark Slee79b16942007-11-26 19:05:29 +0000321
David Reiss8ede8182010-09-02 15:26:28 +0000322 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000323
David Reiss068f4162010-03-09 05:19:45 +0000324 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000325
David Reiss1997f102008-04-29 00:29:41 +0000326 boost::shared_ptr<ThreadManager> getThreadManager() {
327 return threadManager_;
328 }
329
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000330 /**
331 * Get the maximum number of unused TConnection we will hold in reserve.
332 *
333 * @return the current limit on TConnection pool size.
334 */
David Reiss260fa932009-04-02 23:51:39 +0000335 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000336 return connectionStackLimit_;
337 }
338
339 /**
340 * Set the maximum number of unused TConnection we will hold in reserve.
341 *
342 * @param sz the new limit for TConnection pool size.
343 */
David Reiss260fa932009-04-02 23:51:39 +0000344 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000345 connectionStackLimit_ = sz;
346 }
347
Mark Slee79b16942007-11-26 19:05:29 +0000348 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000349 return threadPoolProcessing_;
350 }
351
352 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000353 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000354 }
355
Mark Slee79b16942007-11-26 19:05:29 +0000356 event_base* getEventBase() const {
357 return eventBase_;
358 }
359
David Reiss01fe1532010-03-09 05:19:25 +0000360 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000361 void incrementNumConnections() {
362 ++numTConnections_;
363 }
364
David Reiss01fe1532010-03-09 05:19:25 +0000365 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000366 void decrementNumConnections() {
367 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000368 }
369
David Reiss01fe1532010-03-09 05:19:25 +0000370 /**
371 * Return the count of sockets currently connected to.
372 *
373 * @return count of connected sockets.
374 */
375 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000376 return numTConnections_;
377 }
378
David Reiss01fe1532010-03-09 05:19:25 +0000379 /**
380 * Return the count of connection objects allocated but not in use.
381 *
382 * @return count of idle connection objects.
383 */
384 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000385 return connectionStack_.size();
386 }
387
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000388 /**
David Reiss01fe1532010-03-09 05:19:25 +0000389 * Return count of number of connections which are currently processing.
390 * This is defined as a connection where all data has been received and
391 * either assigned a task (when threading) or passed to a handler (when
392 * not threading), and where the handler has not yet returned.
393 *
394 * @return # of connections currently processing.
395 */
396 size_t getNumActiveProcessors() const {
397 return numActiveProcessors_;
398 }
399
400 /// Increment the count of connections currently processing.
401 void incrementActiveProcessors() {
402 ++numActiveProcessors_;
403 }
404
405 /// Decrement the count of connections currently processing.
406 void decrementActiveProcessors() {
407 if (numActiveProcessors_ > 0) {
408 --numActiveProcessors_;
409 }
410 }
411
412 /**
413 * Get the maximum # of connections allowed before overload.
414 *
415 * @return current setting.
416 */
417 size_t getMaxConnections() const {
418 return maxConnections_;
419 }
420
421 /**
422 * Set the maximum # of connections allowed before overload.
423 *
424 * @param maxConnections new setting for maximum # of connections.
425 */
426 void setMaxConnections(size_t maxConnections) {
427 maxConnections_ = maxConnections;
428 }
429
430 /**
431 * Get the maximum # of connections waiting in handler/task before overload.
432 *
433 * @return current setting.
434 */
435 size_t getMaxActiveProcessors() const {
436 return maxActiveProcessors_;
437 }
438
439 /**
440 * Set the maximum # of connections waiting in handler/task before overload.
441 *
442 * @param maxActiveProcessors new setting for maximum # of active processes.
443 */
444 void setMaxActiveProcessors(size_t maxActiveProcessors) {
445 maxActiveProcessors_ = maxActiveProcessors;
446 }
447
448 /**
449 * Get fraction of maximum limits before an overload condition is cleared.
450 *
451 * @return hysteresis fraction
452 */
453 double getOverloadHysteresis() const {
454 return overloadHysteresis_;
455 }
456
457 /**
458 * Set fraction of maximum limits before an overload condition is cleared.
459 * A good value would probably be between 0.5 and 0.9.
460 *
461 * @param hysteresisFraction fraction <= 1.0.
462 */
463 void setOverloadHysteresis(double hysteresisFraction) {
464 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
465 overloadHysteresis_ = hysteresisFraction;
466 }
467 }
468
469 /**
470 * Get the action the server will take on overload.
471 *
472 * @return a TOverloadAction enum value for the currently set action.
473 */
474 TOverloadAction getOverloadAction() const {
475 return overloadAction_;
476 }
477
478 /**
479 * Set the action the server is to take on overload.
480 *
481 * @param overloadAction a TOverloadAction enum value for the action.
482 */
483 void setOverloadAction(TOverloadAction overloadAction) {
484 overloadAction_ = overloadAction;
485 }
486
487 /**
David Reiss068f4162010-03-09 05:19:45 +0000488 * Get the time in milliseconds after which a task expires (0 == infinite).
489 *
490 * @return a 64-bit time in milliseconds.
491 */
492 int64_t getTaskExpireTime() const {
493 return taskExpireTime_;
494 }
495
496 /**
497 * Set the time in milliseconds after which a task expires (0 == infinite).
498 *
499 * @param taskExpireTime a 64-bit time in milliseconds.
500 */
501 void setTaskExpireTime(int64_t taskExpireTime) {
502 taskExpireTime_ = taskExpireTime;
503 }
504
505 /**
David Reiss01fe1532010-03-09 05:19:25 +0000506 * Determine if the server is currently overloaded.
507 * This function checks the maximums for open connections and connections
508 * currently in processing, and sets an overload condition if they are
509 * exceeded. The overload will persist until both values are below the
510 * current hysteresis fraction of their maximums.
511 *
512 * @return true if an overload condition exists, false if not.
513 */
514 bool serverOverloaded();
515
516 /** Pop and discard next task on threadpool wait queue.
517 *
518 * @return true if a task was discarded, false if the wait queue was empty.
519 */
520 bool drainPendingTask();
521
522 /**
David Reiss89a12942010-10-06 17:10:52 +0000523 * Get the starting size of a TConnection object's write buffer.
524 *
525 * @return # bytes we initialize a TConnection object's write buffer to.
526 */
527 size_t getWriteBufferDefaultSize() const {
528 return writeBufferDefaultSize_;
529 }
530
531 /**
532 * Set the starting size of a TConnection object's write buffer.
533 *
534 * @param size # bytes we initialize a TConnection object's write buffer to.
535 */
536 void setWriteBufferDefaultSize(size_t size) {
537 writeBufferDefaultSize_ = size;
538 }
539
540 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000541 * Get the maximum size of read buffer allocated to idle TConnection objects.
542 *
David Reiss89a12942010-10-06 17:10:52 +0000543 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000544 */
545 size_t getIdleReadBufferLimit() const {
546 return idleReadBufferLimit_;
547 }
548
549 /**
550 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
551 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000552 *
David Reiss89a12942010-10-06 17:10:52 +0000553 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000554 */
555 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000556 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000557 }
558
559 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000560 * Set the maximum size read buffer allocated to idle TConnection objects.
561 * If a TConnection object is found (either on connection close or between
562 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000563 * allocated to its read buffer, we free it and allow it to be reinitialized
564 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000565 *
566 * @param limit of bytes beyond which we will shrink buffers when checked.
567 */
568 void setIdleReadBufferLimit(size_t limit) {
569 idleReadBufferLimit_ = limit;
570 }
571
572 /**
573 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
574 * Set the maximum size read buffer allocated to idle TConnection objects.
575 * If a TConnection object is found (either on connection close or between
576 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000577 * allocated to its read buffer, we free it and allow it to be reinitialized
578 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000579 *
580 * @param limit of bytes beyond which we will shrink buffers when checked.
581 */
582 void setIdleBufferMemLimit(size_t limit) {
583 idleReadBufferLimit_ = limit;
584 }
585
586
587
588 /**
589 * Get the maximum size of write buffer allocated to idle TConnection objects.
590 *
591 * @return # bytes beyond which we will reallocate buffers when checked.
592 */
593 size_t getIdleWriteBufferLimit() const {
594 return idleWriteBufferLimit_;
595 }
596
597 /**
598 * Set the maximum size write buffer allocated to idle TConnection objects.
599 * If a TConnection object is found (either on connection close or between
600 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000601 * allocated to its write buffer, we destroy and construct that buffer with
602 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000603 *
604 * @param limit of bytes beyond which we will shrink buffers when idle.
605 */
David Reiss54bec5d2010-10-06 17:10:45 +0000606 void setIdleWriteBufferLimit(size_t limit) {
607 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000608 }
609
David Reiss01fe1532010-03-09 05:19:25 +0000610 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000611 * Get # of calls made between buffer size checks. 0 means disabled.
612 *
613 * @return # of calls between buffer size checks.
614 */
615 int32_t getResizeBufferEveryN() const {
616 return resizeBufferEveryN_;
617 }
618
619 /**
620 * Check buffer sizes every "count" calls. This allows buffer limits
621 * to be enforced for persistant connections with a controllable degree
622 * of overhead. 0 disables checks except at connection close.
623 *
624 * @param count the number of calls between checks, or 0 to disable
625 */
626 void setResizeBufferEveryN(int32_t count) {
627 resizeBufferEveryN_ = count;
628 }
629
630
631
632 /**
David Reiss01fe1532010-03-09 05:19:25 +0000633 * Return an initialized connection object. Creates or recovers from
634 * pool a TConnection and initializes it with the provided socket FD
635 * and flags.
636 *
637 * @param socket FD of socket associated with this connection.
638 * @param flags initial lib_event flags for this connection.
David Reiss105961d2010-10-06 17:10:17 +0000639 * @param addr the sockaddr of the client
640 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000641 * @return pointer to initialized TConnection object.
642 */
David Reiss105961d2010-10-06 17:10:17 +0000643 TConnection* createConnection(int socket, short flags,
644 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000645
David Reiss01fe1532010-03-09 05:19:25 +0000646 /**
647 * Returns a connection to pool or deletion. If the connection pool
648 * (a stack) isn't full, place the connection object on it, otherwise
649 * just delete it.
650 *
651 * @param connection the TConection being returned.
652 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000653 void returnConnection(TConnection* connection);
654
David Reiss01fe1532010-03-09 05:19:25 +0000655 /**
David Reiss068f4162010-03-09 05:19:45 +0000656 * Callback function that the threadmanager calls when a task reaches
657 * its expiration time. It is needed to clean up the expired connection.
658 *
659 * @param task the runnable associated with the expired task.
660 */
661 void expireClose(boost::shared_ptr<Runnable> task);
662
663 /**
David Reiss01fe1532010-03-09 05:19:25 +0000664 * C-callable event handler for listener events. Provides a callback
665 * that libevent can understand which invokes server->handleEvent().
666 *
667 * @param fd the descriptor the event occured on.
668 * @param which the flags associated with the event.
669 * @param v void* callback arg where we placed TNonblockingServer's "this".
670 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000671 static void eventHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000672 ((TNonblockingServer*)v)->handleEvent(fd, which);
673 }
674
David Reiss01fe1532010-03-09 05:19:25 +0000675 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000676 void listenSocket();
677
David Reiss01fe1532010-03-09 05:19:25 +0000678 /**
679 * Takes a socket created by listenSocket() and sets various options on it
680 * to prepare for use in the server.
681 *
682 * @param fd descriptor of socket to be initialized/
683 */
Mark Slee79b16942007-11-26 19:05:29 +0000684 void listenSocket(int fd);
685
David Reiss01fe1532010-03-09 05:19:25 +0000686 /// Create the pipe used to notify I/O process of task completion.
687 void createNotificationPipe();
688
689 /**
690 * Get notification pipe send descriptor.
691 *
692 * @return write fd for pipe.
693 */
694 int getNotificationSendFD() const {
695 return notificationPipeFDs_[1];
696 }
697
698 /**
699 * Get notification pipe receive descriptor.
700 *
701 * @return read fd of pipe.
702 */
703 int getNotificationRecvFD() const {
704 return notificationPipeFDs_[0];
705 }
706
707 /**
708 * Register the core libevent events onto the proper base.
709 *
710 * @param base pointer to the event base to be initialized.
Roger Meierc1905582011-08-02 23:37:36 +0000711 * @param ownEventBase if true, this server is responsible for
712 * freeing the event base memory.
David Reiss01fe1532010-03-09 05:19:25 +0000713 */
Roger Meierc1905582011-08-02 23:37:36 +0000714 void registerEvents(event_base* base, bool ownEventBase = true);
Mark Slee79b16942007-11-26 19:05:29 +0000715
David Reiss01fe1532010-03-09 05:19:25 +0000716 /**
717 * Main workhorse function, starts up the server listening on a port and
718 * loops over the libevent handler.
719 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000720 void serve();
721};
722
David Reiss89a12942010-10-06 17:10:52 +0000723/// Three states for sockets: recv frame size, recv data, and send mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000724enum TSocketState {
David Reiss89a12942010-10-06 17:10:52 +0000725 SOCKET_RECV_FRAMING,
Mark Slee2f6404d2006-10-10 01:37:40 +0000726 SOCKET_RECV,
727 SOCKET_SEND
728};
729
730/**
David Reiss01fe1532010-03-09 05:19:25 +0000731 * Five states for the nonblocking servr:
Mark Slee2f6404d2006-10-10 01:37:40 +0000732 * 1) initialize
733 * 2) read 4 byte frame size
734 * 3) read frame of data
735 * 4) send back data (if any)
David Reiss01fe1532010-03-09 05:19:25 +0000736 * 5) force immediate connection close
Mark Slee2f6404d2006-10-10 01:37:40 +0000737 */
738enum TAppState {
739 APP_INIT,
740 APP_READ_FRAME_SIZE,
741 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000742 APP_WAIT_TASK,
David Reiss01fe1532010-03-09 05:19:25 +0000743 APP_SEND_RESULT,
744 APP_CLOSE_CONNECTION
Mark Slee2f6404d2006-10-10 01:37:40 +0000745};
746
747/**
748 * Represents a connection that is handled via libevent. This connection
749 * essentially encapsulates a socket that has some associated libevent state.
750 */
David Reiss54bec5d2010-10-06 17:10:45 +0000751class TConnection {
Mark Slee2f6404d2006-10-10 01:37:40 +0000752 private:
753
David Reiss01fe1532010-03-09 05:19:25 +0000754 /// Server handle
Mark Slee2f6404d2006-10-10 01:37:40 +0000755 TNonblockingServer* server_;
756
David Reiss105961d2010-10-06 17:10:17 +0000757 /// Object wrapping network socket
758 boost::shared_ptr<TSocket> tSocket_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000759
David Reiss01fe1532010-03-09 05:19:25 +0000760 /// Libevent object
Mark Slee2f6404d2006-10-10 01:37:40 +0000761 struct event event_;
762
David Reiss01fe1532010-03-09 05:19:25 +0000763 /// Libevent flags
Mark Slee2f6404d2006-10-10 01:37:40 +0000764 short eventFlags_;
765
David Reiss01fe1532010-03-09 05:19:25 +0000766 /// Socket mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000767 TSocketState socketState_;
768
David Reiss01fe1532010-03-09 05:19:25 +0000769 /// Application state
Mark Slee2f6404d2006-10-10 01:37:40 +0000770 TAppState appState_;
771
David Reiss01fe1532010-03-09 05:19:25 +0000772 /// How much data needed to read
Mark Slee2f6404d2006-10-10 01:37:40 +0000773 uint32_t readWant_;
774
David Reiss01fe1532010-03-09 05:19:25 +0000775 /// Where in the read buffer are we
Mark Slee2f6404d2006-10-10 01:37:40 +0000776 uint32_t readBufferPos_;
777
David Reiss01fe1532010-03-09 05:19:25 +0000778 /// Read buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000779 uint8_t* readBuffer_;
780
David Reiss01fe1532010-03-09 05:19:25 +0000781 /// Read buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000782 uint32_t readBufferSize_;
783
David Reiss01fe1532010-03-09 05:19:25 +0000784 /// Write buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000785 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000786
David Reiss01fe1532010-03-09 05:19:25 +0000787 /// Write buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000788 uint32_t writeBufferSize_;
789
David Reiss01fe1532010-03-09 05:19:25 +0000790 /// How far through writing are we?
Mark Slee2f6404d2006-10-10 01:37:40 +0000791 uint32_t writeBufferPos_;
792
David Reiss54bec5d2010-10-06 17:10:45 +0000793 /// Largest size of write buffer seen since buffer was constructed
794 size_t largestWriteBufferSize_;
795
796 /// Count of the number of calls for use with getResizeBufferEveryN().
797 int32_t callsForResize_;
798
David Reiss01fe1532010-03-09 05:19:25 +0000799 /// Task handle
Mark Sleee02385b2007-06-09 01:21:16 +0000800 int taskHandle_;
801
David Reiss01fe1532010-03-09 05:19:25 +0000802 /// Task event
Mark Sleee02385b2007-06-09 01:21:16 +0000803 struct event taskEvent_;
804
David Reiss01fe1532010-03-09 05:19:25 +0000805 /// Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000806 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000807
David Reiss01fe1532010-03-09 05:19:25 +0000808 /// Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000809 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000810
David Reiss01fe1532010-03-09 05:19:25 +0000811 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000812 boost::shared_ptr<TTransport> factoryInputTransport_;
813 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000814
David Reiss01fe1532010-03-09 05:19:25 +0000815 /// Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000816 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000817
David Reiss01fe1532010-03-09 05:19:25 +0000818 /// Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000819 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000820
David Reiss105961d2010-10-06 17:10:17 +0000821 /// Server event handler, if any
822 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
823
824 /// Thrift call context, if any
825 void *connectionContext_;
826
David Reiss01fe1532010-03-09 05:19:25 +0000827 /// Go into read mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000828 void setRead() {
829 setFlags(EV_READ | EV_PERSIST);
830 }
831
David Reiss01fe1532010-03-09 05:19:25 +0000832 /// Go into write mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000833 void setWrite() {
834 setFlags(EV_WRITE | EV_PERSIST);
835 }
836
David Reiss01fe1532010-03-09 05:19:25 +0000837 /// Set socket idle
Mark Slee402ee282007-08-23 01:43:20 +0000838 void setIdle() {
839 setFlags(0);
840 }
841
David Reiss01fe1532010-03-09 05:19:25 +0000842 /**
843 * Set event flags for this connection.
844 *
845 * @param eventFlags flags we pass to libevent for the connection.
846 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000847 void setFlags(short eventFlags);
848
David Reiss01fe1532010-03-09 05:19:25 +0000849 /**
850 * Libevent handler called (via our static wrapper) when the connection
851 * socket had something happen. Rather than use the flags libevent passed,
852 * we use the connection state to determine whether we need to read or
853 * write the socket.
854 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000855 void workSocket();
856
David Reiss01fe1532010-03-09 05:19:25 +0000857 /// Close this connection and free or reset its resources.
Mark Slee2f6404d2006-10-10 01:37:40 +0000858 void close();
859
860 public:
861
David Reiss01fe1532010-03-09 05:19:25 +0000862 class Task;
863
864 /// Constructor
David Reiss105961d2010-10-06 17:10:17 +0000865 TConnection(int socket, short eventFlags, TNonblockingServer *s,
866 const sockaddr* addr, socklen_t addrLen) {
David Reiss89a12942010-10-06 17:10:52 +0000867 readBuffer_ = NULL;
868 readBufferSize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000869
Mark Slee2f6404d2006-10-10 01:37:40 +0000870 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000871 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000872 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000873 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
David Reiss89a12942010-10-06 17:10:52 +0000874 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
David Reiss105961d2010-10-06 17:10:17 +0000875 tSocket_.reset(new TSocket());
Mark Slee79b16942007-11-26 19:05:29 +0000876
David Reiss105961d2010-10-06 17:10:17 +0000877 init(socket, eventFlags, s, addr, addrLen);
David Reiss1997f102008-04-29 00:29:41 +0000878 server_->incrementNumConnections();
879 }
880
881 ~TConnection() {
David Reiss472fffb2010-03-09 05:20:24 +0000882 std::free(readBuffer_);
David Reissc17fe6b2008-04-29 00:29:43 +0000883 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000884 }
885
David Reiss54bec5d2010-10-06 17:10:45 +0000886 /**
887 * Check buffers against any size limits and shrink it if exceeded.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000888 *
David Reiss54bec5d2010-10-06 17:10:45 +0000889 * @param readLimit we reduce read buffer size to this (if nonzero).
890 * @param writeLimit if nonzero and write buffer is larger, replace it.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000891 */
David Reiss54bec5d2010-10-06 17:10:45 +0000892 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000893
David Reiss01fe1532010-03-09 05:19:25 +0000894 /// Initialize
David Reiss105961d2010-10-06 17:10:17 +0000895 void init(int socket, short eventFlags, TNonblockingServer *s,
896 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000897
David Reiss01fe1532010-03-09 05:19:25 +0000898 /**
899 * This is called when the application transitions from one state into
900 * another. This means that it has finished writing the data that it needed
901 * to, or finished receiving the data that it needed to.
902 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000903 void transition();
904
David Reiss01fe1532010-03-09 05:19:25 +0000905 /**
906 * C-callable event handler for connection events. Provides a callback
907 * that libevent can understand which invokes connection_->workSocket().
908 *
909 * @param fd the descriptor the event occured on.
910 * @param which the flags associated with the event.
911 * @param v void* callback arg where we placed TConnection's "this".
912 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000913 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
David Reiss105961d2010-10-06 17:10:17 +0000914 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
Mark Slee2f6404d2006-10-10 01:37:40 +0000915 ((TConnection*)v)->workSocket();
916 }
Mark Slee79b16942007-11-26 19:05:29 +0000917
David Reiss01fe1532010-03-09 05:19:25 +0000918 /**
919 * C-callable event handler for signaling task completion. Provides a
920 * callback that libevent can understand that will read a connection
921 * object's address from a pipe and call connection->transition() for
922 * that object.
923 *
924 * @param fd the descriptor the event occured on.
925 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000926 static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
David Reiss01fe1532010-03-09 05:19:25 +0000927 TConnection* connection;
David Reiss83b8fda2010-03-09 05:19:34 +0000928 ssize_t nBytes;
Roger Meier30aae0c2011-07-08 12:23:31 +0000929 while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
David Reiss83b8fda2010-03-09 05:19:34 +0000930 == sizeof(TConnection*)) {
931 connection->transition();
Mark Sleee02385b2007-06-09 01:21:16 +0000932 }
David Reiss83b8fda2010-03-09 05:19:34 +0000933 if (nBytes > 0) {
934 throw TException("TConnection::taskHandler unexpected partial read");
935 }
Roger Meier30aae0c2011-07-08 12:23:31 +0000936 if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
David Reiss83b8fda2010-03-09 05:19:34 +0000937 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
938 }
Mark Sleee02385b2007-06-09 01:21:16 +0000939 }
940
David Reiss01fe1532010-03-09 05:19:25 +0000941 /**
942 * Notification to server that processing has ended on this request.
943 * Can be called either when processing is completed or when a waiting
944 * task has been preemptively terminated (on overload).
945 *
David Reiss9e8073c2010-03-09 05:19:39 +0000946 * @return true if successful, false if unable to notify (check errno).
David Reiss01fe1532010-03-09 05:19:25 +0000947 */
948 bool notifyServer() {
949 TConnection* connection = this;
Roger Meier30aae0c2011-07-08 12:23:31 +0000950 if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
951 sizeof(TConnection*), 0) != sizeof(TConnection*)) {
David Reiss01fe1532010-03-09 05:19:25 +0000952 return false;
953 }
954
955 return true;
956 }
957
958 /// Force connection shutdown for this connection.
959 void forceClose() {
960 appState_ = APP_CLOSE_CONNECTION;
961 if (!notifyServer()) {
962 throw TException("TConnection::forceClose: failed write on notify pipe");
963 }
964 }
965
966 /// return the server this connection was initialized for.
967 TNonblockingServer* getServer() {
968 return server_;
969 }
970
971 /// get state of connection.
972 TAppState getState() {
973 return appState_;
974 }
David Reiss105961d2010-10-06 17:10:17 +0000975
976 /// return the TSocket transport wrapping this network connection
977 boost::shared_ptr<TSocket> getTSocket() const {
978 return tSocket_;
979 }
980
981 /// return the server event handler if any
982 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
983 return serverEventHandler_;
984 }
985
986 /// return the Thrift connection context if any
987 void* getConnectionContext() {
988 return connectionContext_;
989 }
990
Mark Slee2f6404d2006-10-10 01:37:40 +0000991};
992
T Jake Lucianib5e62212009-01-31 22:36:20 +0000993}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000994
995#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_