blob: abfd5fae0769bab87e6c8d57c1d191ce0b698f6d [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
8#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
9
Mark Slee4af6ed72006-10-25 19:02:49 +000010#include <Thrift.h>
11#include <server/TServer.h>
12#include <transport/TTransportUtils.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000013#include <stack>
14#include <event.h>
15
Mark Slee2f6404d2006-10-10 01:37:40 +000016namespace facebook { namespace thrift { namespace server {
17
18using boost::shared_ptr;
19
20// Forward declaration of class
21class TConnection;
22
23/**
24 * This is a non-blocking server in C++ for high performance that operates a
25 * single IO thread. It assumes that all incoming requests are framed with a
26 * 4 byte length indicator and writes out responses using the same framing.
27 *
28 * It does not use the TServerTransport framework, but rather has socket
29 * operations hardcoded for use with select.
30 *
31 * @author Mark Slee <mcslee@facebook.com>
32 */
33class TNonblockingServer : public TServer {
34 private:
35
36 // Listen backlog
37 static const int LISTEN_BACKLOG = 1024;
38
39 // Server socket file descriptor
40 int serverSocket_;
41
42 // Port server runs on
43 int port_;
44
Mark Slee92f00fb2006-10-25 01:28:17 +000045 // Whether to frame responses
46 bool frameResponses_;
47
Mark Slee2f6404d2006-10-10 01:37:40 +000048 /**
49 * This is a stack of all the objects that have been created but that
50 * are NOT currently in use. When we close a connection, we place it on this
51 * stack so that the object can be reused later, rather than freeing the
52 * memory and reallocating a new object later.
53 */
54 std::stack<TConnection*> connectionStack_;
55
56 void handleEvent(int fd, short which);
57
58 public:
Mark Sleef9373392007-01-24 19:41:57 +000059 TNonblockingServer(shared_ptr<TProcessor> processor,
60 int port) :
61 TServer(processor),
62 serverSocket_(0),
63 port_(port),
64 frameResponses_(true) {}
65
Aditya Agarwal1ea90522007-01-19 02:02:12 +000066 TNonblockingServer(shared_ptr<TProcessor> processor,
67 shared_ptr<TProtocolFactory> protocolFactory,
68 int port) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000069 TServer(processor),
Mark Slee92f00fb2006-10-25 01:28:17 +000070 serverSocket_(0),
71 port_(port),
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000072 frameResponses_(true) {
73 setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
74 setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
75 setInputProtocolFactory(protocolFactory);
76 setOutputProtocolFactory(protocolFactory);
77 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +000078
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000079 TNonblockingServer(shared_ptr<TProcessor> processor,
80 shared_ptr<TTransportFactory> inputTransportFactory,
81 shared_ptr<TTransportFactory> outputTransportFactory,
82 shared_ptr<TProtocolFactory> inputProtocolFactory,
83 shared_ptr<TProtocolFactory> outputProtocolFactory,
Aditya Agarwal1ea90522007-01-19 02:02:12 +000084 int port) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000085 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +000086 serverSocket_(0),
87 port_(port),
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000088 frameResponses_(true) {
89 setInputTransportFactory(inputTransportFactory);
90 setOutputTransportFactory(outputTransportFactory);
91 setInputProtocolFactory(inputProtocolFactory);
92 setOutputProtocolFactory(outputProtocolFactory);
93 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +000094
Mark Slee2f6404d2006-10-10 01:37:40 +000095 ~TNonblockingServer() {}
96
Mark Slee92f00fb2006-10-25 01:28:17 +000097 void setFrameResponses(bool frameResponses) {
98 frameResponses_ = frameResponses;
99 }
100
101 bool getFrameResponses() {
102 return frameResponses_;
103 }
104
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 TConnection* createConnection(int socket, short flags);
106
107 void returnConnection(TConnection* connection);
108
109 static void eventHandler(int fd, short which, void* v) {
110 ((TNonblockingServer*)v)->handleEvent(fd, which);
111 }
112
113 void serve();
114};
115
116/**
117 * Two states for sockets, recv and send mode
118 */
119enum TSocketState {
120 SOCKET_RECV,
121 SOCKET_SEND
122};
123
124/**
125 * Four states for the nonblocking servr:
126 * 1) initialize
127 * 2) read 4 byte frame size
128 * 3) read frame of data
129 * 4) send back data (if any)
130 */
131enum TAppState {
132 APP_INIT,
133 APP_READ_FRAME_SIZE,
134 APP_READ_REQUEST,
Mark Slee92f00fb2006-10-25 01:28:17 +0000135 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000136 APP_SEND_RESULT
137};
138
139/**
140 * Represents a connection that is handled via libevent. This connection
141 * essentially encapsulates a socket that has some associated libevent state.
142 */
143class TConnection {
144 private:
145
146 // Server handle
147 TNonblockingServer* server_;
148
149 // Socket handle
150 int socket_;
151
152 // Libevent object
153 struct event event_;
154
155 // Libevent flags
156 short eventFlags_;
157
158 // Socket mode
159 TSocketState socketState_;
160
161 // Application state
162 TAppState appState_;
163
164 // How much data needed to read
165 uint32_t readWant_;
166
167 // Where in the read buffer are we
168 uint32_t readBufferPos_;
169
170 // Read buffer
171 uint8_t* readBuffer_;
172
173 // Read buffer size
174 uint32_t readBufferSize_;
175
176 // Write buffer
177 uint8_t* writeBuffer_;
178
179 // Write buffer size
180 uint32_t writeBufferSize_;
181
182 // How far through writing are we?
183 uint32_t writeBufferPos_;
184
Mark Slee92f00fb2006-10-25 01:28:17 +0000185 // Frame size
186 int32_t frameSize_;
187
Mark Slee2f6404d2006-10-10 01:37:40 +0000188 // Transport to read from
189 shared_ptr<TMemoryBuffer> inputTransport_;
190
191 // Transport that processor writes to
192 shared_ptr<TMemoryBuffer> outputTransport_;
193
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000194 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
195 shared_ptr<TTransport> factoryInputTransport_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000196 shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000197
198 // Protocol decoder
199 shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000200
201 // Protocol encoder
202 shared_ptr<TProtocol> outputProtocol_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000203
Mark Slee2f6404d2006-10-10 01:37:40 +0000204 // Go into read mode
205 void setRead() {
206 setFlags(EV_READ | EV_PERSIST);
207 }
208
209 // Go into write mode
210 void setWrite() {
211 setFlags(EV_WRITE | EV_PERSIST);
212 }
213
214 // Set event flags
215 void setFlags(short eventFlags);
216
217 // Libevent handlers
218 void workSocket();
219
220 // Close this client and reset
221 void close();
222
223 public:
224
225 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000226 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000227 readBuffer_ = (uint8_t*)malloc(1024);
228 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000229 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 }
231 readBufferSize_ = 1024;
232
233 // Allocate input and output tranpsorts
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000234 // these only need to be allocated once per TConnection (they don't need to be
235 // reallocated on init() call)
Mark Slee2f6404d2006-10-10 01:37:40 +0000236 inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
237 outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000238
Mark Slee2f6404d2006-10-10 01:37:40 +0000239 init(socket, eventFlags, s);
240 }
241
242 // Initialize
243 void init(int socket, short eventFlags, TNonblockingServer *s);
244
245 // Transition into a new state
246 void transition();
247
248 // Handler wrapper
249 static void eventHandler(int fd, short which, void* v) {
250 assert(fd = ((TConnection*)v)->socket_);
251 ((TConnection*)v)->workSocket();
252 }
253};
254
255}}} // facebook::thrift::server
256
257#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_