blob: 3de5b948999c48ee7ff6a849740d7682fd53e87e [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000012#include <transport/TBufferTransports.h>
Mark Sleee02385b2007-06-09 01:21:16 +000013#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000015#include <string>
16#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000017#include <cstdlib>
Mark Slee2f6404d2006-10-10 01:37:40 +000018#include <event.h>
19
Mark Slee79b16942007-11-26 19:05:29 +000020namespace facebook { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000021
Mark Slee5ea15f92007-03-05 22:55:59 +000022using facebook::thrift::transport::TMemoryBuffer;
23using facebook::thrift::protocol::TProtocol;
Mark Sleee02385b2007-06-09 01:21:16 +000024using facebook::thrift::concurrency::Runnable;
25using facebook::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000026
27// Forward declaration of class
28class TConnection;
29
30/**
31 * This is a non-blocking server in C++ for high performance that operates a
32 * single IO thread. It assumes that all incoming requests are framed with a
33 * 4 byte length indicator and writes out responses using the same framing.
34 *
35 * It does not use the TServerTransport framework, but rather has socket
36 * operations hardcoded for use with select.
37 *
38 * @author Mark Slee <mcslee@facebook.com>
39 */
40class TNonblockingServer : public TServer {
41 private:
42
43 // Listen backlog
44 static const int LISTEN_BACKLOG = 1024;
45
46 // Server socket file descriptor
47 int serverSocket_;
48
49 // Port server runs on
50 int port_;
51
Mark Slee92f00fb2006-10-25 01:28:17 +000052 // Whether to frame responses
53 bool frameResponses_;
54
Mark Sleee02385b2007-06-09 01:21:16 +000055 // For processing via thread pool, may be NULL
56 boost::shared_ptr<ThreadManager> threadManager_;
57
58 // Is thread pool processing?
59 bool threadPoolProcessing_;
60
Mark Slee79b16942007-11-26 19:05:29 +000061 // The event base for libevent
62 event_base* eventBase_;
63
64 // Event struct, for use with eventBase_
65 struct event serverEvent_;
66
David Reiss1997f102008-04-29 00:29:41 +000067 // Number of TConnection object we've created
68 size_t numTConnections_;
69
Mark Slee2f6404d2006-10-10 01:37:40 +000070 /**
71 * This is a stack of all the objects that have been created but that
72 * are NOT currently in use. When we close a connection, we place it on this
73 * stack so that the object can be reused later, rather than freeing the
74 * memory and reallocating a new object later.
75 */
76 std::stack<TConnection*> connectionStack_;
77
78 void handleEvent(int fd, short which);
79
80 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000081 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000082 int port) :
83 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000084 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +000085 port_(port),
Mark Slee8eceaea2007-06-15 01:43:21 +000086 frameResponses_(true),
dweatherford58985992007-06-19 23:10:19 +000087 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +000088 eventBase_(NULL),
89 numTConnections_(0) {}
Mark Sleef9373392007-01-24 19:41:57 +000090
Mark Slee79b16942007-11-26 19:05:29 +000091 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +000092 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +000093 int port,
94 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000095 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000096 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +000097 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +000098 frameResponses_(true),
Mark Slee79b16942007-11-26 19:05:29 +000099 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000100 eventBase_(NULL),
101 numTConnections_(0) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000102 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
103 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000104 setInputProtocolFactory(protocolFactory);
105 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000106 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000107 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000108
Mark Slee5ea15f92007-03-05 22:55:59 +0000109 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
110 boost::shared_ptr<TTransportFactory> inputTransportFactory,
111 boost::shared_ptr<TTransportFactory> outputTransportFactory,
112 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
113 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000114 int port,
115 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000116 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000117 serverSocket_(0),
118 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +0000119 frameResponses_(true),
Mark Slee5d1784a2007-12-05 23:20:54 +0000120 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000121 eventBase_(NULL),
122 numTConnections_(0) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000123 setInputTransportFactory(inputTransportFactory);
124 setOutputTransportFactory(outputTransportFactory);
125 setInputProtocolFactory(inputProtocolFactory);
126 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000127 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000128 }
Mark Slee79b16942007-11-26 19:05:29 +0000129
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 ~TNonblockingServer() {}
131
Mark Sleee02385b2007-06-09 01:21:16 +0000132 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
133 threadManager_ = threadManager;
134 threadPoolProcessing_ = (threadManager != NULL);
135 }
136
David Reiss1997f102008-04-29 00:29:41 +0000137 boost::shared_ptr<ThreadManager> getThreadManager() {
138 return threadManager_;
139 }
140
Mark Slee79b16942007-11-26 19:05:29 +0000141 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000142 return threadPoolProcessing_;
143 }
144
145 void addTask(boost::shared_ptr<Runnable> task) {
146 threadManager_->add(task);
147 }
148
Mark Slee92f00fb2006-10-25 01:28:17 +0000149 void setFrameResponses(bool frameResponses) {
150 frameResponses_ = frameResponses;
151 }
152
Mark Slee79b16942007-11-26 19:05:29 +0000153 bool getFrameResponses() const {
Mark Slee92f00fb2006-10-25 01:28:17 +0000154 return frameResponses_;
155 }
156
Mark Slee79b16942007-11-26 19:05:29 +0000157 event_base* getEventBase() const {
158 return eventBase_;
159 }
160
David Reissc17fe6b2008-04-29 00:29:43 +0000161 void incrementNumConnections() {
162 ++numTConnections_;
163 }
164
165 void decrementNumConnections() {
166 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000167 }
168
169 size_t getNumConnections() {
170 return numTConnections_;
171 }
172
173 size_t getNumIdleConnections() {
174 return connectionStack_.size();
175 }
176
Mark Slee2f6404d2006-10-10 01:37:40 +0000177 TConnection* createConnection(int socket, short flags);
178
179 void returnConnection(TConnection* connection);
180
181 static void eventHandler(int fd, short which, void* v) {
182 ((TNonblockingServer*)v)->handleEvent(fd, which);
183 }
184
Mark Slee79b16942007-11-26 19:05:29 +0000185 void listenSocket();
186
187 void listenSocket(int fd);
188
189 void registerEvents(event_base* base);
190
Mark Slee2f6404d2006-10-10 01:37:40 +0000191 void serve();
192};
193
194/**
195 * Two states for sockets, recv and send mode
196 */
197enum TSocketState {
198 SOCKET_RECV,
199 SOCKET_SEND
200};
201
202/**
203 * Four states for the nonblocking servr:
204 * 1) initialize
205 * 2) read 4 byte frame size
206 * 3) read frame of data
207 * 4) send back data (if any)
208 */
209enum TAppState {
210 APP_INIT,
211 APP_READ_FRAME_SIZE,
212 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000213 APP_WAIT_TASK,
Mark Slee92f00fb2006-10-25 01:28:17 +0000214 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000215 APP_SEND_RESULT
216};
217
218/**
219 * Represents a connection that is handled via libevent. This connection
220 * essentially encapsulates a socket that has some associated libevent state.
221 */
222class TConnection {
223 private:
224
Mark Sleee02385b2007-06-09 01:21:16 +0000225 class Task;
226
Mark Slee2f6404d2006-10-10 01:37:40 +0000227 // Server handle
228 TNonblockingServer* server_;
229
230 // Socket handle
231 int socket_;
232
233 // Libevent object
234 struct event event_;
235
236 // Libevent flags
237 short eventFlags_;
238
239 // Socket mode
240 TSocketState socketState_;
241
242 // Application state
243 TAppState appState_;
244
245 // How much data needed to read
246 uint32_t readWant_;
247
248 // Where in the read buffer are we
249 uint32_t readBufferPos_;
250
251 // Read buffer
252 uint8_t* readBuffer_;
253
254 // Read buffer size
255 uint32_t readBufferSize_;
256
257 // Write buffer
258 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000259
Mark Slee2f6404d2006-10-10 01:37:40 +0000260 // Write buffer size
261 uint32_t writeBufferSize_;
262
263 // How far through writing are we?
264 uint32_t writeBufferPos_;
265
Mark Slee92f00fb2006-10-25 01:28:17 +0000266 // Frame size
267 int32_t frameSize_;
268
Mark Sleee02385b2007-06-09 01:21:16 +0000269 // Task handle
270 int taskHandle_;
271
272 // Task event
273 struct event taskEvent_;
274
Mark Slee2f6404d2006-10-10 01:37:40 +0000275 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000276 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000277
278 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000279 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000280
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000281 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000282 boost::shared_ptr<TTransport> factoryInputTransport_;
283 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000284
285 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000286 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000287
288 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000289 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000290
Mark Slee2f6404d2006-10-10 01:37:40 +0000291 // Go into read mode
292 void setRead() {
293 setFlags(EV_READ | EV_PERSIST);
294 }
295
296 // Go into write mode
297 void setWrite() {
298 setFlags(EV_WRITE | EV_PERSIST);
299 }
300
Mark Slee402ee282007-08-23 01:43:20 +0000301 // Set socket idle
302 void setIdle() {
303 setFlags(0);
304 }
305
Mark Slee2f6404d2006-10-10 01:37:40 +0000306 // Set event flags
307 void setFlags(short eventFlags);
308
309 // Libevent handlers
310 void workSocket();
311
312 // Close this client and reset
313 void close();
314
315 public:
316
317 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000318 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
David Reissd7a16f42008-02-19 22:47:29 +0000319 readBuffer_ = (uint8_t*)std::malloc(1024);
Mark Slee2f6404d2006-10-10 01:37:40 +0000320 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000321 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000322 }
323 readBufferSize_ = 1024;
Mark Slee79b16942007-11-26 19:05:29 +0000324
Mark Slee2f6404d2006-10-10 01:37:40 +0000325 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000326 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000327 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000328 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
329 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000330
Mark Slee2f6404d2006-10-10 01:37:40 +0000331 init(socket, eventFlags, s);
David Reiss1997f102008-04-29 00:29:41 +0000332 server_->incrementNumConnections();
333 }
334
335 ~TConnection() {
David Reissc17fe6b2008-04-29 00:29:43 +0000336 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000337 }
338
339 // Initialize
340 void init(int socket, short eventFlags, TNonblockingServer *s);
341
342 // Transition into a new state
343 void transition();
344
345 // Handler wrapper
Mark Sleea8de4892008-02-09 00:02:26 +0000346 static void eventHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000347 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000348 ((TConnection*)v)->workSocket();
349 }
Mark Slee79b16942007-11-26 19:05:29 +0000350
Mark Sleee02385b2007-06-09 01:21:16 +0000351 // Handler wrapper for task block
Mark Sleea8de4892008-02-09 00:02:26 +0000352 static void taskHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000353 assert(fd == ((TConnection*)v)->taskHandle_);
354 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
David Reiss9b209552008-04-08 06:26:05 +0000355 std::string errStr = "TConnection::taskHandler close handle failed, resource leak " + TOutput::strerror_s(errno);
356 GlobalOutput(errStr.c_str());
Mark Sleee02385b2007-06-09 01:21:16 +0000357 }
358 ((TConnection*)v)->transition();
359 }
360
Mark Slee2f6404d2006-10-10 01:37:40 +0000361};
362
363}}} // facebook::thrift::server
364
365#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_