blob: 08ecec6e8ef4206618d168846c53a96eed8c54c7 [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
2#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
3
Mark Slee4af6ed72006-10-25 19:02:49 +00004#include <Thrift.h>
5#include <server/TServer.h>
6#include <transport/TTransportUtils.h>
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include <stack>
8#include <event.h>
9
Mark Slee2f6404d2006-10-10 01:37:40 +000010namespace facebook { namespace thrift { namespace server {
11
12using boost::shared_ptr;
13
14// Forward declaration of class
15class TConnection;
16
17/**
18 * This is a non-blocking server in C++ for high performance that operates a
19 * single IO thread. It assumes that all incoming requests are framed with a
20 * 4 byte length indicator and writes out responses using the same framing.
21 *
22 * It does not use the TServerTransport framework, but rather has socket
23 * operations hardcoded for use with select.
24 *
25 * @author Mark Slee <mcslee@facebook.com>
26 */
27class TNonblockingServer : public TServer {
28 private:
29
30 // Listen backlog
31 static const int LISTEN_BACKLOG = 1024;
32
33 // Server socket file descriptor
34 int serverSocket_;
35
36 // Port server runs on
37 int port_;
38
Mark Slee92f00fb2006-10-25 01:28:17 +000039 // Whether to frame responses
40 bool frameResponses_;
41
Mark Slee2f6404d2006-10-10 01:37:40 +000042 /**
43 * This is a stack of all the objects that have been created but that
44 * are NOT currently in use. When we close a connection, we place it on this
45 * stack so that the object can be reused later, rather than freeing the
46 * memory and reallocating a new object later.
47 */
48 std::stack<TConnection*> connectionStack_;
49
50 void handleEvent(int fd, short which);
51
52 public:
Mark Sleef9373392007-01-24 19:41:57 +000053 TNonblockingServer(shared_ptr<TProcessor> processor,
54 int port) :
55 TServer(processor),
56 serverSocket_(0),
57 port_(port),
58 frameResponses_(true) {}
59
Aditya Agarwal1ea90522007-01-19 02:02:12 +000060 TNonblockingServer(shared_ptr<TProcessor> processor,
61 shared_ptr<TProtocolFactory> protocolFactory,
62 int port) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000063 TServer(processor),
Mark Slee92f00fb2006-10-25 01:28:17 +000064 serverSocket_(0),
65 port_(port),
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000066 frameResponses_(true) {
67 setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
68 setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
69 setInputProtocolFactory(protocolFactory);
70 setOutputProtocolFactory(protocolFactory);
71 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +000072
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000073 TNonblockingServer(shared_ptr<TProcessor> processor,
74 shared_ptr<TTransportFactory> inputTransportFactory,
75 shared_ptr<TTransportFactory> outputTransportFactory,
76 shared_ptr<TProtocolFactory> inputProtocolFactory,
77 shared_ptr<TProtocolFactory> outputProtocolFactory,
Aditya Agarwal1ea90522007-01-19 02:02:12 +000078 int port) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000079 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +000080 serverSocket_(0),
81 port_(port),
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000082 frameResponses_(true) {
83 setInputTransportFactory(inputTransportFactory);
84 setOutputTransportFactory(outputTransportFactory);
85 setInputProtocolFactory(inputProtocolFactory);
86 setOutputProtocolFactory(outputProtocolFactory);
87 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +000088
Mark Slee2f6404d2006-10-10 01:37:40 +000089 ~TNonblockingServer() {}
90
Mark Slee92f00fb2006-10-25 01:28:17 +000091 void setFrameResponses(bool frameResponses) {
92 frameResponses_ = frameResponses;
93 }
94
95 bool getFrameResponses() {
96 return frameResponses_;
97 }
98
Mark Slee2f6404d2006-10-10 01:37:40 +000099 TConnection* createConnection(int socket, short flags);
100
101 void returnConnection(TConnection* connection);
102
103 static void eventHandler(int fd, short which, void* v) {
104 ((TNonblockingServer*)v)->handleEvent(fd, which);
105 }
106
107 void serve();
108};
109
110/**
111 * Two states for sockets, recv and send mode
112 */
113enum TSocketState {
114 SOCKET_RECV,
115 SOCKET_SEND
116};
117
118/**
119 * Four states for the nonblocking servr:
120 * 1) initialize
121 * 2) read 4 byte frame size
122 * 3) read frame of data
123 * 4) send back data (if any)
124 */
125enum TAppState {
126 APP_INIT,
127 APP_READ_FRAME_SIZE,
128 APP_READ_REQUEST,
Mark Slee92f00fb2006-10-25 01:28:17 +0000129 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 APP_SEND_RESULT
131};
132
133/**
134 * Represents a connection that is handled via libevent. This connection
135 * essentially encapsulates a socket that has some associated libevent state.
136 */
137class TConnection {
138 private:
139
140 // Server handle
141 TNonblockingServer* server_;
142
143 // Socket handle
144 int socket_;
145
146 // Libevent object
147 struct event event_;
148
149 // Libevent flags
150 short eventFlags_;
151
152 // Socket mode
153 TSocketState socketState_;
154
155 // Application state
156 TAppState appState_;
157
158 // How much data needed to read
159 uint32_t readWant_;
160
161 // Where in the read buffer are we
162 uint32_t readBufferPos_;
163
164 // Read buffer
165 uint8_t* readBuffer_;
166
167 // Read buffer size
168 uint32_t readBufferSize_;
169
170 // Write buffer
171 uint8_t* writeBuffer_;
172
173 // Write buffer size
174 uint32_t writeBufferSize_;
175
176 // How far through writing are we?
177 uint32_t writeBufferPos_;
178
Mark Slee92f00fb2006-10-25 01:28:17 +0000179 // Frame size
180 int32_t frameSize_;
181
Mark Slee2f6404d2006-10-10 01:37:40 +0000182 // Transport to read from
183 shared_ptr<TMemoryBuffer> inputTransport_;
184
185 // Transport that processor writes to
186 shared_ptr<TMemoryBuffer> outputTransport_;
187
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000188 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
189 shared_ptr<TTransport> factoryInputTransport_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000190 shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000191
192 // Protocol decoder
193 shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000194
195 // Protocol encoder
196 shared_ptr<TProtocol> outputProtocol_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000197
Mark Slee2f6404d2006-10-10 01:37:40 +0000198 // Go into read mode
199 void setRead() {
200 setFlags(EV_READ | EV_PERSIST);
201 }
202
203 // Go into write mode
204 void setWrite() {
205 setFlags(EV_WRITE | EV_PERSIST);
206 }
207
208 // Set event flags
209 void setFlags(short eventFlags);
210
211 // Libevent handlers
212 void workSocket();
213
214 // Close this client and reset
215 void close();
216
217 public:
218
219 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000220 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 readBuffer_ = (uint8_t*)malloc(1024);
222 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000223 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000224 }
225 readBufferSize_ = 1024;
226
227 // Allocate input and output tranpsorts
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000228 // these only need to be allocated once per TConnection (they don't need to be
229 // reallocated on init() call)
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
231 outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000232
Mark Slee2f6404d2006-10-10 01:37:40 +0000233 init(socket, eventFlags, s);
234 }
235
236 // Initialize
237 void init(int socket, short eventFlags, TNonblockingServer *s);
238
239 // Transition into a new state
240 void transition();
241
242 // Handler wrapper
243 static void eventHandler(int fd, short which, void* v) {
244 assert(fd = ((TConnection*)v)->socket_);
245 ((TConnection*)v)->workSocket();
246 }
247};
248
249}}} // facebook::thrift::server
250
251#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_