blob: 2dd53629a428f2c7a1cf3407f570790e51781489 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
Mark Sleee02385b2007-06-09 01:21:16 +000026#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000027#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000028#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000029#include <string>
30#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000031#include <cstdlib>
David Reiss5105b2e2009-05-21 02:28:27 +000032#include <unistd.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000033#include <event.h>
34
T Jake Lucianib5e62212009-01-31 22:36:20 +000035namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000036
T Jake Lucianib5e62212009-01-31 22:36:20 +000037using apache::thrift::transport::TMemoryBuffer;
38using apache::thrift::protocol::TProtocol;
39using apache::thrift::concurrency::Runnable;
40using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000041
42// Forward declaration of class
43class TConnection;
44
45/**
46 * This is a non-blocking server in C++ for high performance that operates a
47 * single IO thread. It assumes that all incoming requests are framed with a
48 * 4 byte length indicator and writes out responses using the same framing.
49 *
50 * It does not use the TServerTransport framework, but rather has socket
51 * operations hardcoded for use with select.
52 *
Mark Slee2f6404d2006-10-10 01:37:40 +000053 */
David Reiss01fe1532010-03-09 05:19:25 +000054
55
56/// Overload condition actions.
57enum TOverloadAction {
58 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
59 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
60 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
61};
62
Mark Slee2f6404d2006-10-10 01:37:40 +000063class TNonblockingServer : public TServer {
64 private:
David Reiss01fe1532010-03-09 05:19:25 +000065 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000066 static const int LISTEN_BACKLOG = 1024;
67
David Reiss01fe1532010-03-09 05:19:25 +000068 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000069 static const size_t CONNECTION_STACK_LIMIT = 1024;
70
David Reiss01fe1532010-03-09 05:19:25 +000071 /// Maximum size of buffer allocated to idle connection
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000072 static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
73
David Reiss01fe1532010-03-09 05:19:25 +000074 /// Default limit on total number of connected sockets
75 static const int MAX_CONNECTIONS = INT_MAX;
76
77 /// Default limit on connections in handler/task processing
78 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
79
80 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +000081 int serverSocket_;
82
David Reiss01fe1532010-03-09 05:19:25 +000083 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +000084 int port_;
85
David Reiss01fe1532010-03-09 05:19:25 +000086 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +000087 boost::shared_ptr<ThreadManager> threadManager_;
88
David Reiss01fe1532010-03-09 05:19:25 +000089 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +000090 bool threadPoolProcessing_;
91
David Reiss01fe1532010-03-09 05:19:25 +000092 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +000093 event_base* eventBase_;
94
David Reiss01fe1532010-03-09 05:19:25 +000095 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +000096 struct event serverEvent_;
97
David Reiss01fe1532010-03-09 05:19:25 +000098 /// Event struct, used with eventBase_ for task completion notification
99 struct event notificationEvent_;
100
101 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000102 size_t numTConnections_;
103
David Reiss9e8073c2010-03-09 05:19:39 +0000104 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000105 size_t numActiveProcessors_;
106
107 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000108 size_t connectionStackLimit_;
109
David Reiss01fe1532010-03-09 05:19:25 +0000110 /// Limit for number of connections processing or waiting to process
111 size_t maxActiveProcessors_;
112
113 /// Limit for number of open connections
114 size_t maxConnections_;
115
David Reiss068f4162010-03-09 05:19:45 +0000116 /// Time in milliseconds before an unperformed task expires (0 == infinite).
117 int64_t taskExpireTime_;
118
David Reiss01fe1532010-03-09 05:19:25 +0000119 /**
120 * Hysteresis for overload state. This is the fraction of the overload
121 * value that needs to be reached before the overload state is cleared;
122 * must be <= 1.0.
123 */
124 double overloadHysteresis_;
125
126 /// Action to take when we're overloaded.
127 TOverloadAction overloadAction_;
128
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000129 /**
130 * Max read buffer size for an idle connection. When we place an idle
131 * TConnection into connectionStack_, we insure that its read buffer is
132 * reduced to this size to insure that idle connections don't hog memory.
133 */
David Reiss01fe1532010-03-09 05:19:25 +0000134 size_t idleBufferMemLimit_;
135
136 /// Set if we are currently in an overloaded state.
137 bool overloaded_;
138
139 /// Count of connections dropped since overload started
140 uint32_t nConnectionsDropped_;
141
142 /// Count of connections dropped on overload since server started
143 uint64_t nTotalConnectionsDropped_;
144
145 /// File descriptors for pipe used for task completion notification.
146 int notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000147
Mark Slee2f6404d2006-10-10 01:37:40 +0000148 /**
149 * This is a stack of all the objects that have been created but that
150 * are NOT currently in use. When we close a connection, we place it on this
151 * stack so that the object can be reused later, rather than freeing the
152 * memory and reallocating a new object later.
153 */
154 std::stack<TConnection*> connectionStack_;
155
David Reiss01fe1532010-03-09 05:19:25 +0000156 /**
157 * Called when server socket had something happen. We accept all waiting
158 * client connections on listen socket fd and assign TConnection objects
159 * to handle those requests.
160 *
161 * @param fd the listen socket.
162 * @param which the event flag that triggered the handler.
163 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000164 void handleEvent(int fd, short which);
165
166 public:
Mark Slee5ea15f92007-03-05 22:55:59 +0000167 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +0000168 int port) :
169 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000170 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +0000171 port_(port),
dweatherford58985992007-06-19 23:10:19 +0000172 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +0000173 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000174 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000175 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000176 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000177 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
178 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000179 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000180 overloadHysteresis_(0.8),
181 overloadAction_(T_OVERLOAD_NO_ACTION),
182 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
183 overloaded_(false),
184 nConnectionsDropped_(0),
185 nTotalConnectionsDropped_(0) {}
Mark Sleef9373392007-01-24 19:41:57 +0000186
Mark Slee79b16942007-11-26 19:05:29 +0000187 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000188 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000189 int port,
190 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000191 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000192 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000193 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000194 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000195 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000196 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000197 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000198 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000199 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
200 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000201 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000202 overloadHysteresis_(0.8),
203 overloadAction_(T_OVERLOAD_NO_ACTION),
204 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
205 overloaded_(false),
206 nConnectionsDropped_(0),
207 nTotalConnectionsDropped_(0) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000208 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
209 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000210 setInputProtocolFactory(protocolFactory);
211 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000212 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000213 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000214
Mark Slee5ea15f92007-03-05 22:55:59 +0000215 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
216 boost::shared_ptr<TTransportFactory> inputTransportFactory,
217 boost::shared_ptr<TTransportFactory> outputTransportFactory,
218 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
219 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000220 int port,
221 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000222 TServer(processor),
David Reiss01fe1532010-03-09 05:19:25 +0000223 serverSocket_(-1),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000224 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000225 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000226 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000227 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000228 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000229 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000230 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
231 maxConnections_(MAX_CONNECTIONS),
David Reiss068f4162010-03-09 05:19:45 +0000232 taskExpireTime_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000233 overloadHysteresis_(0.8),
234 overloadAction_(T_OVERLOAD_NO_ACTION),
235 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
236 overloaded_(false),
237 nConnectionsDropped_(0),
David Reiss068f4162010-03-09 05:19:45 +0000238 nTotalConnectionsDropped_(0) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000239 setInputTransportFactory(inputTransportFactory);
240 setOutputTransportFactory(outputTransportFactory);
241 setInputProtocolFactory(inputProtocolFactory);
242 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000243 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000244 }
Mark Slee79b16942007-11-26 19:05:29 +0000245
David Reiss8ede8182010-09-02 15:26:28 +0000246 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000247
David Reiss068f4162010-03-09 05:19:45 +0000248 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000249
David Reiss1997f102008-04-29 00:29:41 +0000250 boost::shared_ptr<ThreadManager> getThreadManager() {
251 return threadManager_;
252 }
253
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000254 /**
255 * Get the maximum number of unused TConnection we will hold in reserve.
256 *
257 * @return the current limit on TConnection pool size.
258 */
David Reiss260fa932009-04-02 23:51:39 +0000259 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000260 return connectionStackLimit_;
261 }
262
263 /**
264 * Set the maximum number of unused TConnection we will hold in reserve.
265 *
266 * @param sz the new limit for TConnection pool size.
267 */
David Reiss260fa932009-04-02 23:51:39 +0000268 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000269 connectionStackLimit_ = sz;
270 }
271
Mark Slee79b16942007-11-26 19:05:29 +0000272 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000273 return threadPoolProcessing_;
274 }
275
276 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000277 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000278 }
279
Mark Slee79b16942007-11-26 19:05:29 +0000280 event_base* getEventBase() const {
281 return eventBase_;
282 }
283
David Reiss01fe1532010-03-09 05:19:25 +0000284 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000285 void incrementNumConnections() {
286 ++numTConnections_;
287 }
288
David Reiss01fe1532010-03-09 05:19:25 +0000289 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000290 void decrementNumConnections() {
291 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000292 }
293
David Reiss01fe1532010-03-09 05:19:25 +0000294 /**
295 * Return the count of sockets currently connected to.
296 *
297 * @return count of connected sockets.
298 */
299 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000300 return numTConnections_;
301 }
302
David Reiss01fe1532010-03-09 05:19:25 +0000303 /**
304 * Return the count of connection objects allocated but not in use.
305 *
306 * @return count of idle connection objects.
307 */
308 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000309 return connectionStack_.size();
310 }
311
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000312 /**
David Reiss01fe1532010-03-09 05:19:25 +0000313 * Return count of number of connections which are currently processing.
314 * This is defined as a connection where all data has been received and
315 * either assigned a task (when threading) or passed to a handler (when
316 * not threading), and where the handler has not yet returned.
317 *
318 * @return # of connections currently processing.
319 */
320 size_t getNumActiveProcessors() const {
321 return numActiveProcessors_;
322 }
323
324 /// Increment the count of connections currently processing.
325 void incrementActiveProcessors() {
326 ++numActiveProcessors_;
327 }
328
329 /// Decrement the count of connections currently processing.
330 void decrementActiveProcessors() {
331 if (numActiveProcessors_ > 0) {
332 --numActiveProcessors_;
333 }
334 }
335
336 /**
337 * Get the maximum # of connections allowed before overload.
338 *
339 * @return current setting.
340 */
341 size_t getMaxConnections() const {
342 return maxConnections_;
343 }
344
345 /**
346 * Set the maximum # of connections allowed before overload.
347 *
348 * @param maxConnections new setting for maximum # of connections.
349 */
350 void setMaxConnections(size_t maxConnections) {
351 maxConnections_ = maxConnections;
352 }
353
354 /**
355 * Get the maximum # of connections waiting in handler/task before overload.
356 *
357 * @return current setting.
358 */
359 size_t getMaxActiveProcessors() const {
360 return maxActiveProcessors_;
361 }
362
363 /**
364 * Set the maximum # of connections waiting in handler/task before overload.
365 *
366 * @param maxActiveProcessors new setting for maximum # of active processes.
367 */
368 void setMaxActiveProcessors(size_t maxActiveProcessors) {
369 maxActiveProcessors_ = maxActiveProcessors;
370 }
371
372 /**
373 * Get fraction of maximum limits before an overload condition is cleared.
374 *
375 * @return hysteresis fraction
376 */
377 double getOverloadHysteresis() const {
378 return overloadHysteresis_;
379 }
380
381 /**
382 * Set fraction of maximum limits before an overload condition is cleared.
383 * A good value would probably be between 0.5 and 0.9.
384 *
385 * @param hysteresisFraction fraction <= 1.0.
386 */
387 void setOverloadHysteresis(double hysteresisFraction) {
388 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
389 overloadHysteresis_ = hysteresisFraction;
390 }
391 }
392
393 /**
394 * Get the action the server will take on overload.
395 *
396 * @return a TOverloadAction enum value for the currently set action.
397 */
398 TOverloadAction getOverloadAction() const {
399 return overloadAction_;
400 }
401
402 /**
403 * Set the action the server is to take on overload.
404 *
405 * @param overloadAction a TOverloadAction enum value for the action.
406 */
407 void setOverloadAction(TOverloadAction overloadAction) {
408 overloadAction_ = overloadAction;
409 }
410
411 /**
David Reiss068f4162010-03-09 05:19:45 +0000412 * Get the time in milliseconds after which a task expires (0 == infinite).
413 *
414 * @return a 64-bit time in milliseconds.
415 */
416 int64_t getTaskExpireTime() const {
417 return taskExpireTime_;
418 }
419
420 /**
421 * Set the time in milliseconds after which a task expires (0 == infinite).
422 *
423 * @param taskExpireTime a 64-bit time in milliseconds.
424 */
425 void setTaskExpireTime(int64_t taskExpireTime) {
426 taskExpireTime_ = taskExpireTime;
427 }
428
429 /**
David Reiss01fe1532010-03-09 05:19:25 +0000430 * Determine if the server is currently overloaded.
431 * This function checks the maximums for open connections and connections
432 * currently in processing, and sets an overload condition if they are
433 * exceeded. The overload will persist until both values are below the
434 * current hysteresis fraction of their maximums.
435 *
436 * @return true if an overload condition exists, false if not.
437 */
438 bool serverOverloaded();
439
440 /** Pop and discard next task on threadpool wait queue.
441 *
442 * @return true if a task was discarded, false if the wait queue was empty.
443 */
444 bool drainPendingTask();
445
446 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000447 * Get the maximum limit of memory allocated to idle TConnection objects.
448 *
449 * @return # bytes beyond which we will shrink buffers when idle.
450 */
451 size_t getIdleBufferMemLimit() const {
452 return idleBufferMemLimit_;
453 }
454
455 /**
456 * Set the maximum limit of memory allocated to idle TConnection objects.
457 * If a TConnection object goes idle with more than this much memory
458 * allocated to its buffer, we shrink it to this value.
459 *
460 * @param limit of bytes beyond which we will shrink buffers when idle.
461 */
462 void setIdleBufferMemLimit(size_t limit) {
463 idleBufferMemLimit_ = limit;
464 }
465
David Reiss01fe1532010-03-09 05:19:25 +0000466 /**
467 * Return an initialized connection object. Creates or recovers from
468 * pool a TConnection and initializes it with the provided socket FD
469 * and flags.
470 *
471 * @param socket FD of socket associated with this connection.
472 * @param flags initial lib_event flags for this connection.
473 * @return pointer to initialized TConnection object.
474 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000475 TConnection* createConnection(int socket, short flags);
476
David Reiss01fe1532010-03-09 05:19:25 +0000477 /**
478 * Returns a connection to pool or deletion. If the connection pool
479 * (a stack) isn't full, place the connection object on it, otherwise
480 * just delete it.
481 *
482 * @param connection the TConection being returned.
483 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000484 void returnConnection(TConnection* connection);
485
David Reiss01fe1532010-03-09 05:19:25 +0000486 /**
David Reiss068f4162010-03-09 05:19:45 +0000487 * Callback function that the threadmanager calls when a task reaches
488 * its expiration time. It is needed to clean up the expired connection.
489 *
490 * @param task the runnable associated with the expired task.
491 */
492 void expireClose(boost::shared_ptr<Runnable> task);
493
494 /**
David Reiss01fe1532010-03-09 05:19:25 +0000495 * C-callable event handler for listener events. Provides a callback
496 * that libevent can understand which invokes server->handleEvent().
497 *
498 * @param fd the descriptor the event occured on.
499 * @param which the flags associated with the event.
500 * @param v void* callback arg where we placed TNonblockingServer's "this".
501 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000502 static void eventHandler(int fd, short which, void* v) {
503 ((TNonblockingServer*)v)->handleEvent(fd, which);
504 }
505
David Reiss01fe1532010-03-09 05:19:25 +0000506 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000507 void listenSocket();
508
David Reiss01fe1532010-03-09 05:19:25 +0000509 /**
510 * Takes a socket created by listenSocket() and sets various options on it
511 * to prepare for use in the server.
512 *
513 * @param fd descriptor of socket to be initialized/
514 */
Mark Slee79b16942007-11-26 19:05:29 +0000515 void listenSocket(int fd);
516
David Reiss01fe1532010-03-09 05:19:25 +0000517 /// Create the pipe used to notify I/O process of task completion.
518 void createNotificationPipe();
519
520 /**
521 * Get notification pipe send descriptor.
522 *
523 * @return write fd for pipe.
524 */
525 int getNotificationSendFD() const {
526 return notificationPipeFDs_[1];
527 }
528
529 /**
530 * Get notification pipe receive descriptor.
531 *
532 * @return read fd of pipe.
533 */
534 int getNotificationRecvFD() const {
535 return notificationPipeFDs_[0];
536 }
537
538 /**
539 * Register the core libevent events onto the proper base.
540 *
541 * @param base pointer to the event base to be initialized.
542 */
Mark Slee79b16942007-11-26 19:05:29 +0000543 void registerEvents(event_base* base);
544
David Reiss01fe1532010-03-09 05:19:25 +0000545 /**
546 * Main workhorse function, starts up the server listening on a port and
547 * loops over the libevent handler.
548 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000549 void serve();
550};
551
David Reiss01fe1532010-03-09 05:19:25 +0000552/// Two states for sockets, recv and send mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000553enum TSocketState {
554 SOCKET_RECV,
555 SOCKET_SEND
556};
557
558/**
David Reiss01fe1532010-03-09 05:19:25 +0000559 * Five states for the nonblocking servr:
Mark Slee2f6404d2006-10-10 01:37:40 +0000560 * 1) initialize
561 * 2) read 4 byte frame size
562 * 3) read frame of data
563 * 4) send back data (if any)
David Reiss01fe1532010-03-09 05:19:25 +0000564 * 5) force immediate connection close
Mark Slee2f6404d2006-10-10 01:37:40 +0000565 */
566enum TAppState {
567 APP_INIT,
568 APP_READ_FRAME_SIZE,
569 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000570 APP_WAIT_TASK,
David Reiss01fe1532010-03-09 05:19:25 +0000571 APP_SEND_RESULT,
572 APP_CLOSE_CONNECTION
Mark Slee2f6404d2006-10-10 01:37:40 +0000573};
574
575/**
576 * Represents a connection that is handled via libevent. This connection
577 * essentially encapsulates a socket that has some associated libevent state.
578 */
579class TConnection {
580 private:
581
David Reiss01fe1532010-03-09 05:19:25 +0000582 /// Starting size for new connection buffer
583 static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
Mark Sleee02385b2007-06-09 01:21:16 +0000584
David Reiss01fe1532010-03-09 05:19:25 +0000585 /// Server handle
Mark Slee2f6404d2006-10-10 01:37:40 +0000586 TNonblockingServer* server_;
587
David Reiss01fe1532010-03-09 05:19:25 +0000588 /// Socket handle
Mark Slee2f6404d2006-10-10 01:37:40 +0000589 int socket_;
590
David Reiss01fe1532010-03-09 05:19:25 +0000591 /// Libevent object
Mark Slee2f6404d2006-10-10 01:37:40 +0000592 struct event event_;
593
David Reiss01fe1532010-03-09 05:19:25 +0000594 /// Libevent flags
Mark Slee2f6404d2006-10-10 01:37:40 +0000595 short eventFlags_;
596
David Reiss01fe1532010-03-09 05:19:25 +0000597 /// Socket mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000598 TSocketState socketState_;
599
David Reiss01fe1532010-03-09 05:19:25 +0000600 /// Application state
Mark Slee2f6404d2006-10-10 01:37:40 +0000601 TAppState appState_;
602
David Reiss01fe1532010-03-09 05:19:25 +0000603 /// How much data needed to read
Mark Slee2f6404d2006-10-10 01:37:40 +0000604 uint32_t readWant_;
605
David Reiss01fe1532010-03-09 05:19:25 +0000606 /// Where in the read buffer are we
Mark Slee2f6404d2006-10-10 01:37:40 +0000607 uint32_t readBufferPos_;
608
David Reiss01fe1532010-03-09 05:19:25 +0000609 /// Read buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000610 uint8_t* readBuffer_;
611
David Reiss01fe1532010-03-09 05:19:25 +0000612 /// Read buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000613 uint32_t readBufferSize_;
614
David Reiss01fe1532010-03-09 05:19:25 +0000615 /// Write buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000616 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000617
David Reiss01fe1532010-03-09 05:19:25 +0000618 /// Write buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000619 uint32_t writeBufferSize_;
620
David Reiss01fe1532010-03-09 05:19:25 +0000621 /// How far through writing are we?
Mark Slee2f6404d2006-10-10 01:37:40 +0000622 uint32_t writeBufferPos_;
623
David Reiss01fe1532010-03-09 05:19:25 +0000624 /// How many times have we read since our last buffer reset?
Kevin Clark5ace1782009-03-04 21:10:58 +0000625 uint32_t numReadsSinceReset_;
626
David Reiss01fe1532010-03-09 05:19:25 +0000627 /// How many times have we written since our last buffer reset?
Kevin Clark5ace1782009-03-04 21:10:58 +0000628 uint32_t numWritesSinceReset_;
629
David Reiss01fe1532010-03-09 05:19:25 +0000630 /// Task handle
Mark Sleee02385b2007-06-09 01:21:16 +0000631 int taskHandle_;
632
David Reiss01fe1532010-03-09 05:19:25 +0000633 /// Task event
Mark Sleee02385b2007-06-09 01:21:16 +0000634 struct event taskEvent_;
635
David Reiss01fe1532010-03-09 05:19:25 +0000636 /// Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000637 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000638
David Reiss01fe1532010-03-09 05:19:25 +0000639 /// Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000640 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000641
David Reiss01fe1532010-03-09 05:19:25 +0000642 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000643 boost::shared_ptr<TTransport> factoryInputTransport_;
644 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000645
David Reiss01fe1532010-03-09 05:19:25 +0000646 /// Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000647 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000648
David Reiss01fe1532010-03-09 05:19:25 +0000649 /// Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000650 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000651
David Reiss01fe1532010-03-09 05:19:25 +0000652 /// Go into read mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000653 void setRead() {
654 setFlags(EV_READ | EV_PERSIST);
655 }
656
David Reiss01fe1532010-03-09 05:19:25 +0000657 /// Go into write mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000658 void setWrite() {
659 setFlags(EV_WRITE | EV_PERSIST);
660 }
661
David Reiss01fe1532010-03-09 05:19:25 +0000662 /// Set socket idle
Mark Slee402ee282007-08-23 01:43:20 +0000663 void setIdle() {
664 setFlags(0);
665 }
666
David Reiss01fe1532010-03-09 05:19:25 +0000667 /**
668 * Set event flags for this connection.
669 *
670 * @param eventFlags flags we pass to libevent for the connection.
671 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000672 void setFlags(short eventFlags);
673
David Reiss01fe1532010-03-09 05:19:25 +0000674 /**
675 * Libevent handler called (via our static wrapper) when the connection
676 * socket had something happen. Rather than use the flags libevent passed,
677 * we use the connection state to determine whether we need to read or
678 * write the socket.
679 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000680 void workSocket();
681
David Reiss01fe1532010-03-09 05:19:25 +0000682 /// Close this connection and free or reset its resources.
Mark Slee2f6404d2006-10-10 01:37:40 +0000683 void close();
684
685 public:
686
David Reiss01fe1532010-03-09 05:19:25 +0000687 class Task;
688
689 /// Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000690 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
David Reiss01fe1532010-03-09 05:19:25 +0000691 readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
Mark Slee2f6404d2006-10-10 01:37:40 +0000692 if (readBuffer_ == NULL) {
T Jake Lucianib5e62212009-01-31 22:36:20 +0000693 throw new apache::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000694 }
David Reiss01fe1532010-03-09 05:19:25 +0000695 readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
Mark Slee79b16942007-11-26 19:05:29 +0000696
Kevin Clark5ace1782009-03-04 21:10:58 +0000697 numReadsSinceReset_ = 0;
698 numWritesSinceReset_ = 0;
699
Mark Slee2f6404d2006-10-10 01:37:40 +0000700 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000701 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000702 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000703 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
704 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000705
Mark Slee2f6404d2006-10-10 01:37:40 +0000706 init(socket, eventFlags, s);
David Reiss1997f102008-04-29 00:29:41 +0000707 server_->incrementNumConnections();
708 }
709
710 ~TConnection() {
David Reiss472fffb2010-03-09 05:20:24 +0000711 std::free(readBuffer_);
David Reissc17fe6b2008-04-29 00:29:43 +0000712 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000713 }
714
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000715 /**
716 * Check read buffer against a given limit and shrink it if exceeded.
717 *
718 * @param limit we limit buffer size to.
719 */
David Reiss01fe1532010-03-09 05:19:25 +0000720 void checkIdleBufferMemLimit(size_t limit);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000721
David Reiss01fe1532010-03-09 05:19:25 +0000722 /// Initialize
Mark Slee2f6404d2006-10-10 01:37:40 +0000723 void init(int socket, short eventFlags, TNonblockingServer *s);
724
David Reiss01fe1532010-03-09 05:19:25 +0000725 /**
726 * This is called when the application transitions from one state into
727 * another. This means that it has finished writing the data that it needed
728 * to, or finished receiving the data that it needed to.
729 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000730 void transition();
731
David Reiss01fe1532010-03-09 05:19:25 +0000732 /**
733 * C-callable event handler for connection events. Provides a callback
734 * that libevent can understand which invokes connection_->workSocket().
735 *
736 * @param fd the descriptor the event occured on.
737 * @param which the flags associated with the event.
738 * @param v void* callback arg where we placed TConnection's "this".
739 */
Mark Sleea8de4892008-02-09 00:02:26 +0000740 static void eventHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000741 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000742 ((TConnection*)v)->workSocket();
743 }
Mark Slee79b16942007-11-26 19:05:29 +0000744
David Reiss01fe1532010-03-09 05:19:25 +0000745 /**
746 * C-callable event handler for signaling task completion. Provides a
747 * callback that libevent can understand that will read a connection
748 * object's address from a pipe and call connection->transition() for
749 * that object.
750 *
751 * @param fd the descriptor the event occured on.
752 */
753 static void taskHandler(int fd, short /* which */, void* /* v */) {
754 TConnection* connection;
David Reiss83b8fda2010-03-09 05:19:34 +0000755 ssize_t nBytes;
756 while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*)))
757 == sizeof(TConnection*)) {
758 connection->transition();
Mark Sleee02385b2007-06-09 01:21:16 +0000759 }
David Reiss83b8fda2010-03-09 05:19:34 +0000760 if (nBytes > 0) {
761 throw TException("TConnection::taskHandler unexpected partial read");
762 }
763 if (errno != EWOULDBLOCK && errno != EAGAIN) {
764 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
765 }
Mark Sleee02385b2007-06-09 01:21:16 +0000766 }
767
David Reiss01fe1532010-03-09 05:19:25 +0000768 /**
769 * Notification to server that processing has ended on this request.
770 * Can be called either when processing is completed or when a waiting
771 * task has been preemptively terminated (on overload).
772 *
David Reiss9e8073c2010-03-09 05:19:39 +0000773 * @return true if successful, false if unable to notify (check errno).
David Reiss01fe1532010-03-09 05:19:25 +0000774 */
775 bool notifyServer() {
776 TConnection* connection = this;
777 if (write(server_->getNotificationSendFD(), (const void*)&connection,
778 sizeof(TConnection*)) != sizeof(TConnection*)) {
779 return false;
780 }
781
782 return true;
783 }
784
785 /// Force connection shutdown for this connection.
786 void forceClose() {
787 appState_ = APP_CLOSE_CONNECTION;
788 if (!notifyServer()) {
789 throw TException("TConnection::forceClose: failed write on notify pipe");
790 }
791 }
792
793 /// return the server this connection was initialized for.
794 TNonblockingServer* getServer() {
795 return server_;
796 }
797
798 /// get state of connection.
799 TAppState getState() {
800 return appState_;
801 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000802};
803
T Jake Lucianib5e62212009-01-31 22:36:20 +0000804}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000805
806#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_