blob: b7ed761883aa04789f707a17a7c0ca8df36e6e9c [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include "TNonblockingServer.h"
8
Mark Sleee02385b2007-06-09 01:21:16 +00009#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000010#include <sys/socket.h>
11#include <netinet/in.h>
12#include <netinet/tcp.h>
Mark Sleefb4b5142007-11-20 01:27:08 +000013#include <netdb.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000014#include <fcntl.h>
15#include <errno.h>
16#include <assert.h>
17
18namespace facebook { namespace thrift { namespace server {
19
Mark Slee5ea15f92007-03-05 22:55:59 +000020using namespace facebook::thrift::protocol;
21using namespace facebook::thrift::transport;
Mark Sleee02385b2007-06-09 01:21:16 +000022using namespace std;
23
24class TConnection::Task: public Runnable {
25 public:
26 Task(boost::shared_ptr<TProcessor> processor,
27 boost::shared_ptr<TProtocol> input,
28 boost::shared_ptr<TProtocol> output,
29 int taskHandle) :
30 processor_(processor),
31 input_(input),
32 output_(output),
33 taskHandle_(taskHandle) {}
34
35 void run() {
36 try {
37 while (processor_->process(input_, output_)) {
38 if (!input_->getTransport()->peek()) {
39 break;
40 }
41 }
42 } catch (TTransportException& ttx) {
43 cerr << "TThreadedServer client died: " << ttx.what() << endl;
44 } catch (TException& x) {
45 cerr << "TThreadedServer exception: " << x.what() << endl;
46 } catch (...) {
47 cerr << "TThreadedServer uncaught exception." << endl;
48 }
49
50 // Signal completion back to the libevent thread via a socketpair
51 int8_t b = 0;
52 if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
53 GlobalOutput("TNonblockingServer::Task: send");
54 }
55 if (-1 == ::close(taskHandle_)) {
56 GlobalOutput("TNonblockingServer::Task: close, possible resource leak");
57 }
58 }
59
60 private:
61 boost::shared_ptr<TProcessor> processor_;
62 boost::shared_ptr<TProtocol> input_;
63 boost::shared_ptr<TProtocol> output_;
64 int taskHandle_;
65};
Mark Slee5ea15f92007-03-05 22:55:59 +000066
67void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000068 socket_ = socket;
69 server_ = s;
70 appState_ = APP_INIT;
71 eventFlags_ = 0;
72
73 readBufferPos_ = 0;
74 readWant_ = 0;
75
76 writeBuffer_ = NULL;
77 writeBufferSize_ = 0;
78 writeBufferPos_ = 0;
79
80 socketState_ = SOCKET_RECV;
81 appState_ = APP_INIT;
82
Mark Sleee02385b2007-06-09 01:21:16 +000083 taskHandle_ = -1;
84
Mark Slee2f6404d2006-10-10 01:37:40 +000085 // Set flags, which also registers the event
86 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000087
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000088 // get input/transports
89 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
90 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000091
92 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000093 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
94 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +000095}
96
97void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +000098 int flags=0, got=0, left=0, sent=0;
99 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000100
101 switch (socketState_) {
102 case SOCKET_RECV:
103 // It is an error to be in this state if we already have all the data
104 assert(readBufferPos_ < readWant_);
105
Mark Slee2f6404d2006-10-10 01:37:40 +0000106 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000107 if (readWant_ > readBufferSize_) {
108 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000109 readBufferSize_ *= 2;
110 }
111 readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
112 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000113 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000114 close();
115 return;
116 }
117 }
118
119 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000120 fetch = readWant_ - readBufferPos_;
121 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee402ee282007-08-23 01:43:20 +0000122
Mark Slee2f6404d2006-10-10 01:37:40 +0000123 if (got > 0) {
124 // Move along in the buffer
125 readBufferPos_ += got;
126
127 // Check that we did not overdo it
128 assert(readBufferPos_ <= readWant_);
129
130 // We are done reading, move onto the next state
131 if (readBufferPos_ == readWant_) {
132 transition();
133 }
134 return;
135 } else if (got == -1) {
136 // Blocking errors are okay, just move on
137 if (errno == EAGAIN || errno == EWOULDBLOCK) {
138 return;
139 }
140
141 if (errno != ECONNRESET) {
boz6ded7752007-06-05 22:41:18 +0000142 GlobalOutput("TConnection::workSocket() recv -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000143 }
144 }
145
146 // Whenever we get down here it means a remote disconnect
147 close();
148
149 return;
150
151 case SOCKET_SEND:
152 // Should never have position past size
153 assert(writeBufferPos_ <= writeBufferSize_);
154
155 // If there is no data to send, then let us move on
156 if (writeBufferPos_ == writeBufferSize_) {
157 fprintf(stderr, "WARNING: Send state with no data to send\n");
158 transition();
159 return;
160 }
161
162 flags = 0;
163 #ifdef MSG_NOSIGNAL
164 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
165 // check for the EPIPE return condition and close the socket in that case
166 flags |= MSG_NOSIGNAL;
167 #endif // ifdef MSG_NOSIGNAL
168
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000169 left = writeBufferSize_ - writeBufferPos_;
170 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000171
172 if (sent <= 0) {
173 // Blocking errors are okay, just move on
174 if (errno == EAGAIN || errno == EWOULDBLOCK) {
175 return;
176 }
177 if (errno != EPIPE) {
boz6ded7752007-06-05 22:41:18 +0000178 GlobalOutput("TConnection::workSocket() send -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000179 }
180 close();
181 return;
182 }
183
184 writeBufferPos_ += sent;
185
186 // Did we overdo it?
187 assert(writeBufferPos_ <= writeBufferSize_);
188
189 // We are done!
190 if (writeBufferPos_ == writeBufferSize_) {
191 transition();
192 }
193
194 return;
195
196 default:
197 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
198 assert(0);
199 }
200}
201
202/**
203 * This is called when the application transitions from one state into
204 * another. This means that it has finished writing the data that it needed
205 * to, or finished receiving the data that it needed to.
206 */
207void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000208
209 int sz = 0;
210
Mark Slee2f6404d2006-10-10 01:37:40 +0000211 // Switch upon the state that we are currently in and move to a new state
212 switch (appState_) {
213
214 case APP_READ_REQUEST:
215 // We are done reading the request, package the read buffer into transport
216 // and get back some data from the dispatch function
217 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
218 outputTransport_->resetBuffer();
Mark Slee402ee282007-08-23 01:43:20 +0000219
Mark Sleee02385b2007-06-09 01:21:16 +0000220 if (server_->isThreadPoolProcessing()) {
221 // We are setting up a Task to do this work and we will wait on it
222 int sv[2];
223 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
224 GlobalOutput("TConnection::socketpair() failed");
225 // Now we will fall through to the APP_WAIT_TASK block with no response
226 } else {
227 // Create task and dispatch to the thread manager
228 boost::shared_ptr<Runnable> task =
229 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
230 inputProtocol_,
231 outputProtocol_,
232 sv[1]));
233 appState_ = APP_WAIT_TASK;
234 event_set(&taskEvent_,
235 taskHandle_ = sv[0],
236 EV_READ,
237 TConnection::taskHandler,
238 this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000239
Mark Sleee02385b2007-06-09 01:21:16 +0000240 // Add the event and start up the server
241 if (-1 == event_add(&taskEvent_, 0)) {
242 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
243 return;
244 }
245 server_->addTask(task);
Mark Slee402ee282007-08-23 01:43:20 +0000246
247 // Set this connection idle so that libevent doesn't process more
248 // data on it while we're still waiting for the threadmanager to
249 // finish this task
250 setIdle();
Mark Sleee02385b2007-06-09 01:21:16 +0000251 return;
252 }
253 } else {
254 try {
255 // Invoke the processor
256 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
257 } catch (TTransportException &ttx) {
258 fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
259 close();
260 return;
261 } catch (TException &x) {
262 fprintf(stderr, "TException: Server::process() %s\n", x.what());
263 close();
264 return;
265 } catch (...) {
266 fprintf(stderr, "Server::process() unknown exception\n");
267 close();
268 return;
269 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000270 }
271
Mark Slee402ee282007-08-23 01:43:20 +0000272 // Intentionally fall through here, the call to process has written into
273 // the writeBuffer_
274
Mark Sleee02385b2007-06-09 01:21:16 +0000275 case APP_WAIT_TASK:
276 // We have now finished processing a task and the result has been written
277 // into the outputTransport_, so we grab its contents and place them into
278 // the writeBuffer_ for actual writing by the libevent thread
279
Mark Slee2f6404d2006-10-10 01:37:40 +0000280 // Get the result of the operation
281 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
282
283 // If the function call generated return data, then move into the send
284 // state and get going
285 if (writeBufferSize_ > 0) {
286
287 // Move into write state
288 writeBufferPos_ = 0;
289 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000290
291 if (server_->getFrameResponses()) {
292 // Put the frame size into the write buffer
293 appState_ = APP_SEND_FRAME_SIZE;
294 frameSize_ = (int32_t)htonl(writeBufferSize_);
295 writeBuffer_ = (uint8_t*)&frameSize_;
296 writeBufferSize_ = 4;
297 } else {
298 // Go straight into sending the result, do not frame it
299 appState_ = APP_SEND_RESULT;
300 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000301
302 // Socket into write mode
303 setWrite();
304
305 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000306 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000307
308 return;
309 }
310
311 // In this case, the request was asynchronous and we should fall through
312 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000313 goto LABEL_APP_INIT;
314
315 case APP_SEND_FRAME_SIZE:
316
317 // Refetch the result of the operation since we put the frame size into
318 // writeBuffer_
319 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
320 writeBufferPos_ = 0;
321
322 // Now in send result state
323 appState_ = APP_SEND_RESULT;
324
325 // Go to work on the socket right away, probably still writeable
Mark Sleee02385b2007-06-09 01:21:16 +0000326 // workSocket();
Mark Slee92f00fb2006-10-25 01:28:17 +0000327
328 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000329
330 case APP_SEND_RESULT:
331
332 // N.B.: We also intentionally fall through here into the INIT state!
333
Mark Slee92f00fb2006-10-25 01:28:17 +0000334 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000335 case APP_INIT:
336
337 // Clear write buffer variables
338 writeBuffer_ = NULL;
339 writeBufferPos_ = 0;
340 writeBufferSize_ = 0;
341
342 // Set up read buffer for getting 4 bytes
343 readBufferPos_ = 0;
344 readWant_ = 4;
345
346 // Into read4 state we go
347 socketState_ = SOCKET_RECV;
348 appState_ = APP_READ_FRAME_SIZE;
349
350 // Register read event
351 setRead();
352
353 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000354 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000355
356 return;
357
358 case APP_READ_FRAME_SIZE:
359 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000360 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000361 sz = (int32_t)ntohl(sz);
362
363 if (sz <= 0) {
364 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
365 close();
366 return;
367 }
368
369 // Reset the read buffer
370 readWant_ = (uint32_t)sz;
371 readBufferPos_= 0;
372
373 // Move into read request state
374 appState_ = APP_READ_REQUEST;
375
376 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000377 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000378
379 return;
380
381 default:
382 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
383 assert(0);
384 }
385}
386
387void TConnection::setFlags(short eventFlags) {
388 // Catch the do nothing case
389 if (eventFlags_ == eventFlags) {
390 return;
391 }
392
393 // Delete a previously existing event
394 if (eventFlags_ != 0) {
395 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000396 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000397 return;
398 }
399 }
400
401 // Update in memory structure
402 eventFlags_ = eventFlags;
403
Mark Slee402ee282007-08-23 01:43:20 +0000404 // Do not call event_set if there are no flags
405 if (!eventFlags_) {
406 return;
407 }
408
Mark Slee2f6404d2006-10-10 01:37:40 +0000409 /**
410 * event_set:
411 *
412 * Prepares the event structure &event to be used in future calls to
413 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000414 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000415 *
416 * The events can be either EV_READ, EV_WRITE, or both, indicating
417 * that an application can read or write from the file respectively without
418 * blocking.
419 *
Mark Sleee02385b2007-06-09 01:21:16 +0000420 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000421 * the event and the type of event which will be one of: EV_TIMEOUT,
422 * EV_SIGNAL, EV_READ, EV_WRITE.
423 *
424 * The additional flag EV_PERSIST makes an event_add() persistent until
425 * event_del() has been called.
426 *
427 * Once initialized, the &event struct can be used repeatedly with
428 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000429 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000430 * when an ev structure has been added to libevent using event_add() the
431 * structure must persist until the event occurs (assuming EV_PERSIST
432 * is not set) or is removed using event_del(). You may not reuse the same
433 * ev structure for multiple monitored descriptors; each descriptor needs
434 * its own ev.
435 */
436 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
437
438 // Add the event
439 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000440 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000441 }
442}
443
444/**
445 * Closes a connection
446 */
447void TConnection::close() {
448 // Delete the registered libevent
449 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000450 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000451 }
452
453 // Close the socket
454 if (socket_ > 0) {
455 ::close(socket_);
456 }
457 socket_ = 0;
458
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000459 // close any factory produced transports
460 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000461 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000462
Mark Slee2f6404d2006-10-10 01:37:40 +0000463 // Give this object back to the server that owns it
464 server_->returnConnection(this);
465}
466
467/**
468 * Creates a new connection either by reusing an object off the stack or
469 * by allocating a new one entirely
470 */
471TConnection* TNonblockingServer::createConnection(int socket, short flags) {
472 // Check the stack
473 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000474 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000475 } else {
476 TConnection* result = connectionStack_.top();
477 connectionStack_.pop();
478 result->init(socket, flags, this);
479 return result;
480 }
481}
482
483/**
484 * Returns a connection to the stack
485 */
486void TNonblockingServer::returnConnection(TConnection* connection) {
487 connectionStack_.push(connection);
488}
489
490/**
491 * Server socket had something happen
492 */
493void TNonblockingServer::handleEvent(int fd, short which) {
494 // Make sure that libevent didn't fuck up the socket handles
495 assert(fd == serverSocket_);
496
497 // Server socket accepted a new connection
498 socklen_t addrLen;
499 struct sockaddr addr;
500 addrLen = sizeof(addr);
501
502 // Going to accept a new client socket
503 int clientSocket;
504
505 // Accept as many new clients as possible, even though libevent signaled only
506 // one, this helps us to avoid having to go back into the libevent engine so
507 // many times
508 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
509
510 // Explicitly set this socket to NONBLOCK mode
511 int flags;
512 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
513 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
boz6ded7752007-06-05 22:41:18 +0000514 GlobalOutput("thriftServerEventHandler: set O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000515 close(clientSocket);
516 return;
517 }
518
519 // Create a new TConnection for this client socket.
520 TConnection* clientConnection =
521 createConnection(clientSocket, EV_READ | EV_PERSIST);
522
523 // Fail fast if we could not create a TConnection object
524 if (clientConnection == NULL) {
525 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
526 close(clientSocket);
527 return;
528 }
529
530 // Put this client connection into the proper state
531 clientConnection->transition();
532 }
533
534 // Done looping accept, now we have to make sure the error is due to
535 // blocking. Any other error is a problem
536 if (errno != EAGAIN && errno != EWOULDBLOCK) {
boz6ded7752007-06-05 22:41:18 +0000537 GlobalOutput("thriftServerEventHandler: accept()");
Mark Slee2f6404d2006-10-10 01:37:40 +0000538 }
539}
540
541/**
542 * Main workhorse function, starts up the server listening on a port and
543 * loops over the libevent handler.
544 */
545void TNonblockingServer::serve() {
546 // Initialize libevent
547 event_init();
548
549 // Print some libevent stats
550 fprintf(stderr,
551 "libevent %s method %s\n",
552 event_get_version(),
553 event_get_method());
554
Mark Sleefb4b5142007-11-20 01:27:08 +0000555 struct addrinfo hints, *res, *res0;
556 int error;
557 char port[sizeof("65536") + 1];
558 memset(&hints, 0, sizeof(hints));
559 hints.ai_family = PF_UNSPEC;
560 hints.ai_socktype = SOCK_STREAM;
561 hints.ai_flags = AI_PASSIVE;
562 sprintf(port, "%d", port_);
563
564 // Wildcard address
565 error = getaddrinfo(NULL, port, &hints, &res0);
566 if (error) {
567 GlobalOutput("TNonblockingServer::serve() getaddrinfo");
568 return;
569 }
570
571 // Pick the ipv6 address first since ipv4 addresses can be mapped
572 // into ipv6 space.
573 for (res = res0; res; res = res->ai_next) {
574 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
575 break;
576 }
577
Mark Slee2f6404d2006-10-10 01:37:40 +0000578 // Create the server socket
Mark Sleefb4b5142007-11-20 01:27:08 +0000579 serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
Mark Slee2f6404d2006-10-10 01:37:40 +0000580 if (serverSocket_ == -1) {
boz6ded7752007-06-05 22:41:18 +0000581 GlobalOutput("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000582 return;
583 }
584
585 // Set socket to nonblocking mode
586 int flags;
587 if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
588 fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
boz6ded7752007-06-05 22:41:18 +0000589 GlobalOutput("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000590 ::close(serverSocket_);
591 return;
592 }
593
594 int one = 1;
595 struct linger ling = {0, 0};
596
597 // Set reuseaddr to avoid 2MSL delay on server restart
598 setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
599
600 // Keepalive to ensure full result flushing
601 setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
602
603 // Turn linger off to avoid hung sockets
604 setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
605
606 // Set TCP nodelay if available, MAC OS X Hack
607 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
608 #ifndef TCP_NOPUSH
609 setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
610 #endif
611
Mark Sleefb4b5142007-11-20 01:27:08 +0000612 if (bind(serverSocket_, res->ai_addr, res->ai_addrlen) == -1) {
boz6ded7752007-06-05 22:41:18 +0000613 GlobalOutput("TNonblockingServer::serve() bind");
Mark Slee2f6404d2006-10-10 01:37:40 +0000614 close(serverSocket_);
615 return;
616 }
617
618 if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
boz6ded7752007-06-05 22:41:18 +0000619 GlobalOutput("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000620 close(serverSocket_);
621 return;
622 }
623
624 // Register the server event
625 struct event serverEvent;
626 event_set(&serverEvent,
627 serverSocket_,
628 EV_READ | EV_PERSIST,
629 TNonblockingServer::eventHandler,
630 this);
631
632 // Add the event and start up the server
Mark Sleee02385b2007-06-09 01:21:16 +0000633 if (-1 == event_add(&serverEvent, 0)) {
boz6ded7752007-06-05 22:41:18 +0000634 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000635 return;
636 }
637
dweatherford58985992007-06-19 23:10:19 +0000638 // Run pre-serve callback function if we have one
639 if (preServeCallback_) {
640 preServeCallback_(preServeCallbackArg_);
641 }
642
Mark Sleee02385b2007-06-09 01:21:16 +0000643 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee2f6404d2006-10-10 01:37:40 +0000644 event_loop(0);
645}
646
647}}} // facebook::thrift::server