blob: 7b1cf4dd8f560fc3fac7d146e0a1e49eef086098 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000030#include <string>
31#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000032#include <cstdlib>
David Reiss5105b2e2009-05-21 02:28:27 +000033#include <unistd.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000034#include <event.h>
35
T Jake Lucianib5e62212009-01-31 22:36:20 +000036namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000039using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000040using apache::thrift::protocol::TProtocol;
41using apache::thrift::concurrency::Runnable;
42using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000043
44// Forward declaration of class
45class TConnection;
46
Roger Meier30aae0c2011-07-08 12:23:31 +000047#ifdef LIBEVENT_VERSION_NUMBER
48#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
49#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
50#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
51#else
52// assume latest version 1 series
53#define LIBEVENT_VERSION_MAJOR 1
54#define LIBEVENT_VERSION_MINOR 14
55#define LIBEVENT_VERSION_REL 13
56#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
57#endif
58
59#if LIBEVENT_VERSION_NUMBER < 0x02000000
60 typedef int evutil_socket_t;
61#endif
62
63#ifndef SOCKOPT_CAST_T
64#define SOCKOPT_CAST_T void
65#endif
66
67template<class T>
68inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
69 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
70}
71
72template<class T>
73inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
74 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
75}
76
Mark Slee2f6404d2006-10-10 01:37:40 +000077/**
78 * This is a non-blocking server in C++ for high performance that operates a
79 * single IO thread. It assumes that all incoming requests are framed with a
80 * 4 byte length indicator and writes out responses using the same framing.
81 *
82 * It does not use the TServerTransport framework, but rather has socket
83 * operations hardcoded for use with select.
84 *
Mark Slee2f6404d2006-10-10 01:37:40 +000085 */
David Reiss01fe1532010-03-09 05:19:25 +000086
87
88/// Overload condition actions.
89enum TOverloadAction {
90 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
91 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
92 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
93};
94
Mark Slee2f6404d2006-10-10 01:37:40 +000095class TNonblockingServer : public TServer {
96 private:
David Reiss01fe1532010-03-09 05:19:25 +000097 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000098 static const int LISTEN_BACKLOG = 1024;
99
David Reiss01fe1532010-03-09 05:19:25 +0000100 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000101 static const size_t CONNECTION_STACK_LIMIT = 1024;
102
David Reiss01fe1532010-03-09 05:19:25 +0000103 /// Default limit on total number of connected sockets
104 static const int MAX_CONNECTIONS = INT_MAX;
105
106 /// Default limit on connections in handler/task processing
107 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
108
David Reiss89a12942010-10-06 17:10:52 +0000109 /// Default size of write buffer
110 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
111
David Reiss54bec5d2010-10-06 17:10:45 +0000112 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
113 static const int IDLE_READ_BUFFER_LIMIT = 1024;
114
115 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
116 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
117
118 /// # of calls before resizing oversized buffers (0 = check only on close)
119 static const int RESIZE_BUFFER_EVERY_N = 512;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000122 int serverSocket_;
123
David Reiss01fe1532010-03-09 05:19:25 +0000124 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000125 int port_;
126
David Reiss01fe1532010-03-09 05:19:25 +0000127 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000128 boost::shared_ptr<ThreadManager> threadManager_;
129
David Reiss01fe1532010-03-09 05:19:25 +0000130 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000131 bool threadPoolProcessing_;
132
David Reiss01fe1532010-03-09 05:19:25 +0000133 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +0000134 event_base* eventBase_;
135
David Reiss01fe1532010-03-09 05:19:25 +0000136 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +0000137 struct event serverEvent_;
138
David Reiss01fe1532010-03-09 05:19:25 +0000139 /// Event struct, used with eventBase_ for task completion notification
140 struct event notificationEvent_;
141
142 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000143 size_t numTConnections_;
144
David Reiss9e8073c2010-03-09 05:19:39 +0000145 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000146 size_t numActiveProcessors_;
147
148 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000149 size_t connectionStackLimit_;
150
David Reiss01fe1532010-03-09 05:19:25 +0000151 /// Limit for number of connections processing or waiting to process
152 size_t maxActiveProcessors_;
153
154 /// Limit for number of open connections
155 size_t maxConnections_;
156
David Reiss068f4162010-03-09 05:19:45 +0000157 /// Time in milliseconds before an unperformed task expires (0 == infinite).
158 int64_t taskExpireTime_;
159
David Reiss01fe1532010-03-09 05:19:25 +0000160 /**
161 * Hysteresis for overload state. This is the fraction of the overload
162 * value that needs to be reached before the overload state is cleared;
163 * must be <= 1.0.
164 */
165 double overloadHysteresis_;
166
167 /// Action to take when we're overloaded.
168 TOverloadAction overloadAction_;
169
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000170 /**
David Reiss89a12942010-10-06 17:10:52 +0000171 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
172 * and found to be exceeded, reinitialized) to this size.
173 */
174 size_t writeBufferDefaultSize_;
175
176 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000177 * Max read buffer size for an idle TConnection. When we place an idle
178 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000179 * we will free the buffer (such that it will be reinitialized by the next
180 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000181 */
David Reiss54bec5d2010-10-06 17:10:45 +0000182 size_t idleReadBufferLimit_;
183
184 /**
185 * Max write buffer size for an idle connection. When we place an idle
186 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
187 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000188 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
189 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000190 */
191 size_t idleWriteBufferLimit_;
192
193 /**
194 * Every N calls we check the buffer size limits on a connected TConnection.
195 * 0 disables (i.e. the checks are only done when a connection closes).
196 */
197 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000198
199 /// Set if we are currently in an overloaded state.
200 bool overloaded_;
201
202 /// Count of connections dropped since overload started
203 uint32_t nConnectionsDropped_;
204
205 /// Count of connections dropped on overload since server started
206 uint64_t nTotalConnectionsDropped_;
207
208 /// File descriptors for pipe used for task completion notification.
Roger Meier30aae0c2011-07-08 12:23:31 +0000209 evutil_socket_t notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000210
Mark Slee2f6404d2006-10-10 01:37:40 +0000211 /**
212 * This is a stack of all the objects that have been created but that
213 * are NOT currently in use. When we close a connection, we place it on this
214 * stack so that the object can be reused later, rather than freeing the
215 * memory and reallocating a new object later.
216 */
217 std::stack<TConnection*> connectionStack_;
218
David Reiss01fe1532010-03-09 05:19:25 +0000219 /**
220 * Called when server socket had something happen. We accept all waiting
221 * client connections on listen socket fd and assign TConnection objects
222 * to handle those requests.
223 *
224 * @param fd the listen socket.
225 * @param which the event flag that triggered the handler.
226 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000227 void handleEvent(int fd, short which);
228
229 public:
Mark Slee5ea15f92007-03-05 22:55:59 +0000230 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +0000231 int port) :
232 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000233 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +0000234 port_(port),
dweatherford58985992007-06-19 23:10:19 +0000235 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +0000236 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000237 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000238 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000239 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000240 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
241 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000242 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000243 overloadHysteresis_(0.8),
244 overloadAction_(T_OVERLOAD_NO_ACTION),
David Reiss89a12942010-10-06 17:10:52 +0000245 writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
David Reiss54bec5d2010-10-06 17:10:45 +0000246 idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
247 idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
248 resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
David Reiss01fe1532010-03-09 05:19:25 +0000249 overloaded_(false),
250 nConnectionsDropped_(0),
251 nTotalConnectionsDropped_(0) {}
Mark Sleef9373392007-01-24 19:41:57 +0000252
Mark Slee79b16942007-11-26 19:05:29 +0000253 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000254 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000255 int port,
256 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000257 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000258 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000259 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000260 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000261 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000262 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000263 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000264 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000265 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
266 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000267 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000268 overloadHysteresis_(0.8),
269 overloadAction_(T_OVERLOAD_NO_ACTION),
David Reiss89a12942010-10-06 17:10:52 +0000270 writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
David Reiss54bec5d2010-10-06 17:10:45 +0000271 idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
272 idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
273 resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
David Reiss01fe1532010-03-09 05:19:25 +0000274 overloaded_(false),
275 nConnectionsDropped_(0),
276 nTotalConnectionsDropped_(0) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000277 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
278 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000279 setInputProtocolFactory(protocolFactory);
280 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000281 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000282 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000283
Mark Slee5ea15f92007-03-05 22:55:59 +0000284 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
285 boost::shared_ptr<TTransportFactory> inputTransportFactory,
286 boost::shared_ptr<TTransportFactory> outputTransportFactory,
287 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
288 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000289 int port,
290 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000291 TServer(processor),
David Reiss01fe1532010-03-09 05:19:25 +0000292 serverSocket_(-1),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000293 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000294 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000295 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000296 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000297 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000298 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000299 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
300 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000301 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000302 overloadHysteresis_(0.8),
303 overloadAction_(T_OVERLOAD_NO_ACTION),
David Reiss89a12942010-10-06 17:10:52 +0000304 writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
David Reiss54bec5d2010-10-06 17:10:45 +0000305 idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
306 idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
307 resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
David Reiss01fe1532010-03-09 05:19:25 +0000308 overloaded_(false),
309 nConnectionsDropped_(0),
David Reiss068f4162010-03-09 05:19:45 +0000310 nTotalConnectionsDropped_(0) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000311 setInputTransportFactory(inputTransportFactory);
312 setOutputTransportFactory(outputTransportFactory);
313 setInputProtocolFactory(inputProtocolFactory);
314 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000315 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000316 }
Mark Slee79b16942007-11-26 19:05:29 +0000317
David Reiss8ede8182010-09-02 15:26:28 +0000318 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000319
David Reiss068f4162010-03-09 05:19:45 +0000320 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000321
David Reiss1997f102008-04-29 00:29:41 +0000322 boost::shared_ptr<ThreadManager> getThreadManager() {
323 return threadManager_;
324 }
325
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000326 /**
327 * Get the maximum number of unused TConnection we will hold in reserve.
328 *
329 * @return the current limit on TConnection pool size.
330 */
David Reiss260fa932009-04-02 23:51:39 +0000331 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000332 return connectionStackLimit_;
333 }
334
335 /**
336 * Set the maximum number of unused TConnection we will hold in reserve.
337 *
338 * @param sz the new limit for TConnection pool size.
339 */
David Reiss260fa932009-04-02 23:51:39 +0000340 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000341 connectionStackLimit_ = sz;
342 }
343
Mark Slee79b16942007-11-26 19:05:29 +0000344 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000345 return threadPoolProcessing_;
346 }
347
348 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000349 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000350 }
351
Mark Slee79b16942007-11-26 19:05:29 +0000352 event_base* getEventBase() const {
353 return eventBase_;
354 }
355
David Reiss01fe1532010-03-09 05:19:25 +0000356 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000357 void incrementNumConnections() {
358 ++numTConnections_;
359 }
360
David Reiss01fe1532010-03-09 05:19:25 +0000361 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000362 void decrementNumConnections() {
363 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000364 }
365
David Reiss01fe1532010-03-09 05:19:25 +0000366 /**
367 * Return the count of sockets currently connected to.
368 *
369 * @return count of connected sockets.
370 */
371 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000372 return numTConnections_;
373 }
374
David Reiss01fe1532010-03-09 05:19:25 +0000375 /**
376 * Return the count of connection objects allocated but not in use.
377 *
378 * @return count of idle connection objects.
379 */
380 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000381 return connectionStack_.size();
382 }
383
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000384 /**
David Reiss01fe1532010-03-09 05:19:25 +0000385 * Return count of number of connections which are currently processing.
386 * This is defined as a connection where all data has been received and
387 * either assigned a task (when threading) or passed to a handler (when
388 * not threading), and where the handler has not yet returned.
389 *
390 * @return # of connections currently processing.
391 */
392 size_t getNumActiveProcessors() const {
393 return numActiveProcessors_;
394 }
395
396 /// Increment the count of connections currently processing.
397 void incrementActiveProcessors() {
398 ++numActiveProcessors_;
399 }
400
401 /// Decrement the count of connections currently processing.
402 void decrementActiveProcessors() {
403 if (numActiveProcessors_ > 0) {
404 --numActiveProcessors_;
405 }
406 }
407
408 /**
409 * Get the maximum # of connections allowed before overload.
410 *
411 * @return current setting.
412 */
413 size_t getMaxConnections() const {
414 return maxConnections_;
415 }
416
417 /**
418 * Set the maximum # of connections allowed before overload.
419 *
420 * @param maxConnections new setting for maximum # of connections.
421 */
422 void setMaxConnections(size_t maxConnections) {
423 maxConnections_ = maxConnections;
424 }
425
426 /**
427 * Get the maximum # of connections waiting in handler/task before overload.
428 *
429 * @return current setting.
430 */
431 size_t getMaxActiveProcessors() const {
432 return maxActiveProcessors_;
433 }
434
435 /**
436 * Set the maximum # of connections waiting in handler/task before overload.
437 *
438 * @param maxActiveProcessors new setting for maximum # of active processes.
439 */
440 void setMaxActiveProcessors(size_t maxActiveProcessors) {
441 maxActiveProcessors_ = maxActiveProcessors;
442 }
443
444 /**
445 * Get fraction of maximum limits before an overload condition is cleared.
446 *
447 * @return hysteresis fraction
448 */
449 double getOverloadHysteresis() const {
450 return overloadHysteresis_;
451 }
452
453 /**
454 * Set fraction of maximum limits before an overload condition is cleared.
455 * A good value would probably be between 0.5 and 0.9.
456 *
457 * @param hysteresisFraction fraction <= 1.0.
458 */
459 void setOverloadHysteresis(double hysteresisFraction) {
460 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
461 overloadHysteresis_ = hysteresisFraction;
462 }
463 }
464
465 /**
466 * Get the action the server will take on overload.
467 *
468 * @return a TOverloadAction enum value for the currently set action.
469 */
470 TOverloadAction getOverloadAction() const {
471 return overloadAction_;
472 }
473
474 /**
475 * Set the action the server is to take on overload.
476 *
477 * @param overloadAction a TOverloadAction enum value for the action.
478 */
479 void setOverloadAction(TOverloadAction overloadAction) {
480 overloadAction_ = overloadAction;
481 }
482
483 /**
David Reiss068f4162010-03-09 05:19:45 +0000484 * Get the time in milliseconds after which a task expires (0 == infinite).
485 *
486 * @return a 64-bit time in milliseconds.
487 */
488 int64_t getTaskExpireTime() const {
489 return taskExpireTime_;
490 }
491
492 /**
493 * Set the time in milliseconds after which a task expires (0 == infinite).
494 *
495 * @param taskExpireTime a 64-bit time in milliseconds.
496 */
497 void setTaskExpireTime(int64_t taskExpireTime) {
498 taskExpireTime_ = taskExpireTime;
499 }
500
501 /**
David Reiss01fe1532010-03-09 05:19:25 +0000502 * Determine if the server is currently overloaded.
503 * This function checks the maximums for open connections and connections
504 * currently in processing, and sets an overload condition if they are
505 * exceeded. The overload will persist until both values are below the
506 * current hysteresis fraction of their maximums.
507 *
508 * @return true if an overload condition exists, false if not.
509 */
510 bool serverOverloaded();
511
512 /** Pop and discard next task on threadpool wait queue.
513 *
514 * @return true if a task was discarded, false if the wait queue was empty.
515 */
516 bool drainPendingTask();
517
518 /**
David Reiss89a12942010-10-06 17:10:52 +0000519 * Get the starting size of a TConnection object's write buffer.
520 *
521 * @return # bytes we initialize a TConnection object's write buffer to.
522 */
523 size_t getWriteBufferDefaultSize() const {
524 return writeBufferDefaultSize_;
525 }
526
527 /**
528 * Set the starting size of a TConnection object's write buffer.
529 *
530 * @param size # bytes we initialize a TConnection object's write buffer to.
531 */
532 void setWriteBufferDefaultSize(size_t size) {
533 writeBufferDefaultSize_ = size;
534 }
535
536 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000537 * Get the maximum size of read buffer allocated to idle TConnection objects.
538 *
David Reiss89a12942010-10-06 17:10:52 +0000539 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000540 */
541 size_t getIdleReadBufferLimit() const {
542 return idleReadBufferLimit_;
543 }
544
545 /**
546 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
547 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000548 *
David Reiss89a12942010-10-06 17:10:52 +0000549 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000550 */
551 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000552 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000553 }
554
555 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000556 * Set the maximum size read buffer allocated to idle TConnection objects.
557 * If a TConnection object is found (either on connection close or between
558 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000559 * allocated to its read buffer, we free it and allow it to be reinitialized
560 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000561 *
562 * @param limit of bytes beyond which we will shrink buffers when checked.
563 */
564 void setIdleReadBufferLimit(size_t limit) {
565 idleReadBufferLimit_ = limit;
566 }
567
568 /**
569 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
570 * Set the maximum size read buffer allocated to idle TConnection objects.
571 * If a TConnection object is found (either on connection close or between
572 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000573 * allocated to its read buffer, we free it and allow it to be reinitialized
574 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000575 *
576 * @param limit of bytes beyond which we will shrink buffers when checked.
577 */
578 void setIdleBufferMemLimit(size_t limit) {
579 idleReadBufferLimit_ = limit;
580 }
581
582
583
584 /**
585 * Get the maximum size of write buffer allocated to idle TConnection objects.
586 *
587 * @return # bytes beyond which we will reallocate buffers when checked.
588 */
589 size_t getIdleWriteBufferLimit() const {
590 return idleWriteBufferLimit_;
591 }
592
593 /**
594 * Set the maximum size write buffer allocated to idle TConnection objects.
595 * If a TConnection object is found (either on connection close or between
596 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000597 * allocated to its write buffer, we destroy and construct that buffer with
598 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000599 *
600 * @param limit of bytes beyond which we will shrink buffers when idle.
601 */
David Reiss54bec5d2010-10-06 17:10:45 +0000602 void setIdleWriteBufferLimit(size_t limit) {
603 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000604 }
605
David Reiss01fe1532010-03-09 05:19:25 +0000606 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000607 * Get # of calls made between buffer size checks. 0 means disabled.
608 *
609 * @return # of calls between buffer size checks.
610 */
611 int32_t getResizeBufferEveryN() const {
612 return resizeBufferEveryN_;
613 }
614
615 /**
616 * Check buffer sizes every "count" calls. This allows buffer limits
617 * to be enforced for persistant connections with a controllable degree
618 * of overhead. 0 disables checks except at connection close.
619 *
620 * @param count the number of calls between checks, or 0 to disable
621 */
622 void setResizeBufferEveryN(int32_t count) {
623 resizeBufferEveryN_ = count;
624 }
625
626
627
628 /**
David Reiss01fe1532010-03-09 05:19:25 +0000629 * Return an initialized connection object. Creates or recovers from
630 * pool a TConnection and initializes it with the provided socket FD
631 * and flags.
632 *
633 * @param socket FD of socket associated with this connection.
634 * @param flags initial lib_event flags for this connection.
David Reiss105961d2010-10-06 17:10:17 +0000635 * @param addr the sockaddr of the client
636 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000637 * @return pointer to initialized TConnection object.
638 */
David Reiss105961d2010-10-06 17:10:17 +0000639 TConnection* createConnection(int socket, short flags,
640 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000641
David Reiss01fe1532010-03-09 05:19:25 +0000642 /**
643 * Returns a connection to pool or deletion. If the connection pool
644 * (a stack) isn't full, place the connection object on it, otherwise
645 * just delete it.
646 *
647 * @param connection the TConection being returned.
648 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000649 void returnConnection(TConnection* connection);
650
David Reiss01fe1532010-03-09 05:19:25 +0000651 /**
David Reiss068f4162010-03-09 05:19:45 +0000652 * Callback function that the threadmanager calls when a task reaches
653 * its expiration time. It is needed to clean up the expired connection.
654 *
655 * @param task the runnable associated with the expired task.
656 */
657 void expireClose(boost::shared_ptr<Runnable> task);
658
659 /**
David Reiss01fe1532010-03-09 05:19:25 +0000660 * C-callable event handler for listener events. Provides a callback
661 * that libevent can understand which invokes server->handleEvent().
662 *
663 * @param fd the descriptor the event occured on.
664 * @param which the flags associated with the event.
665 * @param v void* callback arg where we placed TNonblockingServer's "this".
666 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000667 static void eventHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000668 ((TNonblockingServer*)v)->handleEvent(fd, which);
669 }
670
David Reiss01fe1532010-03-09 05:19:25 +0000671 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000672 void listenSocket();
673
David Reiss01fe1532010-03-09 05:19:25 +0000674 /**
675 * Takes a socket created by listenSocket() and sets various options on it
676 * to prepare for use in the server.
677 *
678 * @param fd descriptor of socket to be initialized/
679 */
Mark Slee79b16942007-11-26 19:05:29 +0000680 void listenSocket(int fd);
681
David Reiss01fe1532010-03-09 05:19:25 +0000682 /// Create the pipe used to notify I/O process of task completion.
683 void createNotificationPipe();
684
685 /**
686 * Get notification pipe send descriptor.
687 *
688 * @return write fd for pipe.
689 */
690 int getNotificationSendFD() const {
691 return notificationPipeFDs_[1];
692 }
693
694 /**
695 * Get notification pipe receive descriptor.
696 *
697 * @return read fd of pipe.
698 */
699 int getNotificationRecvFD() const {
700 return notificationPipeFDs_[0];
701 }
702
703 /**
704 * Register the core libevent events onto the proper base.
705 *
706 * @param base pointer to the event base to be initialized.
707 */
Mark Slee79b16942007-11-26 19:05:29 +0000708 void registerEvents(event_base* base);
709
David Reiss01fe1532010-03-09 05:19:25 +0000710 /**
711 * Main workhorse function, starts up the server listening on a port and
712 * loops over the libevent handler.
713 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000714 void serve();
715};
716
David Reiss89a12942010-10-06 17:10:52 +0000717/// Three states for sockets: recv frame size, recv data, and send mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000718enum TSocketState {
David Reiss89a12942010-10-06 17:10:52 +0000719 SOCKET_RECV_FRAMING,
Mark Slee2f6404d2006-10-10 01:37:40 +0000720 SOCKET_RECV,
721 SOCKET_SEND
722};
723
724/**
David Reiss01fe1532010-03-09 05:19:25 +0000725 * Five states for the nonblocking servr:
Mark Slee2f6404d2006-10-10 01:37:40 +0000726 * 1) initialize
727 * 2) read 4 byte frame size
728 * 3) read frame of data
729 * 4) send back data (if any)
David Reiss01fe1532010-03-09 05:19:25 +0000730 * 5) force immediate connection close
Mark Slee2f6404d2006-10-10 01:37:40 +0000731 */
732enum TAppState {
733 APP_INIT,
734 APP_READ_FRAME_SIZE,
735 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000736 APP_WAIT_TASK,
David Reiss01fe1532010-03-09 05:19:25 +0000737 APP_SEND_RESULT,
738 APP_CLOSE_CONNECTION
Mark Slee2f6404d2006-10-10 01:37:40 +0000739};
740
741/**
742 * Represents a connection that is handled via libevent. This connection
743 * essentially encapsulates a socket that has some associated libevent state.
744 */
David Reiss54bec5d2010-10-06 17:10:45 +0000745class TConnection {
Mark Slee2f6404d2006-10-10 01:37:40 +0000746 private:
747
David Reiss01fe1532010-03-09 05:19:25 +0000748 /// Server handle
Mark Slee2f6404d2006-10-10 01:37:40 +0000749 TNonblockingServer* server_;
750
David Reiss105961d2010-10-06 17:10:17 +0000751 /// Object wrapping network socket
752 boost::shared_ptr<TSocket> tSocket_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000753
David Reiss01fe1532010-03-09 05:19:25 +0000754 /// Libevent object
Mark Slee2f6404d2006-10-10 01:37:40 +0000755 struct event event_;
756
David Reiss01fe1532010-03-09 05:19:25 +0000757 /// Libevent flags
Mark Slee2f6404d2006-10-10 01:37:40 +0000758 short eventFlags_;
759
David Reiss01fe1532010-03-09 05:19:25 +0000760 /// Socket mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000761 TSocketState socketState_;
762
David Reiss01fe1532010-03-09 05:19:25 +0000763 /// Application state
Mark Slee2f6404d2006-10-10 01:37:40 +0000764 TAppState appState_;
765
David Reiss01fe1532010-03-09 05:19:25 +0000766 /// How much data needed to read
Mark Slee2f6404d2006-10-10 01:37:40 +0000767 uint32_t readWant_;
768
David Reiss01fe1532010-03-09 05:19:25 +0000769 /// Where in the read buffer are we
Mark Slee2f6404d2006-10-10 01:37:40 +0000770 uint32_t readBufferPos_;
771
David Reiss01fe1532010-03-09 05:19:25 +0000772 /// Read buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000773 uint8_t* readBuffer_;
774
David Reiss01fe1532010-03-09 05:19:25 +0000775 /// Read buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000776 uint32_t readBufferSize_;
777
David Reiss01fe1532010-03-09 05:19:25 +0000778 /// Write buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000779 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000780
David Reiss01fe1532010-03-09 05:19:25 +0000781 /// Write buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000782 uint32_t writeBufferSize_;
783
David Reiss01fe1532010-03-09 05:19:25 +0000784 /// How far through writing are we?
Mark Slee2f6404d2006-10-10 01:37:40 +0000785 uint32_t writeBufferPos_;
786
David Reiss54bec5d2010-10-06 17:10:45 +0000787 /// Largest size of write buffer seen since buffer was constructed
788 size_t largestWriteBufferSize_;
789
790 /// Count of the number of calls for use with getResizeBufferEveryN().
791 int32_t callsForResize_;
792
David Reiss01fe1532010-03-09 05:19:25 +0000793 /// Task handle
Mark Sleee02385b2007-06-09 01:21:16 +0000794 int taskHandle_;
795
David Reiss01fe1532010-03-09 05:19:25 +0000796 /// Task event
Mark Sleee02385b2007-06-09 01:21:16 +0000797 struct event taskEvent_;
798
David Reiss01fe1532010-03-09 05:19:25 +0000799 /// Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000800 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000801
David Reiss01fe1532010-03-09 05:19:25 +0000802 /// Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000803 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000804
David Reiss01fe1532010-03-09 05:19:25 +0000805 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000806 boost::shared_ptr<TTransport> factoryInputTransport_;
807 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000808
David Reiss01fe1532010-03-09 05:19:25 +0000809 /// Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000810 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000811
David Reiss01fe1532010-03-09 05:19:25 +0000812 /// Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000813 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000814
David Reiss105961d2010-10-06 17:10:17 +0000815 /// Server event handler, if any
816 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
817
818 /// Thrift call context, if any
819 void *connectionContext_;
820
David Reiss01fe1532010-03-09 05:19:25 +0000821 /// Go into read mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000822 void setRead() {
823 setFlags(EV_READ | EV_PERSIST);
824 }
825
David Reiss01fe1532010-03-09 05:19:25 +0000826 /// Go into write mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000827 void setWrite() {
828 setFlags(EV_WRITE | EV_PERSIST);
829 }
830
David Reiss01fe1532010-03-09 05:19:25 +0000831 /// Set socket idle
Mark Slee402ee282007-08-23 01:43:20 +0000832 void setIdle() {
833 setFlags(0);
834 }
835
David Reiss01fe1532010-03-09 05:19:25 +0000836 /**
837 * Set event flags for this connection.
838 *
839 * @param eventFlags flags we pass to libevent for the connection.
840 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000841 void setFlags(short eventFlags);
842
David Reiss01fe1532010-03-09 05:19:25 +0000843 /**
844 * Libevent handler called (via our static wrapper) when the connection
845 * socket had something happen. Rather than use the flags libevent passed,
846 * we use the connection state to determine whether we need to read or
847 * write the socket.
848 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000849 void workSocket();
850
David Reiss01fe1532010-03-09 05:19:25 +0000851 /// Close this connection and free or reset its resources.
Mark Slee2f6404d2006-10-10 01:37:40 +0000852 void close();
853
854 public:
855
David Reiss01fe1532010-03-09 05:19:25 +0000856 class Task;
857
858 /// Constructor
David Reiss105961d2010-10-06 17:10:17 +0000859 TConnection(int socket, short eventFlags, TNonblockingServer *s,
860 const sockaddr* addr, socklen_t addrLen) {
David Reiss89a12942010-10-06 17:10:52 +0000861 readBuffer_ = NULL;
862 readBufferSize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000863
Mark Slee2f6404d2006-10-10 01:37:40 +0000864 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000865 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000866 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000867 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
David Reiss89a12942010-10-06 17:10:52 +0000868 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
David Reiss105961d2010-10-06 17:10:17 +0000869 tSocket_.reset(new TSocket());
Mark Slee79b16942007-11-26 19:05:29 +0000870
David Reiss105961d2010-10-06 17:10:17 +0000871 init(socket, eventFlags, s, addr, addrLen);
David Reiss1997f102008-04-29 00:29:41 +0000872 server_->incrementNumConnections();
873 }
874
875 ~TConnection() {
David Reiss472fffb2010-03-09 05:20:24 +0000876 std::free(readBuffer_);
David Reissc17fe6b2008-04-29 00:29:43 +0000877 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000878 }
879
David Reiss54bec5d2010-10-06 17:10:45 +0000880 /**
881 * Check buffers against any size limits and shrink it if exceeded.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000882 *
David Reiss54bec5d2010-10-06 17:10:45 +0000883 * @param readLimit we reduce read buffer size to this (if nonzero).
884 * @param writeLimit if nonzero and write buffer is larger, replace it.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000885 */
David Reiss54bec5d2010-10-06 17:10:45 +0000886 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000887
David Reiss01fe1532010-03-09 05:19:25 +0000888 /// Initialize
David Reiss105961d2010-10-06 17:10:17 +0000889 void init(int socket, short eventFlags, TNonblockingServer *s,
890 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000891
David Reiss01fe1532010-03-09 05:19:25 +0000892 /**
893 * This is called when the application transitions from one state into
894 * another. This means that it has finished writing the data that it needed
895 * to, or finished receiving the data that it needed to.
896 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000897 void transition();
898
David Reiss01fe1532010-03-09 05:19:25 +0000899 /**
900 * C-callable event handler for connection events. Provides a callback
901 * that libevent can understand which invokes connection_->workSocket().
902 *
903 * @param fd the descriptor the event occured on.
904 * @param which the flags associated with the event.
905 * @param v void* callback arg where we placed TConnection's "this".
906 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000907 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
David Reiss105961d2010-10-06 17:10:17 +0000908 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
Mark Slee2f6404d2006-10-10 01:37:40 +0000909 ((TConnection*)v)->workSocket();
910 }
Mark Slee79b16942007-11-26 19:05:29 +0000911
David Reiss01fe1532010-03-09 05:19:25 +0000912 /**
913 * C-callable event handler for signaling task completion. Provides a
914 * callback that libevent can understand that will read a connection
915 * object's address from a pipe and call connection->transition() for
916 * that object.
917 *
918 * @param fd the descriptor the event occured on.
919 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000920 static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
David Reiss01fe1532010-03-09 05:19:25 +0000921 TConnection* connection;
David Reiss83b8fda2010-03-09 05:19:34 +0000922 ssize_t nBytes;
Roger Meier30aae0c2011-07-08 12:23:31 +0000923 while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
David Reiss83b8fda2010-03-09 05:19:34 +0000924 == sizeof(TConnection*)) {
925 connection->transition();
Mark Sleee02385b2007-06-09 01:21:16 +0000926 }
David Reiss83b8fda2010-03-09 05:19:34 +0000927 if (nBytes > 0) {
928 throw TException("TConnection::taskHandler unexpected partial read");
929 }
Roger Meier30aae0c2011-07-08 12:23:31 +0000930 if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
David Reiss83b8fda2010-03-09 05:19:34 +0000931 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
932 }
Mark Sleee02385b2007-06-09 01:21:16 +0000933 }
934
David Reiss01fe1532010-03-09 05:19:25 +0000935 /**
936 * Notification to server that processing has ended on this request.
937 * Can be called either when processing is completed or when a waiting
938 * task has been preemptively terminated (on overload).
939 *
David Reiss9e8073c2010-03-09 05:19:39 +0000940 * @return true if successful, false if unable to notify (check errno).
David Reiss01fe1532010-03-09 05:19:25 +0000941 */
942 bool notifyServer() {
943 TConnection* connection = this;
Roger Meier30aae0c2011-07-08 12:23:31 +0000944 if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
945 sizeof(TConnection*), 0) != sizeof(TConnection*)) {
David Reiss01fe1532010-03-09 05:19:25 +0000946 return false;
947 }
948
949 return true;
950 }
951
952 /// Force connection shutdown for this connection.
953 void forceClose() {
954 appState_ = APP_CLOSE_CONNECTION;
955 if (!notifyServer()) {
956 throw TException("TConnection::forceClose: failed write on notify pipe");
957 }
958 }
959
960 /// return the server this connection was initialized for.
961 TNonblockingServer* getServer() {
962 return server_;
963 }
964
965 /// get state of connection.
966 TAppState getState() {
967 return appState_;
968 }
David Reiss105961d2010-10-06 17:10:17 +0000969
970 /// return the TSocket transport wrapping this network connection
971 boost::shared_ptr<TSocket> getTSocket() const {
972 return tSocket_;
973 }
974
975 /// return the server event handler if any
976 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
977 return serverEventHandler_;
978 }
979
980 /// return the Thrift connection context if any
981 void* getConnectionContext() {
982 return connectionContext_;
983 }
984
Mark Slee2f6404d2006-10-10 01:37:40 +0000985};
986
T Jake Lucianib5e62212009-01-31 22:36:20 +0000987}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000988
989#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_