blob: 11b58b1a9e2ecb6466489a6ba18ec5f34fca4918 [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
2#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
3
Mark Slee4af6ed72006-10-25 19:02:49 +00004#include <Thrift.h>
5#include <server/TServer.h>
6#include <transport/TTransportUtils.h>
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include <stack>
8#include <event.h>
9
Mark Slee2f6404d2006-10-10 01:37:40 +000010namespace facebook { namespace thrift { namespace server {
11
12using boost::shared_ptr;
13
14// Forward declaration of class
15class TConnection;
16
17/**
18 * This is a non-blocking server in C++ for high performance that operates a
19 * single IO thread. It assumes that all incoming requests are framed with a
20 * 4 byte length indicator and writes out responses using the same framing.
21 *
22 * It does not use the TServerTransport framework, but rather has socket
23 * operations hardcoded for use with select.
24 *
25 * @author Mark Slee <mcslee@facebook.com>
26 */
27class TNonblockingServer : public TServer {
28 private:
29
30 // Listen backlog
31 static const int LISTEN_BACKLOG = 1024;
32
33 // Server socket file descriptor
34 int serverSocket_;
35
36 // Port server runs on
37 int port_;
38
Mark Slee92f00fb2006-10-25 01:28:17 +000039 // Whether to frame responses
40 bool frameResponses_;
41
Mark Slee2f6404d2006-10-10 01:37:40 +000042 /**
43 * This is a stack of all the objects that have been created but that
44 * are NOT currently in use. When we close a connection, we place it on this
45 * stack so that the object can be reused later, rather than freeing the
46 * memory and reallocating a new object later.
47 */
48 std::stack<TConnection*> connectionStack_;
49
50 void handleEvent(int fd, short which);
51
52 public:
Mark Slee4af6ed72006-10-25 19:02:49 +000053 TNonblockingServer(shared_ptr<TProcessor> processor, int port) :
54 TServer(processor),
Mark Slee92f00fb2006-10-25 01:28:17 +000055 serverSocket_(0),
56 port_(port),
57 frameResponses_(true) {}
Mark Slee2f6404d2006-10-10 01:37:40 +000058
59 ~TNonblockingServer() {}
60
Mark Slee92f00fb2006-10-25 01:28:17 +000061 void setFrameResponses(bool frameResponses) {
62 frameResponses_ = frameResponses;
63 }
64
65 bool getFrameResponses() {
66 return frameResponses_;
67 }
68
Mark Slee2f6404d2006-10-10 01:37:40 +000069 TConnection* createConnection(int socket, short flags);
70
71 void returnConnection(TConnection* connection);
72
73 static void eventHandler(int fd, short which, void* v) {
74 ((TNonblockingServer*)v)->handleEvent(fd, which);
75 }
76
77 void serve();
78};
79
80/**
81 * Two states for sockets, recv and send mode
82 */
83enum TSocketState {
84 SOCKET_RECV,
85 SOCKET_SEND
86};
87
88/**
89 * Four states for the nonblocking servr:
90 * 1) initialize
91 * 2) read 4 byte frame size
92 * 3) read frame of data
93 * 4) send back data (if any)
94 */
95enum TAppState {
96 APP_INIT,
97 APP_READ_FRAME_SIZE,
98 APP_READ_REQUEST,
Mark Slee92f00fb2006-10-25 01:28:17 +000099 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 APP_SEND_RESULT
101};
102
103/**
104 * Represents a connection that is handled via libevent. This connection
105 * essentially encapsulates a socket that has some associated libevent state.
106 */
107class TConnection {
108 private:
109
110 // Server handle
111 TNonblockingServer* server_;
112
113 // Socket handle
114 int socket_;
115
116 // Libevent object
117 struct event event_;
118
119 // Libevent flags
120 short eventFlags_;
121
122 // Socket mode
123 TSocketState socketState_;
124
125 // Application state
126 TAppState appState_;
127
128 // How much data needed to read
129 uint32_t readWant_;
130
131 // Where in the read buffer are we
132 uint32_t readBufferPos_;
133
134 // Read buffer
135 uint8_t* readBuffer_;
136
137 // Read buffer size
138 uint32_t readBufferSize_;
139
140 // Write buffer
141 uint8_t* writeBuffer_;
142
143 // Write buffer size
144 uint32_t writeBufferSize_;
145
146 // How far through writing are we?
147 uint32_t writeBufferPos_;
148
Mark Slee92f00fb2006-10-25 01:28:17 +0000149 // Frame size
150 int32_t frameSize_;
151
Mark Slee2f6404d2006-10-10 01:37:40 +0000152 // Transport to read from
153 shared_ptr<TMemoryBuffer> inputTransport_;
154
155 // Transport that processor writes to
156 shared_ptr<TMemoryBuffer> outputTransport_;
157
Mark Slee4af6ed72006-10-25 19:02:49 +0000158 // Protocol encoder
159 shared_ptr<TProtocol> outputProtocol_;
160
161 // Protocol decoder
162 shared_ptr<TProtocol> inputProtocol_;
163
Mark Slee2f6404d2006-10-10 01:37:40 +0000164 // Go into read mode
165 void setRead() {
166 setFlags(EV_READ | EV_PERSIST);
167 }
168
169 // Go into write mode
170 void setWrite() {
171 setFlags(EV_WRITE | EV_PERSIST);
172 }
173
174 // Set event flags
175 void setFlags(short eventFlags);
176
177 // Libevent handlers
178 void workSocket();
179
180 // Close this client and reset
181 void close();
182
183 public:
184
185 // Constructor
186 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
187 readBuffer_ = (uint8_t*)malloc(1024);
188 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000189 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000190 }
191 readBufferSize_ = 1024;
192
193 // Allocate input and output tranpsorts
194 inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
195 outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
196
Mark Slee4af6ed72006-10-25 19:02:49 +0000197 // Create protocol
198 std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
199 iop = s->getProtocolFactory()->getIOProtocols(inputTransport_, outputTransport_);
200 inputProtocol_ = iop.first;
201 outputProtocol_ = iop.second;
202
Mark Slee2f6404d2006-10-10 01:37:40 +0000203 init(socket, eventFlags, s);
204 }
205
206 // Initialize
207 void init(int socket, short eventFlags, TNonblockingServer *s);
208
209 // Transition into a new state
210 void transition();
211
212 // Handler wrapper
213 static void eventHandler(int fd, short which, void* v) {
214 assert(fd = ((TConnection*)v)->socket_);
215 ((TConnection*)v)->workSocket();
216 }
217};
218
219}}} // facebook::thrift::server
220
221#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_