blob: df3103f70fee11efb5968a9b5854ce9b62c03c36 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
12#include <transport/TTransportUtils.h>
Mark Sleee02385b2007-06-09 01:21:16 +000013#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <stack>
15#include <event.h>
16
Mark Slee79b16942007-11-26 19:05:29 +000017namespace facebook { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000018
Mark Slee5ea15f92007-03-05 22:55:59 +000019using facebook::thrift::transport::TMemoryBuffer;
20using facebook::thrift::protocol::TProtocol;
Mark Sleee02385b2007-06-09 01:21:16 +000021using facebook::thrift::concurrency::Runnable;
22using facebook::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000023
24// Forward declaration of class
25class TConnection;
26
27/**
28 * This is a non-blocking server in C++ for high performance that operates a
29 * single IO thread. It assumes that all incoming requests are framed with a
30 * 4 byte length indicator and writes out responses using the same framing.
31 *
32 * It does not use the TServerTransport framework, but rather has socket
33 * operations hardcoded for use with select.
34 *
35 * @author Mark Slee <mcslee@facebook.com>
36 */
37class TNonblockingServer : public TServer {
38 private:
39
40 // Listen backlog
41 static const int LISTEN_BACKLOG = 1024;
42
43 // Server socket file descriptor
44 int serverSocket_;
45
46 // Port server runs on
47 int port_;
48
Mark Slee92f00fb2006-10-25 01:28:17 +000049 // Whether to frame responses
50 bool frameResponses_;
51
Mark Sleee02385b2007-06-09 01:21:16 +000052 // For processing via thread pool, may be NULL
53 boost::shared_ptr<ThreadManager> threadManager_;
54
55 // Is thread pool processing?
56 bool threadPoolProcessing_;
57
Mark Slee79b16942007-11-26 19:05:29 +000058 // The event base for libevent
59 event_base* eventBase_;
60
61 // Event struct, for use with eventBase_
62 struct event serverEvent_;
63
Mark Slee2f6404d2006-10-10 01:37:40 +000064 /**
65 * This is a stack of all the objects that have been created but that
66 * are NOT currently in use. When we close a connection, we place it on this
67 * stack so that the object can be reused later, rather than freeing the
68 * memory and reallocating a new object later.
69 */
70 std::stack<TConnection*> connectionStack_;
71
72 void handleEvent(int fd, short which);
73
74 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000075 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000076 int port) :
77 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000078 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +000079 port_(port),
Mark Slee8eceaea2007-06-15 01:43:21 +000080 frameResponses_(true),
dweatherford58985992007-06-19 23:10:19 +000081 threadPoolProcessing_(false),
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000082 eventBase_(NULL) {}
Mark Sleef9373392007-01-24 19:41:57 +000083
Mark Slee79b16942007-11-26 19:05:29 +000084 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +000085 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +000086 int port,
87 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000088 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000089 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +000090 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +000091 frameResponses_(true),
Mark Slee79b16942007-11-26 19:05:29 +000092 threadManager_(threadManager),
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000093 eventBase_(NULL) {
Mark Slee5ea15f92007-03-05 22:55:59 +000094 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
95 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000096 setInputProtocolFactory(protocolFactory);
97 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +000098 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000099 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000100
Mark Slee5ea15f92007-03-05 22:55:59 +0000101 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
102 boost::shared_ptr<TTransportFactory> inputTransportFactory,
103 boost::shared_ptr<TTransportFactory> outputTransportFactory,
104 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
105 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000106 int port,
107 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000108 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000109 serverSocket_(0),
110 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +0000111 frameResponses_(true),
Mark Slee5d1784a2007-12-05 23:20:54 +0000112 threadManager_(threadManager),
113 eventBase_(NULL) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000114 setInputTransportFactory(inputTransportFactory);
115 setOutputTransportFactory(outputTransportFactory);
116 setInputProtocolFactory(inputProtocolFactory);
117 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000118 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000119 }
Mark Slee79b16942007-11-26 19:05:29 +0000120
Mark Slee2f6404d2006-10-10 01:37:40 +0000121 ~TNonblockingServer() {}
122
Mark Sleee02385b2007-06-09 01:21:16 +0000123 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
124 threadManager_ = threadManager;
125 threadPoolProcessing_ = (threadManager != NULL);
126 }
127
Mark Slee79b16942007-11-26 19:05:29 +0000128 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000129 return threadPoolProcessing_;
130 }
131
132 void addTask(boost::shared_ptr<Runnable> task) {
133 threadManager_->add(task);
134 }
135
Mark Slee92f00fb2006-10-25 01:28:17 +0000136 void setFrameResponses(bool frameResponses) {
137 frameResponses_ = frameResponses;
138 }
139
Mark Slee79b16942007-11-26 19:05:29 +0000140 bool getFrameResponses() const {
Mark Slee92f00fb2006-10-25 01:28:17 +0000141 return frameResponses_;
142 }
143
Mark Slee79b16942007-11-26 19:05:29 +0000144 event_base* getEventBase() const {
145 return eventBase_;
146 }
147
Mark Slee2f6404d2006-10-10 01:37:40 +0000148 TConnection* createConnection(int socket, short flags);
149
150 void returnConnection(TConnection* connection);
151
152 static void eventHandler(int fd, short which, void* v) {
153 ((TNonblockingServer*)v)->handleEvent(fd, which);
154 }
155
Mark Slee79b16942007-11-26 19:05:29 +0000156 void listenSocket();
157
158 void listenSocket(int fd);
159
160 void registerEvents(event_base* base);
161
Mark Slee2f6404d2006-10-10 01:37:40 +0000162 void serve();
dweatherford58985992007-06-19 23:10:19 +0000163
Mark Slee2f6404d2006-10-10 01:37:40 +0000164};
165
166/**
167 * Two states for sockets, recv and send mode
168 */
169enum TSocketState {
170 SOCKET_RECV,
171 SOCKET_SEND
172};
173
174/**
175 * Four states for the nonblocking servr:
176 * 1) initialize
177 * 2) read 4 byte frame size
178 * 3) read frame of data
179 * 4) send back data (if any)
180 */
181enum TAppState {
182 APP_INIT,
183 APP_READ_FRAME_SIZE,
184 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000185 APP_WAIT_TASK,
Mark Slee92f00fb2006-10-25 01:28:17 +0000186 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000187 APP_SEND_RESULT
188};
189
190/**
191 * Represents a connection that is handled via libevent. This connection
192 * essentially encapsulates a socket that has some associated libevent state.
193 */
194class TConnection {
195 private:
196
Mark Sleee02385b2007-06-09 01:21:16 +0000197 class Task;
198
Mark Slee2f6404d2006-10-10 01:37:40 +0000199 // Server handle
200 TNonblockingServer* server_;
201
202 // Socket handle
203 int socket_;
204
205 // Libevent object
206 struct event event_;
207
208 // Libevent flags
209 short eventFlags_;
210
211 // Socket mode
212 TSocketState socketState_;
213
214 // Application state
215 TAppState appState_;
216
217 // How much data needed to read
218 uint32_t readWant_;
219
220 // Where in the read buffer are we
221 uint32_t readBufferPos_;
222
223 // Read buffer
224 uint8_t* readBuffer_;
225
226 // Read buffer size
227 uint32_t readBufferSize_;
228
229 // Write buffer
230 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000231
Mark Slee2f6404d2006-10-10 01:37:40 +0000232 // Write buffer size
233 uint32_t writeBufferSize_;
234
235 // How far through writing are we?
236 uint32_t writeBufferPos_;
237
Mark Slee92f00fb2006-10-25 01:28:17 +0000238 // Frame size
239 int32_t frameSize_;
240
Mark Sleee02385b2007-06-09 01:21:16 +0000241 // Task handle
242 int taskHandle_;
243
244 // Task event
245 struct event taskEvent_;
246
Mark Slee2f6404d2006-10-10 01:37:40 +0000247 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000248 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000249
250 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000251 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000252
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000253 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000254 boost::shared_ptr<TTransport> factoryInputTransport_;
255 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000256
257 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000258 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000259
260 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000261 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000262
Mark Slee2f6404d2006-10-10 01:37:40 +0000263 // Go into read mode
264 void setRead() {
265 setFlags(EV_READ | EV_PERSIST);
266 }
267
268 // Go into write mode
269 void setWrite() {
270 setFlags(EV_WRITE | EV_PERSIST);
271 }
272
Mark Slee402ee282007-08-23 01:43:20 +0000273 // Set socket idle
274 void setIdle() {
275 setFlags(0);
276 }
277
Mark Slee2f6404d2006-10-10 01:37:40 +0000278 // Set event flags
279 void setFlags(short eventFlags);
280
281 // Libevent handlers
282 void workSocket();
283
284 // Close this client and reset
285 void close();
286
287 public:
288
289 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000290 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000291 readBuffer_ = (uint8_t*)malloc(1024);
292 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000293 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000294 }
295 readBufferSize_ = 1024;
Mark Slee79b16942007-11-26 19:05:29 +0000296
Mark Slee2f6404d2006-10-10 01:37:40 +0000297 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000298 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000299 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000300 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
301 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000302
Mark Slee2f6404d2006-10-10 01:37:40 +0000303 init(socket, eventFlags, s);
304 }
305
306 // Initialize
307 void init(int socket, short eventFlags, TNonblockingServer *s);
308
309 // Transition into a new state
310 void transition();
311
312 // Handler wrapper
313 static void eventHandler(int fd, short which, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000314 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000315 ((TConnection*)v)->workSocket();
316 }
Mark Slee79b16942007-11-26 19:05:29 +0000317
Mark Sleee02385b2007-06-09 01:21:16 +0000318 // Handler wrapper for task block
319 static void taskHandler(int fd, short which, void* v) {
320 assert(fd == ((TConnection*)v)->taskHandle_);
321 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
322 GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
323 }
324 ((TConnection*)v)->transition();
325 }
326
Mark Slee2f6404d2006-10-10 01:37:40 +0000327};
328
329}}} // facebook::thrift::server
330
331#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_