blob: e4f834682849f57ef2531d6d088ab116c8a6aa20 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
12#include <transport/TTransportUtils.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000013#include <stack>
14#include <event.h>
15
Mark Slee2f6404d2006-10-10 01:37:40 +000016namespace facebook { namespace thrift { namespace server {
17
Mark Slee5ea15f92007-03-05 22:55:59 +000018using facebook::thrift::transport::TMemoryBuffer;
19using facebook::thrift::protocol::TProtocol;
Mark Slee2f6404d2006-10-10 01:37:40 +000020
21// Forward declaration of class
22class TConnection;
23
24/**
25 * This is a non-blocking server in C++ for high performance that operates a
26 * single IO thread. It assumes that all incoming requests are framed with a
27 * 4 byte length indicator and writes out responses using the same framing.
28 *
29 * It does not use the TServerTransport framework, but rather has socket
30 * operations hardcoded for use with select.
31 *
32 * @author Mark Slee <mcslee@facebook.com>
33 */
34class TNonblockingServer : public TServer {
35 private:
36
37 // Listen backlog
38 static const int LISTEN_BACKLOG = 1024;
39
40 // Server socket file descriptor
41 int serverSocket_;
42
43 // Port server runs on
44 int port_;
45
Mark Slee92f00fb2006-10-25 01:28:17 +000046 // Whether to frame responses
47 bool frameResponses_;
48
Mark Slee2f6404d2006-10-10 01:37:40 +000049 /**
50 * This is a stack of all the objects that have been created but that
51 * are NOT currently in use. When we close a connection, we place it on this
52 * stack so that the object can be reused later, rather than freeing the
53 * memory and reallocating a new object later.
54 */
55 std::stack<TConnection*> connectionStack_;
56
57 void handleEvent(int fd, short which);
58
59 public:
Mark Slee5ea15f92007-03-05 22:55:59 +000060 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +000061 int port) :
62 TServer(processor),
63 serverSocket_(0),
64 port_(port),
65 frameResponses_(true) {}
66
Mark Slee5ea15f92007-03-05 22:55:59 +000067 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
68 boost::shared_ptr<TProtocolFactory> protocolFactory,
Aditya Agarwal1ea90522007-01-19 02:02:12 +000069 int port) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000070 TServer(processor),
Mark Slee92f00fb2006-10-25 01:28:17 +000071 serverSocket_(0),
72 port_(port),
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000073 frameResponses_(true) {
Mark Slee5ea15f92007-03-05 22:55:59 +000074 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
75 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000076 setInputProtocolFactory(protocolFactory);
77 setOutputProtocolFactory(protocolFactory);
78 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +000079
Mark Slee5ea15f92007-03-05 22:55:59 +000080 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
81 boost::shared_ptr<TTransportFactory> inputTransportFactory,
82 boost::shared_ptr<TTransportFactory> outputTransportFactory,
83 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
84 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Aditya Agarwal1ea90522007-01-19 02:02:12 +000085 int port) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000086 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +000087 serverSocket_(0),
88 port_(port),
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000089 frameResponses_(true) {
90 setInputTransportFactory(inputTransportFactory);
91 setOutputTransportFactory(outputTransportFactory);
92 setInputProtocolFactory(inputProtocolFactory);
93 setOutputProtocolFactory(outputProtocolFactory);
94 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +000095
Mark Slee2f6404d2006-10-10 01:37:40 +000096 ~TNonblockingServer() {}
97
Mark Slee92f00fb2006-10-25 01:28:17 +000098 void setFrameResponses(bool frameResponses) {
99 frameResponses_ = frameResponses;
100 }
101
102 bool getFrameResponses() {
103 return frameResponses_;
104 }
105
Mark Slee2f6404d2006-10-10 01:37:40 +0000106 TConnection* createConnection(int socket, short flags);
107
108 void returnConnection(TConnection* connection);
109
110 static void eventHandler(int fd, short which, void* v) {
111 ((TNonblockingServer*)v)->handleEvent(fd, which);
112 }
113
114 void serve();
115};
116
117/**
118 * Two states for sockets, recv and send mode
119 */
120enum TSocketState {
121 SOCKET_RECV,
122 SOCKET_SEND
123};
124
125/**
126 * Four states for the nonblocking servr:
127 * 1) initialize
128 * 2) read 4 byte frame size
129 * 3) read frame of data
130 * 4) send back data (if any)
131 */
132enum TAppState {
133 APP_INIT,
134 APP_READ_FRAME_SIZE,
135 APP_READ_REQUEST,
Mark Slee92f00fb2006-10-25 01:28:17 +0000136 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000137 APP_SEND_RESULT
138};
139
140/**
141 * Represents a connection that is handled via libevent. This connection
142 * essentially encapsulates a socket that has some associated libevent state.
143 */
144class TConnection {
145 private:
146
147 // Server handle
148 TNonblockingServer* server_;
149
150 // Socket handle
151 int socket_;
152
153 // Libevent object
154 struct event event_;
155
156 // Libevent flags
157 short eventFlags_;
158
159 // Socket mode
160 TSocketState socketState_;
161
162 // Application state
163 TAppState appState_;
164
165 // How much data needed to read
166 uint32_t readWant_;
167
168 // Where in the read buffer are we
169 uint32_t readBufferPos_;
170
171 // Read buffer
172 uint8_t* readBuffer_;
173
174 // Read buffer size
175 uint32_t readBufferSize_;
176
177 // Write buffer
178 uint8_t* writeBuffer_;
179
180 // Write buffer size
181 uint32_t writeBufferSize_;
182
183 // How far through writing are we?
184 uint32_t writeBufferPos_;
185
Mark Slee92f00fb2006-10-25 01:28:17 +0000186 // Frame size
187 int32_t frameSize_;
188
Mark Slee2f6404d2006-10-10 01:37:40 +0000189 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000190 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000191
192 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000193 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000194
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000195 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000196 boost::shared_ptr<TTransport> factoryInputTransport_;
197 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000198
199 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000200 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000201
202 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000203 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000204
Mark Slee2f6404d2006-10-10 01:37:40 +0000205 // Go into read mode
206 void setRead() {
207 setFlags(EV_READ | EV_PERSIST);
208 }
209
210 // Go into write mode
211 void setWrite() {
212 setFlags(EV_WRITE | EV_PERSIST);
213 }
214
215 // Set event flags
216 void setFlags(short eventFlags);
217
218 // Libevent handlers
219 void workSocket();
220
221 // Close this client and reset
222 void close();
223
224 public:
225
226 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000227 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000228 readBuffer_ = (uint8_t*)malloc(1024);
229 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000230 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000231 }
232 readBufferSize_ = 1024;
233
234 // Allocate input and output tranpsorts
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000235 // these only need to be allocated once per TConnection (they don't need to be
236 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000237 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
238 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000239
Mark Slee2f6404d2006-10-10 01:37:40 +0000240 init(socket, eventFlags, s);
241 }
242
243 // Initialize
244 void init(int socket, short eventFlags, TNonblockingServer *s);
245
246 // Transition into a new state
247 void transition();
248
249 // Handler wrapper
250 static void eventHandler(int fd, short which, void* v) {
251 assert(fd = ((TConnection*)v)->socket_);
252 ((TConnection*)v)->workSocket();
253 }
254};
255
256}}} // facebook::thrift::server
257
258#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_