blob: 89c69a00189fd0915834bcec0045e426c98c2a7b [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
12#include <transport/TTransportUtils.h>
Mark Sleee02385b2007-06-09 01:21:16 +000013#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <stack>
David Reissd7a16f42008-02-19 22:47:29 +000015#include <cstdlib>
Mark Slee2f6404d2006-10-10 01:37:40 +000016#include <event.h>
17
Mark Slee79b16942007-11-26 19:05:29 +000018namespace facebook { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000019
Mark Slee5ea15f92007-03-05 22:55:59 +000020using facebook::thrift::transport::TMemoryBuffer;
21using facebook::thrift::protocol::TProtocol;
Mark Sleee02385b2007-06-09 01:21:16 +000022using facebook::thrift::concurrency::Runnable;
23using facebook::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000024
25// Forward declaration of class
26class TConnection;
27
28/**
29 * This is a non-blocking server in C++ for high performance that operates a
30 * single IO thread. It assumes that all incoming requests are framed with a
31 * 4 byte length indicator and writes out responses using the same framing.
32 *
33 * It does not use the TServerTransport framework, but rather has socket
34 * operations hardcoded for use with select.
35 *
36 * @author Mark Slee <mcslee@facebook.com>
37 */
38class TNonblockingServer : public TServer {
39 private:
40
41 // Listen backlog
42 static const int LISTEN_BACKLOG = 1024;
43
44 // Server socket file descriptor
45 int serverSocket_;
46
47 // Port server runs on
48 int port_;
49
Mark Slee92f00fb2006-10-25 01:28:17 +000050 // Whether to frame responses
51 bool frameResponses_;
52
Mark Sleee02385b2007-06-09 01:21:16 +000053 // For processing via thread pool, may be NULL
54 boost::shared_ptr<ThreadManager> threadManager_;
55
56 // Is thread pool processing?
57 bool threadPoolProcessing_;
58
Mark Slee79b16942007-11-26 19:05:29 +000059 // The event base for libevent
60 event_base* eventBase_;
61
62 // Event struct, for use with eventBase_
63 struct event serverEvent_;
64
Mark Slee2f6404d2006-10-10 01:37:40 +000065 /**
66 * This is a stack of all the objects that have been created but that
67 * are NOT currently in use. When we close a connection, we place it on this
68 * stack so that the object can be reused later, rather than freeing the
69 * memory and reallocating a new object later.
70 */
71 std::stack<TConnection*> connectionStack_;
72
73 void handleEvent(int fd, short which);
74
75 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000076 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000077 int port) :
78 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000079 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +000080 port_(port),
Mark Slee8eceaea2007-06-15 01:43:21 +000081 frameResponses_(true),
dweatherford58985992007-06-19 23:10:19 +000082 threadPoolProcessing_(false),
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000083 eventBase_(NULL) {}
Mark Sleef9373392007-01-24 19:41:57 +000084
Mark Slee79b16942007-11-26 19:05:29 +000085 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +000086 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +000087 int port,
88 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000089 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000090 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +000091 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +000092 frameResponses_(true),
Mark Slee79b16942007-11-26 19:05:29 +000093 threadManager_(threadManager),
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000094 eventBase_(NULL) {
Mark Slee5ea15f92007-03-05 22:55:59 +000095 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
96 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000097 setInputProtocolFactory(protocolFactory);
98 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +000099 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000100 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000101
Mark Slee5ea15f92007-03-05 22:55:59 +0000102 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
103 boost::shared_ptr<TTransportFactory> inputTransportFactory,
104 boost::shared_ptr<TTransportFactory> outputTransportFactory,
105 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
106 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000107 int port,
108 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000109 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000110 serverSocket_(0),
111 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +0000112 frameResponses_(true),
Mark Slee5d1784a2007-12-05 23:20:54 +0000113 threadManager_(threadManager),
114 eventBase_(NULL) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000115 setInputTransportFactory(inputTransportFactory);
116 setOutputTransportFactory(outputTransportFactory);
117 setInputProtocolFactory(inputProtocolFactory);
118 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000119 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000120 }
Mark Slee79b16942007-11-26 19:05:29 +0000121
Mark Slee2f6404d2006-10-10 01:37:40 +0000122 ~TNonblockingServer() {}
123
Mark Sleee02385b2007-06-09 01:21:16 +0000124 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
125 threadManager_ = threadManager;
126 threadPoolProcessing_ = (threadManager != NULL);
127 }
128
Mark Slee79b16942007-11-26 19:05:29 +0000129 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000130 return threadPoolProcessing_;
131 }
132
133 void addTask(boost::shared_ptr<Runnable> task) {
134 threadManager_->add(task);
135 }
136
Mark Slee92f00fb2006-10-25 01:28:17 +0000137 void setFrameResponses(bool frameResponses) {
138 frameResponses_ = frameResponses;
139 }
140
Mark Slee79b16942007-11-26 19:05:29 +0000141 bool getFrameResponses() const {
Mark Slee92f00fb2006-10-25 01:28:17 +0000142 return frameResponses_;
143 }
144
Mark Slee79b16942007-11-26 19:05:29 +0000145 event_base* getEventBase() const {
146 return eventBase_;
147 }
148
Mark Slee2f6404d2006-10-10 01:37:40 +0000149 TConnection* createConnection(int socket, short flags);
150
151 void returnConnection(TConnection* connection);
152
153 static void eventHandler(int fd, short which, void* v) {
154 ((TNonblockingServer*)v)->handleEvent(fd, which);
155 }
156
Mark Slee79b16942007-11-26 19:05:29 +0000157 void listenSocket();
158
159 void listenSocket(int fd);
160
161 void registerEvents(event_base* base);
162
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 void serve();
dweatherford58985992007-06-19 23:10:19 +0000164
Mark Slee2f6404d2006-10-10 01:37:40 +0000165};
166
167/**
168 * Two states for sockets, recv and send mode
169 */
170enum TSocketState {
171 SOCKET_RECV,
172 SOCKET_SEND
173};
174
175/**
176 * Four states for the nonblocking servr:
177 * 1) initialize
178 * 2) read 4 byte frame size
179 * 3) read frame of data
180 * 4) send back data (if any)
181 */
182enum TAppState {
183 APP_INIT,
184 APP_READ_FRAME_SIZE,
185 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000186 APP_WAIT_TASK,
Mark Slee92f00fb2006-10-25 01:28:17 +0000187 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000188 APP_SEND_RESULT
189};
190
191/**
192 * Represents a connection that is handled via libevent. This connection
193 * essentially encapsulates a socket that has some associated libevent state.
194 */
195class TConnection {
196 private:
197
Mark Sleee02385b2007-06-09 01:21:16 +0000198 class Task;
199
Mark Slee2f6404d2006-10-10 01:37:40 +0000200 // Server handle
201 TNonblockingServer* server_;
202
203 // Socket handle
204 int socket_;
205
206 // Libevent object
207 struct event event_;
208
209 // Libevent flags
210 short eventFlags_;
211
212 // Socket mode
213 TSocketState socketState_;
214
215 // Application state
216 TAppState appState_;
217
218 // How much data needed to read
219 uint32_t readWant_;
220
221 // Where in the read buffer are we
222 uint32_t readBufferPos_;
223
224 // Read buffer
225 uint8_t* readBuffer_;
226
227 // Read buffer size
228 uint32_t readBufferSize_;
229
230 // Write buffer
231 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000232
Mark Slee2f6404d2006-10-10 01:37:40 +0000233 // Write buffer size
234 uint32_t writeBufferSize_;
235
236 // How far through writing are we?
237 uint32_t writeBufferPos_;
238
Mark Slee92f00fb2006-10-25 01:28:17 +0000239 // Frame size
240 int32_t frameSize_;
241
Mark Sleee02385b2007-06-09 01:21:16 +0000242 // Task handle
243 int taskHandle_;
244
245 // Task event
246 struct event taskEvent_;
247
Mark Slee2f6404d2006-10-10 01:37:40 +0000248 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000249 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000250
251 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000252 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000253
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000254 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000255 boost::shared_ptr<TTransport> factoryInputTransport_;
256 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000257
258 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000259 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000260
261 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000262 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000263
Mark Slee2f6404d2006-10-10 01:37:40 +0000264 // Go into read mode
265 void setRead() {
266 setFlags(EV_READ | EV_PERSIST);
267 }
268
269 // Go into write mode
270 void setWrite() {
271 setFlags(EV_WRITE | EV_PERSIST);
272 }
273
Mark Slee402ee282007-08-23 01:43:20 +0000274 // Set socket idle
275 void setIdle() {
276 setFlags(0);
277 }
278
Mark Slee2f6404d2006-10-10 01:37:40 +0000279 // Set event flags
280 void setFlags(short eventFlags);
281
282 // Libevent handlers
283 void workSocket();
284
285 // Close this client and reset
286 void close();
287
288 public:
289
290 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000291 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
David Reissd7a16f42008-02-19 22:47:29 +0000292 readBuffer_ = (uint8_t*)std::malloc(1024);
Mark Slee2f6404d2006-10-10 01:37:40 +0000293 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000294 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000295 }
296 readBufferSize_ = 1024;
Mark Slee79b16942007-11-26 19:05:29 +0000297
Mark Slee2f6404d2006-10-10 01:37:40 +0000298 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000299 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000300 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000301 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
302 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000303
Mark Slee2f6404d2006-10-10 01:37:40 +0000304 init(socket, eventFlags, s);
305 }
306
307 // Initialize
308 void init(int socket, short eventFlags, TNonblockingServer *s);
309
310 // Transition into a new state
311 void transition();
312
313 // Handler wrapper
Mark Sleea8de4892008-02-09 00:02:26 +0000314 static void eventHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000315 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000316 ((TConnection*)v)->workSocket();
317 }
Mark Slee79b16942007-11-26 19:05:29 +0000318
Mark Sleee02385b2007-06-09 01:21:16 +0000319 // Handler wrapper for task block
Mark Sleea8de4892008-02-09 00:02:26 +0000320 static void taskHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000321 assert(fd == ((TConnection*)v)->taskHandle_);
322 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
323 GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
324 }
325 ((TConnection*)v)->transition();
326 }
327
Mark Slee2f6404d2006-10-10 01:37:40 +0000328};
329
330}}} // facebook::thrift::server
331
332#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_