blob: 4ea3fa315f86b59378462b435a01e675f1963c44 [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
2#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
3
Mark Slee4af6ed72006-10-25 19:02:49 +00004#include <Thrift.h>
5#include <server/TServer.h>
6#include <transport/TTransportUtils.h>
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include <stack>
8#include <event.h>
9
Mark Slee2f6404d2006-10-10 01:37:40 +000010namespace facebook { namespace thrift { namespace server {
11
12using boost::shared_ptr;
13
14// Forward declaration of class
15class TConnection;
16
17/**
18 * This is a non-blocking server in C++ for high performance that operates a
19 * single IO thread. It assumes that all incoming requests are framed with a
20 * 4 byte length indicator and writes out responses using the same framing.
21 *
22 * It does not use the TServerTransport framework, but rather has socket
23 * operations hardcoded for use with select.
24 *
25 * @author Mark Slee <mcslee@facebook.com>
26 */
27class TNonblockingServer : public TServer {
28 private:
29
30 // Listen backlog
31 static const int LISTEN_BACKLOG = 1024;
32
33 // Server socket file descriptor
34 int serverSocket_;
35
36 // Port server runs on
37 int port_;
38
Mark Slee92f00fb2006-10-25 01:28:17 +000039 // Whether to frame responses
40 bool frameResponses_;
41
Mark Slee2f6404d2006-10-10 01:37:40 +000042 /**
43 * This is a stack of all the objects that have been created but that
44 * are NOT currently in use. When we close a connection, we place it on this
45 * stack so that the object can be reused later, rather than freeing the
46 * memory and reallocating a new object later.
47 */
48 std::stack<TConnection*> connectionStack_;
49
50 void handleEvent(int fd, short which);
51
52 public:
Mark Sleef9373392007-01-24 19:41:57 +000053 TNonblockingServer(shared_ptr<TProcessor> processor,
54 int port) :
55 TServer(processor),
56 serverSocket_(0),
57 port_(port),
58 frameResponses_(true) {}
59
Aditya Agarwal1ea90522007-01-19 02:02:12 +000060 TNonblockingServer(shared_ptr<TProcessor> processor,
61 shared_ptr<TProtocolFactory> protocolFactory,
62 int port) :
63 TServer(processor, protocolFactory),
Mark Slee92f00fb2006-10-25 01:28:17 +000064 serverSocket_(0),
65 port_(port),
66 frameResponses_(true) {}
Aditya Agarwal1ea90522007-01-19 02:02:12 +000067
68 TNonblockingServer(shared_ptr<TProcessor> processor,
69 shared_ptr<TProtocolFactory> protocolFactory,
70 shared_ptr<TTransportFactory> transportFactory,
71 int port) :
72 TServer(processor, protocolFactory, transportFactory),
73 serverSocket_(0),
74 port_(port),
75 frameResponses_(true) {}
76
Mark Slee2f6404d2006-10-10 01:37:40 +000077 ~TNonblockingServer() {}
78
Mark Slee92f00fb2006-10-25 01:28:17 +000079 void setFrameResponses(bool frameResponses) {
80 frameResponses_ = frameResponses;
81 }
82
83 bool getFrameResponses() {
84 return frameResponses_;
85 }
86
Mark Slee2f6404d2006-10-10 01:37:40 +000087 TConnection* createConnection(int socket, short flags);
88
89 void returnConnection(TConnection* connection);
90
91 static void eventHandler(int fd, short which, void* v) {
92 ((TNonblockingServer*)v)->handleEvent(fd, which);
93 }
94
95 void serve();
96};
97
98/**
99 * Two states for sockets, recv and send mode
100 */
101enum TSocketState {
102 SOCKET_RECV,
103 SOCKET_SEND
104};
105
106/**
107 * Four states for the nonblocking servr:
108 * 1) initialize
109 * 2) read 4 byte frame size
110 * 3) read frame of data
111 * 4) send back data (if any)
112 */
113enum TAppState {
114 APP_INIT,
115 APP_READ_FRAME_SIZE,
116 APP_READ_REQUEST,
Mark Slee92f00fb2006-10-25 01:28:17 +0000117 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000118 APP_SEND_RESULT
119};
120
121/**
122 * Represents a connection that is handled via libevent. This connection
123 * essentially encapsulates a socket that has some associated libevent state.
124 */
125class TConnection {
126 private:
127
128 // Server handle
129 TNonblockingServer* server_;
130
131 // Socket handle
132 int socket_;
133
134 // Libevent object
135 struct event event_;
136
137 // Libevent flags
138 short eventFlags_;
139
140 // Socket mode
141 TSocketState socketState_;
142
143 // Application state
144 TAppState appState_;
145
146 // How much data needed to read
147 uint32_t readWant_;
148
149 // Where in the read buffer are we
150 uint32_t readBufferPos_;
151
152 // Read buffer
153 uint8_t* readBuffer_;
154
155 // Read buffer size
156 uint32_t readBufferSize_;
157
158 // Write buffer
159 uint8_t* writeBuffer_;
160
161 // Write buffer size
162 uint32_t writeBufferSize_;
163
164 // How far through writing are we?
165 uint32_t writeBufferPos_;
166
Mark Slee92f00fb2006-10-25 01:28:17 +0000167 // Frame size
168 int32_t frameSize_;
169
Mark Slee2f6404d2006-10-10 01:37:40 +0000170 // Transport to read from
171 shared_ptr<TMemoryBuffer> inputTransport_;
172
173 // Transport that processor writes to
174 shared_ptr<TMemoryBuffer> outputTransport_;
175
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000176 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
177 shared_ptr<TTransport> factoryInputTransport_;
178 // shared_ptr<TTransport> factoryOutputTransport_;
179
Mark Slee4af6ed72006-10-25 19:02:49 +0000180 // Protocol encoder
181 shared_ptr<TProtocol> outputProtocol_;
182
183 // Protocol decoder
184 shared_ptr<TProtocol> inputProtocol_;
185
Mark Slee2f6404d2006-10-10 01:37:40 +0000186 // Go into read mode
187 void setRead() {
188 setFlags(EV_READ | EV_PERSIST);
189 }
190
191 // Go into write mode
192 void setWrite() {
193 setFlags(EV_WRITE | EV_PERSIST);
194 }
195
196 // Set event flags
197 void setFlags(short eventFlags);
198
199 // Libevent handlers
200 void workSocket();
201
202 // Close this client and reset
203 void close();
204
205 public:
206
207 // Constructor
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000208 TConnection(int socket, short eventFlags, TNonblockingServer *s,
209 shared_ptr<TTransportFactory> transportFactory) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000210 readBuffer_ = (uint8_t*)malloc(1024);
211 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000212 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000213 }
214 readBufferSize_ = 1024;
215
216 // Allocate input and output tranpsorts
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000217 // these only need to be allocated once per TConnection (they don't need to be
218 // reallocated on init() call)
Mark Slee2f6404d2006-10-10 01:37:40 +0000219 inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
220 outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000221
Mark Slee2f6404d2006-10-10 01:37:40 +0000222 init(socket, eventFlags, s);
223 }
224
225 // Initialize
226 void init(int socket, short eventFlags, TNonblockingServer *s);
227
228 // Transition into a new state
229 void transition();
230
231 // Handler wrapper
232 static void eventHandler(int fd, short which, void* v) {
233 assert(fd = ((TConnection*)v)->socket_);
234 ((TConnection*)v)->workSocket();
235 }
236};
237
238}}} // facebook::thrift::server
239
240#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_