blob: bf9a5b17d643c5cd3380ea34294832c6c6d58494 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include "TNonblockingServer.h"
8
Mark Sleee02385b2007-06-09 01:21:16 +00009#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000010#include <sys/socket.h>
11#include <netinet/in.h>
12#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000013#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <fcntl.h>
15#include <errno.h>
16#include <assert.h>
17
Mark Slee79b16942007-11-26 19:05:29 +000018namespace facebook { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000019
Mark Slee5ea15f92007-03-05 22:55:59 +000020using namespace facebook::thrift::protocol;
21using namespace facebook::thrift::transport;
Mark Sleee02385b2007-06-09 01:21:16 +000022using namespace std;
23
24class TConnection::Task: public Runnable {
25 public:
26 Task(boost::shared_ptr<TProcessor> processor,
27 boost::shared_ptr<TProtocol> input,
28 boost::shared_ptr<TProtocol> output,
29 int taskHandle) :
30 processor_(processor),
31 input_(input),
32 output_(output),
33 taskHandle_(taskHandle) {}
34
35 void run() {
36 try {
37 while (processor_->process(input_, output_)) {
38 if (!input_->getTransport()->peek()) {
39 break;
40 }
41 }
42 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000043 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000044 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000045 cerr << "TNonblockingServer exception: " << x.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000046 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000047 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000048 }
Mark Slee79b16942007-11-26 19:05:29 +000049
Mark Sleee02385b2007-06-09 01:21:16 +000050 // Signal completion back to the libevent thread via a socketpair
51 int8_t b = 0;
52 if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
David Reiss01e55c12008-07-13 22:18:51 +000053 GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +000054 }
55 if (-1 == ::close(taskHandle_)) {
David Reiss01e55c12008-07-13 22:18:51 +000056 GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +000057 }
58 }
59
60 private:
61 boost::shared_ptr<TProcessor> processor_;
62 boost::shared_ptr<TProtocol> input_;
63 boost::shared_ptr<TProtocol> output_;
64 int taskHandle_;
65};
Mark Slee5ea15f92007-03-05 22:55:59 +000066
67void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000068 socket_ = socket;
69 server_ = s;
70 appState_ = APP_INIT;
71 eventFlags_ = 0;
72
73 readBufferPos_ = 0;
74 readWant_ = 0;
75
76 writeBuffer_ = NULL;
77 writeBufferSize_ = 0;
78 writeBufferPos_ = 0;
79
80 socketState_ = SOCKET_RECV;
81 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +000082
Mark Sleee02385b2007-06-09 01:21:16 +000083 taskHandle_ = -1;
84
Mark Slee2f6404d2006-10-10 01:37:40 +000085 // Set flags, which also registers the event
86 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000087
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000088 // get input/transports
89 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
90 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000091
92 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000093 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
94 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +000095}
96
97void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +000098 int flags=0, got=0, left=0, sent=0;
99 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000100
101 switch (socketState_) {
102 case SOCKET_RECV:
103 // It is an error to be in this state if we already have all the data
104 assert(readBufferPos_ < readWant_);
105
Mark Slee2f6404d2006-10-10 01:37:40 +0000106 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000107 if (readWant_ > readBufferSize_) {
108 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000109 readBufferSize_ *= 2;
110 }
David Reissd7a16f42008-02-19 22:47:29 +0000111 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000112 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000113 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000114 close();
115 return;
116 }
117 }
118
119 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000120 fetch = readWant_ - readBufferPos_;
121 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee79b16942007-11-26 19:05:29 +0000122
Mark Slee2f6404d2006-10-10 01:37:40 +0000123 if (got > 0) {
124 // Move along in the buffer
125 readBufferPos_ += got;
126
127 // Check that we did not overdo it
128 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000129
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 // We are done reading, move onto the next state
131 if (readBufferPos_ == readWant_) {
132 transition();
133 }
134 return;
135 } else if (got == -1) {
136 // Blocking errors are okay, just move on
137 if (errno == EAGAIN || errno == EWOULDBLOCK) {
138 return;
139 }
140
141 if (errno != ECONNRESET) {
David Reiss01e55c12008-07-13 22:18:51 +0000142 GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000143 }
144 }
145
146 // Whenever we get down here it means a remote disconnect
147 close();
Mark Slee79b16942007-11-26 19:05:29 +0000148
Mark Slee2f6404d2006-10-10 01:37:40 +0000149 return;
150
151 case SOCKET_SEND:
152 // Should never have position past size
153 assert(writeBufferPos_ <= writeBufferSize_);
154
155 // If there is no data to send, then let us move on
156 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000157 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 transition();
159 return;
160 }
161
162 flags = 0;
163 #ifdef MSG_NOSIGNAL
164 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
165 // check for the EPIPE return condition and close the socket in that case
166 flags |= MSG_NOSIGNAL;
167 #endif // ifdef MSG_NOSIGNAL
168
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000169 left = writeBufferSize_ - writeBufferPos_;
170 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000171
172 if (sent <= 0) {
173 // Blocking errors are okay, just move on
174 if (errno == EAGAIN || errno == EWOULDBLOCK) {
175 return;
176 }
177 if (errno != EPIPE) {
David Reiss01e55c12008-07-13 22:18:51 +0000178 GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000179 }
180 close();
181 return;
182 }
183
184 writeBufferPos_ += sent;
185
186 // Did we overdo it?
187 assert(writeBufferPos_ <= writeBufferSize_);
188
Mark Slee79b16942007-11-26 19:05:29 +0000189 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000190 if (writeBufferPos_ == writeBufferSize_) {
191 transition();
192 }
193
194 return;
195
196 default:
David Reiss01e55c12008-07-13 22:18:51 +0000197 GlobalOutput.printf("Shit Got Ill. Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000198 assert(0);
199 }
200}
201
202/**
203 * This is called when the application transitions from one state into
204 * another. This means that it has finished writing the data that it needed
205 * to, or finished receiving the data that it needed to.
206 */
207void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000208
209 int sz = 0;
210
Mark Slee2f6404d2006-10-10 01:37:40 +0000211 // Switch upon the state that we are currently in and move to a new state
212 switch (appState_) {
213
214 case APP_READ_REQUEST:
215 // We are done reading the request, package the read buffer into transport
216 // and get back some data from the dispatch function
217 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
218 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000219 // Prepend four bytes of blank space to the buffer so we can
220 // write the frame size there later.
221 outputTransport_->getWritePtr(4);
222 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000223
Mark Sleee02385b2007-06-09 01:21:16 +0000224 if (server_->isThreadPoolProcessing()) {
225 // We are setting up a Task to do this work and we will wait on it
226 int sv[2];
227 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
David Reiss01e55c12008-07-13 22:18:51 +0000228 GlobalOutput.perror("TConnection::socketpair() failed ", errno);
Mark Sleee02385b2007-06-09 01:21:16 +0000229 // Now we will fall through to the APP_WAIT_TASK block with no response
230 } else {
231 // Create task and dispatch to the thread manager
232 boost::shared_ptr<Runnable> task =
233 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
234 inputProtocol_,
235 outputProtocol_,
236 sv[1]));
Mark Slee79b16942007-11-26 19:05:29 +0000237 // The application is now waiting on the task to finish
Mark Sleee02385b2007-06-09 01:21:16 +0000238 appState_ = APP_WAIT_TASK;
Mark Slee79b16942007-11-26 19:05:29 +0000239
240 // Create an event to be notified when the task finishes
Mark Sleee02385b2007-06-09 01:21:16 +0000241 event_set(&taskEvent_,
242 taskHandle_ = sv[0],
243 EV_READ,
244 TConnection::taskHandler,
245 this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000246
Mark Slee79b16942007-11-26 19:05:29 +0000247 // Attach to the base
248 event_base_set(server_->getEventBase(), &taskEvent_);
249
Mark Sleee02385b2007-06-09 01:21:16 +0000250 // Add the event and start up the server
251 if (-1 == event_add(&taskEvent_, 0)) {
252 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
253 return;
254 }
255 server_->addTask(task);
Mark Slee402ee282007-08-23 01:43:20 +0000256
257 // Set this connection idle so that libevent doesn't process more
258 // data on it while we're still waiting for the threadmanager to
259 // finish this task
260 setIdle();
Mark Sleee02385b2007-06-09 01:21:16 +0000261 return;
262 }
263 } else {
264 try {
265 // Invoke the processor
266 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
267 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000268 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000269 close();
270 return;
271 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000272 GlobalOutput.printf("TException: Server::process() %s", x.what());
Mark Slee79b16942007-11-26 19:05:29 +0000273 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000274 return;
275 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000276 GlobalOutput.printf("Server::process() unknown exception");
Mark Sleee02385b2007-06-09 01:21:16 +0000277 close();
278 return;
279 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000280 }
281
Mark Slee402ee282007-08-23 01:43:20 +0000282 // Intentionally fall through here, the call to process has written into
283 // the writeBuffer_
284
Mark Sleee02385b2007-06-09 01:21:16 +0000285 case APP_WAIT_TASK:
286 // We have now finished processing a task and the result has been written
287 // into the outputTransport_, so we grab its contents and place them into
288 // the writeBuffer_ for actual writing by the libevent thread
289
Mark Slee2f6404d2006-10-10 01:37:40 +0000290 // Get the result of the operation
291 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
292
293 // If the function call generated return data, then move into the send
294 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000295 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000296 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000297
298 // Move into write state
299 writeBufferPos_ = 0;
300 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000301
David Reissaf787782008-07-03 20:29:34 +0000302 // Put the frame size into the write buffer
303 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
304 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000305
306 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000307 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000308 setWrite();
309
310 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000311 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000312
313 return;
314 }
315
316 // In this case, the request was asynchronous and we should fall through
317 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000318 goto LABEL_APP_INIT;
319
Mark Slee2f6404d2006-10-10 01:37:40 +0000320 case APP_SEND_RESULT:
321
322 // N.B.: We also intentionally fall through here into the INIT state!
323
Mark Slee92f00fb2006-10-25 01:28:17 +0000324 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000325 case APP_INIT:
326
327 // Clear write buffer variables
328 writeBuffer_ = NULL;
329 writeBufferPos_ = 0;
330 writeBufferSize_ = 0;
331
332 // Set up read buffer for getting 4 bytes
333 readBufferPos_ = 0;
334 readWant_ = 4;
335
336 // Into read4 state we go
337 socketState_ = SOCKET_RECV;
338 appState_ = APP_READ_FRAME_SIZE;
339
340 // Register read event
341 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000342
Mark Slee2f6404d2006-10-10 01:37:40 +0000343 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000344 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000345
346 return;
347
348 case APP_READ_FRAME_SIZE:
349 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000350 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000351 sz = (int32_t)ntohl(sz);
352
353 if (sz <= 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000354 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
Mark Slee2f6404d2006-10-10 01:37:40 +0000355 close();
356 return;
357 }
358
359 // Reset the read buffer
360 readWant_ = (uint32_t)sz;
361 readBufferPos_= 0;
362
363 // Move into read request state
364 appState_ = APP_READ_REQUEST;
365
366 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000367 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000368
369 return;
370
371 default:
David Reiss01e55c12008-07-13 22:18:51 +0000372 GlobalOutput.printf("Totally Fucked. Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000373 assert(0);
374 }
375}
376
377void TConnection::setFlags(short eventFlags) {
378 // Catch the do nothing case
379 if (eventFlags_ == eventFlags) {
380 return;
381 }
382
383 // Delete a previously existing event
384 if (eventFlags_ != 0) {
385 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000386 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000387 return;
388 }
389 }
390
391 // Update in memory structure
392 eventFlags_ = eventFlags;
393
Mark Slee402ee282007-08-23 01:43:20 +0000394 // Do not call event_set if there are no flags
395 if (!eventFlags_) {
396 return;
397 }
398
Mark Slee2f6404d2006-10-10 01:37:40 +0000399 /**
400 * event_set:
401 *
402 * Prepares the event structure &event to be used in future calls to
403 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000404 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000405 *
406 * The events can be either EV_READ, EV_WRITE, or both, indicating
407 * that an application can read or write from the file respectively without
408 * blocking.
409 *
Mark Sleee02385b2007-06-09 01:21:16 +0000410 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000411 * the event and the type of event which will be one of: EV_TIMEOUT,
412 * EV_SIGNAL, EV_READ, EV_WRITE.
413 *
414 * The additional flag EV_PERSIST makes an event_add() persistent until
415 * event_del() has been called.
416 *
417 * Once initialized, the &event struct can be used repeatedly with
418 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000419 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000420 * when an ev structure has been added to libevent using event_add() the
421 * structure must persist until the event occurs (assuming EV_PERSIST
422 * is not set) or is removed using event_del(). You may not reuse the same
423 * ev structure for multiple monitored descriptors; each descriptor needs
424 * its own ev.
425 */
426 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000427 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000428
429 // Add the event
430 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000431 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000432 }
433}
434
435/**
436 * Closes a connection
437 */
438void TConnection::close() {
439 // Delete the registered libevent
440 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000441 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000442 }
443
444 // Close the socket
445 if (socket_ > 0) {
446 ::close(socket_);
447 }
448 socket_ = 0;
449
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000450 // close any factory produced transports
451 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000452 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000453
Mark Slee2f6404d2006-10-10 01:37:40 +0000454 // Give this object back to the server that owns it
455 server_->returnConnection(this);
456}
457
458/**
459 * Creates a new connection either by reusing an object off the stack or
460 * by allocating a new one entirely
461 */
462TConnection* TNonblockingServer::createConnection(int socket, short flags) {
463 // Check the stack
464 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000465 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000466 } else {
467 TConnection* result = connectionStack_.top();
468 connectionStack_.pop();
469 result->init(socket, flags, this);
470 return result;
471 }
472}
473
474/**
475 * Returns a connection to the stack
476 */
477void TNonblockingServer::returnConnection(TConnection* connection) {
478 connectionStack_.push(connection);
479}
480
481/**
David Reissa79e4882008-03-05 07:51:47 +0000482 * Server socket had something happen. We accept all waiting client
483 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000484 */
485void TNonblockingServer::handleEvent(int fd, short which) {
486 // Make sure that libevent didn't fuck up the socket handles
487 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000488
Mark Slee2f6404d2006-10-10 01:37:40 +0000489 // Server socket accepted a new connection
490 socklen_t addrLen;
491 struct sockaddr addr;
Mark Slee79b16942007-11-26 19:05:29 +0000492 addrLen = sizeof(addr);
493
Mark Slee2f6404d2006-10-10 01:37:40 +0000494 // Going to accept a new client socket
495 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000496
Mark Slee2f6404d2006-10-10 01:37:40 +0000497 // Accept as many new clients as possible, even though libevent signaled only
498 // one, this helps us to avoid having to go back into the libevent engine so
499 // many times
500 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
501
502 // Explicitly set this socket to NONBLOCK mode
503 int flags;
504 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
505 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000506 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000507 close(clientSocket);
508 return;
509 }
510
511 // Create a new TConnection for this client socket.
512 TConnection* clientConnection =
513 createConnection(clientSocket, EV_READ | EV_PERSIST);
514
515 // Fail fast if we could not create a TConnection object
516 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000517 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000518 close(clientSocket);
519 return;
520 }
521
522 // Put this client connection into the proper state
523 clientConnection->transition();
524 }
Mark Slee79b16942007-11-26 19:05:29 +0000525
Mark Slee2f6404d2006-10-10 01:37:40 +0000526 // Done looping accept, now we have to make sure the error is due to
527 // blocking. Any other error is a problem
528 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000529 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000530 }
531}
532
533/**
Mark Slee79b16942007-11-26 19:05:29 +0000534 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000535 */
Mark Slee79b16942007-11-26 19:05:29 +0000536void TNonblockingServer::listenSocket() {
537 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000538 struct addrinfo hints, *res, *res0;
539 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000540
Mark Sleefb4b5142007-11-20 01:27:08 +0000541 char port[sizeof("65536") + 1];
542 memset(&hints, 0, sizeof(hints));
543 hints.ai_family = PF_UNSPEC;
544 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000545 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000546 sprintf(port, "%d", port_);
547
548 // Wildcard address
549 error = getaddrinfo(NULL, port, &hints, &res0);
550 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000551 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
552 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000553 return;
554 }
555
556 // Pick the ipv6 address first since ipv4 addresses can be mapped
557 // into ipv6 space.
558 for (res = res0; res; res = res->ai_next) {
559 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
560 break;
561 }
562
Mark Slee2f6404d2006-10-10 01:37:40 +0000563 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000564 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
565 if (s == -1) {
566 freeaddrinfo(res0);
567 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000568 }
569
David Reiss13aea462008-06-10 22:56:04 +0000570 #ifdef IPV6_V6ONLY
571 int zero = 0;
572 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
573 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
574 }
575 #endif // #ifdef IPV6_V6ONLY
576
577
Mark Slee79b16942007-11-26 19:05:29 +0000578 int one = 1;
579
580 // Set reuseaddr to avoid 2MSL delay on server restart
581 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
582
583 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
584 close(s);
585 freeaddrinfo(res0);
586 throw TException("TNonblockingServer::serve() bind");
587 }
588
589 // Done with the addr info
590 freeaddrinfo(res0);
591
592 // Set up this file descriptor for listening
593 listenSocket(s);
594}
595
596/**
597 * Takes a socket created by listenSocket() and sets various options on it
598 * to prepare for use in the server.
599 */
600void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000601 // Set socket to nonblocking mode
602 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000603 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
604 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
605 close(s);
606 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000607 }
608
609 int one = 1;
610 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000611
612 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000613 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000614
615 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000616 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000617
618 // Set TCP nodelay if available, MAC OS X Hack
619 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
620 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000621 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000622 #endif
623
Mark Slee79b16942007-11-26 19:05:29 +0000624 if (listen(s, LISTEN_BACKLOG) == -1) {
625 close(s);
626 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000627 }
628
Mark Slee79b16942007-11-26 19:05:29 +0000629 // Cool, this socket is good to go, set it as the serverSocket_
630 serverSocket_ = s;
631}
632
633/**
634 * Register the core libevent events onto the proper base.
635 */
636void TNonblockingServer::registerEvents(event_base* base) {
637 assert(serverSocket_ != -1);
638 assert(!eventBase_);
639 eventBase_ = base;
640
641 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000642 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000643 event_get_version(),
644 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000645
646 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000647 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000648 serverSocket_,
649 EV_READ | EV_PERSIST,
650 TNonblockingServer::eventHandler,
651 this);
Mark Slee79b16942007-11-26 19:05:29 +0000652 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000653
654 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000655 if (-1 == event_add(&serverEvent_, 0)) {
656 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000657 }
Mark Slee79b16942007-11-26 19:05:29 +0000658}
659
660/**
661 * Main workhorse function, starts up the server listening on a port and
662 * loops over the libevent handler.
663 */
664void TNonblockingServer::serve() {
665 // Init socket
666 listenSocket();
667
668 // Initialize libevent core
669 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000670
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000671 // Run the preServe event
672 if (eventHandler_ != NULL) {
673 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000674 }
675
Mark Sleee02385b2007-06-09 01:21:16 +0000676 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000677 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000678}
679
680}}} // facebook::thrift::server