blob: 2cdd897df17a177a92371bb2ed7f67165d722603 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
12#include <transport/TTransportUtils.h>
Mark Sleee02385b2007-06-09 01:21:16 +000013#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <stack>
15#include <event.h>
16
Mark Slee79b16942007-11-26 19:05:29 +000017namespace facebook { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000018
Mark Slee5ea15f92007-03-05 22:55:59 +000019using facebook::thrift::transport::TMemoryBuffer;
20using facebook::thrift::protocol::TProtocol;
Mark Sleee02385b2007-06-09 01:21:16 +000021using facebook::thrift::concurrency::Runnable;
22using facebook::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000023
24// Forward declaration of class
25class TConnection;
26
27/**
28 * This is a non-blocking server in C++ for high performance that operates a
29 * single IO thread. It assumes that all incoming requests are framed with a
30 * 4 byte length indicator and writes out responses using the same framing.
31 *
32 * It does not use the TServerTransport framework, but rather has socket
33 * operations hardcoded for use with select.
34 *
35 * @author Mark Slee <mcslee@facebook.com>
36 */
37class TNonblockingServer : public TServer {
38 private:
39
40 // Listen backlog
41 static const int LISTEN_BACKLOG = 1024;
42
43 // Server socket file descriptor
44 int serverSocket_;
45
46 // Port server runs on
47 int port_;
48
Mark Slee92f00fb2006-10-25 01:28:17 +000049 // Whether to frame responses
50 bool frameResponses_;
51
Mark Sleee02385b2007-06-09 01:21:16 +000052 // For processing via thread pool, may be NULL
53 boost::shared_ptr<ThreadManager> threadManager_;
54
55 // Is thread pool processing?
56 bool threadPoolProcessing_;
57
Mark Slee79b16942007-11-26 19:05:29 +000058 // The event base for libevent
59 event_base* eventBase_;
60
61 // Event struct, for use with eventBase_
62 struct event serverEvent_;
63
Mark Slee2f6404d2006-10-10 01:37:40 +000064 /**
65 * This is a stack of all the objects that have been created but that
66 * are NOT currently in use. When we close a connection, we place it on this
67 * stack so that the object can be reused later, rather than freeing the
68 * memory and reallocating a new object later.
69 */
70 std::stack<TConnection*> connectionStack_;
71
dweatherford58985992007-06-19 23:10:19 +000072 // Pointer to optional function called after opening the listen socket and
Mark Slee79b16942007-11-26 19:05:29 +000073 // before running the event loop, along with its argument data.
dweatherford58985992007-06-19 23:10:19 +000074 void (*preServeCallback_)(void*);
75 void* preServeCallbackArg_;
76
Mark Slee2f6404d2006-10-10 01:37:40 +000077 void handleEvent(int fd, short which);
78
79 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000080 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000081 int port) :
82 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000083 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +000084 port_(port),
Mark Slee8eceaea2007-06-15 01:43:21 +000085 frameResponses_(true),
dweatherford58985992007-06-19 23:10:19 +000086 threadPoolProcessing_(false),
Mark Slee79b16942007-11-26 19:05:29 +000087 eventBase_(NULL),
dweatherford58985992007-06-19 23:10:19 +000088 preServeCallback_(NULL),
89 preServeCallbackArg_(NULL) {}
Mark Sleef9373392007-01-24 19:41:57 +000090
Mark Slee79b16942007-11-26 19:05:29 +000091 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +000092 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +000093 int port,
94 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000095 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000096 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +000097 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +000098 frameResponses_(true),
Mark Slee79b16942007-11-26 19:05:29 +000099 threadManager_(threadManager),
100 eventBase_(NULL),
dweatherford58985992007-06-19 23:10:19 +0000101 preServeCallback_(NULL),
102 preServeCallbackArg_(NULL) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000103 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
104 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000105 setInputProtocolFactory(protocolFactory);
106 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000107 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000108 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000109
Mark Slee5ea15f92007-03-05 22:55:59 +0000110 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
111 boost::shared_ptr<TTransportFactory> inputTransportFactory,
112 boost::shared_ptr<TTransportFactory> outputTransportFactory,
113 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
114 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000115 int port,
116 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000117 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000118 serverSocket_(0),
119 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +0000120 frameResponses_(true),
dweatherford58985992007-06-19 23:10:19 +0000121 threadManager_(threadManager),
122 preServeCallback_(NULL),
123 preServeCallbackArg_(NULL) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000124 setInputTransportFactory(inputTransportFactory);
125 setOutputTransportFactory(outputTransportFactory);
126 setInputProtocolFactory(inputProtocolFactory);
127 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000128 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000129 }
Mark Slee79b16942007-11-26 19:05:29 +0000130
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 ~TNonblockingServer() {}
132
Mark Sleee02385b2007-06-09 01:21:16 +0000133 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
134 threadManager_ = threadManager;
135 threadPoolProcessing_ = (threadManager != NULL);
136 }
137
Mark Slee79b16942007-11-26 19:05:29 +0000138 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000139 return threadPoolProcessing_;
140 }
141
142 void addTask(boost::shared_ptr<Runnable> task) {
143 threadManager_->add(task);
144 }
145
Mark Slee92f00fb2006-10-25 01:28:17 +0000146 void setFrameResponses(bool frameResponses) {
147 frameResponses_ = frameResponses;
148 }
149
Mark Slee79b16942007-11-26 19:05:29 +0000150 bool getFrameResponses() const {
Mark Slee92f00fb2006-10-25 01:28:17 +0000151 return frameResponses_;
152 }
153
Mark Slee79b16942007-11-26 19:05:29 +0000154 event_base* getEventBase() const {
155 return eventBase_;
156 }
157
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 TConnection* createConnection(int socket, short flags);
159
160 void returnConnection(TConnection* connection);
161
162 static void eventHandler(int fd, short which, void* v) {
163 ((TNonblockingServer*)v)->handleEvent(fd, which);
164 }
165
Mark Slee79b16942007-11-26 19:05:29 +0000166 void listenSocket();
167
168 void listenSocket(int fd);
169
170 void registerEvents(event_base* base);
171
Mark Slee2f6404d2006-10-10 01:37:40 +0000172 void serve();
dweatherford58985992007-06-19 23:10:19 +0000173
174 void setPreServeCallback(void(*fn_ptr)(void*), void* arg = NULL) {
175 preServeCallback_ = fn_ptr;
176 preServeCallbackArg_ = arg;
177 }
178
Mark Slee2f6404d2006-10-10 01:37:40 +0000179};
180
181/**
182 * Two states for sockets, recv and send mode
183 */
184enum TSocketState {
185 SOCKET_RECV,
186 SOCKET_SEND
187};
188
189/**
190 * Four states for the nonblocking servr:
191 * 1) initialize
192 * 2) read 4 byte frame size
193 * 3) read frame of data
194 * 4) send back data (if any)
195 */
196enum TAppState {
197 APP_INIT,
198 APP_READ_FRAME_SIZE,
199 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000200 APP_WAIT_TASK,
Mark Slee92f00fb2006-10-25 01:28:17 +0000201 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 APP_SEND_RESULT
203};
204
205/**
206 * Represents a connection that is handled via libevent. This connection
207 * essentially encapsulates a socket that has some associated libevent state.
208 */
209class TConnection {
210 private:
211
Mark Sleee02385b2007-06-09 01:21:16 +0000212 class Task;
213
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 // Server handle
215 TNonblockingServer* server_;
216
217 // Socket handle
218 int socket_;
219
220 // Libevent object
221 struct event event_;
222
223 // Libevent flags
224 short eventFlags_;
225
226 // Socket mode
227 TSocketState socketState_;
228
229 // Application state
230 TAppState appState_;
231
232 // How much data needed to read
233 uint32_t readWant_;
234
235 // Where in the read buffer are we
236 uint32_t readBufferPos_;
237
238 // Read buffer
239 uint8_t* readBuffer_;
240
241 // Read buffer size
242 uint32_t readBufferSize_;
243
244 // Write buffer
245 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000246
Mark Slee2f6404d2006-10-10 01:37:40 +0000247 // Write buffer size
248 uint32_t writeBufferSize_;
249
250 // How far through writing are we?
251 uint32_t writeBufferPos_;
252
Mark Slee92f00fb2006-10-25 01:28:17 +0000253 // Frame size
254 int32_t frameSize_;
255
Mark Sleee02385b2007-06-09 01:21:16 +0000256 // Task handle
257 int taskHandle_;
258
259 // Task event
260 struct event taskEvent_;
261
Mark Slee2f6404d2006-10-10 01:37:40 +0000262 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000263 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000264
265 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000266 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000267
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000268 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000269 boost::shared_ptr<TTransport> factoryInputTransport_;
270 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000271
272 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000273 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000274
275 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000276 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000277
Mark Slee2f6404d2006-10-10 01:37:40 +0000278 // Go into read mode
279 void setRead() {
280 setFlags(EV_READ | EV_PERSIST);
281 }
282
283 // Go into write mode
284 void setWrite() {
285 setFlags(EV_WRITE | EV_PERSIST);
286 }
287
Mark Slee402ee282007-08-23 01:43:20 +0000288 // Set socket idle
289 void setIdle() {
290 setFlags(0);
291 }
292
Mark Slee2f6404d2006-10-10 01:37:40 +0000293 // Set event flags
294 void setFlags(short eventFlags);
295
296 // Libevent handlers
297 void workSocket();
298
299 // Close this client and reset
300 void close();
301
302 public:
303
304 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000305 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000306 readBuffer_ = (uint8_t*)malloc(1024);
307 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000308 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000309 }
310 readBufferSize_ = 1024;
Mark Slee79b16942007-11-26 19:05:29 +0000311
Mark Slee2f6404d2006-10-10 01:37:40 +0000312 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000313 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000314 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000315 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
316 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000317
Mark Slee2f6404d2006-10-10 01:37:40 +0000318 init(socket, eventFlags, s);
319 }
320
321 // Initialize
322 void init(int socket, short eventFlags, TNonblockingServer *s);
323
324 // Transition into a new state
325 void transition();
326
327 // Handler wrapper
328 static void eventHandler(int fd, short which, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000329 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000330 ((TConnection*)v)->workSocket();
331 }
Mark Slee79b16942007-11-26 19:05:29 +0000332
Mark Sleee02385b2007-06-09 01:21:16 +0000333 // Handler wrapper for task block
334 static void taskHandler(int fd, short which, void* v) {
335 assert(fd == ((TConnection*)v)->taskHandle_);
336 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
337 GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
338 }
339 ((TConnection*)v)->transition();
340 }
341
Mark Slee2f6404d2006-10-10 01:37:40 +0000342};
343
344}}} // facebook::thrift::server
345
346#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_