blob: a47a2c00dac1c9a942a3442374f0f337ec2966f1 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000012#include <transport/TBufferTransports.h>
Mark Sleee02385b2007-06-09 01:21:16 +000013#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000015#include <string>
16#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000017#include <cstdlib>
Mark Slee2f6404d2006-10-10 01:37:40 +000018#include <event.h>
19
T Jake Lucianib5e62212009-01-31 22:36:20 +000020namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000021
T Jake Lucianib5e62212009-01-31 22:36:20 +000022using apache::thrift::transport::TMemoryBuffer;
23using apache::thrift::protocol::TProtocol;
24using apache::thrift::concurrency::Runnable;
25using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000026
27// Forward declaration of class
28class TConnection;
29
30/**
31 * This is a non-blocking server in C++ for high performance that operates a
32 * single IO thread. It assumes that all incoming requests are framed with a
33 * 4 byte length indicator and writes out responses using the same framing.
34 *
35 * It does not use the TServerTransport framework, but rather has socket
36 * operations hardcoded for use with select.
37 *
38 * @author Mark Slee <mcslee@facebook.com>
39 */
40class TNonblockingServer : public TServer {
41 private:
42
43 // Listen backlog
44 static const int LISTEN_BACKLOG = 1024;
45
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000046 // Default limit on size of idle connection pool
47 static const size_t CONNECTION_STACK_LIMIT = 1024;
48
49 // Maximum size of buffer allocated to idle connection
50 static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
51
Mark Slee2f6404d2006-10-10 01:37:40 +000052 // Server socket file descriptor
53 int serverSocket_;
54
55 // Port server runs on
56 int port_;
57
Mark Sleee02385b2007-06-09 01:21:16 +000058 // For processing via thread pool, may be NULL
59 boost::shared_ptr<ThreadManager> threadManager_;
60
61 // Is thread pool processing?
62 bool threadPoolProcessing_;
63
Mark Slee79b16942007-11-26 19:05:29 +000064 // The event base for libevent
65 event_base* eventBase_;
66
67 // Event struct, for use with eventBase_
68 struct event serverEvent_;
69
David Reiss1997f102008-04-29 00:29:41 +000070 // Number of TConnection object we've created
71 size_t numTConnections_;
72
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000073 // Limit for how many TConnection objects to cache
74 size_t connectionStackLimit_;
75
76 /**
77 * Max read buffer size for an idle connection. When we place an idle
78 * TConnection into connectionStack_, we insure that its read buffer is
79 * reduced to this size to insure that idle connections don't hog memory.
80 */
81 uint32_t idleBufferMemLimit_;
82
Mark Slee2f6404d2006-10-10 01:37:40 +000083 /**
84 * This is a stack of all the objects that have been created but that
85 * are NOT currently in use. When we close a connection, we place it on this
86 * stack so that the object can be reused later, rather than freeing the
87 * memory and reallocating a new object later.
88 */
89 std::stack<TConnection*> connectionStack_;
90
91 void handleEvent(int fd, short which);
92
93 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000094 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000095 int port) :
96 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000097 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +000098 port_(port),
dweatherford58985992007-06-19 23:10:19 +000099 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +0000100 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000101 numTConnections_(0),
102 connectionStackLimit_(CONNECTION_STACK_LIMIT),
103 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
Mark Sleef9373392007-01-24 19:41:57 +0000104
Mark Slee79b16942007-11-26 19:05:29 +0000105 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000106 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000107 int port,
108 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000109 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000110 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000111 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000112 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000113 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000114 numTConnections_(0),
115 connectionStackLimit_(CONNECTION_STACK_LIMIT),
116 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000117 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
118 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000119 setInputProtocolFactory(protocolFactory);
120 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000121 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000122 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000123
Mark Slee5ea15f92007-03-05 22:55:59 +0000124 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
125 boost::shared_ptr<TTransportFactory> inputTransportFactory,
126 boost::shared_ptr<TTransportFactory> outputTransportFactory,
127 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
128 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000129 int port,
130 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000131 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000132 serverSocket_(0),
133 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000134 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000135 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000136 numTConnections_(0),
137 connectionStackLimit_(CONNECTION_STACK_LIMIT),
138 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000139 setInputTransportFactory(inputTransportFactory);
140 setOutputTransportFactory(outputTransportFactory);
141 setInputProtocolFactory(inputProtocolFactory);
142 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000143 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000144 }
Mark Slee79b16942007-11-26 19:05:29 +0000145
Mark Slee2f6404d2006-10-10 01:37:40 +0000146 ~TNonblockingServer() {}
147
Mark Sleee02385b2007-06-09 01:21:16 +0000148 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
149 threadManager_ = threadManager;
150 threadPoolProcessing_ = (threadManager != NULL);
151 }
152
David Reiss1997f102008-04-29 00:29:41 +0000153 boost::shared_ptr<ThreadManager> getThreadManager() {
154 return threadManager_;
155 }
156
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000157 /**
158 * Get the maximum number of unused TConnection we will hold in reserve.
159 *
160 * @return the current limit on TConnection pool size.
161 */
162 int getConnectionStackLimit() const {
163 return connectionStackLimit_;
164 }
165
166 /**
167 * Set the maximum number of unused TConnection we will hold in reserve.
168 *
169 * @param sz the new limit for TConnection pool size.
170 */
171 void setConnectionStackLimit(int sz) {
172 connectionStackLimit_ = sz;
173 }
174
Mark Slee79b16942007-11-26 19:05:29 +0000175 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000176 return threadPoolProcessing_;
177 }
178
179 void addTask(boost::shared_ptr<Runnable> task) {
180 threadManager_->add(task);
181 }
182
Mark Slee79b16942007-11-26 19:05:29 +0000183 event_base* getEventBase() const {
184 return eventBase_;
185 }
186
David Reissc17fe6b2008-04-29 00:29:43 +0000187 void incrementNumConnections() {
188 ++numTConnections_;
189 }
190
191 void decrementNumConnections() {
192 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000193 }
194
195 size_t getNumConnections() {
196 return numTConnections_;
197 }
198
199 size_t getNumIdleConnections() {
200 return connectionStack_.size();
201 }
202
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000203 /**
204 * Get the maximum limit of memory allocated to idle TConnection objects.
205 *
206 * @return # bytes beyond which we will shrink buffers when idle.
207 */
208 size_t getIdleBufferMemLimit() const {
209 return idleBufferMemLimit_;
210 }
211
212 /**
213 * Set the maximum limit of memory allocated to idle TConnection objects.
214 * If a TConnection object goes idle with more than this much memory
215 * allocated to its buffer, we shrink it to this value.
216 *
217 * @param limit of bytes beyond which we will shrink buffers when idle.
218 */
219 void setIdleBufferMemLimit(size_t limit) {
220 idleBufferMemLimit_ = limit;
221 }
222
Mark Slee2f6404d2006-10-10 01:37:40 +0000223 TConnection* createConnection(int socket, short flags);
224
225 void returnConnection(TConnection* connection);
226
227 static void eventHandler(int fd, short which, void* v) {
228 ((TNonblockingServer*)v)->handleEvent(fd, which);
229 }
230
Mark Slee79b16942007-11-26 19:05:29 +0000231 void listenSocket();
232
233 void listenSocket(int fd);
234
235 void registerEvents(event_base* base);
236
Mark Slee2f6404d2006-10-10 01:37:40 +0000237 void serve();
238};
239
240/**
241 * Two states for sockets, recv and send mode
242 */
243enum TSocketState {
244 SOCKET_RECV,
245 SOCKET_SEND
246};
247
248/**
249 * Four states for the nonblocking servr:
250 * 1) initialize
251 * 2) read 4 byte frame size
252 * 3) read frame of data
253 * 4) send back data (if any)
254 */
255enum TAppState {
256 APP_INIT,
257 APP_READ_FRAME_SIZE,
258 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000259 APP_WAIT_TASK,
Mark Slee2f6404d2006-10-10 01:37:40 +0000260 APP_SEND_RESULT
261};
262
263/**
264 * Represents a connection that is handled via libevent. This connection
265 * essentially encapsulates a socket that has some associated libevent state.
266 */
267class TConnection {
268 private:
269
Mark Sleee02385b2007-06-09 01:21:16 +0000270 class Task;
271
Mark Slee2f6404d2006-10-10 01:37:40 +0000272 // Server handle
273 TNonblockingServer* server_;
274
275 // Socket handle
276 int socket_;
277
278 // Libevent object
279 struct event event_;
280
281 // Libevent flags
282 short eventFlags_;
283
284 // Socket mode
285 TSocketState socketState_;
286
287 // Application state
288 TAppState appState_;
289
290 // How much data needed to read
291 uint32_t readWant_;
292
293 // Where in the read buffer are we
294 uint32_t readBufferPos_;
295
296 // Read buffer
297 uint8_t* readBuffer_;
298
299 // Read buffer size
300 uint32_t readBufferSize_;
301
302 // Write buffer
303 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000304
Mark Slee2f6404d2006-10-10 01:37:40 +0000305 // Write buffer size
306 uint32_t writeBufferSize_;
307
308 // How far through writing are we?
309 uint32_t writeBufferPos_;
310
Kevin Clark5ace1782009-03-04 21:10:58 +0000311 // How many times have we read since our last buffer reset?
312 uint32_t numReadsSinceReset_;
313
314 // How many times have we written since our last buffer reset?
315 uint32_t numWritesSinceReset_;
316
Mark Sleee02385b2007-06-09 01:21:16 +0000317 // Task handle
318 int taskHandle_;
319
320 // Task event
321 struct event taskEvent_;
322
Mark Slee2f6404d2006-10-10 01:37:40 +0000323 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000324 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000325
326 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000327 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000328
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000329 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000330 boost::shared_ptr<TTransport> factoryInputTransport_;
331 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000332
333 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000334 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000335
336 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000337 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000338
Mark Slee2f6404d2006-10-10 01:37:40 +0000339 // Go into read mode
340 void setRead() {
341 setFlags(EV_READ | EV_PERSIST);
342 }
343
344 // Go into write mode
345 void setWrite() {
346 setFlags(EV_WRITE | EV_PERSIST);
347 }
348
Mark Slee402ee282007-08-23 01:43:20 +0000349 // Set socket idle
350 void setIdle() {
351 setFlags(0);
352 }
353
Mark Slee2f6404d2006-10-10 01:37:40 +0000354 // Set event flags
355 void setFlags(short eventFlags);
356
357 // Libevent handlers
358 void workSocket();
359
360 // Close this client and reset
361 void close();
362
363 public:
364
365 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000366 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
David Reissd7a16f42008-02-19 22:47:29 +0000367 readBuffer_ = (uint8_t*)std::malloc(1024);
Mark Slee2f6404d2006-10-10 01:37:40 +0000368 if (readBuffer_ == NULL) {
T Jake Lucianib5e62212009-01-31 22:36:20 +0000369 throw new apache::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000370 }
371 readBufferSize_ = 1024;
Mark Slee79b16942007-11-26 19:05:29 +0000372
Kevin Clark5ace1782009-03-04 21:10:58 +0000373 numReadsSinceReset_ = 0;
374 numWritesSinceReset_ = 0;
375
Mark Slee2f6404d2006-10-10 01:37:40 +0000376 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000377 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000378 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000379 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
380 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000381
Mark Slee2f6404d2006-10-10 01:37:40 +0000382 init(socket, eventFlags, s);
David Reiss1997f102008-04-29 00:29:41 +0000383 server_->incrementNumConnections();
384 }
385
386 ~TConnection() {
David Reissc17fe6b2008-04-29 00:29:43 +0000387 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000388 }
389
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000390 /**
391 * Check read buffer against a given limit and shrink it if exceeded.
392 *
393 * @param limit we limit buffer size to.
394 */
395 void checkIdleBufferMemLimit(uint32_t limit);
396
Mark Slee2f6404d2006-10-10 01:37:40 +0000397 // Initialize
398 void init(int socket, short eventFlags, TNonblockingServer *s);
399
400 // Transition into a new state
401 void transition();
402
403 // Handler wrapper
Mark Sleea8de4892008-02-09 00:02:26 +0000404 static void eventHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000405 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000406 ((TConnection*)v)->workSocket();
407 }
Mark Slee79b16942007-11-26 19:05:29 +0000408
Mark Sleee02385b2007-06-09 01:21:16 +0000409 // Handler wrapper for task block
Mark Sleea8de4892008-02-09 00:02:26 +0000410 static void taskHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000411 assert(fd == ((TConnection*)v)->taskHandle_);
412 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
David Reiss01e55c12008-07-13 22:18:51 +0000413 GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +0000414 }
415 ((TConnection*)v)->transition();
416 }
417
Mark Slee2f6404d2006-10-10 01:37:40 +0000418};
419
T Jake Lucianib5e62212009-01-31 22:36:20 +0000420}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000421
422#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_