blob: 21b8d95345190665ba646e4fd9f53127b96cd5fa [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
David Reiss105961d2010-10-06 17:10:17 +000026#include <transport/TSocket.h>
Mark Sleee02385b2007-06-09 01:21:16 +000027#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000028#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000029#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000030#include <string>
31#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000032#include <cstdlib>
David Reiss5105b2e2009-05-21 02:28:27 +000033#include <unistd.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000034#include <event.h>
35
T Jake Lucianib5e62212009-01-31 22:36:20 +000036namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000037
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000039using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000040using apache::thrift::protocol::TProtocol;
41using apache::thrift::concurrency::Runnable;
42using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000043
44// Forward declaration of class
45class TConnection;
46
Roger Meier30aae0c2011-07-08 12:23:31 +000047#ifdef LIBEVENT_VERSION_NUMBER
48#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
49#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
50#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
51#else
52// assume latest version 1 series
53#define LIBEVENT_VERSION_MAJOR 1
54#define LIBEVENT_VERSION_MINOR 14
55#define LIBEVENT_VERSION_REL 13
56#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
57#endif
58
59#if LIBEVENT_VERSION_NUMBER < 0x02000000
60 typedef int evutil_socket_t;
61#endif
62
63#ifndef SOCKOPT_CAST_T
64#define SOCKOPT_CAST_T void
65#endif
66
67template<class T>
68inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
69 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
70}
71
72template<class T>
73inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
74 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
75}
76
Mark Slee2f6404d2006-10-10 01:37:40 +000077/**
78 * This is a non-blocking server in C++ for high performance that operates a
79 * single IO thread. It assumes that all incoming requests are framed with a
80 * 4 byte length indicator and writes out responses using the same framing.
81 *
82 * It does not use the TServerTransport framework, but rather has socket
83 * operations hardcoded for use with select.
84 *
Mark Slee2f6404d2006-10-10 01:37:40 +000085 */
David Reiss01fe1532010-03-09 05:19:25 +000086
87
88/// Overload condition actions.
89enum TOverloadAction {
90 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
91 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
92 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
93};
94
Mark Slee2f6404d2006-10-10 01:37:40 +000095class TNonblockingServer : public TServer {
96 private:
David Reiss01fe1532010-03-09 05:19:25 +000097 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000098 static const int LISTEN_BACKLOG = 1024;
99
David Reiss01fe1532010-03-09 05:19:25 +0000100 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000101 static const size_t CONNECTION_STACK_LIMIT = 1024;
102
David Reiss01fe1532010-03-09 05:19:25 +0000103 /// Default limit on total number of connected sockets
104 static const int MAX_CONNECTIONS = INT_MAX;
105
106 /// Default limit on connections in handler/task processing
107 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
108
David Reiss89a12942010-10-06 17:10:52 +0000109 /// Default size of write buffer
110 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
111
David Reiss54bec5d2010-10-06 17:10:45 +0000112 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
113 static const int IDLE_READ_BUFFER_LIMIT = 1024;
114
115 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
116 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
117
118 /// # of calls before resizing oversized buffers (0 = check only on close)
119 static const int RESIZE_BUFFER_EVERY_N = 512;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +0000122 int serverSocket_;
123
David Reiss01fe1532010-03-09 05:19:25 +0000124 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000125 int port_;
126
David Reiss01fe1532010-03-09 05:19:25 +0000127 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000128 boost::shared_ptr<ThreadManager> threadManager_;
129
David Reiss01fe1532010-03-09 05:19:25 +0000130 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000131 bool threadPoolProcessing_;
132
David Reiss01fe1532010-03-09 05:19:25 +0000133 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +0000134 event_base* eventBase_;
Roger Meierc1905582011-08-02 23:37:36 +0000135 bool ownEventBase_;
Mark Slee79b16942007-11-26 19:05:29 +0000136
David Reiss01fe1532010-03-09 05:19:25 +0000137 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +0000138 struct event serverEvent_;
139
David Reiss01fe1532010-03-09 05:19:25 +0000140 /// Event struct, used with eventBase_ for task completion notification
141 struct event notificationEvent_;
142
143 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000144 size_t numTConnections_;
145
David Reiss9e8073c2010-03-09 05:19:39 +0000146 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000147 size_t numActiveProcessors_;
148
149 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000150 size_t connectionStackLimit_;
151
David Reiss01fe1532010-03-09 05:19:25 +0000152 /// Limit for number of connections processing or waiting to process
153 size_t maxActiveProcessors_;
154
155 /// Limit for number of open connections
156 size_t maxConnections_;
157
David Reiss068f4162010-03-09 05:19:45 +0000158 /// Time in milliseconds before an unperformed task expires (0 == infinite).
159 int64_t taskExpireTime_;
160
David Reiss01fe1532010-03-09 05:19:25 +0000161 /**
162 * Hysteresis for overload state. This is the fraction of the overload
163 * value that needs to be reached before the overload state is cleared;
164 * must be <= 1.0.
165 */
166 double overloadHysteresis_;
167
168 /// Action to take when we're overloaded.
169 TOverloadAction overloadAction_;
170
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000171 /**
David Reiss89a12942010-10-06 17:10:52 +0000172 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
173 * and found to be exceeded, reinitialized) to this size.
174 */
175 size_t writeBufferDefaultSize_;
176
177 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000178 * Max read buffer size for an idle TConnection. When we place an idle
179 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000180 * we will free the buffer (such that it will be reinitialized by the next
181 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000182 */
David Reiss54bec5d2010-10-06 17:10:45 +0000183 size_t idleReadBufferLimit_;
184
185 /**
186 * Max write buffer size for an idle connection. When we place an idle
187 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
188 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000189 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
190 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000191 */
192 size_t idleWriteBufferLimit_;
193
194 /**
195 * Every N calls we check the buffer size limits on a connected TConnection.
196 * 0 disables (i.e. the checks are only done when a connection closes).
197 */
198 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000199
200 /// Set if we are currently in an overloaded state.
201 bool overloaded_;
202
203 /// Count of connections dropped since overload started
204 uint32_t nConnectionsDropped_;
205
206 /// Count of connections dropped on overload since server started
207 uint64_t nTotalConnectionsDropped_;
208
209 /// File descriptors for pipe used for task completion notification.
Roger Meier30aae0c2011-07-08 12:23:31 +0000210 evutil_socket_t notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000211
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 /**
213 * This is a stack of all the objects that have been created but that
214 * are NOT currently in use. When we close a connection, we place it on this
215 * stack so that the object can be reused later, rather than freeing the
216 * memory and reallocating a new object later.
217 */
218 std::stack<TConnection*> connectionStack_;
219
David Reiss01fe1532010-03-09 05:19:25 +0000220 /**
221 * Called when server socket had something happen. We accept all waiting
222 * client connections on listen socket fd and assign TConnection objects
223 * to handle those requests.
224 *
225 * @param fd the listen socket.
226 * @param which the event flag that triggered the handler.
227 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000228 void handleEvent(int fd, short which);
229
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000230 void init(int port) {
231 serverSocket_ = -1;
232 port_ = port;
233 threadPoolProcessing_ = false;
234 eventBase_ = NULL;
235 ownEventBase_ = false;
236 numTConnections_ = 0;
237 numActiveProcessors_ = 0;
238 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
239 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
240 maxConnections_ = MAX_CONNECTIONS;
241 taskExpireTime_ = 0;
242 overloadHysteresis_ = 0.8;
243 overloadAction_ = T_OVERLOAD_NO_ACTION;
244 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
245 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
246 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
247 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
248 overloaded_ = false;
249 nConnectionsDropped_ = 0;
250 nTotalConnectionsDropped_ = 0;
251 }
Mark Sleef9373392007-01-24 19:41:57 +0000252
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000253 public:
254 TNonblockingServer(const boost::shared_ptr<TProcessor>& processor,
255 int port) :
256 TServer(processor) {
257 init(port);
258 }
259
260 TNonblockingServer(
261 const boost::shared_ptr<TProcessor>& processor,
262 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
263 int port,
264 const boost::shared_ptr<ThreadManager>& threadManager =
265 boost::shared_ptr<ThreadManager>()) :
266 TServer(processor) {
267
268 init(port);
269
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000270 setInputProtocolFactory(protocolFactory);
271 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000272 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000273 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000274
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000275 TNonblockingServer(
276 const boost::shared_ptr<TProcessor>& processor,
277 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
278 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
279 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
280 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
281 int port,
282 const boost::shared_ptr<ThreadManager>& threadManager =
283 boost::shared_ptr<ThreadManager>()) :
284 TServer(processor) {
285
286 init(port);
287
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000288 setInputTransportFactory(inputTransportFactory);
289 setOutputTransportFactory(outputTransportFactory);
290 setInputProtocolFactory(inputProtocolFactory);
291 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000292 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000293 }
Mark Slee79b16942007-11-26 19:05:29 +0000294
David Reiss8ede8182010-09-02 15:26:28 +0000295 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000296
David Reiss068f4162010-03-09 05:19:45 +0000297 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000298
David Reiss1997f102008-04-29 00:29:41 +0000299 boost::shared_ptr<ThreadManager> getThreadManager() {
300 return threadManager_;
301 }
302
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000303 /**
304 * Get the maximum number of unused TConnection we will hold in reserve.
305 *
306 * @return the current limit on TConnection pool size.
307 */
David Reiss260fa932009-04-02 23:51:39 +0000308 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000309 return connectionStackLimit_;
310 }
311
312 /**
313 * Set the maximum number of unused TConnection we will hold in reserve.
314 *
315 * @param sz the new limit for TConnection pool size.
316 */
David Reiss260fa932009-04-02 23:51:39 +0000317 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000318 connectionStackLimit_ = sz;
319 }
320
Mark Slee79b16942007-11-26 19:05:29 +0000321 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000322 return threadPoolProcessing_;
323 }
324
325 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000326 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000327 }
328
Mark Slee79b16942007-11-26 19:05:29 +0000329 event_base* getEventBase() const {
330 return eventBase_;
331 }
332
David Reiss01fe1532010-03-09 05:19:25 +0000333 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000334 void incrementNumConnections() {
335 ++numTConnections_;
336 }
337
David Reiss01fe1532010-03-09 05:19:25 +0000338 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000339 void decrementNumConnections() {
340 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000341 }
342
David Reiss01fe1532010-03-09 05:19:25 +0000343 /**
344 * Return the count of sockets currently connected to.
345 *
346 * @return count of connected sockets.
347 */
348 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000349 return numTConnections_;
350 }
351
David Reiss01fe1532010-03-09 05:19:25 +0000352 /**
353 * Return the count of connection objects allocated but not in use.
354 *
355 * @return count of idle connection objects.
356 */
357 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000358 return connectionStack_.size();
359 }
360
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000361 /**
David Reiss01fe1532010-03-09 05:19:25 +0000362 * Return count of number of connections which are currently processing.
363 * This is defined as a connection where all data has been received and
364 * either assigned a task (when threading) or passed to a handler (when
365 * not threading), and where the handler has not yet returned.
366 *
367 * @return # of connections currently processing.
368 */
369 size_t getNumActiveProcessors() const {
370 return numActiveProcessors_;
371 }
372
373 /// Increment the count of connections currently processing.
374 void incrementActiveProcessors() {
375 ++numActiveProcessors_;
376 }
377
378 /// Decrement the count of connections currently processing.
379 void decrementActiveProcessors() {
380 if (numActiveProcessors_ > 0) {
381 --numActiveProcessors_;
382 }
383 }
384
385 /**
386 * Get the maximum # of connections allowed before overload.
387 *
388 * @return current setting.
389 */
390 size_t getMaxConnections() const {
391 return maxConnections_;
392 }
393
394 /**
395 * Set the maximum # of connections allowed before overload.
396 *
397 * @param maxConnections new setting for maximum # of connections.
398 */
399 void setMaxConnections(size_t maxConnections) {
400 maxConnections_ = maxConnections;
401 }
402
403 /**
404 * Get the maximum # of connections waiting in handler/task before overload.
405 *
406 * @return current setting.
407 */
408 size_t getMaxActiveProcessors() const {
409 return maxActiveProcessors_;
410 }
411
412 /**
413 * Set the maximum # of connections waiting in handler/task before overload.
414 *
415 * @param maxActiveProcessors new setting for maximum # of active processes.
416 */
417 void setMaxActiveProcessors(size_t maxActiveProcessors) {
418 maxActiveProcessors_ = maxActiveProcessors;
419 }
420
421 /**
422 * Get fraction of maximum limits before an overload condition is cleared.
423 *
424 * @return hysteresis fraction
425 */
426 double getOverloadHysteresis() const {
427 return overloadHysteresis_;
428 }
429
430 /**
431 * Set fraction of maximum limits before an overload condition is cleared.
432 * A good value would probably be between 0.5 and 0.9.
433 *
434 * @param hysteresisFraction fraction <= 1.0.
435 */
436 void setOverloadHysteresis(double hysteresisFraction) {
437 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
438 overloadHysteresis_ = hysteresisFraction;
439 }
440 }
441
442 /**
443 * Get the action the server will take on overload.
444 *
445 * @return a TOverloadAction enum value for the currently set action.
446 */
447 TOverloadAction getOverloadAction() const {
448 return overloadAction_;
449 }
450
451 /**
452 * Set the action the server is to take on overload.
453 *
454 * @param overloadAction a TOverloadAction enum value for the action.
455 */
456 void setOverloadAction(TOverloadAction overloadAction) {
457 overloadAction_ = overloadAction;
458 }
459
460 /**
David Reiss068f4162010-03-09 05:19:45 +0000461 * Get the time in milliseconds after which a task expires (0 == infinite).
462 *
463 * @return a 64-bit time in milliseconds.
464 */
465 int64_t getTaskExpireTime() const {
466 return taskExpireTime_;
467 }
468
469 /**
470 * Set the time in milliseconds after which a task expires (0 == infinite).
471 *
472 * @param taskExpireTime a 64-bit time in milliseconds.
473 */
474 void setTaskExpireTime(int64_t taskExpireTime) {
475 taskExpireTime_ = taskExpireTime;
476 }
477
478 /**
David Reiss01fe1532010-03-09 05:19:25 +0000479 * Determine if the server is currently overloaded.
480 * This function checks the maximums for open connections and connections
481 * currently in processing, and sets an overload condition if they are
482 * exceeded. The overload will persist until both values are below the
483 * current hysteresis fraction of their maximums.
484 *
485 * @return true if an overload condition exists, false if not.
486 */
487 bool serverOverloaded();
488
489 /** Pop and discard next task on threadpool wait queue.
490 *
491 * @return true if a task was discarded, false if the wait queue was empty.
492 */
493 bool drainPendingTask();
494
495 /**
David Reiss89a12942010-10-06 17:10:52 +0000496 * Get the starting size of a TConnection object's write buffer.
497 *
498 * @return # bytes we initialize a TConnection object's write buffer to.
499 */
500 size_t getWriteBufferDefaultSize() const {
501 return writeBufferDefaultSize_;
502 }
503
504 /**
505 * Set the starting size of a TConnection object's write buffer.
506 *
507 * @param size # bytes we initialize a TConnection object's write buffer to.
508 */
509 void setWriteBufferDefaultSize(size_t size) {
510 writeBufferDefaultSize_ = size;
511 }
512
513 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000514 * Get the maximum size of read buffer allocated to idle TConnection objects.
515 *
David Reiss89a12942010-10-06 17:10:52 +0000516 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000517 */
518 size_t getIdleReadBufferLimit() const {
519 return idleReadBufferLimit_;
520 }
521
522 /**
523 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
524 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000525 *
David Reiss89a12942010-10-06 17:10:52 +0000526 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000527 */
528 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000529 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000530 }
531
532 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000533 * Set the maximum size read buffer allocated to idle TConnection objects.
534 * If a TConnection object is found (either on connection close or between
535 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000536 * allocated to its read buffer, we free it and allow it to be reinitialized
537 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000538 *
539 * @param limit of bytes beyond which we will shrink buffers when checked.
540 */
541 void setIdleReadBufferLimit(size_t limit) {
542 idleReadBufferLimit_ = limit;
543 }
544
545 /**
546 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
547 * Set the maximum size read buffer allocated to idle TConnection objects.
548 * If a TConnection object is found (either on connection close or between
549 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000550 * allocated to its read buffer, we free it and allow it to be reinitialized
551 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000552 *
553 * @param limit of bytes beyond which we will shrink buffers when checked.
554 */
555 void setIdleBufferMemLimit(size_t limit) {
556 idleReadBufferLimit_ = limit;
557 }
558
559
560
561 /**
562 * Get the maximum size of write buffer allocated to idle TConnection objects.
563 *
564 * @return # bytes beyond which we will reallocate buffers when checked.
565 */
566 size_t getIdleWriteBufferLimit() const {
567 return idleWriteBufferLimit_;
568 }
569
570 /**
571 * Set the maximum size write buffer allocated to idle TConnection objects.
572 * If a TConnection object is found (either on connection close or between
573 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000574 * allocated to its write buffer, we destroy and construct that buffer with
575 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000576 *
577 * @param limit of bytes beyond which we will shrink buffers when idle.
578 */
David Reiss54bec5d2010-10-06 17:10:45 +0000579 void setIdleWriteBufferLimit(size_t limit) {
580 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000581 }
582
David Reiss01fe1532010-03-09 05:19:25 +0000583 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000584 * Get # of calls made between buffer size checks. 0 means disabled.
585 *
586 * @return # of calls between buffer size checks.
587 */
588 int32_t getResizeBufferEveryN() const {
589 return resizeBufferEveryN_;
590 }
591
592 /**
593 * Check buffer sizes every "count" calls. This allows buffer limits
594 * to be enforced for persistant connections with a controllable degree
595 * of overhead. 0 disables checks except at connection close.
596 *
597 * @param count the number of calls between checks, or 0 to disable
598 */
599 void setResizeBufferEveryN(int32_t count) {
600 resizeBufferEveryN_ = count;
601 }
602
603
604
605 /**
David Reiss01fe1532010-03-09 05:19:25 +0000606 * Return an initialized connection object. Creates or recovers from
607 * pool a TConnection and initializes it with the provided socket FD
608 * and flags.
609 *
610 * @param socket FD of socket associated with this connection.
611 * @param flags initial lib_event flags for this connection.
David Reiss105961d2010-10-06 17:10:17 +0000612 * @param addr the sockaddr of the client
613 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000614 * @return pointer to initialized TConnection object.
615 */
David Reiss105961d2010-10-06 17:10:17 +0000616 TConnection* createConnection(int socket, short flags,
617 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000618
David Reiss01fe1532010-03-09 05:19:25 +0000619 /**
620 * Returns a connection to pool or deletion. If the connection pool
621 * (a stack) isn't full, place the connection object on it, otherwise
622 * just delete it.
623 *
624 * @param connection the TConection being returned.
625 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000626 void returnConnection(TConnection* connection);
627
David Reiss01fe1532010-03-09 05:19:25 +0000628 /**
David Reiss068f4162010-03-09 05:19:45 +0000629 * Callback function that the threadmanager calls when a task reaches
630 * its expiration time. It is needed to clean up the expired connection.
631 *
632 * @param task the runnable associated with the expired task.
633 */
634 void expireClose(boost::shared_ptr<Runnable> task);
635
636 /**
David Reiss01fe1532010-03-09 05:19:25 +0000637 * C-callable event handler for listener events. Provides a callback
638 * that libevent can understand which invokes server->handleEvent().
639 *
640 * @param fd the descriptor the event occured on.
641 * @param which the flags associated with the event.
642 * @param v void* callback arg where we placed TNonblockingServer's "this".
643 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000644 static void eventHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000645 ((TNonblockingServer*)v)->handleEvent(fd, which);
646 }
647
David Reiss01fe1532010-03-09 05:19:25 +0000648 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000649 void listenSocket();
650
David Reiss01fe1532010-03-09 05:19:25 +0000651 /**
652 * Takes a socket created by listenSocket() and sets various options on it
653 * to prepare for use in the server.
654 *
655 * @param fd descriptor of socket to be initialized/
656 */
Mark Slee79b16942007-11-26 19:05:29 +0000657 void listenSocket(int fd);
658
David Reiss01fe1532010-03-09 05:19:25 +0000659 /// Create the pipe used to notify I/O process of task completion.
660 void createNotificationPipe();
661
662 /**
663 * Get notification pipe send descriptor.
664 *
665 * @return write fd for pipe.
666 */
667 int getNotificationSendFD() const {
668 return notificationPipeFDs_[1];
669 }
670
671 /**
672 * Get notification pipe receive descriptor.
673 *
674 * @return read fd of pipe.
675 */
676 int getNotificationRecvFD() const {
677 return notificationPipeFDs_[0];
678 }
679
680 /**
681 * Register the core libevent events onto the proper base.
682 *
683 * @param base pointer to the event base to be initialized.
Roger Meierc1905582011-08-02 23:37:36 +0000684 * @param ownEventBase if true, this server is responsible for
685 * freeing the event base memory.
David Reiss01fe1532010-03-09 05:19:25 +0000686 */
Roger Meierc1905582011-08-02 23:37:36 +0000687 void registerEvents(event_base* base, bool ownEventBase = true);
Mark Slee79b16942007-11-26 19:05:29 +0000688
David Reiss01fe1532010-03-09 05:19:25 +0000689 /**
690 * Main workhorse function, starts up the server listening on a port and
691 * loops over the libevent handler.
692 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000693 void serve();
Bryan Duxbury76c43682011-08-24 21:26:48 +0000694
695 /**
696 * May be called from a separate thread to cause serve() to return.
697 */
698 void stop();
Mark Slee2f6404d2006-10-10 01:37:40 +0000699};
700
David Reiss89a12942010-10-06 17:10:52 +0000701/// Three states for sockets: recv frame size, recv data, and send mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000702enum TSocketState {
David Reiss89a12942010-10-06 17:10:52 +0000703 SOCKET_RECV_FRAMING,
Mark Slee2f6404d2006-10-10 01:37:40 +0000704 SOCKET_RECV,
705 SOCKET_SEND
706};
707
708/**
David Reiss01fe1532010-03-09 05:19:25 +0000709 * Five states for the nonblocking servr:
Mark Slee2f6404d2006-10-10 01:37:40 +0000710 * 1) initialize
711 * 2) read 4 byte frame size
712 * 3) read frame of data
713 * 4) send back data (if any)
David Reiss01fe1532010-03-09 05:19:25 +0000714 * 5) force immediate connection close
Mark Slee2f6404d2006-10-10 01:37:40 +0000715 */
716enum TAppState {
717 APP_INIT,
718 APP_READ_FRAME_SIZE,
719 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000720 APP_WAIT_TASK,
David Reiss01fe1532010-03-09 05:19:25 +0000721 APP_SEND_RESULT,
722 APP_CLOSE_CONNECTION
Mark Slee2f6404d2006-10-10 01:37:40 +0000723};
724
725/**
726 * Represents a connection that is handled via libevent. This connection
727 * essentially encapsulates a socket that has some associated libevent state.
728 */
David Reiss54bec5d2010-10-06 17:10:45 +0000729class TConnection {
Mark Slee2f6404d2006-10-10 01:37:40 +0000730 private:
731
David Reiss01fe1532010-03-09 05:19:25 +0000732 /// Server handle
Mark Slee2f6404d2006-10-10 01:37:40 +0000733 TNonblockingServer* server_;
734
David Reiss105961d2010-10-06 17:10:17 +0000735 /// Object wrapping network socket
736 boost::shared_ptr<TSocket> tSocket_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000737
David Reiss01fe1532010-03-09 05:19:25 +0000738 /// Libevent object
Mark Slee2f6404d2006-10-10 01:37:40 +0000739 struct event event_;
740
David Reiss01fe1532010-03-09 05:19:25 +0000741 /// Libevent flags
Mark Slee2f6404d2006-10-10 01:37:40 +0000742 short eventFlags_;
743
David Reiss01fe1532010-03-09 05:19:25 +0000744 /// Socket mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000745 TSocketState socketState_;
746
David Reiss01fe1532010-03-09 05:19:25 +0000747 /// Application state
Mark Slee2f6404d2006-10-10 01:37:40 +0000748 TAppState appState_;
749
David Reiss01fe1532010-03-09 05:19:25 +0000750 /// How much data needed to read
Mark Slee2f6404d2006-10-10 01:37:40 +0000751 uint32_t readWant_;
752
David Reiss01fe1532010-03-09 05:19:25 +0000753 /// Where in the read buffer are we
Mark Slee2f6404d2006-10-10 01:37:40 +0000754 uint32_t readBufferPos_;
755
David Reiss01fe1532010-03-09 05:19:25 +0000756 /// Read buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000757 uint8_t* readBuffer_;
758
David Reiss01fe1532010-03-09 05:19:25 +0000759 /// Read buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000760 uint32_t readBufferSize_;
761
David Reiss01fe1532010-03-09 05:19:25 +0000762 /// Write buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000763 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000764
David Reiss01fe1532010-03-09 05:19:25 +0000765 /// Write buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000766 uint32_t writeBufferSize_;
767
David Reiss01fe1532010-03-09 05:19:25 +0000768 /// How far through writing are we?
Mark Slee2f6404d2006-10-10 01:37:40 +0000769 uint32_t writeBufferPos_;
770
David Reiss54bec5d2010-10-06 17:10:45 +0000771 /// Largest size of write buffer seen since buffer was constructed
772 size_t largestWriteBufferSize_;
773
774 /// Count of the number of calls for use with getResizeBufferEveryN().
775 int32_t callsForResize_;
776
David Reiss01fe1532010-03-09 05:19:25 +0000777 /// Task handle
Mark Sleee02385b2007-06-09 01:21:16 +0000778 int taskHandle_;
779
David Reiss01fe1532010-03-09 05:19:25 +0000780 /// Task event
Mark Sleee02385b2007-06-09 01:21:16 +0000781 struct event taskEvent_;
782
David Reiss01fe1532010-03-09 05:19:25 +0000783 /// Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000784 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000785
David Reiss01fe1532010-03-09 05:19:25 +0000786 /// Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000787 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000788
David Reiss01fe1532010-03-09 05:19:25 +0000789 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000790 boost::shared_ptr<TTransport> factoryInputTransport_;
791 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000792
David Reiss01fe1532010-03-09 05:19:25 +0000793 /// Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000794 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000795
David Reiss01fe1532010-03-09 05:19:25 +0000796 /// Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000797 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000798
David Reiss105961d2010-10-06 17:10:17 +0000799 /// Server event handler, if any
800 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
801
802 /// Thrift call context, if any
803 void *connectionContext_;
804
David Reiss01fe1532010-03-09 05:19:25 +0000805 /// Go into read mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000806 void setRead() {
807 setFlags(EV_READ | EV_PERSIST);
808 }
809
David Reiss01fe1532010-03-09 05:19:25 +0000810 /// Go into write mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000811 void setWrite() {
812 setFlags(EV_WRITE | EV_PERSIST);
813 }
814
David Reiss01fe1532010-03-09 05:19:25 +0000815 /// Set socket idle
Mark Slee402ee282007-08-23 01:43:20 +0000816 void setIdle() {
817 setFlags(0);
818 }
819
David Reiss01fe1532010-03-09 05:19:25 +0000820 /**
821 * Set event flags for this connection.
822 *
823 * @param eventFlags flags we pass to libevent for the connection.
824 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000825 void setFlags(short eventFlags);
826
David Reiss01fe1532010-03-09 05:19:25 +0000827 /**
828 * Libevent handler called (via our static wrapper) when the connection
829 * socket had something happen. Rather than use the flags libevent passed,
830 * we use the connection state to determine whether we need to read or
831 * write the socket.
832 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000833 void workSocket();
834
David Reiss01fe1532010-03-09 05:19:25 +0000835 /// Close this connection and free or reset its resources.
Mark Slee2f6404d2006-10-10 01:37:40 +0000836 void close();
837
838 public:
839
David Reiss01fe1532010-03-09 05:19:25 +0000840 class Task;
841
842 /// Constructor
David Reiss105961d2010-10-06 17:10:17 +0000843 TConnection(int socket, short eventFlags, TNonblockingServer *s,
844 const sockaddr* addr, socklen_t addrLen) {
David Reiss89a12942010-10-06 17:10:52 +0000845 readBuffer_ = NULL;
846 readBufferSize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000847
Mark Slee2f6404d2006-10-10 01:37:40 +0000848 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000849 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000850 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000851 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
David Reiss89a12942010-10-06 17:10:52 +0000852 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
David Reiss105961d2010-10-06 17:10:17 +0000853 tSocket_.reset(new TSocket());
Mark Slee79b16942007-11-26 19:05:29 +0000854
David Reiss105961d2010-10-06 17:10:17 +0000855 init(socket, eventFlags, s, addr, addrLen);
David Reiss1997f102008-04-29 00:29:41 +0000856 server_->incrementNumConnections();
857 }
858
859 ~TConnection() {
David Reiss472fffb2010-03-09 05:20:24 +0000860 std::free(readBuffer_);
David Reissc17fe6b2008-04-29 00:29:43 +0000861 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000862 }
863
David Reiss54bec5d2010-10-06 17:10:45 +0000864 /**
865 * Check buffers against any size limits and shrink it if exceeded.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000866 *
David Reiss54bec5d2010-10-06 17:10:45 +0000867 * @param readLimit we reduce read buffer size to this (if nonzero).
868 * @param writeLimit if nonzero and write buffer is larger, replace it.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000869 */
David Reiss54bec5d2010-10-06 17:10:45 +0000870 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000871
David Reiss01fe1532010-03-09 05:19:25 +0000872 /// Initialize
David Reiss105961d2010-10-06 17:10:17 +0000873 void init(int socket, short eventFlags, TNonblockingServer *s,
874 const sockaddr* addr, socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000875
David Reiss01fe1532010-03-09 05:19:25 +0000876 /**
877 * This is called when the application transitions from one state into
878 * another. This means that it has finished writing the data that it needed
879 * to, or finished receiving the data that it needed to.
880 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000881 void transition();
882
David Reiss01fe1532010-03-09 05:19:25 +0000883 /**
884 * C-callable event handler for connection events. Provides a callback
885 * that libevent can understand which invokes connection_->workSocket().
886 *
887 * @param fd the descriptor the event occured on.
888 * @param which the flags associated with the event.
889 * @param v void* callback arg where we placed TConnection's "this".
890 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000891 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
David Reiss105961d2010-10-06 17:10:17 +0000892 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
Mark Slee2f6404d2006-10-10 01:37:40 +0000893 ((TConnection*)v)->workSocket();
894 }
Mark Slee79b16942007-11-26 19:05:29 +0000895
David Reiss01fe1532010-03-09 05:19:25 +0000896 /**
897 * C-callable event handler for signaling task completion. Provides a
898 * callback that libevent can understand that will read a connection
899 * object's address from a pipe and call connection->transition() for
900 * that object.
901 *
902 * @param fd the descriptor the event occured on.
903 */
Roger Meier30aae0c2011-07-08 12:23:31 +0000904 static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
David Reiss01fe1532010-03-09 05:19:25 +0000905 TConnection* connection;
David Reiss83b8fda2010-03-09 05:19:34 +0000906 ssize_t nBytes;
Roger Meier30aae0c2011-07-08 12:23:31 +0000907 while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
David Reiss83b8fda2010-03-09 05:19:34 +0000908 == sizeof(TConnection*)) {
909 connection->transition();
Mark Sleee02385b2007-06-09 01:21:16 +0000910 }
David Reiss83b8fda2010-03-09 05:19:34 +0000911 if (nBytes > 0) {
912 throw TException("TConnection::taskHandler unexpected partial read");
913 }
Roger Meier30aae0c2011-07-08 12:23:31 +0000914 if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
David Reiss83b8fda2010-03-09 05:19:34 +0000915 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
916 }
Mark Sleee02385b2007-06-09 01:21:16 +0000917 }
918
David Reiss01fe1532010-03-09 05:19:25 +0000919 /**
920 * Notification to server that processing has ended on this request.
921 * Can be called either when processing is completed or when a waiting
922 * task has been preemptively terminated (on overload).
923 *
David Reiss9e8073c2010-03-09 05:19:39 +0000924 * @return true if successful, false if unable to notify (check errno).
David Reiss01fe1532010-03-09 05:19:25 +0000925 */
926 bool notifyServer() {
927 TConnection* connection = this;
Roger Meier30aae0c2011-07-08 12:23:31 +0000928 if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
929 sizeof(TConnection*), 0) != sizeof(TConnection*)) {
David Reiss01fe1532010-03-09 05:19:25 +0000930 return false;
931 }
932
933 return true;
934 }
935
936 /// Force connection shutdown for this connection.
937 void forceClose() {
938 appState_ = APP_CLOSE_CONNECTION;
939 if (!notifyServer()) {
940 throw TException("TConnection::forceClose: failed write on notify pipe");
941 }
942 }
943
944 /// return the server this connection was initialized for.
945 TNonblockingServer* getServer() {
946 return server_;
947 }
948
949 /// get state of connection.
950 TAppState getState() {
951 return appState_;
952 }
David Reiss105961d2010-10-06 17:10:17 +0000953
954 /// return the TSocket transport wrapping this network connection
955 boost::shared_ptr<TSocket> getTSocket() const {
956 return tSocket_;
957 }
958
959 /// return the server event handler if any
960 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
961 return serverEventHandler_;
962 }
963
964 /// return the Thrift connection context if any
965 void* getConnectionContext() {
966 return connectionContext_;
967 }
968
Mark Slee2f6404d2006-10-10 01:37:40 +0000969};
970
T Jake Lucianib5e62212009-01-31 22:36:20 +0000971}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000972
973#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_