blob: ec024c0311c481dafc78f98e3dbd8f7d05801adc [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
2#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
3
4#include "Thrift.h"
5#include "server/TServer.h"
6#include "transport/TMemoryBuffer.h"
7#include <stack>
8#include <event.h>
9
Mark Slee2f6404d2006-10-10 01:37:40 +000010namespace facebook { namespace thrift { namespace server {
11
12using boost::shared_ptr;
13
14// Forward declaration of class
15class TConnection;
16
17/**
18 * This is a non-blocking server in C++ for high performance that operates a
19 * single IO thread. It assumes that all incoming requests are framed with a
20 * 4 byte length indicator and writes out responses using the same framing.
21 *
22 * It does not use the TServerTransport framework, but rather has socket
23 * operations hardcoded for use with select.
24 *
25 * @author Mark Slee <mcslee@facebook.com>
26 */
27class TNonblockingServer : public TServer {
28 private:
29
30 // Listen backlog
31 static const int LISTEN_BACKLOG = 1024;
32
33 // Server socket file descriptor
34 int serverSocket_;
35
36 // Port server runs on
37 int port_;
38
Mark Slee92f00fb2006-10-25 01:28:17 +000039 // Whether to frame responses
40 bool frameResponses_;
41
Mark Slee2f6404d2006-10-10 01:37:40 +000042 /**
43 * This is a stack of all the objects that have been created but that
44 * are NOT currently in use. When we close a connection, we place it on this
45 * stack so that the object can be reused later, rather than freeing the
46 * memory and reallocating a new object later.
47 */
48 std::stack<TConnection*> connectionStack_;
49
50 void handleEvent(int fd, short which);
51
52 public:
53 TNonblockingServer(shared_ptr<TProcessor> processor,
54 shared_ptr<TServerOptions> options,
55 int port) :
Mark Slee92f00fb2006-10-25 01:28:17 +000056 TServer(processor, options),
57 serverSocket_(0),
58 port_(port),
59 frameResponses_(true) {}
Mark Slee2f6404d2006-10-10 01:37:40 +000060
61 ~TNonblockingServer() {}
62
Mark Slee92f00fb2006-10-25 01:28:17 +000063 void setFrameResponses(bool frameResponses) {
64 frameResponses_ = frameResponses;
65 }
66
67 bool getFrameResponses() {
68 return frameResponses_;
69 }
70
Mark Slee2f6404d2006-10-10 01:37:40 +000071 TConnection* createConnection(int socket, short flags);
72
73 void returnConnection(TConnection* connection);
74
75 static void eventHandler(int fd, short which, void* v) {
76 ((TNonblockingServer*)v)->handleEvent(fd, which);
77 }
78
79 void serve();
80};
81
82/**
83 * Two states for sockets, recv and send mode
84 */
85enum TSocketState {
86 SOCKET_RECV,
87 SOCKET_SEND
88};
89
90/**
91 * Four states for the nonblocking servr:
92 * 1) initialize
93 * 2) read 4 byte frame size
94 * 3) read frame of data
95 * 4) send back data (if any)
96 */
97enum TAppState {
98 APP_INIT,
99 APP_READ_FRAME_SIZE,
100 APP_READ_REQUEST,
Mark Slee92f00fb2006-10-25 01:28:17 +0000101 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000102 APP_SEND_RESULT
103};
104
105/**
106 * Represents a connection that is handled via libevent. This connection
107 * essentially encapsulates a socket that has some associated libevent state.
108 */
109class TConnection {
110 private:
111
112 // Server handle
113 TNonblockingServer* server_;
114
115 // Socket handle
116 int socket_;
117
118 // Libevent object
119 struct event event_;
120
121 // Libevent flags
122 short eventFlags_;
123
124 // Socket mode
125 TSocketState socketState_;
126
127 // Application state
128 TAppState appState_;
129
130 // How much data needed to read
131 uint32_t readWant_;
132
133 // Where in the read buffer are we
134 uint32_t readBufferPos_;
135
136 // Read buffer
137 uint8_t* readBuffer_;
138
139 // Read buffer size
140 uint32_t readBufferSize_;
141
142 // Write buffer
143 uint8_t* writeBuffer_;
144
145 // Write buffer size
146 uint32_t writeBufferSize_;
147
148 // How far through writing are we?
149 uint32_t writeBufferPos_;
150
Mark Slee92f00fb2006-10-25 01:28:17 +0000151 // Frame size
152 int32_t frameSize_;
153
Mark Slee2f6404d2006-10-10 01:37:40 +0000154 // Transport to read from
155 shared_ptr<TMemoryBuffer> inputTransport_;
156
157 // Transport that processor writes to
158 shared_ptr<TMemoryBuffer> outputTransport_;
159
160 // Go into read mode
161 void setRead() {
162 setFlags(EV_READ | EV_PERSIST);
163 }
164
165 // Go into write mode
166 void setWrite() {
167 setFlags(EV_WRITE | EV_PERSIST);
168 }
169
170 // Set event flags
171 void setFlags(short eventFlags);
172
173 // Libevent handlers
174 void workSocket();
175
176 // Close this client and reset
177 void close();
178
179 public:
180
181 // Constructor
182 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
183 readBuffer_ = (uint8_t*)malloc(1024);
184 if (readBuffer_ == NULL) {
185 throw new facebook::thrift::Exception("Out of memory.");
186 }
187 readBufferSize_ = 1024;
188
189 // Allocate input and output tranpsorts
190 inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
191 outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
192
193 init(socket, eventFlags, s);
194 }
195
196 // Initialize
197 void init(int socket, short eventFlags, TNonblockingServer *s);
198
199 // Transition into a new state
200 void transition();
201
202 // Handler wrapper
203 static void eventHandler(int fd, short which, void* v) {
204 assert(fd = ((TConnection*)v)->socket_);
205 ((TConnection*)v)->workSocket();
206 }
207};
208
209}}} // facebook::thrift::server
210
211#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_