blob: 1684b64a04587bf37fcad770574edceba51c3d4a [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
Mark Sleee02385b2007-06-09 01:21:16 +000026#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000027#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000028#include <string>
29#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000030#include <cstdlib>
Mark Slee2f6404d2006-10-10 01:37:40 +000031#include <event.h>
32
T Jake Lucianib5e62212009-01-31 22:36:20 +000033namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000034
T Jake Lucianib5e62212009-01-31 22:36:20 +000035using apache::thrift::transport::TMemoryBuffer;
36using apache::thrift::protocol::TProtocol;
37using apache::thrift::concurrency::Runnable;
38using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000039
40// Forward declaration of class
41class TConnection;
42
43/**
44 * This is a non-blocking server in C++ for high performance that operates a
45 * single IO thread. It assumes that all incoming requests are framed with a
46 * 4 byte length indicator and writes out responses using the same framing.
47 *
48 * It does not use the TServerTransport framework, but rather has socket
49 * operations hardcoded for use with select.
50 *
Mark Slee2f6404d2006-10-10 01:37:40 +000051 */
52class TNonblockingServer : public TServer {
53 private:
54
55 // Listen backlog
56 static const int LISTEN_BACKLOG = 1024;
57
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000058 // Default limit on size of idle connection pool
59 static const size_t CONNECTION_STACK_LIMIT = 1024;
60
61 // Maximum size of buffer allocated to idle connection
62 static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
63
Mark Slee2f6404d2006-10-10 01:37:40 +000064 // Server socket file descriptor
65 int serverSocket_;
66
67 // Port server runs on
68 int port_;
69
Mark Sleee02385b2007-06-09 01:21:16 +000070 // For processing via thread pool, may be NULL
71 boost::shared_ptr<ThreadManager> threadManager_;
72
73 // Is thread pool processing?
74 bool threadPoolProcessing_;
75
Mark Slee79b16942007-11-26 19:05:29 +000076 // The event base for libevent
77 event_base* eventBase_;
78
79 // Event struct, for use with eventBase_
80 struct event serverEvent_;
81
David Reiss1997f102008-04-29 00:29:41 +000082 // Number of TConnection object we've created
83 size_t numTConnections_;
84
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000085 // Limit for how many TConnection objects to cache
86 size_t connectionStackLimit_;
87
88 /**
89 * Max read buffer size for an idle connection. When we place an idle
90 * TConnection into connectionStack_, we insure that its read buffer is
91 * reduced to this size to insure that idle connections don't hog memory.
92 */
93 uint32_t idleBufferMemLimit_;
94
Mark Slee2f6404d2006-10-10 01:37:40 +000095 /**
96 * This is a stack of all the objects that have been created but that
97 * are NOT currently in use. When we close a connection, we place it on this
98 * stack so that the object can be reused later, rather than freeing the
99 * memory and reallocating a new object later.
100 */
101 std::stack<TConnection*> connectionStack_;
102
103 void handleEvent(int fd, short which);
104
105 public:
Mark Slee5ea15f92007-03-05 22:55:59 +0000106 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +0000107 int port) :
108 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000109 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +0000110 port_(port),
dweatherford58985992007-06-19 23:10:19 +0000111 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +0000112 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000113 numTConnections_(0),
114 connectionStackLimit_(CONNECTION_STACK_LIMIT),
115 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
Mark Sleef9373392007-01-24 19:41:57 +0000116
Mark Slee79b16942007-11-26 19:05:29 +0000117 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000118 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000119 int port,
120 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000121 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000122 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000123 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000124 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000125 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000126 numTConnections_(0),
127 connectionStackLimit_(CONNECTION_STACK_LIMIT),
128 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000129 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
130 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000131 setInputProtocolFactory(protocolFactory);
132 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000133 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000134 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000135
Mark Slee5ea15f92007-03-05 22:55:59 +0000136 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
137 boost::shared_ptr<TTransportFactory> inputTransportFactory,
138 boost::shared_ptr<TTransportFactory> outputTransportFactory,
139 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
140 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000141 int port,
142 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000143 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000144 serverSocket_(0),
145 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000146 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000147 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000148 numTConnections_(0),
149 connectionStackLimit_(CONNECTION_STACK_LIMIT),
150 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000151 setInputTransportFactory(inputTransportFactory);
152 setOutputTransportFactory(outputTransportFactory);
153 setInputProtocolFactory(inputProtocolFactory);
154 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000155 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000156 }
Mark Slee79b16942007-11-26 19:05:29 +0000157
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 ~TNonblockingServer() {}
159
Mark Sleee02385b2007-06-09 01:21:16 +0000160 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
161 threadManager_ = threadManager;
162 threadPoolProcessing_ = (threadManager != NULL);
163 }
164
David Reiss1997f102008-04-29 00:29:41 +0000165 boost::shared_ptr<ThreadManager> getThreadManager() {
166 return threadManager_;
167 }
168
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000169 /**
170 * Get the maximum number of unused TConnection we will hold in reserve.
171 *
172 * @return the current limit on TConnection pool size.
173 */
David Reiss260fa932009-04-02 23:51:39 +0000174 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000175 return connectionStackLimit_;
176 }
177
178 /**
179 * Set the maximum number of unused TConnection we will hold in reserve.
180 *
181 * @param sz the new limit for TConnection pool size.
182 */
David Reiss260fa932009-04-02 23:51:39 +0000183 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000184 connectionStackLimit_ = sz;
185 }
186
Mark Slee79b16942007-11-26 19:05:29 +0000187 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000188 return threadPoolProcessing_;
189 }
190
191 void addTask(boost::shared_ptr<Runnable> task) {
192 threadManager_->add(task);
193 }
194
Mark Slee79b16942007-11-26 19:05:29 +0000195 event_base* getEventBase() const {
196 return eventBase_;
197 }
198
David Reissc17fe6b2008-04-29 00:29:43 +0000199 void incrementNumConnections() {
200 ++numTConnections_;
201 }
202
203 void decrementNumConnections() {
204 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000205 }
206
207 size_t getNumConnections() {
208 return numTConnections_;
209 }
210
211 size_t getNumIdleConnections() {
212 return connectionStack_.size();
213 }
214
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000215 /**
216 * Get the maximum limit of memory allocated to idle TConnection objects.
217 *
218 * @return # bytes beyond which we will shrink buffers when idle.
219 */
220 size_t getIdleBufferMemLimit() const {
221 return idleBufferMemLimit_;
222 }
223
224 /**
225 * Set the maximum limit of memory allocated to idle TConnection objects.
226 * If a TConnection object goes idle with more than this much memory
227 * allocated to its buffer, we shrink it to this value.
228 *
229 * @param limit of bytes beyond which we will shrink buffers when idle.
230 */
231 void setIdleBufferMemLimit(size_t limit) {
232 idleBufferMemLimit_ = limit;
233 }
234
Mark Slee2f6404d2006-10-10 01:37:40 +0000235 TConnection* createConnection(int socket, short flags);
236
237 void returnConnection(TConnection* connection);
238
239 static void eventHandler(int fd, short which, void* v) {
240 ((TNonblockingServer*)v)->handleEvent(fd, which);
241 }
242
Mark Slee79b16942007-11-26 19:05:29 +0000243 void listenSocket();
244
245 void listenSocket(int fd);
246
247 void registerEvents(event_base* base);
248
Mark Slee2f6404d2006-10-10 01:37:40 +0000249 void serve();
250};
251
252/**
253 * Two states for sockets, recv and send mode
254 */
255enum TSocketState {
256 SOCKET_RECV,
257 SOCKET_SEND
258};
259
260/**
261 * Four states for the nonblocking servr:
262 * 1) initialize
263 * 2) read 4 byte frame size
264 * 3) read frame of data
265 * 4) send back data (if any)
266 */
267enum TAppState {
268 APP_INIT,
269 APP_READ_FRAME_SIZE,
270 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000271 APP_WAIT_TASK,
Mark Slee2f6404d2006-10-10 01:37:40 +0000272 APP_SEND_RESULT
273};
274
275/**
276 * Represents a connection that is handled via libevent. This connection
277 * essentially encapsulates a socket that has some associated libevent state.
278 */
279class TConnection {
280 private:
281
Mark Sleee02385b2007-06-09 01:21:16 +0000282 class Task;
283
Mark Slee2f6404d2006-10-10 01:37:40 +0000284 // Server handle
285 TNonblockingServer* server_;
286
287 // Socket handle
288 int socket_;
289
290 // Libevent object
291 struct event event_;
292
293 // Libevent flags
294 short eventFlags_;
295
296 // Socket mode
297 TSocketState socketState_;
298
299 // Application state
300 TAppState appState_;
301
302 // How much data needed to read
303 uint32_t readWant_;
304
305 // Where in the read buffer are we
306 uint32_t readBufferPos_;
307
308 // Read buffer
309 uint8_t* readBuffer_;
310
311 // Read buffer size
312 uint32_t readBufferSize_;
313
314 // Write buffer
315 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000316
Mark Slee2f6404d2006-10-10 01:37:40 +0000317 // Write buffer size
318 uint32_t writeBufferSize_;
319
320 // How far through writing are we?
321 uint32_t writeBufferPos_;
322
Kevin Clark5ace1782009-03-04 21:10:58 +0000323 // How many times have we read since our last buffer reset?
324 uint32_t numReadsSinceReset_;
325
326 // How many times have we written since our last buffer reset?
327 uint32_t numWritesSinceReset_;
328
Mark Sleee02385b2007-06-09 01:21:16 +0000329 // Task handle
330 int taskHandle_;
331
332 // Task event
333 struct event taskEvent_;
334
Mark Slee2f6404d2006-10-10 01:37:40 +0000335 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000336 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000337
338 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000339 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000340
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000341 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000342 boost::shared_ptr<TTransport> factoryInputTransport_;
343 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000344
345 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000346 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000347
348 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000349 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000350
Mark Slee2f6404d2006-10-10 01:37:40 +0000351 // Go into read mode
352 void setRead() {
353 setFlags(EV_READ | EV_PERSIST);
354 }
355
356 // Go into write mode
357 void setWrite() {
358 setFlags(EV_WRITE | EV_PERSIST);
359 }
360
Mark Slee402ee282007-08-23 01:43:20 +0000361 // Set socket idle
362 void setIdle() {
363 setFlags(0);
364 }
365
Mark Slee2f6404d2006-10-10 01:37:40 +0000366 // Set event flags
367 void setFlags(short eventFlags);
368
369 // Libevent handlers
370 void workSocket();
371
372 // Close this client and reset
373 void close();
374
375 public:
376
377 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000378 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
David Reissd7a16f42008-02-19 22:47:29 +0000379 readBuffer_ = (uint8_t*)std::malloc(1024);
Mark Slee2f6404d2006-10-10 01:37:40 +0000380 if (readBuffer_ == NULL) {
T Jake Lucianib5e62212009-01-31 22:36:20 +0000381 throw new apache::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000382 }
383 readBufferSize_ = 1024;
Mark Slee79b16942007-11-26 19:05:29 +0000384
Kevin Clark5ace1782009-03-04 21:10:58 +0000385 numReadsSinceReset_ = 0;
386 numWritesSinceReset_ = 0;
387
Mark Slee2f6404d2006-10-10 01:37:40 +0000388 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000389 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000390 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000391 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
392 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000393
Mark Slee2f6404d2006-10-10 01:37:40 +0000394 init(socket, eventFlags, s);
David Reiss1997f102008-04-29 00:29:41 +0000395 server_->incrementNumConnections();
396 }
397
398 ~TConnection() {
David Reissc17fe6b2008-04-29 00:29:43 +0000399 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000400 }
401
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000402 /**
403 * Check read buffer against a given limit and shrink it if exceeded.
404 *
405 * @param limit we limit buffer size to.
406 */
407 void checkIdleBufferMemLimit(uint32_t limit);
408
Mark Slee2f6404d2006-10-10 01:37:40 +0000409 // Initialize
410 void init(int socket, short eventFlags, TNonblockingServer *s);
411
412 // Transition into a new state
413 void transition();
414
415 // Handler wrapper
Mark Sleea8de4892008-02-09 00:02:26 +0000416 static void eventHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000417 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000418 ((TConnection*)v)->workSocket();
419 }
Mark Slee79b16942007-11-26 19:05:29 +0000420
Mark Sleee02385b2007-06-09 01:21:16 +0000421 // Handler wrapper for task block
Mark Sleea8de4892008-02-09 00:02:26 +0000422 static void taskHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000423 assert(fd == ((TConnection*)v)->taskHandle_);
424 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
David Reiss01e55c12008-07-13 22:18:51 +0000425 GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +0000426 }
427 ((TConnection*)v)->transition();
428 }
429
Mark Slee2f6404d2006-10-10 01:37:40 +0000430};
431
T Jake Lucianib5e62212009-01-31 22:36:20 +0000432}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000433
434#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_