blob: 960ba657385c984f77dd5de7b23bacd9641ce207 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000012#include <transport/TBufferTransports.h>
Mark Sleee02385b2007-06-09 01:21:16 +000013#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000015#include <string>
16#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000017#include <cstdlib>
Mark Slee2f6404d2006-10-10 01:37:40 +000018#include <event.h>
19
T Jake Lucianib5e62212009-01-31 22:36:20 +000020namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000021
T Jake Lucianib5e62212009-01-31 22:36:20 +000022using apache::thrift::transport::TMemoryBuffer;
23using apache::thrift::protocol::TProtocol;
24using apache::thrift::concurrency::Runnable;
25using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000026
27// Forward declaration of class
28class TConnection;
29
30/**
31 * This is a non-blocking server in C++ for high performance that operates a
32 * single IO thread. It assumes that all incoming requests are framed with a
33 * 4 byte length indicator and writes out responses using the same framing.
34 *
35 * It does not use the TServerTransport framework, but rather has socket
36 * operations hardcoded for use with select.
37 *
Mark Slee2f6404d2006-10-10 01:37:40 +000038 */
39class TNonblockingServer : public TServer {
40 private:
41
42 // Listen backlog
43 static const int LISTEN_BACKLOG = 1024;
44
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000045 // Default limit on size of idle connection pool
46 static const size_t CONNECTION_STACK_LIMIT = 1024;
47
48 // Maximum size of buffer allocated to idle connection
49 static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
50
Mark Slee2f6404d2006-10-10 01:37:40 +000051 // Server socket file descriptor
52 int serverSocket_;
53
54 // Port server runs on
55 int port_;
56
Mark Sleee02385b2007-06-09 01:21:16 +000057 // For processing via thread pool, may be NULL
58 boost::shared_ptr<ThreadManager> threadManager_;
59
60 // Is thread pool processing?
61 bool threadPoolProcessing_;
62
Mark Slee79b16942007-11-26 19:05:29 +000063 // The event base for libevent
64 event_base* eventBase_;
65
66 // Event struct, for use with eventBase_
67 struct event serverEvent_;
68
David Reiss1997f102008-04-29 00:29:41 +000069 // Number of TConnection object we've created
70 size_t numTConnections_;
71
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000072 // Limit for how many TConnection objects to cache
73 size_t connectionStackLimit_;
74
75 /**
76 * Max read buffer size for an idle connection. When we place an idle
77 * TConnection into connectionStack_, we insure that its read buffer is
78 * reduced to this size to insure that idle connections don't hog memory.
79 */
80 uint32_t idleBufferMemLimit_;
81
Mark Slee2f6404d2006-10-10 01:37:40 +000082 /**
83 * This is a stack of all the objects that have been created but that
84 * are NOT currently in use. When we close a connection, we place it on this
85 * stack so that the object can be reused later, rather than freeing the
86 * memory and reallocating a new object later.
87 */
88 std::stack<TConnection*> connectionStack_;
89
90 void handleEvent(int fd, short which);
91
92 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000093 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000094 int port) :
95 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000096 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +000097 port_(port),
dweatherford58985992007-06-19 23:10:19 +000098 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +000099 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000100 numTConnections_(0),
101 connectionStackLimit_(CONNECTION_STACK_LIMIT),
102 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
Mark Sleef9373392007-01-24 19:41:57 +0000103
Mark Slee79b16942007-11-26 19:05:29 +0000104 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000105 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000106 int port,
107 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000108 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000109 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000110 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000111 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000112 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000113 numTConnections_(0),
114 connectionStackLimit_(CONNECTION_STACK_LIMIT),
115 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000116 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
117 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000118 setInputProtocolFactory(protocolFactory);
119 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000120 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000121 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000122
Mark Slee5ea15f92007-03-05 22:55:59 +0000123 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
124 boost::shared_ptr<TTransportFactory> inputTransportFactory,
125 boost::shared_ptr<TTransportFactory> outputTransportFactory,
126 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
127 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000128 int port,
129 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000130 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000131 serverSocket_(0),
132 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000133 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000134 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000135 numTConnections_(0),
136 connectionStackLimit_(CONNECTION_STACK_LIMIT),
137 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000138 setInputTransportFactory(inputTransportFactory);
139 setOutputTransportFactory(outputTransportFactory);
140 setInputProtocolFactory(inputProtocolFactory);
141 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000142 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000143 }
Mark Slee79b16942007-11-26 19:05:29 +0000144
Mark Slee2f6404d2006-10-10 01:37:40 +0000145 ~TNonblockingServer() {}
146
Mark Sleee02385b2007-06-09 01:21:16 +0000147 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
148 threadManager_ = threadManager;
149 threadPoolProcessing_ = (threadManager != NULL);
150 }
151
David Reiss1997f102008-04-29 00:29:41 +0000152 boost::shared_ptr<ThreadManager> getThreadManager() {
153 return threadManager_;
154 }
155
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000156 /**
157 * Get the maximum number of unused TConnection we will hold in reserve.
158 *
159 * @return the current limit on TConnection pool size.
160 */
161 int getConnectionStackLimit() const {
162 return connectionStackLimit_;
163 }
164
165 /**
166 * Set the maximum number of unused TConnection we will hold in reserve.
167 *
168 * @param sz the new limit for TConnection pool size.
169 */
170 void setConnectionStackLimit(int sz) {
171 connectionStackLimit_ = sz;
172 }
173
Mark Slee79b16942007-11-26 19:05:29 +0000174 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000175 return threadPoolProcessing_;
176 }
177
178 void addTask(boost::shared_ptr<Runnable> task) {
179 threadManager_->add(task);
180 }
181
Mark Slee79b16942007-11-26 19:05:29 +0000182 event_base* getEventBase() const {
183 return eventBase_;
184 }
185
David Reissc17fe6b2008-04-29 00:29:43 +0000186 void incrementNumConnections() {
187 ++numTConnections_;
188 }
189
190 void decrementNumConnections() {
191 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000192 }
193
194 size_t getNumConnections() {
195 return numTConnections_;
196 }
197
198 size_t getNumIdleConnections() {
199 return connectionStack_.size();
200 }
201
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000202 /**
203 * Get the maximum limit of memory allocated to idle TConnection objects.
204 *
205 * @return # bytes beyond which we will shrink buffers when idle.
206 */
207 size_t getIdleBufferMemLimit() const {
208 return idleBufferMemLimit_;
209 }
210
211 /**
212 * Set the maximum limit of memory allocated to idle TConnection objects.
213 * If a TConnection object goes idle with more than this much memory
214 * allocated to its buffer, we shrink it to this value.
215 *
216 * @param limit of bytes beyond which we will shrink buffers when idle.
217 */
218 void setIdleBufferMemLimit(size_t limit) {
219 idleBufferMemLimit_ = limit;
220 }
221
Mark Slee2f6404d2006-10-10 01:37:40 +0000222 TConnection* createConnection(int socket, short flags);
223
224 void returnConnection(TConnection* connection);
225
226 static void eventHandler(int fd, short which, void* v) {
227 ((TNonblockingServer*)v)->handleEvent(fd, which);
228 }
229
Mark Slee79b16942007-11-26 19:05:29 +0000230 void listenSocket();
231
232 void listenSocket(int fd);
233
234 void registerEvents(event_base* base);
235
Mark Slee2f6404d2006-10-10 01:37:40 +0000236 void serve();
237};
238
239/**
240 * Two states for sockets, recv and send mode
241 */
242enum TSocketState {
243 SOCKET_RECV,
244 SOCKET_SEND
245};
246
247/**
248 * Four states for the nonblocking servr:
249 * 1) initialize
250 * 2) read 4 byte frame size
251 * 3) read frame of data
252 * 4) send back data (if any)
253 */
254enum TAppState {
255 APP_INIT,
256 APP_READ_FRAME_SIZE,
257 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000258 APP_WAIT_TASK,
Mark Slee2f6404d2006-10-10 01:37:40 +0000259 APP_SEND_RESULT
260};
261
262/**
263 * Represents a connection that is handled via libevent. This connection
264 * essentially encapsulates a socket that has some associated libevent state.
265 */
266class TConnection {
267 private:
268
Mark Sleee02385b2007-06-09 01:21:16 +0000269 class Task;
270
Mark Slee2f6404d2006-10-10 01:37:40 +0000271 // Server handle
272 TNonblockingServer* server_;
273
274 // Socket handle
275 int socket_;
276
277 // Libevent object
278 struct event event_;
279
280 // Libevent flags
281 short eventFlags_;
282
283 // Socket mode
284 TSocketState socketState_;
285
286 // Application state
287 TAppState appState_;
288
289 // How much data needed to read
290 uint32_t readWant_;
291
292 // Where in the read buffer are we
293 uint32_t readBufferPos_;
294
295 // Read buffer
296 uint8_t* readBuffer_;
297
298 // Read buffer size
299 uint32_t readBufferSize_;
300
301 // Write buffer
302 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000303
Mark Slee2f6404d2006-10-10 01:37:40 +0000304 // Write buffer size
305 uint32_t writeBufferSize_;
306
307 // How far through writing are we?
308 uint32_t writeBufferPos_;
309
Kevin Clark5ace1782009-03-04 21:10:58 +0000310 // How many times have we read since our last buffer reset?
311 uint32_t numReadsSinceReset_;
312
313 // How many times have we written since our last buffer reset?
314 uint32_t numWritesSinceReset_;
315
Mark Sleee02385b2007-06-09 01:21:16 +0000316 // Task handle
317 int taskHandle_;
318
319 // Task event
320 struct event taskEvent_;
321
Mark Slee2f6404d2006-10-10 01:37:40 +0000322 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000323 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000324
325 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000326 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000327
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000328 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000329 boost::shared_ptr<TTransport> factoryInputTransport_;
330 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000331
332 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000333 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000334
335 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000336 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000337
Mark Slee2f6404d2006-10-10 01:37:40 +0000338 // Go into read mode
339 void setRead() {
340 setFlags(EV_READ | EV_PERSIST);
341 }
342
343 // Go into write mode
344 void setWrite() {
345 setFlags(EV_WRITE | EV_PERSIST);
346 }
347
Mark Slee402ee282007-08-23 01:43:20 +0000348 // Set socket idle
349 void setIdle() {
350 setFlags(0);
351 }
352
Mark Slee2f6404d2006-10-10 01:37:40 +0000353 // Set event flags
354 void setFlags(short eventFlags);
355
356 // Libevent handlers
357 void workSocket();
358
359 // Close this client and reset
360 void close();
361
362 public:
363
364 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000365 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
David Reissd7a16f42008-02-19 22:47:29 +0000366 readBuffer_ = (uint8_t*)std::malloc(1024);
Mark Slee2f6404d2006-10-10 01:37:40 +0000367 if (readBuffer_ == NULL) {
T Jake Lucianib5e62212009-01-31 22:36:20 +0000368 throw new apache::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000369 }
370 readBufferSize_ = 1024;
Mark Slee79b16942007-11-26 19:05:29 +0000371
Kevin Clark5ace1782009-03-04 21:10:58 +0000372 numReadsSinceReset_ = 0;
373 numWritesSinceReset_ = 0;
374
Mark Slee2f6404d2006-10-10 01:37:40 +0000375 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000376 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000377 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000378 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
379 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000380
Mark Slee2f6404d2006-10-10 01:37:40 +0000381 init(socket, eventFlags, s);
David Reiss1997f102008-04-29 00:29:41 +0000382 server_->incrementNumConnections();
383 }
384
385 ~TConnection() {
David Reissc17fe6b2008-04-29 00:29:43 +0000386 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000387 }
388
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000389 /**
390 * Check read buffer against a given limit and shrink it if exceeded.
391 *
392 * @param limit we limit buffer size to.
393 */
394 void checkIdleBufferMemLimit(uint32_t limit);
395
Mark Slee2f6404d2006-10-10 01:37:40 +0000396 // Initialize
397 void init(int socket, short eventFlags, TNonblockingServer *s);
398
399 // Transition into a new state
400 void transition();
401
402 // Handler wrapper
Mark Sleea8de4892008-02-09 00:02:26 +0000403 static void eventHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000404 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000405 ((TConnection*)v)->workSocket();
406 }
Mark Slee79b16942007-11-26 19:05:29 +0000407
Mark Sleee02385b2007-06-09 01:21:16 +0000408 // Handler wrapper for task block
Mark Sleea8de4892008-02-09 00:02:26 +0000409 static void taskHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000410 assert(fd == ((TConnection*)v)->taskHandle_);
411 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
David Reiss01e55c12008-07-13 22:18:51 +0000412 GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +0000413 }
414 ((TConnection*)v)->transition();
415 }
416
Mark Slee2f6404d2006-10-10 01:37:40 +0000417};
418
T Jake Lucianib5e62212009-01-31 22:36:20 +0000419}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000420
421#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_