blob: cf024ed573c927f33640813089c7dc2f3f62c442 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000012#include <transport/TBufferTransports.h>
Mark Sleee02385b2007-06-09 01:21:16 +000013#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000015#include <string>
16#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000017#include <cstdlib>
Mark Slee2f6404d2006-10-10 01:37:40 +000018#include <event.h>
19
T Jake Lucianib5e62212009-01-31 22:36:20 +000020namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000021
T Jake Lucianib5e62212009-01-31 22:36:20 +000022using apache::thrift::transport::TMemoryBuffer;
23using apache::thrift::protocol::TProtocol;
24using apache::thrift::concurrency::Runnable;
25using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000026
27// Forward declaration of class
28class TConnection;
29
30/**
31 * This is a non-blocking server in C++ for high performance that operates a
32 * single IO thread. It assumes that all incoming requests are framed with a
33 * 4 byte length indicator and writes out responses using the same framing.
34 *
35 * It does not use the TServerTransport framework, but rather has socket
36 * operations hardcoded for use with select.
37 *
38 * @author Mark Slee <mcslee@facebook.com>
39 */
40class TNonblockingServer : public TServer {
41 private:
42
43 // Listen backlog
44 static const int LISTEN_BACKLOG = 1024;
45
46 // Server socket file descriptor
47 int serverSocket_;
48
49 // Port server runs on
50 int port_;
51
Mark Sleee02385b2007-06-09 01:21:16 +000052 // For processing via thread pool, may be NULL
53 boost::shared_ptr<ThreadManager> threadManager_;
54
55 // Is thread pool processing?
56 bool threadPoolProcessing_;
57
Mark Slee79b16942007-11-26 19:05:29 +000058 // The event base for libevent
59 event_base* eventBase_;
60
61 // Event struct, for use with eventBase_
62 struct event serverEvent_;
63
David Reiss1997f102008-04-29 00:29:41 +000064 // Number of TConnection object we've created
65 size_t numTConnections_;
66
Mark Slee2f6404d2006-10-10 01:37:40 +000067 /**
68 * This is a stack of all the objects that have been created but that
69 * are NOT currently in use. When we close a connection, we place it on this
70 * stack so that the object can be reused later, rather than freeing the
71 * memory and reallocating a new object later.
72 */
73 std::stack<TConnection*> connectionStack_;
74
75 void handleEvent(int fd, short which);
76
77 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000078 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000079 int port) :
80 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000081 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +000082 port_(port),
dweatherford58985992007-06-19 23:10:19 +000083 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +000084 eventBase_(NULL),
85 numTConnections_(0) {}
Mark Sleef9373392007-01-24 19:41:57 +000086
Mark Slee79b16942007-11-26 19:05:29 +000087 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +000088 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +000089 int port,
90 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000091 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +000092 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +000093 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +000094 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +000095 eventBase_(NULL),
96 numTConnections_(0) {
Mark Slee5ea15f92007-03-05 22:55:59 +000097 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
98 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000099 setInputProtocolFactory(protocolFactory);
100 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000101 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000102 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000103
Mark Slee5ea15f92007-03-05 22:55:59 +0000104 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
105 boost::shared_ptr<TTransportFactory> inputTransportFactory,
106 boost::shared_ptr<TTransportFactory> outputTransportFactory,
107 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
108 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000109 int port,
110 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000111 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000112 serverSocket_(0),
113 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000114 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000115 eventBase_(NULL),
116 numTConnections_(0) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000117 setInputTransportFactory(inputTransportFactory);
118 setOutputTransportFactory(outputTransportFactory);
119 setInputProtocolFactory(inputProtocolFactory);
120 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000121 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000122 }
Mark Slee79b16942007-11-26 19:05:29 +0000123
Mark Slee2f6404d2006-10-10 01:37:40 +0000124 ~TNonblockingServer() {}
125
Mark Sleee02385b2007-06-09 01:21:16 +0000126 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
127 threadManager_ = threadManager;
128 threadPoolProcessing_ = (threadManager != NULL);
129 }
130
David Reiss1997f102008-04-29 00:29:41 +0000131 boost::shared_ptr<ThreadManager> getThreadManager() {
132 return threadManager_;
133 }
134
Mark Slee79b16942007-11-26 19:05:29 +0000135 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000136 return threadPoolProcessing_;
137 }
138
139 void addTask(boost::shared_ptr<Runnable> task) {
140 threadManager_->add(task);
141 }
142
Mark Slee79b16942007-11-26 19:05:29 +0000143 event_base* getEventBase() const {
144 return eventBase_;
145 }
146
David Reissc17fe6b2008-04-29 00:29:43 +0000147 void incrementNumConnections() {
148 ++numTConnections_;
149 }
150
151 void decrementNumConnections() {
152 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000153 }
154
155 size_t getNumConnections() {
156 return numTConnections_;
157 }
158
159 size_t getNumIdleConnections() {
160 return connectionStack_.size();
161 }
162
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 TConnection* createConnection(int socket, short flags);
164
165 void returnConnection(TConnection* connection);
166
167 static void eventHandler(int fd, short which, void* v) {
168 ((TNonblockingServer*)v)->handleEvent(fd, which);
169 }
170
Mark Slee79b16942007-11-26 19:05:29 +0000171 void listenSocket();
172
173 void listenSocket(int fd);
174
175 void registerEvents(event_base* base);
176
Mark Slee2f6404d2006-10-10 01:37:40 +0000177 void serve();
178};
179
180/**
181 * Two states for sockets, recv and send mode
182 */
183enum TSocketState {
184 SOCKET_RECV,
185 SOCKET_SEND
186};
187
188/**
189 * Four states for the nonblocking servr:
190 * 1) initialize
191 * 2) read 4 byte frame size
192 * 3) read frame of data
193 * 4) send back data (if any)
194 */
195enum TAppState {
196 APP_INIT,
197 APP_READ_FRAME_SIZE,
198 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000199 APP_WAIT_TASK,
Mark Slee2f6404d2006-10-10 01:37:40 +0000200 APP_SEND_RESULT
201};
202
203/**
204 * Represents a connection that is handled via libevent. This connection
205 * essentially encapsulates a socket that has some associated libevent state.
206 */
207class TConnection {
208 private:
209
Mark Sleee02385b2007-06-09 01:21:16 +0000210 class Task;
211
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 // Server handle
213 TNonblockingServer* server_;
214
215 // Socket handle
216 int socket_;
217
218 // Libevent object
219 struct event event_;
220
221 // Libevent flags
222 short eventFlags_;
223
224 // Socket mode
225 TSocketState socketState_;
226
227 // Application state
228 TAppState appState_;
229
230 // How much data needed to read
231 uint32_t readWant_;
232
233 // Where in the read buffer are we
234 uint32_t readBufferPos_;
235
236 // Read buffer
237 uint8_t* readBuffer_;
238
239 // Read buffer size
240 uint32_t readBufferSize_;
241
242 // Write buffer
243 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000244
Mark Slee2f6404d2006-10-10 01:37:40 +0000245 // Write buffer size
246 uint32_t writeBufferSize_;
247
248 // How far through writing are we?
249 uint32_t writeBufferPos_;
250
Mark Sleee02385b2007-06-09 01:21:16 +0000251 // Task handle
252 int taskHandle_;
253
254 // Task event
255 struct event taskEvent_;
256
Mark Slee2f6404d2006-10-10 01:37:40 +0000257 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000258 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000259
260 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000261 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000262
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000263 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000264 boost::shared_ptr<TTransport> factoryInputTransport_;
265 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000266
267 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000268 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000269
270 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000271 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000272
Mark Slee2f6404d2006-10-10 01:37:40 +0000273 // Go into read mode
274 void setRead() {
275 setFlags(EV_READ | EV_PERSIST);
276 }
277
278 // Go into write mode
279 void setWrite() {
280 setFlags(EV_WRITE | EV_PERSIST);
281 }
282
Mark Slee402ee282007-08-23 01:43:20 +0000283 // Set socket idle
284 void setIdle() {
285 setFlags(0);
286 }
287
Mark Slee2f6404d2006-10-10 01:37:40 +0000288 // Set event flags
289 void setFlags(short eventFlags);
290
291 // Libevent handlers
292 void workSocket();
293
294 // Close this client and reset
295 void close();
296
297 public:
298
299 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000300 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
David Reissd7a16f42008-02-19 22:47:29 +0000301 readBuffer_ = (uint8_t*)std::malloc(1024);
Mark Slee2f6404d2006-10-10 01:37:40 +0000302 if (readBuffer_ == NULL) {
T Jake Lucianib5e62212009-01-31 22:36:20 +0000303 throw new apache::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000304 }
305 readBufferSize_ = 1024;
Mark Slee79b16942007-11-26 19:05:29 +0000306
Mark Slee2f6404d2006-10-10 01:37:40 +0000307 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000308 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000309 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000310 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
311 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000312
Mark Slee2f6404d2006-10-10 01:37:40 +0000313 init(socket, eventFlags, s);
David Reiss1997f102008-04-29 00:29:41 +0000314 server_->incrementNumConnections();
315 }
316
317 ~TConnection() {
David Reissc17fe6b2008-04-29 00:29:43 +0000318 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000319 }
320
321 // Initialize
322 void init(int socket, short eventFlags, TNonblockingServer *s);
323
324 // Transition into a new state
325 void transition();
326
327 // Handler wrapper
Mark Sleea8de4892008-02-09 00:02:26 +0000328 static void eventHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000329 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000330 ((TConnection*)v)->workSocket();
331 }
Mark Slee79b16942007-11-26 19:05:29 +0000332
Mark Sleee02385b2007-06-09 01:21:16 +0000333 // Handler wrapper for task block
Mark Sleea8de4892008-02-09 00:02:26 +0000334 static void taskHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000335 assert(fd == ((TConnection*)v)->taskHandle_);
336 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
David Reiss01e55c12008-07-13 22:18:51 +0000337 GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +0000338 }
339 ((TConnection*)v)->transition();
340 }
341
Mark Slee2f6404d2006-10-10 01:37:40 +0000342};
343
T Jake Lucianib5e62212009-01-31 22:36:20 +0000344}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000345
346#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_