blob: ebe85a6a9e8cc5397c3488c5b508565639c39c8a [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include "TNonblockingServer.h"
8
Mark Sleee02385b2007-06-09 01:21:16 +00009#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000010#include <sys/socket.h>
11#include <netinet/in.h>
12#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000013#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <fcntl.h>
15#include <errno.h>
16#include <assert.h>
17
Mark Slee79b16942007-11-26 19:05:29 +000018namespace facebook { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000019
Mark Slee5ea15f92007-03-05 22:55:59 +000020using namespace facebook::thrift::protocol;
21using namespace facebook::thrift::transport;
Mark Sleee02385b2007-06-09 01:21:16 +000022using namespace std;
23
24class TConnection::Task: public Runnable {
25 public:
26 Task(boost::shared_ptr<TProcessor> processor,
27 boost::shared_ptr<TProtocol> input,
28 boost::shared_ptr<TProtocol> output,
29 int taskHandle) :
30 processor_(processor),
31 input_(input),
32 output_(output),
33 taskHandle_(taskHandle) {}
34
35 void run() {
36 try {
37 while (processor_->process(input_, output_)) {
38 if (!input_->getTransport()->peek()) {
39 break;
40 }
41 }
42 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000043 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000044 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000045 cerr << "TNonblockingServer exception: " << x.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000046 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000047 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000048 }
Mark Slee79b16942007-11-26 19:05:29 +000049
Mark Sleee02385b2007-06-09 01:21:16 +000050 // Signal completion back to the libevent thread via a socketpair
51 int8_t b = 0;
52 if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
David Reiss9b209552008-04-08 06:26:05 +000053 string errStr = "TNonblockingServer::Task: send " + TOutput::strerror_s(errno);
54 GlobalOutput(errStr.c_str());
Mark Sleee02385b2007-06-09 01:21:16 +000055 }
56 if (-1 == ::close(taskHandle_)) {
David Reiss9b209552008-04-08 06:26:05 +000057 string errStr = "TNonblockingServer::Task: close, possible resource leak " + TOutput::strerror_s(errno);
58 GlobalOutput(errStr.c_str());
Mark Sleee02385b2007-06-09 01:21:16 +000059 }
60 }
61
62 private:
63 boost::shared_ptr<TProcessor> processor_;
64 boost::shared_ptr<TProtocol> input_;
65 boost::shared_ptr<TProtocol> output_;
66 int taskHandle_;
67};
Mark Slee5ea15f92007-03-05 22:55:59 +000068
69void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000070 socket_ = socket;
71 server_ = s;
72 appState_ = APP_INIT;
73 eventFlags_ = 0;
74
75 readBufferPos_ = 0;
76 readWant_ = 0;
77
78 writeBuffer_ = NULL;
79 writeBufferSize_ = 0;
80 writeBufferPos_ = 0;
81
82 socketState_ = SOCKET_RECV;
83 appState_ = APP_INIT;
Mark Slee79b16942007-11-26 19:05:29 +000084
Mark Sleee02385b2007-06-09 01:21:16 +000085 taskHandle_ = -1;
86
Mark Slee2f6404d2006-10-10 01:37:40 +000087 // Set flags, which also registers the event
88 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000089
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000090 // get input/transports
91 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
92 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000093
94 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000095 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
96 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +000097}
98
99void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000100 int flags=0, got=0, left=0, sent=0;
101 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000102
103 switch (socketState_) {
104 case SOCKET_RECV:
105 // It is an error to be in this state if we already have all the data
106 assert(readBufferPos_ < readWant_);
107
Mark Slee2f6404d2006-10-10 01:37:40 +0000108 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000109 if (readWant_ > readBufferSize_) {
110 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000111 readBufferSize_ *= 2;
112 }
David Reissd7a16f42008-02-19 22:47:29 +0000113 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000114 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000115 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000116 close();
117 return;
118 }
119 }
120
121 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000122 fetch = readWant_ - readBufferPos_;
123 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee79b16942007-11-26 19:05:29 +0000124
Mark Slee2f6404d2006-10-10 01:37:40 +0000125 if (got > 0) {
126 // Move along in the buffer
127 readBufferPos_ += got;
128
129 // Check that we did not overdo it
130 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000131
Mark Slee2f6404d2006-10-10 01:37:40 +0000132 // We are done reading, move onto the next state
133 if (readBufferPos_ == readWant_) {
134 transition();
135 }
136 return;
137 } else if (got == -1) {
138 // Blocking errors are okay, just move on
139 if (errno == EAGAIN || errno == EWOULDBLOCK) {
140 return;
141 }
142
143 if (errno != ECONNRESET) {
David Reiss9b209552008-04-08 06:26:05 +0000144 string errStr = "TConnection::workSocket() recv -1 " + TOutput::strerror_s(errno);
145 GlobalOutput(errStr.c_str());
Mark Slee2f6404d2006-10-10 01:37:40 +0000146 }
147 }
148
149 // Whenever we get down here it means a remote disconnect
150 close();
Mark Slee79b16942007-11-26 19:05:29 +0000151
Mark Slee2f6404d2006-10-10 01:37:40 +0000152 return;
153
154 case SOCKET_SEND:
155 // Should never have position past size
156 assert(writeBufferPos_ <= writeBufferSize_);
157
158 // If there is no data to send, then let us move on
159 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000160 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 transition();
162 return;
163 }
164
165 flags = 0;
166 #ifdef MSG_NOSIGNAL
167 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
168 // check for the EPIPE return condition and close the socket in that case
169 flags |= MSG_NOSIGNAL;
170 #endif // ifdef MSG_NOSIGNAL
171
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000172 left = writeBufferSize_ - writeBufferPos_;
173 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000174
175 if (sent <= 0) {
176 // Blocking errors are okay, just move on
177 if (errno == EAGAIN || errno == EWOULDBLOCK) {
178 return;
179 }
180 if (errno != EPIPE) {
David Reiss9b209552008-04-08 06:26:05 +0000181 string errStr = "TConnection::workSocket() send -1 " + TOutput::strerror_s(errno);
182 GlobalOutput(errStr.c_str());
Mark Slee2f6404d2006-10-10 01:37:40 +0000183 }
184 close();
185 return;
186 }
187
188 writeBufferPos_ += sent;
189
190 // Did we overdo it?
191 assert(writeBufferPos_ <= writeBufferSize_);
192
Mark Slee79b16942007-11-26 19:05:29 +0000193 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000194 if (writeBufferPos_ == writeBufferSize_) {
195 transition();
196 }
197
198 return;
199
200 default:
201 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
202 assert(0);
203 }
204}
205
206/**
207 * This is called when the application transitions from one state into
208 * another. This means that it has finished writing the data that it needed
209 * to, or finished receiving the data that it needed to.
210 */
211void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000212
213 int sz = 0;
214
Mark Slee2f6404d2006-10-10 01:37:40 +0000215 // Switch upon the state that we are currently in and move to a new state
216 switch (appState_) {
217
218 case APP_READ_REQUEST:
219 // We are done reading the request, package the read buffer into transport
220 // and get back some data from the dispatch function
221 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
222 outputTransport_->resetBuffer();
Mark Slee79b16942007-11-26 19:05:29 +0000223
Mark Sleee02385b2007-06-09 01:21:16 +0000224 if (server_->isThreadPoolProcessing()) {
225 // We are setting up a Task to do this work and we will wait on it
226 int sv[2];
227 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
David Reiss9b209552008-04-08 06:26:05 +0000228 string errStr = "TConnection::socketpair() failed " + TOutput::strerror_s(errno);
229 GlobalOutput(errStr.c_str());
Mark Sleee02385b2007-06-09 01:21:16 +0000230 // Now we will fall through to the APP_WAIT_TASK block with no response
231 } else {
232 // Create task and dispatch to the thread manager
233 boost::shared_ptr<Runnable> task =
234 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
235 inputProtocol_,
236 outputProtocol_,
237 sv[1]));
Mark Slee79b16942007-11-26 19:05:29 +0000238 // The application is now waiting on the task to finish
Mark Sleee02385b2007-06-09 01:21:16 +0000239 appState_ = APP_WAIT_TASK;
Mark Slee79b16942007-11-26 19:05:29 +0000240
241 // Create an event to be notified when the task finishes
Mark Sleee02385b2007-06-09 01:21:16 +0000242 event_set(&taskEvent_,
243 taskHandle_ = sv[0],
244 EV_READ,
245 TConnection::taskHandler,
246 this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000247
Mark Slee79b16942007-11-26 19:05:29 +0000248 // Attach to the base
249 event_base_set(server_->getEventBase(), &taskEvent_);
250
Mark Sleee02385b2007-06-09 01:21:16 +0000251 // Add the event and start up the server
252 if (-1 == event_add(&taskEvent_, 0)) {
253 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
254 return;
255 }
256 server_->addTask(task);
Mark Slee402ee282007-08-23 01:43:20 +0000257
258 // Set this connection idle so that libevent doesn't process more
259 // data on it while we're still waiting for the threadmanager to
260 // finish this task
261 setIdle();
Mark Sleee02385b2007-06-09 01:21:16 +0000262 return;
263 }
264 } else {
265 try {
266 // Invoke the processor
267 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
268 } catch (TTransportException &ttx) {
269 fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
270 close();
271 return;
272 } catch (TException &x) {
273 fprintf(stderr, "TException: Server::process() %s\n", x.what());
Mark Slee79b16942007-11-26 19:05:29 +0000274 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000275 return;
276 } catch (...) {
277 fprintf(stderr, "Server::process() unknown exception\n");
278 close();
279 return;
280 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000281 }
282
Mark Slee402ee282007-08-23 01:43:20 +0000283 // Intentionally fall through here, the call to process has written into
284 // the writeBuffer_
285
Mark Sleee02385b2007-06-09 01:21:16 +0000286 case APP_WAIT_TASK:
287 // We have now finished processing a task and the result has been written
288 // into the outputTransport_, so we grab its contents and place them into
289 // the writeBuffer_ for actual writing by the libevent thread
290
Mark Slee2f6404d2006-10-10 01:37:40 +0000291 // Get the result of the operation
292 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
293
294 // If the function call generated return data, then move into the send
295 // state and get going
296 if (writeBufferSize_ > 0) {
297
298 // Move into write state
299 writeBufferPos_ = 0;
300 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000301
302 if (server_->getFrameResponses()) {
303 // Put the frame size into the write buffer
304 appState_ = APP_SEND_FRAME_SIZE;
305 frameSize_ = (int32_t)htonl(writeBufferSize_);
306 writeBuffer_ = (uint8_t*)&frameSize_;
307 writeBufferSize_ = 4;
308 } else {
309 // Go straight into sending the result, do not frame it
310 appState_ = APP_SEND_RESULT;
311 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000312
313 // Socket into write mode
314 setWrite();
315
316 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000317 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000318
319 return;
320 }
321
322 // In this case, the request was asynchronous and we should fall through
323 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000324 goto LABEL_APP_INIT;
325
326 case APP_SEND_FRAME_SIZE:
327
328 // Refetch the result of the operation since we put the frame size into
329 // writeBuffer_
330 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
331 writeBufferPos_ = 0;
332
333 // Now in send result state
334 appState_ = APP_SEND_RESULT;
335
336 // Go to work on the socket right away, probably still writeable
Mark Sleee02385b2007-06-09 01:21:16 +0000337 // workSocket();
Mark Slee92f00fb2006-10-25 01:28:17 +0000338
339 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000340
341 case APP_SEND_RESULT:
342
343 // N.B.: We also intentionally fall through here into the INIT state!
344
Mark Slee92f00fb2006-10-25 01:28:17 +0000345 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000346 case APP_INIT:
347
348 // Clear write buffer variables
349 writeBuffer_ = NULL;
350 writeBufferPos_ = 0;
351 writeBufferSize_ = 0;
352
353 // Set up read buffer for getting 4 bytes
354 readBufferPos_ = 0;
355 readWant_ = 4;
356
357 // Into read4 state we go
358 socketState_ = SOCKET_RECV;
359 appState_ = APP_READ_FRAME_SIZE;
360
361 // Register read event
362 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000363
Mark Slee2f6404d2006-10-10 01:37:40 +0000364 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000365 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000366
367 return;
368
369 case APP_READ_FRAME_SIZE:
370 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000371 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000372 sz = (int32_t)ntohl(sz);
373
374 if (sz <= 0) {
375 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
376 close();
377 return;
378 }
379
380 // Reset the read buffer
381 readWant_ = (uint32_t)sz;
382 readBufferPos_= 0;
383
384 // Move into read request state
385 appState_ = APP_READ_REQUEST;
386
387 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000388 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000389
390 return;
391
392 default:
393 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
394 assert(0);
395 }
396}
397
398void TConnection::setFlags(short eventFlags) {
399 // Catch the do nothing case
400 if (eventFlags_ == eventFlags) {
401 return;
402 }
403
404 // Delete a previously existing event
405 if (eventFlags_ != 0) {
406 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000407 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000408 return;
409 }
410 }
411
412 // Update in memory structure
413 eventFlags_ = eventFlags;
414
Mark Slee402ee282007-08-23 01:43:20 +0000415 // Do not call event_set if there are no flags
416 if (!eventFlags_) {
417 return;
418 }
419
Mark Slee2f6404d2006-10-10 01:37:40 +0000420 /**
421 * event_set:
422 *
423 * Prepares the event structure &event to be used in future calls to
424 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000425 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000426 *
427 * The events can be either EV_READ, EV_WRITE, or both, indicating
428 * that an application can read or write from the file respectively without
429 * blocking.
430 *
Mark Sleee02385b2007-06-09 01:21:16 +0000431 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000432 * the event and the type of event which will be one of: EV_TIMEOUT,
433 * EV_SIGNAL, EV_READ, EV_WRITE.
434 *
435 * The additional flag EV_PERSIST makes an event_add() persistent until
436 * event_del() has been called.
437 *
438 * Once initialized, the &event struct can be used repeatedly with
439 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000440 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000441 * when an ev structure has been added to libevent using event_add() the
442 * structure must persist until the event occurs (assuming EV_PERSIST
443 * is not set) or is removed using event_del(). You may not reuse the same
444 * ev structure for multiple monitored descriptors; each descriptor needs
445 * its own ev.
446 */
447 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000448 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000449
450 // Add the event
451 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000452 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000453 }
454}
455
456/**
457 * Closes a connection
458 */
459void TConnection::close() {
460 // Delete the registered libevent
461 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000462 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000463 }
464
465 // Close the socket
466 if (socket_ > 0) {
467 ::close(socket_);
468 }
469 socket_ = 0;
470
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000471 // close any factory produced transports
472 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000473 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000474
Mark Slee2f6404d2006-10-10 01:37:40 +0000475 // Give this object back to the server that owns it
476 server_->returnConnection(this);
477}
478
479/**
480 * Creates a new connection either by reusing an object off the stack or
481 * by allocating a new one entirely
482 */
483TConnection* TNonblockingServer::createConnection(int socket, short flags) {
484 // Check the stack
485 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000486 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000487 } else {
488 TConnection* result = connectionStack_.top();
489 connectionStack_.pop();
490 result->init(socket, flags, this);
491 return result;
492 }
493}
494
495/**
496 * Returns a connection to the stack
497 */
498void TNonblockingServer::returnConnection(TConnection* connection) {
499 connectionStack_.push(connection);
500}
501
502/**
David Reissa79e4882008-03-05 07:51:47 +0000503 * Server socket had something happen. We accept all waiting client
504 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000505 */
506void TNonblockingServer::handleEvent(int fd, short which) {
507 // Make sure that libevent didn't fuck up the socket handles
508 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000509
Mark Slee2f6404d2006-10-10 01:37:40 +0000510 // Server socket accepted a new connection
511 socklen_t addrLen;
512 struct sockaddr addr;
Mark Slee79b16942007-11-26 19:05:29 +0000513 addrLen = sizeof(addr);
514
Mark Slee2f6404d2006-10-10 01:37:40 +0000515 // Going to accept a new client socket
516 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000517
Mark Slee2f6404d2006-10-10 01:37:40 +0000518 // Accept as many new clients as possible, even though libevent signaled only
519 // one, this helps us to avoid having to go back into the libevent engine so
520 // many times
521 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
522
523 // Explicitly set this socket to NONBLOCK mode
524 int flags;
525 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
526 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss9b209552008-04-08 06:26:05 +0000527 string errStr = "thriftServerEventHandler: set O_NONBLOCK (fcntl) " + TOutput::strerror_s(errno);
528 GlobalOutput(errStr.c_str());
Mark Slee2f6404d2006-10-10 01:37:40 +0000529 close(clientSocket);
530 return;
531 }
532
533 // Create a new TConnection for this client socket.
534 TConnection* clientConnection =
535 createConnection(clientSocket, EV_READ | EV_PERSIST);
536
537 // Fail fast if we could not create a TConnection object
538 if (clientConnection == NULL) {
539 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
540 close(clientSocket);
541 return;
542 }
543
544 // Put this client connection into the proper state
545 clientConnection->transition();
546 }
Mark Slee79b16942007-11-26 19:05:29 +0000547
Mark Slee2f6404d2006-10-10 01:37:40 +0000548 // Done looping accept, now we have to make sure the error is due to
549 // blocking. Any other error is a problem
550 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss9b209552008-04-08 06:26:05 +0000551 string errStr = "thriftServerEventHandler: accept() " + TOutput::strerror_s(errno);
552 GlobalOutput(errStr.c_str());
Mark Slee2f6404d2006-10-10 01:37:40 +0000553 }
554}
555
556/**
Mark Slee79b16942007-11-26 19:05:29 +0000557 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000558 */
Mark Slee79b16942007-11-26 19:05:29 +0000559void TNonblockingServer::listenSocket() {
560 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000561 struct addrinfo hints, *res, *res0;
562 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000563
Mark Sleefb4b5142007-11-20 01:27:08 +0000564 char port[sizeof("65536") + 1];
565 memset(&hints, 0, sizeof(hints));
566 hints.ai_family = PF_UNSPEC;
567 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000568 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000569 sprintf(port, "%d", port_);
570
571 // Wildcard address
572 error = getaddrinfo(NULL, port, &hints, &res0);
573 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000574 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
575 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000576 return;
577 }
578
579 // Pick the ipv6 address first since ipv4 addresses can be mapped
580 // into ipv6 space.
581 for (res = res0; res; res = res->ai_next) {
582 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
583 break;
584 }
585
Mark Slee2f6404d2006-10-10 01:37:40 +0000586 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000587 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
588 if (s == -1) {
589 freeaddrinfo(res0);
590 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000591 }
592
Mark Slee79b16942007-11-26 19:05:29 +0000593 int one = 1;
594
595 // Set reuseaddr to avoid 2MSL delay on server restart
596 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
597
598 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
599 close(s);
600 freeaddrinfo(res0);
601 throw TException("TNonblockingServer::serve() bind");
602 }
603
604 // Done with the addr info
605 freeaddrinfo(res0);
606
607 // Set up this file descriptor for listening
608 listenSocket(s);
609}
610
611/**
612 * Takes a socket created by listenSocket() and sets various options on it
613 * to prepare for use in the server.
614 */
615void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000616 // Set socket to nonblocking mode
617 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000618 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
619 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
620 close(s);
621 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000622 }
623
624 int one = 1;
625 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000626
627 // Keepalive to ensure full result flushing
Mark Slee79b16942007-11-26 19:05:29 +0000628 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000629
630 // Turn linger off to avoid hung sockets
Mark Slee79b16942007-11-26 19:05:29 +0000631 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000632
633 // Set TCP nodelay if available, MAC OS X Hack
634 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
635 #ifndef TCP_NOPUSH
Mark Slee79b16942007-11-26 19:05:29 +0000636 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000637 #endif
638
Mark Slee79b16942007-11-26 19:05:29 +0000639 if (listen(s, LISTEN_BACKLOG) == -1) {
640 close(s);
641 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000642 }
643
Mark Slee79b16942007-11-26 19:05:29 +0000644 // Cool, this socket is good to go, set it as the serverSocket_
645 serverSocket_ = s;
646}
647
648/**
649 * Register the core libevent events onto the proper base.
650 */
651void TNonblockingServer::registerEvents(event_base* base) {
652 assert(serverSocket_ != -1);
653 assert(!eventBase_);
654 eventBase_ = base;
655
656 // Print some libevent stats
657 fprintf(stderr,
658 "libevent %s method %s\n",
659 event_get_version(),
660 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000661
662 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000663 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000664 serverSocket_,
665 EV_READ | EV_PERSIST,
666 TNonblockingServer::eventHandler,
667 this);
Mark Slee79b16942007-11-26 19:05:29 +0000668 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000669
670 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000671 if (-1 == event_add(&serverEvent_, 0)) {
672 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000673 }
Mark Slee79b16942007-11-26 19:05:29 +0000674}
675
676/**
677 * Main workhorse function, starts up the server listening on a port and
678 * loops over the libevent handler.
679 */
680void TNonblockingServer::serve() {
681 // Init socket
682 listenSocket();
683
684 // Initialize libevent core
685 registerEvents(static_cast<event_base*>(event_init()));
Mark Slee2f6404d2006-10-10 01:37:40 +0000686
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000687 // Run the preServe event
688 if (eventHandler_ != NULL) {
689 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000690 }
691
Mark Sleee02385b2007-06-09 01:21:16 +0000692 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000693 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000694}
695
696}}} // facebook::thrift::server