blob: ac0e34500705c25ff933facd60ba06fde2f1deeb [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000030#include <string>
31#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000032#include <cstdlib>
David Reiss5105b2e2009-05-21 02:28:27 +000033#include <unistd.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000034#include <event.h>
35
T Jake Lucianib5e62212009-01-31 22:36:20 +000036namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000039using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000040using apache::thrift::protocol::TProtocol;
41using apache::thrift::concurrency::Runnable;
42using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000043
44// Forward declaration of class
45class TConnection;
46
47/**
48 * This is a non-blocking server in C++ for high performance that operates a
49 * single IO thread. It assumes that all incoming requests are framed with a
50 * 4 byte length indicator and writes out responses using the same framing.
51 *
52 * It does not use the TServerTransport framework, but rather has socket
53 * operations hardcoded for use with select.
54 *
Mark Slee2f6404d2006-10-10 01:37:40 +000055 */
David Reiss01fe1532010-03-09 05:19:25 +000056
57
58/// Overload condition actions.
59enum TOverloadAction {
60 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
61 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
62 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
63};
64
Mark Slee2f6404d2006-10-10 01:37:40 +000065class TNonblockingServer : public TServer {
66 private:
David Reiss01fe1532010-03-09 05:19:25 +000067 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000068 static const int LISTEN_BACKLOG = 1024;
69
David Reiss01fe1532010-03-09 05:19:25 +000070 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000071 static const size_t CONNECTION_STACK_LIMIT = 1024;
72
David Reiss01fe1532010-03-09 05:19:25 +000073 /// Maximum size of buffer allocated to idle connection
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000074 static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
75
David Reiss01fe1532010-03-09 05:19:25 +000076 /// Default limit on total number of connected sockets
77 static const int MAX_CONNECTIONS = INT_MAX;
78
79 /// Default limit on connections in handler/task processing
80 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
81
82 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +000083 int serverSocket_;
84
David Reiss01fe1532010-03-09 05:19:25 +000085 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +000086 int port_;
87
David Reiss01fe1532010-03-09 05:19:25 +000088 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +000089 boost::shared_ptr<ThreadManager> threadManager_;
90
David Reiss01fe1532010-03-09 05:19:25 +000091 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +000092 bool threadPoolProcessing_;
93
David Reiss01fe1532010-03-09 05:19:25 +000094 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +000095 event_base* eventBase_;
96
David Reiss01fe1532010-03-09 05:19:25 +000097 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +000098 struct event serverEvent_;
99
David Reiss01fe1532010-03-09 05:19:25 +0000100 /// Event struct, used with eventBase_ for task completion notification
101 struct event notificationEvent_;
102
103 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000104 size_t numTConnections_;
105
David Reiss9e8073c2010-03-09 05:19:39 +0000106 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000107 size_t numActiveProcessors_;
108
109 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000110 size_t connectionStackLimit_;
111
David Reiss01fe1532010-03-09 05:19:25 +0000112 /// Limit for number of connections processing or waiting to process
113 size_t maxActiveProcessors_;
114
115 /// Limit for number of open connections
116 size_t maxConnections_;
117
David Reiss068f4162010-03-09 05:19:45 +0000118 /// Time in milliseconds before an unperformed task expires (0 == infinite).
119 int64_t taskExpireTime_;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /**
122 * Hysteresis for overload state. This is the fraction of the overload
123 * value that needs to be reached before the overload state is cleared;
124 * must be <= 1.0.
125 */
126 double overloadHysteresis_;
127
128 /// Action to take when we're overloaded.
129 TOverloadAction overloadAction_;
130
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000131 /**
132 * Max read buffer size for an idle connection. When we place an idle
133 * TConnection into connectionStack_, we insure that its read buffer is
134 * reduced to this size to insure that idle connections don't hog memory.
135 */
David Reiss01fe1532010-03-09 05:19:25 +0000136 size_t idleBufferMemLimit_;
137
138 /// Set if we are currently in an overloaded state.
139 bool overloaded_;
140
141 /// Count of connections dropped since overload started
142 uint32_t nConnectionsDropped_;
143
144 /// Count of connections dropped on overload since server started
145 uint64_t nTotalConnectionsDropped_;
146
147 /// File descriptors for pipe used for task completion notification.
148 int notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000149
Mark Slee2f6404d2006-10-10 01:37:40 +0000150 /**
151 * This is a stack of all the objects that have been created but that
152 * are NOT currently in use. When we close a connection, we place it on this
153 * stack so that the object can be reused later, rather than freeing the
154 * memory and reallocating a new object later.
155 */
156 std::stack<TConnection*> connectionStack_;
157
David Reiss01fe1532010-03-09 05:19:25 +0000158 /**
159 * Called when server socket had something happen. We accept all waiting
160 * client connections on listen socket fd and assign TConnection objects
161 * to handle those requests.
162 *
163 * @param fd the listen socket.
164 * @param which the event flag that triggered the handler.
165 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000166 void handleEvent(int fd, short which);
167
168 public:
Mark Slee5ea15f92007-03-05 22:55:59 +0000169 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +0000170 int port) :
171 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000172 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +0000173 port_(port),
dweatherford58985992007-06-19 23:10:19 +0000174 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +0000175 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000176 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000177 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000178 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000179 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
180 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000181 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000182 overloadHysteresis_(0.8),
183 overloadAction_(T_OVERLOAD_NO_ACTION),
184 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
185 overloaded_(false),
186 nConnectionsDropped_(0),
187 nTotalConnectionsDropped_(0) {}
Mark Sleef9373392007-01-24 19:41:57 +0000188
Mark Slee79b16942007-11-26 19:05:29 +0000189 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000190 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000191 int port,
192 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000193 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000194 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000195 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000196 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000197 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000198 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000199 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000200 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000201 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
202 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000203 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000204 overloadHysteresis_(0.8),
205 overloadAction_(T_OVERLOAD_NO_ACTION),
206 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
207 overloaded_(false),
208 nConnectionsDropped_(0),
209 nTotalConnectionsDropped_(0) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000210 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
211 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000212 setInputProtocolFactory(protocolFactory);
213 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000214 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000215 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000216
Mark Slee5ea15f92007-03-05 22:55:59 +0000217 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
218 boost::shared_ptr<TTransportFactory> inputTransportFactory,
219 boost::shared_ptr<TTransportFactory> outputTransportFactory,
220 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
221 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000222 int port,
223 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000224 TServer(processor),
David Reiss01fe1532010-03-09 05:19:25 +0000225 serverSocket_(-1),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000226 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000227 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000228 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000229 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000230 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000231 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000232 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
233 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000234 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000235 overloadHysteresis_(0.8),
236 overloadAction_(T_OVERLOAD_NO_ACTION),
237 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
238 overloaded_(false),
239 nConnectionsDropped_(0),
David Reiss068f4162010-03-09 05:19:45 +0000240 nTotalConnectionsDropped_(0) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000241 setInputTransportFactory(inputTransportFactory);
242 setOutputTransportFactory(outputTransportFactory);
243 setInputProtocolFactory(inputProtocolFactory);
244 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000245 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000246 }
Mark Slee79b16942007-11-26 19:05:29 +0000247
David Reiss8ede8182010-09-02 15:26:28 +0000248 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000249
David Reiss068f4162010-03-09 05:19:45 +0000250 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000251
David Reiss1997f102008-04-29 00:29:41 +0000252 boost::shared_ptr<ThreadManager> getThreadManager() {
253 return threadManager_;
254 }
255
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000256 /**
257 * Get the maximum number of unused TConnection we will hold in reserve.
258 *
259 * @return the current limit on TConnection pool size.
260 */
David Reiss260fa932009-04-02 23:51:39 +0000261 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000262 return connectionStackLimit_;
263 }
264
265 /**
266 * Set the maximum number of unused TConnection we will hold in reserve.
267 *
268 * @param sz the new limit for TConnection pool size.
269 */
David Reiss260fa932009-04-02 23:51:39 +0000270 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000271 connectionStackLimit_ = sz;
272 }
273
Mark Slee79b16942007-11-26 19:05:29 +0000274 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000275 return threadPoolProcessing_;
276 }
277
278 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000279 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000280 }
281
Mark Slee79b16942007-11-26 19:05:29 +0000282 event_base* getEventBase() const {
283 return eventBase_;
284 }
285
David Reiss01fe1532010-03-09 05:19:25 +0000286 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000287 void incrementNumConnections() {
288 ++numTConnections_;
289 }
290
David Reiss01fe1532010-03-09 05:19:25 +0000291 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000292 void decrementNumConnections() {
293 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000294 }
295
David Reiss01fe1532010-03-09 05:19:25 +0000296 /**
297 * Return the count of sockets currently connected to.
298 *
299 * @return count of connected sockets.
300 */
301 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000302 return numTConnections_;
303 }
304
David Reiss01fe1532010-03-09 05:19:25 +0000305 /**
306 * Return the count of connection objects allocated but not in use.
307 *
308 * @return count of idle connection objects.
309 */
310 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000311 return connectionStack_.size();
312 }
313
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000314 /**
David Reiss01fe1532010-03-09 05:19:25 +0000315 * Return count of number of connections which are currently processing.
316 * This is defined as a connection where all data has been received and
317 * either assigned a task (when threading) or passed to a handler (when
318 * not threading), and where the handler has not yet returned.
319 *
320 * @return # of connections currently processing.
321 */
322 size_t getNumActiveProcessors() const {
323 return numActiveProcessors_;
324 }
325
326 /// Increment the count of connections currently processing.
327 void incrementActiveProcessors() {
328 ++numActiveProcessors_;
329 }
330
331 /// Decrement the count of connections currently processing.
332 void decrementActiveProcessors() {
333 if (numActiveProcessors_ > 0) {
334 --numActiveProcessors_;
335 }
336 }
337
338 /**
339 * Get the maximum # of connections allowed before overload.
340 *
341 * @return current setting.
342 */
343 size_t getMaxConnections() const {
344 return maxConnections_;
345 }
346
347 /**
348 * Set the maximum # of connections allowed before overload.
349 *
350 * @param maxConnections new setting for maximum # of connections.
351 */
352 void setMaxConnections(size_t maxConnections) {
353 maxConnections_ = maxConnections;
354 }
355
356 /**
357 * Get the maximum # of connections waiting in handler/task before overload.
358 *
359 * @return current setting.
360 */
361 size_t getMaxActiveProcessors() const {
362 return maxActiveProcessors_;
363 }
364
365 /**
366 * Set the maximum # of connections waiting in handler/task before overload.
367 *
368 * @param maxActiveProcessors new setting for maximum # of active processes.
369 */
370 void setMaxActiveProcessors(size_t maxActiveProcessors) {
371 maxActiveProcessors_ = maxActiveProcessors;
372 }
373
374 /**
375 * Get fraction of maximum limits before an overload condition is cleared.
376 *
377 * @return hysteresis fraction
378 */
379 double getOverloadHysteresis() const {
380 return overloadHysteresis_;
381 }
382
383 /**
384 * Set fraction of maximum limits before an overload condition is cleared.
385 * A good value would probably be between 0.5 and 0.9.
386 *
387 * @param hysteresisFraction fraction <= 1.0.
388 */
389 void setOverloadHysteresis(double hysteresisFraction) {
390 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
391 overloadHysteresis_ = hysteresisFraction;
392 }
393 }
394
395 /**
396 * Get the action the server will take on overload.
397 *
398 * @return a TOverloadAction enum value for the currently set action.
399 */
400 TOverloadAction getOverloadAction() const {
401 return overloadAction_;
402 }
403
404 /**
405 * Set the action the server is to take on overload.
406 *
407 * @param overloadAction a TOverloadAction enum value for the action.
408 */
409 void setOverloadAction(TOverloadAction overloadAction) {
410 overloadAction_ = overloadAction;
411 }
412
413 /**
David Reiss068f4162010-03-09 05:19:45 +0000414 * Get the time in milliseconds after which a task expires (0 == infinite).
415 *
416 * @return a 64-bit time in milliseconds.
417 */
418 int64_t getTaskExpireTime() const {
419 return taskExpireTime_;
420 }
421
422 /**
423 * Set the time in milliseconds after which a task expires (0 == infinite).
424 *
425 * @param taskExpireTime a 64-bit time in milliseconds.
426 */
427 void setTaskExpireTime(int64_t taskExpireTime) {
428 taskExpireTime_ = taskExpireTime;
429 }
430
431 /**
David Reiss01fe1532010-03-09 05:19:25 +0000432 * Determine if the server is currently overloaded.
433 * This function checks the maximums for open connections and connections
434 * currently in processing, and sets an overload condition if they are
435 * exceeded. The overload will persist until both values are below the
436 * current hysteresis fraction of their maximums.
437 *
438 * @return true if an overload condition exists, false if not.
439 */
440 bool serverOverloaded();
441
442 /** Pop and discard next task on threadpool wait queue.
443 *
444 * @return true if a task was discarded, false if the wait queue was empty.
445 */
446 bool drainPendingTask();
447
448 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000449 * Get the maximum limit of memory allocated to idle TConnection objects.
450 *
451 * @return # bytes beyond which we will shrink buffers when idle.
452 */
453 size_t getIdleBufferMemLimit() const {
454 return idleBufferMemLimit_;
455 }
456
457 /**
458 * Set the maximum limit of memory allocated to idle TConnection objects.
459 * If a TConnection object goes idle with more than this much memory
460 * allocated to its buffer, we shrink it to this value.
461 *
462 * @param limit of bytes beyond which we will shrink buffers when idle.
463 */
464 void setIdleBufferMemLimit(size_t limit) {
465 idleBufferMemLimit_ = limit;
466 }
467
David Reiss01fe1532010-03-09 05:19:25 +0000468 /**
469 * Return an initialized connection object. Creates or recovers from
470 * pool a TConnection and initializes it with the provided socket FD
471 * and flags.
472 *
473 * @param socket FD of socket associated with this connection.
474 * @param flags initial lib_event flags for this connection.
David Reiss105961d2010-10-06 17:10:17 +0000475 * @param addr the sockaddr of the client
476 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000477 * @return pointer to initialized TConnection object.
478 */
David Reiss105961d2010-10-06 17:10:17 +0000479 TConnection* createConnection(int socket, short flags,
480 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000481
David Reiss01fe1532010-03-09 05:19:25 +0000482 /**
483 * Returns a connection to pool or deletion. If the connection pool
484 * (a stack) isn't full, place the connection object on it, otherwise
485 * just delete it.
486 *
487 * @param connection the TConection being returned.
488 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000489 void returnConnection(TConnection* connection);
490
David Reiss01fe1532010-03-09 05:19:25 +0000491 /**
David Reiss068f4162010-03-09 05:19:45 +0000492 * Callback function that the threadmanager calls when a task reaches
493 * its expiration time. It is needed to clean up the expired connection.
494 *
495 * @param task the runnable associated with the expired task.
496 */
497 void expireClose(boost::shared_ptr<Runnable> task);
498
499 /**
David Reiss01fe1532010-03-09 05:19:25 +0000500 * C-callable event handler for listener events. Provides a callback
501 * that libevent can understand which invokes server->handleEvent().
502 *
503 * @param fd the descriptor the event occured on.
504 * @param which the flags associated with the event.
505 * @param v void* callback arg where we placed TNonblockingServer's "this".
506 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000507 static void eventHandler(int fd, short which, void* v) {
508 ((TNonblockingServer*)v)->handleEvent(fd, which);
509 }
510
David Reiss01fe1532010-03-09 05:19:25 +0000511 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000512 void listenSocket();
513
David Reiss01fe1532010-03-09 05:19:25 +0000514 /**
515 * Takes a socket created by listenSocket() and sets various options on it
516 * to prepare for use in the server.
517 *
518 * @param fd descriptor of socket to be initialized/
519 */
Mark Slee79b16942007-11-26 19:05:29 +0000520 void listenSocket(int fd);
521
David Reiss01fe1532010-03-09 05:19:25 +0000522 /// Create the pipe used to notify I/O process of task completion.
523 void createNotificationPipe();
524
525 /**
526 * Get notification pipe send descriptor.
527 *
528 * @return write fd for pipe.
529 */
530 int getNotificationSendFD() const {
531 return notificationPipeFDs_[1];
532 }
533
534 /**
535 * Get notification pipe receive descriptor.
536 *
537 * @return read fd of pipe.
538 */
539 int getNotificationRecvFD() const {
540 return notificationPipeFDs_[0];
541 }
542
543 /**
544 * Register the core libevent events onto the proper base.
545 *
546 * @param base pointer to the event base to be initialized.
547 */
Mark Slee79b16942007-11-26 19:05:29 +0000548 void registerEvents(event_base* base);
549
David Reiss01fe1532010-03-09 05:19:25 +0000550 /**
551 * Main workhorse function, starts up the server listening on a port and
552 * loops over the libevent handler.
553 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000554 void serve();
555};
556
David Reiss01fe1532010-03-09 05:19:25 +0000557/// Two states for sockets, recv and send mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000558enum TSocketState {
559 SOCKET_RECV,
560 SOCKET_SEND
561};
562
563/**
David Reiss01fe1532010-03-09 05:19:25 +0000564 * Five states for the nonblocking servr:
Mark Slee2f6404d2006-10-10 01:37:40 +0000565 * 1) initialize
566 * 2) read 4 byte frame size
567 * 3) read frame of data
568 * 4) send back data (if any)
David Reiss01fe1532010-03-09 05:19:25 +0000569 * 5) force immediate connection close
Mark Slee2f6404d2006-10-10 01:37:40 +0000570 */
571enum TAppState {
572 APP_INIT,
573 APP_READ_FRAME_SIZE,
574 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000575 APP_WAIT_TASK,
David Reiss01fe1532010-03-09 05:19:25 +0000576 APP_SEND_RESULT,
577 APP_CLOSE_CONNECTION
Mark Slee2f6404d2006-10-10 01:37:40 +0000578};
579
580/**
581 * Represents a connection that is handled via libevent. This connection
582 * essentially encapsulates a socket that has some associated libevent state.
583 */
David Reiss105961d2010-10-06 17:10:17 +0000584 class TConnection {
Mark Slee2f6404d2006-10-10 01:37:40 +0000585 private:
586
David Reiss01fe1532010-03-09 05:19:25 +0000587 /// Starting size for new connection buffer
588 static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
Mark Sleee02385b2007-06-09 01:21:16 +0000589
David Reiss01fe1532010-03-09 05:19:25 +0000590 /// Server handle
Mark Slee2f6404d2006-10-10 01:37:40 +0000591 TNonblockingServer* server_;
592
David Reiss105961d2010-10-06 17:10:17 +0000593 /// Object wrapping network socket
594 boost::shared_ptr<TSocket> tSocket_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000595
David Reiss01fe1532010-03-09 05:19:25 +0000596 /// Libevent object
Mark Slee2f6404d2006-10-10 01:37:40 +0000597 struct event event_;
598
David Reiss01fe1532010-03-09 05:19:25 +0000599 /// Libevent flags
Mark Slee2f6404d2006-10-10 01:37:40 +0000600 short eventFlags_;
601
David Reiss01fe1532010-03-09 05:19:25 +0000602 /// Socket mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000603 TSocketState socketState_;
604
David Reiss01fe1532010-03-09 05:19:25 +0000605 /// Application state
Mark Slee2f6404d2006-10-10 01:37:40 +0000606 TAppState appState_;
607
David Reiss01fe1532010-03-09 05:19:25 +0000608 /// How much data needed to read
Mark Slee2f6404d2006-10-10 01:37:40 +0000609 uint32_t readWant_;
610
David Reiss01fe1532010-03-09 05:19:25 +0000611 /// Where in the read buffer are we
Mark Slee2f6404d2006-10-10 01:37:40 +0000612 uint32_t readBufferPos_;
613
David Reiss01fe1532010-03-09 05:19:25 +0000614 /// Read buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000615 uint8_t* readBuffer_;
616
David Reiss01fe1532010-03-09 05:19:25 +0000617 /// Read buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000618 uint32_t readBufferSize_;
619
David Reiss01fe1532010-03-09 05:19:25 +0000620 /// Write buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000621 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000622
David Reiss01fe1532010-03-09 05:19:25 +0000623 /// Write buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000624 uint32_t writeBufferSize_;
625
David Reiss01fe1532010-03-09 05:19:25 +0000626 /// How far through writing are we?
Mark Slee2f6404d2006-10-10 01:37:40 +0000627 uint32_t writeBufferPos_;
628
David Reiss01fe1532010-03-09 05:19:25 +0000629 /// How many times have we read since our last buffer reset?
Kevin Clark5ace1782009-03-04 21:10:58 +0000630 uint32_t numReadsSinceReset_;
631
David Reiss01fe1532010-03-09 05:19:25 +0000632 /// How many times have we written since our last buffer reset?
Kevin Clark5ace1782009-03-04 21:10:58 +0000633 uint32_t numWritesSinceReset_;
634
David Reiss01fe1532010-03-09 05:19:25 +0000635 /// Task handle
Mark Sleee02385b2007-06-09 01:21:16 +0000636 int taskHandle_;
637
David Reiss01fe1532010-03-09 05:19:25 +0000638 /// Task event
Mark Sleee02385b2007-06-09 01:21:16 +0000639 struct event taskEvent_;
640
David Reiss01fe1532010-03-09 05:19:25 +0000641 /// Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000642 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000643
David Reiss01fe1532010-03-09 05:19:25 +0000644 /// Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000645 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000646
David Reiss01fe1532010-03-09 05:19:25 +0000647 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000648 boost::shared_ptr<TTransport> factoryInputTransport_;
649 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000650
David Reiss01fe1532010-03-09 05:19:25 +0000651 /// Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000652 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000653
David Reiss01fe1532010-03-09 05:19:25 +0000654 /// Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000655 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000656
David Reiss105961d2010-10-06 17:10:17 +0000657 /// Server event handler, if any
658 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
659
660 /// Thrift call context, if any
661 void *connectionContext_;
662
David Reiss01fe1532010-03-09 05:19:25 +0000663 /// Go into read mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000664 void setRead() {
665 setFlags(EV_READ | EV_PERSIST);
666 }
667
David Reiss01fe1532010-03-09 05:19:25 +0000668 /// Go into write mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000669 void setWrite() {
670 setFlags(EV_WRITE | EV_PERSIST);
671 }
672
David Reiss01fe1532010-03-09 05:19:25 +0000673 /// Set socket idle
Mark Slee402ee282007-08-23 01:43:20 +0000674 void setIdle() {
675 setFlags(0);
676 }
677
David Reiss01fe1532010-03-09 05:19:25 +0000678 /**
679 * Set event flags for this connection.
680 *
681 * @param eventFlags flags we pass to libevent for the connection.
682 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000683 void setFlags(short eventFlags);
684
David Reiss01fe1532010-03-09 05:19:25 +0000685 /**
686 * Libevent handler called (via our static wrapper) when the connection
687 * socket had something happen. Rather than use the flags libevent passed,
688 * we use the connection state to determine whether we need to read or
689 * write the socket.
690 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000691 void workSocket();
692
David Reiss01fe1532010-03-09 05:19:25 +0000693 /// Close this connection and free or reset its resources.
Mark Slee2f6404d2006-10-10 01:37:40 +0000694 void close();
695
696 public:
697
David Reiss01fe1532010-03-09 05:19:25 +0000698 class Task;
699
700 /// Constructor
David Reiss105961d2010-10-06 17:10:17 +0000701 TConnection(int socket, short eventFlags, TNonblockingServer *s,
702 const sockaddr* addr, socklen_t addrLen) {
David Reiss01fe1532010-03-09 05:19:25 +0000703 readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
Mark Slee2f6404d2006-10-10 01:37:40 +0000704 if (readBuffer_ == NULL) {
T Jake Lucianib5e62212009-01-31 22:36:20 +0000705 throw new apache::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000706 }
David Reiss01fe1532010-03-09 05:19:25 +0000707 readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
Mark Slee79b16942007-11-26 19:05:29 +0000708
Kevin Clark5ace1782009-03-04 21:10:58 +0000709 numReadsSinceReset_ = 0;
710 numWritesSinceReset_ = 0;
711
Mark Slee2f6404d2006-10-10 01:37:40 +0000712 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000713 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000714 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000715 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
716 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
David Reiss105961d2010-10-06 17:10:17 +0000717 tSocket_.reset(new TSocket());
Mark Slee79b16942007-11-26 19:05:29 +0000718
David Reiss105961d2010-10-06 17:10:17 +0000719 init(socket, eventFlags, s, addr, addrLen);
David Reiss1997f102008-04-29 00:29:41 +0000720 server_->incrementNumConnections();
721 }
722
723 ~TConnection() {
David Reiss472fffb2010-03-09 05:20:24 +0000724 std::free(readBuffer_);
David Reissc17fe6b2008-04-29 00:29:43 +0000725 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000726 }
727
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000728 /**
729 * Check read buffer against a given limit and shrink it if exceeded.
730 *
731 * @param limit we limit buffer size to.
732 */
David Reiss01fe1532010-03-09 05:19:25 +0000733 void checkIdleBufferMemLimit(size_t limit);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000734
David Reiss01fe1532010-03-09 05:19:25 +0000735 /// Initialize
David Reiss105961d2010-10-06 17:10:17 +0000736 void init(int socket, short eventFlags, TNonblockingServer *s,
737 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000738
David Reiss01fe1532010-03-09 05:19:25 +0000739 /**
740 * This is called when the application transitions from one state into
741 * another. This means that it has finished writing the data that it needed
742 * to, or finished receiving the data that it needed to.
743 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000744 void transition();
745
David Reiss01fe1532010-03-09 05:19:25 +0000746 /**
747 * C-callable event handler for connection events. Provides a callback
748 * that libevent can understand which invokes connection_->workSocket().
749 *
750 * @param fd the descriptor the event occured on.
751 * @param which the flags associated with the event.
752 * @param v void* callback arg where we placed TConnection's "this".
753 */
Mark Sleea8de4892008-02-09 00:02:26 +0000754 static void eventHandler(int fd, short /* which */, void* v) {
David Reiss105961d2010-10-06 17:10:17 +0000755 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
Mark Slee2f6404d2006-10-10 01:37:40 +0000756 ((TConnection*)v)->workSocket();
757 }
Mark Slee79b16942007-11-26 19:05:29 +0000758
David Reiss01fe1532010-03-09 05:19:25 +0000759 /**
760 * C-callable event handler for signaling task completion. Provides a
761 * callback that libevent can understand that will read a connection
762 * object's address from a pipe and call connection->transition() for
763 * that object.
764 *
765 * @param fd the descriptor the event occured on.
766 */
767 static void taskHandler(int fd, short /* which */, void* /* v */) {
768 TConnection* connection;
David Reiss83b8fda2010-03-09 05:19:34 +0000769 ssize_t nBytes;
770 while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*)))
771 == sizeof(TConnection*)) {
772 connection->transition();
Mark Sleee02385b2007-06-09 01:21:16 +0000773 }
David Reiss83b8fda2010-03-09 05:19:34 +0000774 if (nBytes > 0) {
775 throw TException("TConnection::taskHandler unexpected partial read");
776 }
777 if (errno != EWOULDBLOCK && errno != EAGAIN) {
778 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
779 }
Mark Sleee02385b2007-06-09 01:21:16 +0000780 }
781
David Reiss01fe1532010-03-09 05:19:25 +0000782 /**
783 * Notification to server that processing has ended on this request.
784 * Can be called either when processing is completed or when a waiting
785 * task has been preemptively terminated (on overload).
786 *
David Reiss9e8073c2010-03-09 05:19:39 +0000787 * @return true if successful, false if unable to notify (check errno).
David Reiss01fe1532010-03-09 05:19:25 +0000788 */
789 bool notifyServer() {
790 TConnection* connection = this;
791 if (write(server_->getNotificationSendFD(), (const void*)&connection,
792 sizeof(TConnection*)) != sizeof(TConnection*)) {
793 return false;
794 }
795
796 return true;
797 }
798
799 /// Force connection shutdown for this connection.
800 void forceClose() {
801 appState_ = APP_CLOSE_CONNECTION;
802 if (!notifyServer()) {
803 throw TException("TConnection::forceClose: failed write on notify pipe");
804 }
805 }
806
807 /// return the server this connection was initialized for.
808 TNonblockingServer* getServer() {
809 return server_;
810 }
811
812 /// get state of connection.
813 TAppState getState() {
814 return appState_;
815 }
David Reiss105961d2010-10-06 17:10:17 +0000816
817 /// return the TSocket transport wrapping this network connection
818 boost::shared_ptr<TSocket> getTSocket() const {
819 return tSocket_;
820 }
821
822 /// return the server event handler if any
823 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
824 return serverEventHandler_;
825 }
826
827 /// return the Thrift connection context if any
828 void* getConnectionContext() {
829 return connectionContext_;
830 }
831
Mark Slee2f6404d2006-10-10 01:37:40 +0000832};
833
T Jake Lucianib5e62212009-01-31 22:36:20 +0000834}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000835
836#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_