blob: 8506507d30e7ba9153920f156d997472951d813f [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
Mark Sleee02385b2007-06-09 01:21:16 +000026#include <concurrency/ThreadManager.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000027#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000028#include <string>
29#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000030#include <cstdlib>
David Reiss5105b2e2009-05-21 02:28:27 +000031#include <unistd.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000032#include <event.h>
33
T Jake Lucianib5e62212009-01-31 22:36:20 +000034namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000035
T Jake Lucianib5e62212009-01-31 22:36:20 +000036using apache::thrift::transport::TMemoryBuffer;
37using apache::thrift::protocol::TProtocol;
38using apache::thrift::concurrency::Runnable;
39using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000040
41// Forward declaration of class
42class TConnection;
43
44/**
45 * This is a non-blocking server in C++ for high performance that operates a
46 * single IO thread. It assumes that all incoming requests are framed with a
47 * 4 byte length indicator and writes out responses using the same framing.
48 *
49 * It does not use the TServerTransport framework, but rather has socket
50 * operations hardcoded for use with select.
51 *
Mark Slee2f6404d2006-10-10 01:37:40 +000052 */
53class TNonblockingServer : public TServer {
54 private:
55
56 // Listen backlog
57 static const int LISTEN_BACKLOG = 1024;
58
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000059 // Default limit on size of idle connection pool
60 static const size_t CONNECTION_STACK_LIMIT = 1024;
61
62 // Maximum size of buffer allocated to idle connection
63 static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
64
Mark Slee2f6404d2006-10-10 01:37:40 +000065 // Server socket file descriptor
66 int serverSocket_;
67
68 // Port server runs on
69 int port_;
70
Mark Sleee02385b2007-06-09 01:21:16 +000071 // For processing via thread pool, may be NULL
72 boost::shared_ptr<ThreadManager> threadManager_;
73
74 // Is thread pool processing?
75 bool threadPoolProcessing_;
76
Mark Slee79b16942007-11-26 19:05:29 +000077 // The event base for libevent
78 event_base* eventBase_;
79
80 // Event struct, for use with eventBase_
81 struct event serverEvent_;
82
David Reiss1997f102008-04-29 00:29:41 +000083 // Number of TConnection object we've created
84 size_t numTConnections_;
85
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000086 // Limit for how many TConnection objects to cache
87 size_t connectionStackLimit_;
88
89 /**
90 * Max read buffer size for an idle connection. When we place an idle
91 * TConnection into connectionStack_, we insure that its read buffer is
92 * reduced to this size to insure that idle connections don't hog memory.
93 */
94 uint32_t idleBufferMemLimit_;
95
Mark Slee2f6404d2006-10-10 01:37:40 +000096 /**
97 * This is a stack of all the objects that have been created but that
98 * are NOT currently in use. When we close a connection, we place it on this
99 * stack so that the object can be reused later, rather than freeing the
100 * memory and reallocating a new object later.
101 */
102 std::stack<TConnection*> connectionStack_;
103
104 void handleEvent(int fd, short which);
105
106 public:
Mark Slee5ea15f92007-03-05 22:55:59 +0000107 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +0000108 int port) :
109 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000110 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +0000111 port_(port),
dweatherford58985992007-06-19 23:10:19 +0000112 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +0000113 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000114 numTConnections_(0),
115 connectionStackLimit_(CONNECTION_STACK_LIMIT),
116 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
Mark Sleef9373392007-01-24 19:41:57 +0000117
Mark Slee79b16942007-11-26 19:05:29 +0000118 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000119 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000120 int port,
121 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000122 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000123 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000124 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000125 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000126 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000127 numTConnections_(0),
128 connectionStackLimit_(CONNECTION_STACK_LIMIT),
129 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000130 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
131 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000132 setInputProtocolFactory(protocolFactory);
133 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000134 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000135 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000136
Mark Slee5ea15f92007-03-05 22:55:59 +0000137 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
138 boost::shared_ptr<TTransportFactory> inputTransportFactory,
139 boost::shared_ptr<TTransportFactory> outputTransportFactory,
140 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
141 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000142 int port,
143 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000144 TServer(processor),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000145 serverSocket_(0),
146 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000147 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000148 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000149 numTConnections_(0),
150 connectionStackLimit_(CONNECTION_STACK_LIMIT),
151 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000152 setInputTransportFactory(inputTransportFactory);
153 setOutputTransportFactory(outputTransportFactory);
154 setInputProtocolFactory(inputProtocolFactory);
155 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000156 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000157 }
Mark Slee79b16942007-11-26 19:05:29 +0000158
Mark Slee2f6404d2006-10-10 01:37:40 +0000159 ~TNonblockingServer() {}
160
Mark Sleee02385b2007-06-09 01:21:16 +0000161 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
162 threadManager_ = threadManager;
163 threadPoolProcessing_ = (threadManager != NULL);
164 }
165
David Reiss1997f102008-04-29 00:29:41 +0000166 boost::shared_ptr<ThreadManager> getThreadManager() {
167 return threadManager_;
168 }
169
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000170 /**
171 * Get the maximum number of unused TConnection we will hold in reserve.
172 *
173 * @return the current limit on TConnection pool size.
174 */
David Reiss260fa932009-04-02 23:51:39 +0000175 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000176 return connectionStackLimit_;
177 }
178
179 /**
180 * Set the maximum number of unused TConnection we will hold in reserve.
181 *
182 * @param sz the new limit for TConnection pool size.
183 */
David Reiss260fa932009-04-02 23:51:39 +0000184 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000185 connectionStackLimit_ = sz;
186 }
187
Mark Slee79b16942007-11-26 19:05:29 +0000188 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000189 return threadPoolProcessing_;
190 }
191
192 void addTask(boost::shared_ptr<Runnable> task) {
193 threadManager_->add(task);
194 }
195
Mark Slee79b16942007-11-26 19:05:29 +0000196 event_base* getEventBase() const {
197 return eventBase_;
198 }
199
David Reissc17fe6b2008-04-29 00:29:43 +0000200 void incrementNumConnections() {
201 ++numTConnections_;
202 }
203
204 void decrementNumConnections() {
205 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000206 }
207
208 size_t getNumConnections() {
209 return numTConnections_;
210 }
211
212 size_t getNumIdleConnections() {
213 return connectionStack_.size();
214 }
215
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000216 /**
217 * Get the maximum limit of memory allocated to idle TConnection objects.
218 *
219 * @return # bytes beyond which we will shrink buffers when idle.
220 */
221 size_t getIdleBufferMemLimit() const {
222 return idleBufferMemLimit_;
223 }
224
225 /**
226 * Set the maximum limit of memory allocated to idle TConnection objects.
227 * If a TConnection object goes idle with more than this much memory
228 * allocated to its buffer, we shrink it to this value.
229 *
230 * @param limit of bytes beyond which we will shrink buffers when idle.
231 */
232 void setIdleBufferMemLimit(size_t limit) {
233 idleBufferMemLimit_ = limit;
234 }
235
Mark Slee2f6404d2006-10-10 01:37:40 +0000236 TConnection* createConnection(int socket, short flags);
237
238 void returnConnection(TConnection* connection);
239
240 static void eventHandler(int fd, short which, void* v) {
241 ((TNonblockingServer*)v)->handleEvent(fd, which);
242 }
243
Mark Slee79b16942007-11-26 19:05:29 +0000244 void listenSocket();
245
246 void listenSocket(int fd);
247
248 void registerEvents(event_base* base);
249
Mark Slee2f6404d2006-10-10 01:37:40 +0000250 void serve();
251};
252
253/**
254 * Two states for sockets, recv and send mode
255 */
256enum TSocketState {
257 SOCKET_RECV,
258 SOCKET_SEND
259};
260
261/**
262 * Four states for the nonblocking servr:
263 * 1) initialize
264 * 2) read 4 byte frame size
265 * 3) read frame of data
266 * 4) send back data (if any)
267 */
268enum TAppState {
269 APP_INIT,
270 APP_READ_FRAME_SIZE,
271 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000272 APP_WAIT_TASK,
Mark Slee2f6404d2006-10-10 01:37:40 +0000273 APP_SEND_RESULT
274};
275
276/**
277 * Represents a connection that is handled via libevent. This connection
278 * essentially encapsulates a socket that has some associated libevent state.
279 */
280class TConnection {
281 private:
282
Mark Sleee02385b2007-06-09 01:21:16 +0000283 class Task;
284
Mark Slee2f6404d2006-10-10 01:37:40 +0000285 // Server handle
286 TNonblockingServer* server_;
287
288 // Socket handle
289 int socket_;
290
291 // Libevent object
292 struct event event_;
293
294 // Libevent flags
295 short eventFlags_;
296
297 // Socket mode
298 TSocketState socketState_;
299
300 // Application state
301 TAppState appState_;
302
303 // How much data needed to read
304 uint32_t readWant_;
305
306 // Where in the read buffer are we
307 uint32_t readBufferPos_;
308
309 // Read buffer
310 uint8_t* readBuffer_;
311
312 // Read buffer size
313 uint32_t readBufferSize_;
314
315 // Write buffer
316 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000317
Mark Slee2f6404d2006-10-10 01:37:40 +0000318 // Write buffer size
319 uint32_t writeBufferSize_;
320
321 // How far through writing are we?
322 uint32_t writeBufferPos_;
323
Kevin Clark5ace1782009-03-04 21:10:58 +0000324 // How many times have we read since our last buffer reset?
325 uint32_t numReadsSinceReset_;
326
327 // How many times have we written since our last buffer reset?
328 uint32_t numWritesSinceReset_;
329
Mark Sleee02385b2007-06-09 01:21:16 +0000330 // Task handle
331 int taskHandle_;
332
333 // Task event
334 struct event taskEvent_;
335
Mark Slee2f6404d2006-10-10 01:37:40 +0000336 // Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000337 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000338
339 // Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000340 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000341
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000342 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000343 boost::shared_ptr<TTransport> factoryInputTransport_;
344 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000345
346 // Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000347 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000348
349 // Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000350 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000351
Mark Slee2f6404d2006-10-10 01:37:40 +0000352 // Go into read mode
353 void setRead() {
354 setFlags(EV_READ | EV_PERSIST);
355 }
356
357 // Go into write mode
358 void setWrite() {
359 setFlags(EV_WRITE | EV_PERSIST);
360 }
361
Mark Slee402ee282007-08-23 01:43:20 +0000362 // Set socket idle
363 void setIdle() {
364 setFlags(0);
365 }
366
Mark Slee2f6404d2006-10-10 01:37:40 +0000367 // Set event flags
368 void setFlags(short eventFlags);
369
370 // Libevent handlers
371 void workSocket();
372
373 // Close this client and reset
374 void close();
375
376 public:
377
378 // Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000379 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
David Reissd7a16f42008-02-19 22:47:29 +0000380 readBuffer_ = (uint8_t*)std::malloc(1024);
Mark Slee2f6404d2006-10-10 01:37:40 +0000381 if (readBuffer_ == NULL) {
T Jake Lucianib5e62212009-01-31 22:36:20 +0000382 throw new apache::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000383 }
384 readBufferSize_ = 1024;
Mark Slee79b16942007-11-26 19:05:29 +0000385
Kevin Clark5ace1782009-03-04 21:10:58 +0000386 numReadsSinceReset_ = 0;
387 numWritesSinceReset_ = 0;
388
Mark Slee2f6404d2006-10-10 01:37:40 +0000389 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000390 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000391 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000392 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
393 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000394
Mark Slee2f6404d2006-10-10 01:37:40 +0000395 init(socket, eventFlags, s);
David Reiss1997f102008-04-29 00:29:41 +0000396 server_->incrementNumConnections();
397 }
398
399 ~TConnection() {
David Reissc17fe6b2008-04-29 00:29:43 +0000400 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000401 }
402
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000403 /**
404 * Check read buffer against a given limit and shrink it if exceeded.
405 *
406 * @param limit we limit buffer size to.
407 */
408 void checkIdleBufferMemLimit(uint32_t limit);
409
Mark Slee2f6404d2006-10-10 01:37:40 +0000410 // Initialize
411 void init(int socket, short eventFlags, TNonblockingServer *s);
412
413 // Transition into a new state
414 void transition();
415
416 // Handler wrapper
Mark Sleea8de4892008-02-09 00:02:26 +0000417 static void eventHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000418 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000419 ((TConnection*)v)->workSocket();
420 }
Mark Slee79b16942007-11-26 19:05:29 +0000421
Mark Sleee02385b2007-06-09 01:21:16 +0000422 // Handler wrapper for task block
Mark Sleea8de4892008-02-09 00:02:26 +0000423 static void taskHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000424 assert(fd == ((TConnection*)v)->taskHandle_);
425 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
David Reiss01e55c12008-07-13 22:18:51 +0000426 GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +0000427 }
428 ((TConnection*)v)->transition();
429 }
430
Mark Slee2f6404d2006-10-10 01:37:40 +0000431};
432
T Jake Lucianib5e62212009-01-31 22:36:20 +0000433}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000434
435#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_