blob: 6da9bf5557c2c56b6fb83bf8b017d87d466d2af6 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
12#include <transport/TTransportUtils.h>
Mark Sleee02385b2007-06-09 01:21:16 +000013#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <stack>
15#include <event.h>
16
Mark Slee2f6404d2006-10-10 01:37:40 +000017namespace facebook { namespace thrift { namespace server {
18
Mark Slee5ea15f92007-03-05 22:55:59 +000019using facebook::thrift::transport::TMemoryBuffer;
20using facebook::thrift::protocol::TProtocol;
Mark Sleee02385b2007-06-09 01:21:16 +000021using facebook::thrift::concurrency::Runnable;
22using facebook::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000023
24// Forward declaration of class
25class TConnection;
26
27/**
28 * This is a non-blocking server in C++ for high performance that operates a
29 * single IO thread. It assumes that all incoming requests are framed with a
30 * 4 byte length indicator and writes out responses using the same framing.
31 *
32 * It does not use the TServerTransport framework, but rather has socket
33 * operations hardcoded for use with select.
34 *
35 * @author Mark Slee <mcslee@facebook.com>
36 */
37class TNonblockingServer : public TServer {
38 private:
39
40 // Listen backlog
41 static const int LISTEN_BACKLOG = 1024;
42
43 // Server socket file descriptor
44 int serverSocket_;
45
46 // Port server runs on
47 int port_;
48
Mark Slee92f00fb2006-10-25 01:28:17 +000049 // Whether to frame responses
50 bool frameResponses_;
51
Mark Sleee02385b2007-06-09 01:21:16 +000052 // For processing via thread pool, may be NULL
53 boost::shared_ptr<ThreadManager> threadManager_;
54
55 // Is thread pool processing?
56 bool threadPoolProcessing_;
57
Mark Slee2f6404d2006-10-10 01:37:40 +000058 /**
59 * This is a stack of all the objects that have been created but that
60 * are NOT currently in use. When we close a connection, we place it on this
61 * stack so that the object can be reused later, rather than freeing the
62 * memory and reallocating a new object later.
63 */
64 std::stack<TConnection*> connectionStack_;
65
66 void handleEvent(int fd, short which);
67
68 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000069 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000070 int port) :
71 TServer(processor),
72 serverSocket_(0),
73 port_(port),
Mark Slee8eceaea2007-06-15 01:43:21 +000074 frameResponses_(true),
75 threadPoolProcessing_(false) {}
Mark Sleef9373392007-01-24 19:41:57 +000076
Mark Slee5ea15f92007-03-05 22:55:59 +000077 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
78 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +000079 int port,
80 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000081 TServer(processor),
Mark Slee92f00fb2006-10-25 01:28:17 +000082 serverSocket_(0),
83 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +000084 frameResponses_(true),
85 threadManager_(threadManager) {
Mark Slee5ea15f92007-03-05 22:55:59 +000086 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
87 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000088 setInputProtocolFactory(protocolFactory);
89 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +000090 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000091 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +000092
Mark Slee5ea15f92007-03-05 22:55:59 +000093 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
94 boost::shared_ptr<TTransportFactory> inputTransportFactory,
95 boost::shared_ptr<TTransportFactory> outputTransportFactory,
96 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
97 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +000098 int port,
99 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000100 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000101 serverSocket_(0),
102 port_(port),
Mark Sleee02385b2007-06-09 01:21:16 +0000103 frameResponses_(true),
104 threadManager_(threadManager) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000105 setInputTransportFactory(inputTransportFactory);
106 setOutputTransportFactory(outputTransportFactory);
107 setInputProtocolFactory(inputProtocolFactory);
108 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000109 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000110 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000111
Mark Slee2f6404d2006-10-10 01:37:40 +0000112 ~TNonblockingServer() {}
113
Mark Sleee02385b2007-06-09 01:21:16 +0000114 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
115 threadManager_ = threadManager;
116 threadPoolProcessing_ = (threadManager != NULL);
117 }
118
119 bool isThreadPoolProcessing() {
120 return threadPoolProcessing_;
121 }
122
123 void addTask(boost::shared_ptr<Runnable> task) {
124 threadManager_->add(task);
125 }
126
Mark Slee92f00fb2006-10-25 01:28:17 +0000127 void setFrameResponses(bool frameResponses) {
128 frameResponses_ = frameResponses;
129 }
130
131 bool getFrameResponses() {
132 return frameResponses_;
133 }
134
Mark Slee2f6404d2006-10-10 01:37:40 +0000135 TConnection* createConnection(int socket, short flags);
136
137 void returnConnection(TConnection* connection);
138
139 static void eventHandler(int fd, short which, void* v) {
140 ((TNonblockingServer*)v)->handleEvent(fd, which);
141 }
142
143 void serve();
144};
145
146/**
147 * Two states for sockets, recv and send mode
148 */
149enum TSocketState {
150 SOCKET_RECV,
151 SOCKET_SEND
152};
153
154/**
155 * Four states for the nonblocking servr:
156 * 1) initialize
157 * 2) read 4 byte frame size
158 * 3) read frame of data
159 * 4) send back data (if any)
160 */
161enum TAppState {
162 APP_INIT,
163 APP_READ_FRAME_SIZE,
164 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000165 APP_WAIT_TASK,
Mark Slee92f00fb2006-10-25 01:28:17 +0000166 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000167 APP_SEND_RESULT
168};
169
170/**
171 * Represents a connection that is handled via libevent. This connection
172 * essentially encapsulates a socket that has some associated libevent state.
173 */
174class TConnection {
175 private:
176
Mark Sleee02385b2007-06-09 01:21:16 +0000177 class Task;
178
Mark Slee2f6404d2006-10-10 01:37:40 +0000179 // Server handle
180 TNonblockingServer* server_;
181
182 // Socket handle
183 int socket_;
184
185 // Libevent object
186 struct event event_;
187
188 // Libevent flags
189 short eventFlags_;
190
191 // Socket mode
192 TSocketState socketState_;
193
194 // Application state
195 TAppState appState_;
196
197 // How much data needed to read
198 uint32_t readWant_;
199
200 // Where in the read buffer are we
201 uint32_t readBufferPos_;
202
203 // Read buffer
204 uint8_t* readBuffer_;
205
206 // Read buffer size
207 uint32_t readBufferSize_;
208
209 // Write buffer
210 uint8_t* writeBuffer_;
211
212 // Write buffer size
213 uint32_t writeBufferSize_;
214
215 // How far through writing are we?
216 uint32_t writeBufferPos_;
217
Mark Slee92f00fb2006-10-25 01:28:17 +0000218 // Frame size
219 int32_t frameSize_;
220
Mark Sleee02385b2007-06-09 01:21:16 +0000221 // Task handle
222 int taskHandle_;
223
224 // Task event
225 struct event taskEvent_;
226
Mark Slee2f6404d2006-10-10 01:37:40 +0000227 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000228 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000229
230 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000231 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000232
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000233 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000234 boost::shared_ptr<TTransport> factoryInputTransport_;
235 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000236
237 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000238 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000239
240 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000241 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000242
Mark Slee2f6404d2006-10-10 01:37:40 +0000243 // Go into read mode
244 void setRead() {
245 setFlags(EV_READ | EV_PERSIST);
246 }
247
248 // Go into write mode
249 void setWrite() {
250 setFlags(EV_WRITE | EV_PERSIST);
251 }
252
253 // Set event flags
254 void setFlags(short eventFlags);
255
256 // Libevent handlers
257 void workSocket();
258
259 // Close this client and reset
260 void close();
261
262 public:
263
264 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000265 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000266 readBuffer_ = (uint8_t*)malloc(1024);
267 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000268 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000269 }
270 readBufferSize_ = 1024;
271
272 // Allocate input and output tranpsorts
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000273 // these only need to be allocated once per TConnection (they don't need to be
274 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000275 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
276 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000277
Mark Slee2f6404d2006-10-10 01:37:40 +0000278 init(socket, eventFlags, s);
279 }
280
281 // Initialize
282 void init(int socket, short eventFlags, TNonblockingServer *s);
283
284 // Transition into a new state
285 void transition();
286
287 // Handler wrapper
288 static void eventHandler(int fd, short which, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000289 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000290 ((TConnection*)v)->workSocket();
291 }
Mark Sleee02385b2007-06-09 01:21:16 +0000292
293 // Handler wrapper for task block
294 static void taskHandler(int fd, short which, void* v) {
295 assert(fd == ((TConnection*)v)->taskHandle_);
296 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
297 GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
298 }
299 ((TConnection*)v)->transition();
300 }
301
Mark Slee2f6404d2006-10-10 01:37:40 +0000302};
303
304}}} // facebook::thrift::server
305
306#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_