blob: 6997c45de1c5e71c31eaf26515010e67b399f84c [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
12#include <transport/TTransportUtils.h>
Mark Sleee02385b2007-06-09 01:21:16 +000013#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <stack>
15#include <event.h>
16
Mark Slee2f6404d2006-10-10 01:37:40 +000017namespace facebook { namespace thrift { namespace server {
18
Mark Slee5ea15f92007-03-05 22:55:59 +000019using facebook::thrift::transport::TMemoryBuffer;
20using facebook::thrift::protocol::TProtocol;
Mark Sleee02385b2007-06-09 01:21:16 +000021using facebook::thrift::concurrency::Runnable;
22using facebook::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000023
24// Forward declaration of class
25class TConnection;
26
27/**
28 * This is a non-blocking server in C++ for high performance that operates a
29 * single IO thread. It assumes that all incoming requests are framed with a
30 * 4 byte length indicator and writes out responses using the same framing.
31 *
32 * It does not use the TServerTransport framework, but rather has socket
33 * operations hardcoded for use with select.
34 *
35 * @author Mark Slee <mcslee@facebook.com>
36 */
37class TNonblockingServer : public TServer {
38 private:
39
40 // Listen backlog
41 static const int LISTEN_BACKLOG = 1024;
42
43 // Server socket file descriptor
44 int serverSocket_;
45
46 // Port server runs on
47 int port_;
48
Mark Slee92f00fb2006-10-25 01:28:17 +000049 // Whether to frame responses
50 bool frameResponses_;
51
Mark Sleee02385b2007-06-09 01:21:16 +000052 // For processing via thread pool, may be NULL
53 boost::shared_ptr<ThreadManager> threadManager_;
54
55 // Is thread pool processing?
56 bool threadPoolProcessing_;
57
Mark Slee2f6404d2006-10-10 01:37:40 +000058 /**
59 * This is a stack of all the objects that have been created but that
60 * are NOT currently in use. When we close a connection, we place it on this
61 * stack so that the object can be reused later, rather than freeing the
62 * memory and reallocating a new object later.
63 */
64 std::stack<TConnection*> connectionStack_;
65
dweatherford58985992007-06-19 23:10:19 +000066 // Pointer to optional function called after opening the listen socket and
67 // before running the event loop, along with its argument data
68 void (*preServeCallback_)(void*);
69 void* preServeCallbackArg_;
70
Mark Slee2f6404d2006-10-10 01:37:40 +000071 void handleEvent(int fd, short which);
72
73 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000074 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000075 int port) :
76 TServer(processor),
77 serverSocket_(0),
78 port_(port),
Mark Slee8eceaea2007-06-15 01:43:21 +000079 frameResponses_(true),
dweatherford58985992007-06-19 23:10:19 +000080 threadPoolProcessing_(false),
81 preServeCallback_(NULL),
82 preServeCallbackArg_(NULL) {}
Mark Sleef9373392007-01-24 19:41:57 +000083
Mark Slee5ea15f92007-03-05 22:55:59 +000084 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
85 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +000086 int port,
87 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000088 TServer(processor),
Mark Slee92f00fb2006-10-25 01:28:17 +000089 serverSocket_(0),
90 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +000091 frameResponses_(true),
dweatherford58985992007-06-19 23:10:19 +000092 threadManager_(threadManager),
93 preServeCallback_(NULL),
94 preServeCallbackArg_(NULL) {
Mark Slee5ea15f92007-03-05 22:55:59 +000095 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
96 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000097 setInputProtocolFactory(protocolFactory);
98 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +000099 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000100 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000101
Mark Slee5ea15f92007-03-05 22:55:59 +0000102 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
103 boost::shared_ptr<TTransportFactory> inputTransportFactory,
104 boost::shared_ptr<TTransportFactory> outputTransportFactory,
105 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
106 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000107 int port,
108 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000109 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000110 serverSocket_(0),
111 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +0000112 frameResponses_(true),
dweatherford58985992007-06-19 23:10:19 +0000113 threadManager_(threadManager),
114 preServeCallback_(NULL),
115 preServeCallbackArg_(NULL) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000116 setInputTransportFactory(inputTransportFactory);
117 setOutputTransportFactory(outputTransportFactory);
118 setInputProtocolFactory(inputProtocolFactory);
119 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000120 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000121 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000122
Mark Slee2f6404d2006-10-10 01:37:40 +0000123 ~TNonblockingServer() {}
124
Mark Sleee02385b2007-06-09 01:21:16 +0000125 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
126 threadManager_ = threadManager;
127 threadPoolProcessing_ = (threadManager != NULL);
128 }
129
130 bool isThreadPoolProcessing() {
131 return threadPoolProcessing_;
132 }
133
134 void addTask(boost::shared_ptr<Runnable> task) {
135 threadManager_->add(task);
136 }
137
Mark Slee92f00fb2006-10-25 01:28:17 +0000138 void setFrameResponses(bool frameResponses) {
139 frameResponses_ = frameResponses;
140 }
141
142 bool getFrameResponses() {
143 return frameResponses_;
144 }
145
Mark Slee2f6404d2006-10-10 01:37:40 +0000146 TConnection* createConnection(int socket, short flags);
147
148 void returnConnection(TConnection* connection);
149
150 static void eventHandler(int fd, short which, void* v) {
151 ((TNonblockingServer*)v)->handleEvent(fd, which);
152 }
153
154 void serve();
dweatherford58985992007-06-19 23:10:19 +0000155
156 void setPreServeCallback(void(*fn_ptr)(void*), void* arg = NULL) {
157 preServeCallback_ = fn_ptr;
158 preServeCallbackArg_ = arg;
159 }
160
Mark Slee2f6404d2006-10-10 01:37:40 +0000161};
162
163/**
164 * Two states for sockets, recv and send mode
165 */
166enum TSocketState {
167 SOCKET_RECV,
168 SOCKET_SEND
169};
170
171/**
172 * Four states for the nonblocking servr:
173 * 1) initialize
174 * 2) read 4 byte frame size
175 * 3) read frame of data
176 * 4) send back data (if any)
177 */
178enum TAppState {
179 APP_INIT,
180 APP_READ_FRAME_SIZE,
181 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000182 APP_WAIT_TASK,
Mark Slee92f00fb2006-10-25 01:28:17 +0000183 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000184 APP_SEND_RESULT
185};
186
187/**
188 * Represents a connection that is handled via libevent. This connection
189 * essentially encapsulates a socket that has some associated libevent state.
190 */
191class TConnection {
192 private:
193
Mark Sleee02385b2007-06-09 01:21:16 +0000194 class Task;
195
Mark Slee2f6404d2006-10-10 01:37:40 +0000196 // Server handle
197 TNonblockingServer* server_;
198
199 // Socket handle
200 int socket_;
201
202 // Libevent object
203 struct event event_;
204
205 // Libevent flags
206 short eventFlags_;
207
208 // Socket mode
209 TSocketState socketState_;
210
211 // Application state
212 TAppState appState_;
213
214 // How much data needed to read
215 uint32_t readWant_;
216
217 // Where in the read buffer are we
218 uint32_t readBufferPos_;
219
220 // Read buffer
221 uint8_t* readBuffer_;
222
223 // Read buffer size
224 uint32_t readBufferSize_;
225
226 // Write buffer
227 uint8_t* writeBuffer_;
228
229 // Write buffer size
230 uint32_t writeBufferSize_;
231
232 // How far through writing are we?
233 uint32_t writeBufferPos_;
234
Mark Slee92f00fb2006-10-25 01:28:17 +0000235 // Frame size
236 int32_t frameSize_;
237
Mark Sleee02385b2007-06-09 01:21:16 +0000238 // Task handle
239 int taskHandle_;
240
241 // Task event
242 struct event taskEvent_;
243
Mark Slee2f6404d2006-10-10 01:37:40 +0000244 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000245 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000246
247 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000248 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000249
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000250 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000251 boost::shared_ptr<TTransport> factoryInputTransport_;
252 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000253
254 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000255 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000256
257 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000258 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000259
Mark Slee2f6404d2006-10-10 01:37:40 +0000260 // Go into read mode
261 void setRead() {
262 setFlags(EV_READ | EV_PERSIST);
263 }
264
265 // Go into write mode
266 void setWrite() {
267 setFlags(EV_WRITE | EV_PERSIST);
268 }
269
270 // Set event flags
271 void setFlags(short eventFlags);
272
273 // Libevent handlers
274 void workSocket();
275
276 // Close this client and reset
277 void close();
278
279 public:
280
281 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000282 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000283 readBuffer_ = (uint8_t*)malloc(1024);
284 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000285 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000286 }
287 readBufferSize_ = 1024;
288
289 // Allocate input and output tranpsorts
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000290 // these only need to be allocated once per TConnection (they don't need to be
291 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000292 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
293 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000294
Mark Slee2f6404d2006-10-10 01:37:40 +0000295 init(socket, eventFlags, s);
296 }
297
298 // Initialize
299 void init(int socket, short eventFlags, TNonblockingServer *s);
300
301 // Transition into a new state
302 void transition();
303
304 // Handler wrapper
305 static void eventHandler(int fd, short which, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000306 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000307 ((TConnection*)v)->workSocket();
308 }
Mark Sleee02385b2007-06-09 01:21:16 +0000309
310 // Handler wrapper for task block
311 static void taskHandler(int fd, short which, void* v) {
312 assert(fd == ((TConnection*)v)->taskHandle_);
313 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
314 GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
315 }
316 ((TConnection*)v)->transition();
317 }
318
Mark Slee2f6404d2006-10-10 01:37:40 +0000319};
320
321}}} // facebook::thrift::server
322
323#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_