blob: 34a0d88d279ea329f35124abd231f423ec820e63 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +00008#include <concurrency/Exception.h>
Mark Slee2f6404d2006-10-10 01:37:40 +00009
Mark Sleee02385b2007-06-09 01:21:16 +000010#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000011#include <sys/socket.h>
12#include <netinet/in.h>
13#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000014#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000015#include <fcntl.h>
16#include <errno.h>
17#include <assert.h>
18
T Jake Lucianib5e62212009-01-31 22:36:20 +000019namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000020
T Jake Lucianib5e62212009-01-31 22:36:20 +000021using namespace apache::thrift::protocol;
22using namespace apache::thrift::transport;
23using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000024using namespace std;
25
26class TConnection::Task: public Runnable {
27 public:
28 Task(boost::shared_ptr<TProcessor> processor,
29 boost::shared_ptr<TProtocol> input,
30 boost::shared_ptr<TProtocol> output,
31 int taskHandle) :
32 processor_(processor),
33 input_(input),
34 output_(output),
35 taskHandle_(taskHandle) {}
36
37 void run() {
38 try {
39 while (processor_->process(input_, output_)) {
40 if (!input_->getTransport()->peek()) {
41 break;
42 }
43 }
44 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000045 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000046 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000047 cerr << "TNonblockingServer exception: " << x.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000048 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000049 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000050 }
Mark Slee79b16942007-11-26 19:05:29 +000051
Mark Sleee02385b2007-06-09 01:21:16 +000052 // Signal completion back to the libevent thread via a socketpair
53 int8_t b = 0;
54 if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
David Reiss01e55c12008-07-13 22:18:51 +000055 GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +000056 }
57 if (-1 == ::close(taskHandle_)) {
David Reiss01e55c12008-07-13 22:18:51 +000058 GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +000059 }
60 }
61
62 private:
63 boost::shared_ptr<TProcessor> processor_;
64 boost::shared_ptr<TProtocol> input_;
65 boost::shared_ptr<TProtocol> output_;
66 int taskHandle_;
67};
Mark Slee5ea15f92007-03-05 22:55:59 +000068
69void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000070 socket_ = socket;
71 server_ = s;
72 appState_ = APP_INIT;
73 eventFlags_ = 0;
74
75 readBufferPos_ = 0;
76 readWant_ = 0;
77
78 writeBuffer_ = NULL;
79 writeBufferSize_ = 0;
80 writeBufferPos_ = 0;
81
82 socketState_ = SOCKET_RECV;
83 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +000084
Mark Sleee02385b2007-06-09 01:21:16 +000085 taskHandle_ = -1;
86
Mark Slee2f6404d2006-10-10 01:37:40 +000087 // Set flags, which also registers the event
88 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000089
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000090 // get input/transports
91 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
92 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000093
94 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000095 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
96 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +000097}
98
99void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000100 int flags=0, got=0, left=0, sent=0;
101 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000102
103 switch (socketState_) {
104 case SOCKET_RECV:
105 // It is an error to be in this state if we already have all the data
106 assert(readBufferPos_ < readWant_);
107
Mark Slee2f6404d2006-10-10 01:37:40 +0000108 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000109 if (readWant_ > readBufferSize_) {
110 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000111 readBufferSize_ *= 2;
112 }
David Reissd7a16f42008-02-19 22:47:29 +0000113 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000114 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000115 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000116 close();
117 return;
118 }
119 }
120
121 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000122 fetch = readWant_ - readBufferPos_;
123 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee79b16942007-11-26 19:05:29 +0000124
Mark Slee2f6404d2006-10-10 01:37:40 +0000125 if (got > 0) {
126 // Move along in the buffer
127 readBufferPos_ += got;
128
129 // Check that we did not overdo it
130 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000131
Mark Slee2f6404d2006-10-10 01:37:40 +0000132 // We are done reading, move onto the next state
133 if (readBufferPos_ == readWant_) {
134 transition();
135 }
136 return;
137 } else if (got == -1) {
138 // Blocking errors are okay, just move on
139 if (errno == EAGAIN || errno == EWOULDBLOCK) {
140 return;
141 }
142
143 if (errno != ECONNRESET) {
David Reiss01e55c12008-07-13 22:18:51 +0000144 GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000145 }
146 }
147
148 // Whenever we get down here it means a remote disconnect
149 close();
Mark Slee79b16942007-11-26 19:05:29 +0000150
Mark Slee2f6404d2006-10-10 01:37:40 +0000151 return;
152
153 case SOCKET_SEND:
154 // Should never have position past size
155 assert(writeBufferPos_ <= writeBufferSize_);
156
157 // If there is no data to send, then let us move on
158 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000159 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000160 transition();
161 return;
162 }
163
164 flags = 0;
165 #ifdef MSG_NOSIGNAL
166 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
167 // check for the EPIPE return condition and close the socket in that case
168 flags |= MSG_NOSIGNAL;
169 #endif // ifdef MSG_NOSIGNAL
170
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000171 left = writeBufferSize_ - writeBufferPos_;
172 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000173
174 if (sent <= 0) {
175 // Blocking errors are okay, just move on
176 if (errno == EAGAIN || errno == EWOULDBLOCK) {
177 return;
178 }
179 if (errno != EPIPE) {
David Reiss01e55c12008-07-13 22:18:51 +0000180 GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000181 }
182 close();
183 return;
184 }
185
186 writeBufferPos_ += sent;
187
188 // Did we overdo it?
189 assert(writeBufferPos_ <= writeBufferSize_);
190
Mark Slee79b16942007-11-26 19:05:29 +0000191 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000192 if (writeBufferPos_ == writeBufferSize_) {
193 transition();
194 }
195
196 return;
197
198 default:
David Reiss01e55c12008-07-13 22:18:51 +0000199 GlobalOutput.printf("Shit Got Ill. Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000200 assert(0);
201 }
202}
203
204/**
205 * This is called when the application transitions from one state into
206 * another. This means that it has finished writing the data that it needed
207 * to, or finished receiving the data that it needed to.
208 */
209void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000210
211 int sz = 0;
212
Mark Slee2f6404d2006-10-10 01:37:40 +0000213 // Switch upon the state that we are currently in and move to a new state
214 switch (appState_) {
215
216 case APP_READ_REQUEST:
217 // We are done reading the request, package the read buffer into transport
218 // and get back some data from the dispatch function
Kevin Clark5ace1782009-03-04 21:10:58 +0000219 // If we've used these transport buffers enough times, reset them to avoid bloating
220
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
Kevin Clark5ace1782009-03-04 21:10:58 +0000222 ++numReadsSinceReset_;
223 if (numWritesSinceReset_ < 512) {
224 outputTransport_->resetBuffer();
225 } else {
226 // reset the capacity of the output transport if we used it enough times that it might be bloated
227 try {
228 outputTransport_->resetBuffer(true);
229 numWritesSinceReset_ = 0;
230 } catch (TTransportException &ttx) {
231 GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
232 close();
233 return;
234 }
235 }
236
David Reiss52cb7a72008-06-30 21:40:35 +0000237 // Prepend four bytes of blank space to the buffer so we can
238 // write the frame size there later.
239 outputTransport_->getWritePtr(4);
240 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000241
Mark Sleee02385b2007-06-09 01:21:16 +0000242 if (server_->isThreadPoolProcessing()) {
243 // We are setting up a Task to do this work and we will wait on it
244 int sv[2];
245 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
David Reiss01e55c12008-07-13 22:18:51 +0000246 GlobalOutput.perror("TConnection::socketpair() failed ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +0000247 // Now we will fall through to the APP_WAIT_TASK block with no response
248 } else {
249 // Create task and dispatch to the thread manager
250 boost::shared_ptr<Runnable> task =
251 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
252 inputProtocol_,
253 outputProtocol_,
254 sv[1]));
Mark Slee79b16942007-11-26 19:05:29 +0000255 // The application is now waiting on the task to finish
Mark Sleee02385b2007-06-09 01:21:16 +0000256 appState_ = APP_WAIT_TASK;
Mark Slee79b16942007-11-26 19:05:29 +0000257
258 // Create an event to be notified when the task finishes
Mark Sleee02385b2007-06-09 01:21:16 +0000259 event_set(&taskEvent_,
260 taskHandle_ = sv[0],
261 EV_READ,
262 TConnection::taskHandler,
263 this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000264
Mark Slee79b16942007-11-26 19:05:29 +0000265 // Attach to the base
266 event_base_set(server_->getEventBase(), &taskEvent_);
267
Mark Sleee02385b2007-06-09 01:21:16 +0000268 // Add the event and start up the server
269 if (-1 == event_add(&taskEvent_, 0)) {
270 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
271 return;
272 }
David Reisse11f3072008-10-07 21:39:19 +0000273 try {
274 server_->addTask(task);
275 } catch (IllegalStateException & ise) {
276 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000277 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000278 close();
279 }
Mark Slee402ee282007-08-23 01:43:20 +0000280
281 // Set this connection idle so that libevent doesn't process more
282 // data on it while we're still waiting for the threadmanager to
283 // finish this task
284 setIdle();
Mark Sleee02385b2007-06-09 01:21:16 +0000285 return;
286 }
287 } else {
288 try {
289 // Invoke the processor
290 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
291 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000292 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000293 close();
294 return;
295 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000296 GlobalOutput.printf("TException: Server::process() %s", x.what());
Mark Slee79b16942007-11-26 19:05:29 +0000297 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000298 return;
299 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000300 GlobalOutput.printf("Server::process() unknown exception");
Mark Sleee02385b2007-06-09 01:21:16 +0000301 close();
302 return;
303 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000304 }
305
Mark Slee402ee282007-08-23 01:43:20 +0000306 // Intentionally fall through here, the call to process has written into
307 // the writeBuffer_
308
Mark Sleee02385b2007-06-09 01:21:16 +0000309 case APP_WAIT_TASK:
310 // We have now finished processing a task and the result has been written
311 // into the outputTransport_, so we grab its contents and place them into
312 // the writeBuffer_ for actual writing by the libevent thread
313
Mark Slee2f6404d2006-10-10 01:37:40 +0000314 // Get the result of the operation
315 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
316
317 // If the function call generated return data, then move into the send
318 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000319 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000320 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000321
322 // Move into write state
323 writeBufferPos_ = 0;
324 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000325
David Reissaf787782008-07-03 20:29:34 +0000326 // Put the frame size into the write buffer
327 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
328 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000329
330 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000331 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000332 setWrite();
333
334 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000335 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000336
337 return;
338 }
339
David Reissc51986f2009-03-24 20:01:25 +0000340 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000341 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000342 goto LABEL_APP_INIT;
343
Mark Slee2f6404d2006-10-10 01:37:40 +0000344 case APP_SEND_RESULT:
345
Kevin Clark5ace1782009-03-04 21:10:58 +0000346 ++numWritesSinceReset_;
347
Mark Slee2f6404d2006-10-10 01:37:40 +0000348 // N.B.: We also intentionally fall through here into the INIT state!
349
Mark Slee92f00fb2006-10-25 01:28:17 +0000350 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000351 case APP_INIT:
352
Kevin Clark5ace1782009-03-04 21:10:58 +0000353 // reset the input buffer if we used it enough times that it might be bloated
354 if (numReadsSinceReset_ > 512)
355 {
356 void * new_buffer = std::realloc(readBuffer_, 1024);
357 if (new_buffer == NULL) {
358 GlobalOutput("TConnection::transition() realloc");
359 close();
360 return;
361 }
362 readBuffer_ = (uint8_t*) new_buffer;
363 readBufferSize_ = 1024;
364 numReadsSinceReset_ = 0;
365 }
366
Mark Slee2f6404d2006-10-10 01:37:40 +0000367 // Clear write buffer variables
368 writeBuffer_ = NULL;
369 writeBufferPos_ = 0;
370 writeBufferSize_ = 0;
371
372 // Set up read buffer for getting 4 bytes
373 readBufferPos_ = 0;
374 readWant_ = 4;
375
376 // Into read4 state we go
377 socketState_ = SOCKET_RECV;
378 appState_ = APP_READ_FRAME_SIZE;
379
380 // Register read event
381 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000382
Mark Slee2f6404d2006-10-10 01:37:40 +0000383 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000384 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000385
386 return;
387
388 case APP_READ_FRAME_SIZE:
389 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000390 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000391 sz = (int32_t)ntohl(sz);
392
393 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000394 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000395 close();
396 return;
397 }
398
399 // Reset the read buffer
400 readWant_ = (uint32_t)sz;
401 readBufferPos_= 0;
402
403 // Move into read request state
404 appState_ = APP_READ_REQUEST;
405
406 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000407 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000408
409 return;
410
411 default:
David Reiss01e55c12008-07-13 22:18:51 +0000412 GlobalOutput.printf("Totally Fucked. Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000413 assert(0);
414 }
415}
416
417void TConnection::setFlags(short eventFlags) {
418 // Catch the do nothing case
419 if (eventFlags_ == eventFlags) {
420 return;
421 }
422
423 // Delete a previously existing event
424 if (eventFlags_ != 0) {
425 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000426 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000427 return;
428 }
429 }
430
431 // Update in memory structure
432 eventFlags_ = eventFlags;
433
Mark Slee402ee282007-08-23 01:43:20 +0000434 // Do not call event_set if there are no flags
435 if (!eventFlags_) {
436 return;
437 }
438
Mark Slee2f6404d2006-10-10 01:37:40 +0000439 /**
440 * event_set:
441 *
442 * Prepares the event structure &event to be used in future calls to
443 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000444 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000445 *
446 * The events can be either EV_READ, EV_WRITE, or both, indicating
447 * that an application can read or write from the file respectively without
448 * blocking.
449 *
Mark Sleee02385b2007-06-09 01:21:16 +0000450 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000451 * the event and the type of event which will be one of: EV_TIMEOUT,
452 * EV_SIGNAL, EV_READ, EV_WRITE.
453 *
454 * The additional flag EV_PERSIST makes an event_add() persistent until
455 * event_del() has been called.
456 *
457 * Once initialized, the &event struct can be used repeatedly with
458 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000459 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000460 * when an ev structure has been added to libevent using event_add() the
461 * structure must persist until the event occurs (assuming EV_PERSIST
462 * is not set) or is removed using event_del(). You may not reuse the same
463 * ev structure for multiple monitored descriptors; each descriptor needs
464 * its own ev.
465 */
466 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000467 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000468
469 // Add the event
470 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000471 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000472 }
473}
474
475/**
476 * Closes a connection
477 */
478void TConnection::close() {
479 // Delete the registered libevent
480 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000481 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000482 }
483
484 // Close the socket
485 if (socket_ > 0) {
486 ::close(socket_);
487 }
488 socket_ = 0;
489
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000490 // close any factory produced transports
491 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000492 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000493
Mark Slee2f6404d2006-10-10 01:37:40 +0000494 // Give this object back to the server that owns it
495 server_->returnConnection(this);
496}
497
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000498void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
499 if (readBufferSize_ > limit) {
500 readBufferSize_ = limit;
501 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
502 if (readBuffer_ == NULL) {
503 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
504 close();
505 }
506 }
507}
508
Mark Slee2f6404d2006-10-10 01:37:40 +0000509/**
510 * Creates a new connection either by reusing an object off the stack or
511 * by allocating a new one entirely
512 */
513TConnection* TNonblockingServer::createConnection(int socket, short flags) {
514 // Check the stack
515 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000516 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000517 } else {
518 TConnection* result = connectionStack_.top();
519 connectionStack_.pop();
520 result->init(socket, flags, this);
521 return result;
522 }
523}
524
525/**
526 * Returns a connection to the stack
527 */
528void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000529 if (connectionStackLimit_ &&
530 (connectionStack_.size() >= connectionStackLimit_)) {
531 delete connection;
532 } else {
533 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
534 connectionStack_.push(connection);
535 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000536}
537
538/**
David Reissa79e4882008-03-05 07:51:47 +0000539 * Server socket had something happen. We accept all waiting client
540 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000541 */
542void TNonblockingServer::handleEvent(int fd, short which) {
543 // Make sure that libevent didn't fuck up the socket handles
544 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000545
Mark Slee2f6404d2006-10-10 01:37:40 +0000546 // Server socket accepted a new connection
547 socklen_t addrLen;
548 struct sockaddr addr;
Mark Slee79b16942007-11-26 19:05:29 +0000549 addrLen = sizeof(addr);
550
Mark Slee2f6404d2006-10-10 01:37:40 +0000551 // Going to accept a new client socket
552 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000553
Mark Slee2f6404d2006-10-10 01:37:40 +0000554 // Accept as many new clients as possible, even though libevent signaled only
555 // one, this helps us to avoid having to go back into the libevent engine so
556 // many times
557 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
558
559 // Explicitly set this socket to NONBLOCK mode
560 int flags;
561 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
562 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000563 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000564 close(clientSocket);
565 return;
566 }
567
568 // Create a new TConnection for this client socket.
569 TConnection* clientConnection =
570 createConnection(clientSocket, EV_READ | EV_PERSIST);
571
572 // Fail fast if we could not create a TConnection object
573 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000574 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000575 close(clientSocket);
576 return;
577 }
578
579 // Put this client connection into the proper state
580 clientConnection->transition();
581 }
Mark Slee79b16942007-11-26 19:05:29 +0000582
Mark Slee2f6404d2006-10-10 01:37:40 +0000583 // Done looping accept, now we have to make sure the error is due to
584 // blocking. Any other error is a problem
585 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000586 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000587 }
588}
589
590/**
Mark Slee79b16942007-11-26 19:05:29 +0000591 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000592 */
Mark Slee79b16942007-11-26 19:05:29 +0000593void TNonblockingServer::listenSocket() {
594 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000595 struct addrinfo hints, *res, *res0;
596 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000597
Mark Sleefb4b5142007-11-20 01:27:08 +0000598 char port[sizeof("65536") + 1];
599 memset(&hints, 0, sizeof(hints));
600 hints.ai_family = PF_UNSPEC;
601 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000602 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000603 sprintf(port, "%d", port_);
604
605 // Wildcard address
606 error = getaddrinfo(NULL, port, &hints, &res0);
607 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000608 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
609 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000610 return;
611 }
612
613 // Pick the ipv6 address first since ipv4 addresses can be mapped
614 // into ipv6 space.
615 for (res = res0; res; res = res->ai_next) {
616 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
617 break;
618 }
619
Mark Slee2f6404d2006-10-10 01:37:40 +0000620 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000621 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
622 if (s == -1) {
623 freeaddrinfo(res0);
624 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000625 }
626
David Reiss13aea462008-06-10 22:56:04 +0000627 #ifdef IPV6_V6ONLY
628 int zero = 0;
629 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
630 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
631 }
632 #endif // #ifdef IPV6_V6ONLY
633
634
Mark Slee79b16942007-11-26 19:05:29 +0000635 int one = 1;
636
637 // Set reuseaddr to avoid 2MSL delay on server restart
638 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
639
640 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
641 close(s);
642 freeaddrinfo(res0);
643 throw TException("TNonblockingServer::serve() bind");
644 }
645
646 // Done with the addr info
647 freeaddrinfo(res0);
648
649 // Set up this file descriptor for listening
650 listenSocket(s);
651}
652
653/**
654 * Takes a socket created by listenSocket() and sets various options on it
655 * to prepare for use in the server.
656 */
657void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000658 // Set socket to nonblocking mode
659 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000660 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
661 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
662 close(s);
663 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000664 }
665
666 int one = 1;
667 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000668
669 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000670 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000671
672 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000673 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000674
675 // Set TCP nodelay if available, MAC OS X Hack
676 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
677 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000678 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000679 #endif
680
Mark Slee79b16942007-11-26 19:05:29 +0000681 if (listen(s, LISTEN_BACKLOG) == -1) {
682 close(s);
683 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000684 }
685
Mark Slee79b16942007-11-26 19:05:29 +0000686 // Cool, this socket is good to go, set it as the serverSocket_
687 serverSocket_ = s;
688}
689
690/**
691 * Register the core libevent events onto the proper base.
692 */
693void TNonblockingServer::registerEvents(event_base* base) {
694 assert(serverSocket_ != -1);
695 assert(!eventBase_);
696 eventBase_ = base;
697
698 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000699 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000700 event_get_version(),
701 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000702
703 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000704 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000705 serverSocket_,
706 EV_READ | EV_PERSIST,
707 TNonblockingServer::eventHandler,
708 this);
Mark Slee79b16942007-11-26 19:05:29 +0000709 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000710
711 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000712 if (-1 == event_add(&serverEvent_, 0)) {
713 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000714 }
Mark Slee79b16942007-11-26 19:05:29 +0000715}
716
717/**
718 * Main workhorse function, starts up the server listening on a port and
719 * loops over the libevent handler.
720 */
721void TNonblockingServer::serve() {
722 // Init socket
723 listenSocket();
724
725 // Initialize libevent core
726 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000727
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000728 // Run the preServe event
729 if (eventHandler_ != NULL) {
730 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000731 }
732
Mark Sleee02385b2007-06-09 01:21:16 +0000733 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000734 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000735}
736
T Jake Lucianib5e62212009-01-31 22:36:20 +0000737}}} // apache::thrift::server