blob: 565486c37dc8b2d428134655e8cc9c5933dd69de [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
2#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
3
4#include "Thrift.h"
5#include "server/TServer.h"
6#include "transport/TMemoryBuffer.h"
7#include <stack>
8#include <event.h>
9
10#
11
12namespace facebook { namespace thrift { namespace server {
13
14using boost::shared_ptr;
15
16// Forward declaration of class
17class TConnection;
18
19/**
20 * This is a non-blocking server in C++ for high performance that operates a
21 * single IO thread. It assumes that all incoming requests are framed with a
22 * 4 byte length indicator and writes out responses using the same framing.
23 *
24 * It does not use the TServerTransport framework, but rather has socket
25 * operations hardcoded for use with select.
26 *
27 * @author Mark Slee <mcslee@facebook.com>
28 */
29class TNonblockingServer : public TServer {
30 private:
31
32 // Listen backlog
33 static const int LISTEN_BACKLOG = 1024;
34
35 // Server socket file descriptor
36 int serverSocket_;
37
38 // Port server runs on
39 int port_;
40
41 /**
42 * This is a stack of all the objects that have been created but that
43 * are NOT currently in use. When we close a connection, we place it on this
44 * stack so that the object can be reused later, rather than freeing the
45 * memory and reallocating a new object later.
46 */
47 std::stack<TConnection*> connectionStack_;
48
49 void handleEvent(int fd, short which);
50
51 public:
52 TNonblockingServer(shared_ptr<TProcessor> processor,
53 shared_ptr<TServerOptions> options,
54 int port) :
55 TServer(processor, options), serverSocket_(0), port_(port) {}
56
57 ~TNonblockingServer() {}
58
59 TConnection* createConnection(int socket, short flags);
60
61 void returnConnection(TConnection* connection);
62
63 static void eventHandler(int fd, short which, void* v) {
64 ((TNonblockingServer*)v)->handleEvent(fd, which);
65 }
66
67 void serve();
68};
69
70/**
71 * Two states for sockets, recv and send mode
72 */
73enum TSocketState {
74 SOCKET_RECV,
75 SOCKET_SEND
76};
77
78/**
79 * Four states for the nonblocking servr:
80 * 1) initialize
81 * 2) read 4 byte frame size
82 * 3) read frame of data
83 * 4) send back data (if any)
84 */
85enum TAppState {
86 APP_INIT,
87 APP_READ_FRAME_SIZE,
88 APP_READ_REQUEST,
89 APP_SEND_RESULT
90};
91
92/**
93 * Represents a connection that is handled via libevent. This connection
94 * essentially encapsulates a socket that has some associated libevent state.
95 */
96class TConnection {
97 private:
98
99 // Server handle
100 TNonblockingServer* server_;
101
102 // Socket handle
103 int socket_;
104
105 // Libevent object
106 struct event event_;
107
108 // Libevent flags
109 short eventFlags_;
110
111 // Socket mode
112 TSocketState socketState_;
113
114 // Application state
115 TAppState appState_;
116
117 // How much data needed to read
118 uint32_t readWant_;
119
120 // Where in the read buffer are we
121 uint32_t readBufferPos_;
122
123 // Read buffer
124 uint8_t* readBuffer_;
125
126 // Read buffer size
127 uint32_t readBufferSize_;
128
129 // Write buffer
130 uint8_t* writeBuffer_;
131
132 // Write buffer size
133 uint32_t writeBufferSize_;
134
135 // How far through writing are we?
136 uint32_t writeBufferPos_;
137
138 // Transport to read from
139 shared_ptr<TMemoryBuffer> inputTransport_;
140
141 // Transport that processor writes to
142 shared_ptr<TMemoryBuffer> outputTransport_;
143
144 // Go into read mode
145 void setRead() {
146 setFlags(EV_READ | EV_PERSIST);
147 }
148
149 // Go into write mode
150 void setWrite() {
151 setFlags(EV_WRITE | EV_PERSIST);
152 }
153
154 // Set event flags
155 void setFlags(short eventFlags);
156
157 // Libevent handlers
158 void workSocket();
159
160 // Close this client and reset
161 void close();
162
163 public:
164
165 // Constructor
166 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
167 readBuffer_ = (uint8_t*)malloc(1024);
168 if (readBuffer_ == NULL) {
169 throw new facebook::thrift::Exception("Out of memory.");
170 }
171 readBufferSize_ = 1024;
172
173 // Allocate input and output tranpsorts
174 inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
175 outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
176
177 init(socket, eventFlags, s);
178 }
179
180 // Initialize
181 void init(int socket, short eventFlags, TNonblockingServer *s);
182
183 // Transition into a new state
184 void transition();
185
186 // Handler wrapper
187 static void eventHandler(int fd, short which, void* v) {
188 assert(fd = ((TConnection*)v)->socket_);
189 ((TConnection*)v)->workSocket();
190 }
191};
192
193}}} // facebook::thrift::server
194
195#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_