blob: fcfe797a118699274adcf76328712cc130de7e33 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include "TNonblockingServer.h"
8
Mark Sleee02385b2007-06-09 01:21:16 +00009#include <iostream>
Mark Slee2f6404d2006-10-10 01:37:40 +000010#include <sys/socket.h>
11#include <netinet/in.h>
12#include <netinet/tcp.h>
13#include <fcntl.h>
14#include <errno.h>
15#include <assert.h>
16
17namespace facebook { namespace thrift { namespace server {
18
Mark Slee5ea15f92007-03-05 22:55:59 +000019using namespace facebook::thrift::protocol;
20using namespace facebook::thrift::transport;
Mark Sleee02385b2007-06-09 01:21:16 +000021using namespace std;
22
23class TConnection::Task: public Runnable {
24 public:
25 Task(boost::shared_ptr<TProcessor> processor,
26 boost::shared_ptr<TProtocol> input,
27 boost::shared_ptr<TProtocol> output,
28 int taskHandle) :
29 processor_(processor),
30 input_(input),
31 output_(output),
32 taskHandle_(taskHandle) {}
33
34 void run() {
35 try {
36 while (processor_->process(input_, output_)) {
37 if (!input_->getTransport()->peek()) {
38 break;
39 }
40 }
41 } catch (TTransportException& ttx) {
42 cerr << "TThreadedServer client died: " << ttx.what() << endl;
43 } catch (TException& x) {
44 cerr << "TThreadedServer exception: " << x.what() << endl;
45 } catch (...) {
46 cerr << "TThreadedServer uncaught exception." << endl;
47 }
48
49 // Signal completion back to the libevent thread via a socketpair
50 int8_t b = 0;
51 if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
52 GlobalOutput("TNonblockingServer::Task: send");
53 }
54 if (-1 == ::close(taskHandle_)) {
55 GlobalOutput("TNonblockingServer::Task: close, possible resource leak");
56 }
57 }
58
59 private:
60 boost::shared_ptr<TProcessor> processor_;
61 boost::shared_ptr<TProtocol> input_;
62 boost::shared_ptr<TProtocol> output_;
63 int taskHandle_;
64};
Mark Slee5ea15f92007-03-05 22:55:59 +000065
66void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
Mark Slee2f6404d2006-10-10 01:37:40 +000067 socket_ = socket;
68 server_ = s;
69 appState_ = APP_INIT;
70 eventFlags_ = 0;
71
72 readBufferPos_ = 0;
73 readWant_ = 0;
74
75 writeBuffer_ = NULL;
76 writeBufferSize_ = 0;
77 writeBufferPos_ = 0;
78
79 socketState_ = SOCKET_RECV;
80 appState_ = APP_INIT;
81
Mark Sleee02385b2007-06-09 01:21:16 +000082 taskHandle_ = -1;
83
Mark Slee2f6404d2006-10-10 01:37:40 +000084 // Set flags, which also registers the event
85 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000086
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000087 // get input/transports
88 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
89 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +000090
91 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000092 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
93 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
Mark Slee2f6404d2006-10-10 01:37:40 +000094}
95
96void TConnection::workSocket() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +000097 int flags=0, got=0, left=0, sent=0;
98 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +000099
100 switch (socketState_) {
101 case SOCKET_RECV:
102 // It is an error to be in this state if we already have all the data
103 assert(readBufferPos_ < readWant_);
104
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 // Double the buffer size until it is big enough
robert79511192006-12-20 19:25:38 +0000106 if (readWant_ > readBufferSize_) {
107 while (readWant_ > readBufferSize_) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000108 readBufferSize_ *= 2;
109 }
110 readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
111 if (readBuffer_ == NULL) {
boz6ded7752007-06-05 22:41:18 +0000112 GlobalOutput("TConnection::workSocket() realloc");
Mark Slee2f6404d2006-10-10 01:37:40 +0000113 close();
114 return;
115 }
116 }
117
118 // Read from the socket
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000119 fetch = readWant_ - readBufferPos_;
120 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
Mark Slee402ee282007-08-23 01:43:20 +0000121
Mark Slee2f6404d2006-10-10 01:37:40 +0000122 if (got > 0) {
123 // Move along in the buffer
124 readBufferPos_ += got;
125
126 // Check that we did not overdo it
127 assert(readBufferPos_ <= readWant_);
128
129 // We are done reading, move onto the next state
130 if (readBufferPos_ == readWant_) {
131 transition();
132 }
133 return;
134 } else if (got == -1) {
135 // Blocking errors are okay, just move on
136 if (errno == EAGAIN || errno == EWOULDBLOCK) {
137 return;
138 }
139
140 if (errno != ECONNRESET) {
boz6ded7752007-06-05 22:41:18 +0000141 GlobalOutput("TConnection::workSocket() recv -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000142 }
143 }
144
145 // Whenever we get down here it means a remote disconnect
146 close();
147
148 return;
149
150 case SOCKET_SEND:
151 // Should never have position past size
152 assert(writeBufferPos_ <= writeBufferSize_);
153
154 // If there is no data to send, then let us move on
155 if (writeBufferPos_ == writeBufferSize_) {
156 fprintf(stderr, "WARNING: Send state with no data to send\n");
157 transition();
158 return;
159 }
160
161 flags = 0;
162 #ifdef MSG_NOSIGNAL
163 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
164 // check for the EPIPE return condition and close the socket in that case
165 flags |= MSG_NOSIGNAL;
166 #endif // ifdef MSG_NOSIGNAL
167
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000168 left = writeBufferSize_ - writeBufferPos_;
169 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
Mark Slee2f6404d2006-10-10 01:37:40 +0000170
171 if (sent <= 0) {
172 // Blocking errors are okay, just move on
173 if (errno == EAGAIN || errno == EWOULDBLOCK) {
174 return;
175 }
176 if (errno != EPIPE) {
boz6ded7752007-06-05 22:41:18 +0000177 GlobalOutput("TConnection::workSocket() send -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000178 }
179 close();
180 return;
181 }
182
183 writeBufferPos_ += sent;
184
185 // Did we overdo it?
186 assert(writeBufferPos_ <= writeBufferSize_);
187
188 // We are done!
189 if (writeBufferPos_ == writeBufferSize_) {
190 transition();
191 }
192
193 return;
194
195 default:
196 fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
197 assert(0);
198 }
199}
200
201/**
202 * This is called when the application transitions from one state into
203 * another. This means that it has finished writing the data that it needed
204 * to, or finished receiving the data that it needed to.
205 */
206void TConnection::transition() {
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000207
208 int sz = 0;
209
Mark Slee2f6404d2006-10-10 01:37:40 +0000210 // Switch upon the state that we are currently in and move to a new state
211 switch (appState_) {
212
213 case APP_READ_REQUEST:
214 // We are done reading the request, package the read buffer into transport
215 // and get back some data from the dispatch function
216 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
217 outputTransport_->resetBuffer();
Mark Slee402ee282007-08-23 01:43:20 +0000218
Mark Sleee02385b2007-06-09 01:21:16 +0000219 if (server_->isThreadPoolProcessing()) {
220 // We are setting up a Task to do this work and we will wait on it
221 int sv[2];
222 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
223 GlobalOutput("TConnection::socketpair() failed");
224 // Now we will fall through to the APP_WAIT_TASK block with no response
225 } else {
226 // Create task and dispatch to the thread manager
227 boost::shared_ptr<Runnable> task =
228 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
229 inputProtocol_,
230 outputProtocol_,
231 sv[1]));
232 appState_ = APP_WAIT_TASK;
233 event_set(&taskEvent_,
234 taskHandle_ = sv[0],
235 EV_READ,
236 TConnection::taskHandler,
237 this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000238
Mark Sleee02385b2007-06-09 01:21:16 +0000239 // Add the event and start up the server
240 if (-1 == event_add(&taskEvent_, 0)) {
241 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
242 return;
243 }
244 server_->addTask(task);
Mark Slee402ee282007-08-23 01:43:20 +0000245
246 // Set this connection idle so that libevent doesn't process more
247 // data on it while we're still waiting for the threadmanager to
248 // finish this task
249 setIdle();
Mark Sleee02385b2007-06-09 01:21:16 +0000250 return;
251 }
252 } else {
253 try {
254 // Invoke the processor
255 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
256 } catch (TTransportException &ttx) {
257 fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
258 close();
259 return;
260 } catch (TException &x) {
261 fprintf(stderr, "TException: Server::process() %s\n", x.what());
262 close();
263 return;
264 } catch (...) {
265 fprintf(stderr, "Server::process() unknown exception\n");
266 close();
267 return;
268 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000269 }
270
Mark Slee402ee282007-08-23 01:43:20 +0000271 // Intentionally fall through here, the call to process has written into
272 // the writeBuffer_
273
Mark Sleee02385b2007-06-09 01:21:16 +0000274 case APP_WAIT_TASK:
275 // We have now finished processing a task and the result has been written
276 // into the outputTransport_, so we grab its contents and place them into
277 // the writeBuffer_ for actual writing by the libevent thread
278
Mark Slee2f6404d2006-10-10 01:37:40 +0000279 // Get the result of the operation
280 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
281
282 // If the function call generated return data, then move into the send
283 // state and get going
284 if (writeBufferSize_ > 0) {
285
286 // Move into write state
287 writeBufferPos_ = 0;
288 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000289
290 if (server_->getFrameResponses()) {
291 // Put the frame size into the write buffer
292 appState_ = APP_SEND_FRAME_SIZE;
293 frameSize_ = (int32_t)htonl(writeBufferSize_);
294 writeBuffer_ = (uint8_t*)&frameSize_;
295 writeBufferSize_ = 4;
296 } else {
297 // Go straight into sending the result, do not frame it
298 appState_ = APP_SEND_RESULT;
299 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000300
301 // Socket into write mode
302 setWrite();
303
304 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000305 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000306
307 return;
308 }
309
310 // In this case, the request was asynchronous and we should fall through
311 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000312 goto LABEL_APP_INIT;
313
314 case APP_SEND_FRAME_SIZE:
315
316 // Refetch the result of the operation since we put the frame size into
317 // writeBuffer_
318 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
319 writeBufferPos_ = 0;
320
321 // Now in send result state
322 appState_ = APP_SEND_RESULT;
323
324 // Go to work on the socket right away, probably still writeable
Mark Sleee02385b2007-06-09 01:21:16 +0000325 // workSocket();
Mark Slee92f00fb2006-10-25 01:28:17 +0000326
327 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000328
329 case APP_SEND_RESULT:
330
331 // N.B.: We also intentionally fall through here into the INIT state!
332
Mark Slee92f00fb2006-10-25 01:28:17 +0000333 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000334 case APP_INIT:
335
336 // Clear write buffer variables
337 writeBuffer_ = NULL;
338 writeBufferPos_ = 0;
339 writeBufferSize_ = 0;
340
341 // Set up read buffer for getting 4 bytes
342 readBufferPos_ = 0;
343 readWant_ = 4;
344
345 // Into read4 state we go
346 socketState_ = SOCKET_RECV;
347 appState_ = APP_READ_FRAME_SIZE;
348
349 // Register read event
350 setRead();
351
352 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000353 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000354
355 return;
356
357 case APP_READ_FRAME_SIZE:
358 // We just read the request length, deserialize it
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000359 sz = *(int32_t*)readBuffer_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000360 sz = (int32_t)ntohl(sz);
361
362 if (sz <= 0) {
363 fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
364 close();
365 return;
366 }
367
368 // Reset the read buffer
369 readWant_ = (uint32_t)sz;
370 readBufferPos_= 0;
371
372 // Move into read request state
373 appState_ = APP_READ_REQUEST;
374
375 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000376 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000377
378 return;
379
380 default:
381 fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
382 assert(0);
383 }
384}
385
386void TConnection::setFlags(short eventFlags) {
387 // Catch the do nothing case
388 if (eventFlags_ == eventFlags) {
389 return;
390 }
391
392 // Delete a previously existing event
393 if (eventFlags_ != 0) {
394 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000395 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000396 return;
397 }
398 }
399
400 // Update in memory structure
401 eventFlags_ = eventFlags;
402
Mark Slee402ee282007-08-23 01:43:20 +0000403 // Do not call event_set if there are no flags
404 if (!eventFlags_) {
405 return;
406 }
407
Mark Slee2f6404d2006-10-10 01:37:40 +0000408 /**
409 * event_set:
410 *
411 * Prepares the event structure &event to be used in future calls to
412 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000413 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000414 *
415 * The events can be either EV_READ, EV_WRITE, or both, indicating
416 * that an application can read or write from the file respectively without
417 * blocking.
418 *
Mark Sleee02385b2007-06-09 01:21:16 +0000419 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000420 * the event and the type of event which will be one of: EV_TIMEOUT,
421 * EV_SIGNAL, EV_READ, EV_WRITE.
422 *
423 * The additional flag EV_PERSIST makes an event_add() persistent until
424 * event_del() has been called.
425 *
426 * Once initialized, the &event struct can be used repeatedly with
427 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000428 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000429 * when an ev structure has been added to libevent using event_add() the
430 * structure must persist until the event occurs (assuming EV_PERSIST
431 * is not set) or is removed using event_del(). You may not reuse the same
432 * ev structure for multiple monitored descriptors; each descriptor needs
433 * its own ev.
434 */
435 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
436
437 // Add the event
438 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000439 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000440 }
441}
442
443/**
444 * Closes a connection
445 */
446void TConnection::close() {
447 // Delete the registered libevent
448 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000449 GlobalOutput("TConnection::close() event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000450 }
451
452 // Close the socket
453 if (socket_ > 0) {
454 ::close(socket_);
455 }
456 socket_ = 0;
457
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000458 // close any factory produced transports
459 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000460 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000461
Mark Slee2f6404d2006-10-10 01:37:40 +0000462 // Give this object back to the server that owns it
463 server_->returnConnection(this);
464}
465
466/**
467 * Creates a new connection either by reusing an object off the stack or
468 * by allocating a new one entirely
469 */
470TConnection* TNonblockingServer::createConnection(int socket, short flags) {
471 // Check the stack
472 if (connectionStack_.empty()) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000473 return new TConnection(socket, flags, this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000474 } else {
475 TConnection* result = connectionStack_.top();
476 connectionStack_.pop();
477 result->init(socket, flags, this);
478 return result;
479 }
480}
481
482/**
483 * Returns a connection to the stack
484 */
485void TNonblockingServer::returnConnection(TConnection* connection) {
486 connectionStack_.push(connection);
487}
488
489/**
490 * Server socket had something happen
491 */
492void TNonblockingServer::handleEvent(int fd, short which) {
493 // Make sure that libevent didn't fuck up the socket handles
494 assert(fd == serverSocket_);
495
496 // Server socket accepted a new connection
497 socklen_t addrLen;
498 struct sockaddr addr;
499 addrLen = sizeof(addr);
500
501 // Going to accept a new client socket
502 int clientSocket;
503
504 // Accept as many new clients as possible, even though libevent signaled only
505 // one, this helps us to avoid having to go back into the libevent engine so
506 // many times
507 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
508
509 // Explicitly set this socket to NONBLOCK mode
510 int flags;
511 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
512 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
boz6ded7752007-06-05 22:41:18 +0000513 GlobalOutput("thriftServerEventHandler: set O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000514 close(clientSocket);
515 return;
516 }
517
518 // Create a new TConnection for this client socket.
519 TConnection* clientConnection =
520 createConnection(clientSocket, EV_READ | EV_PERSIST);
521
522 // Fail fast if we could not create a TConnection object
523 if (clientConnection == NULL) {
524 fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
525 close(clientSocket);
526 return;
527 }
528
529 // Put this client connection into the proper state
530 clientConnection->transition();
531 }
532
533 // Done looping accept, now we have to make sure the error is due to
534 // blocking. Any other error is a problem
535 if (errno != EAGAIN && errno != EWOULDBLOCK) {
boz6ded7752007-06-05 22:41:18 +0000536 GlobalOutput("thriftServerEventHandler: accept()");
Mark Slee2f6404d2006-10-10 01:37:40 +0000537 }
538}
539
540/**
541 * Main workhorse function, starts up the server listening on a port and
542 * loops over the libevent handler.
543 */
544void TNonblockingServer::serve() {
545 // Initialize libevent
546 event_init();
547
548 // Print some libevent stats
549 fprintf(stderr,
550 "libevent %s method %s\n",
551 event_get_version(),
552 event_get_method());
553
554 // Create the server socket
555 serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
556 if (serverSocket_ == -1) {
boz6ded7752007-06-05 22:41:18 +0000557 GlobalOutput("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000558 return;
559 }
560
561 // Set socket to nonblocking mode
562 int flags;
563 if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
564 fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
boz6ded7752007-06-05 22:41:18 +0000565 GlobalOutput("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000566 ::close(serverSocket_);
567 return;
568 }
569
570 int one = 1;
571 struct linger ling = {0, 0};
572
573 // Set reuseaddr to avoid 2MSL delay on server restart
574 setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
575
576 // Keepalive to ensure full result flushing
577 setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
578
579 // Turn linger off to avoid hung sockets
580 setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
581
582 // Set TCP nodelay if available, MAC OS X Hack
583 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
584 #ifndef TCP_NOPUSH
585 setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
586 #endif
587
588 struct sockaddr_in addr;
589 addr.sin_family = AF_INET;
590 addr.sin_port = htons(port_);
591 addr.sin_addr.s_addr = INADDR_ANY;
592
593 if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
boz6ded7752007-06-05 22:41:18 +0000594 GlobalOutput("TNonblockingServer::serve() bind");
Mark Slee2f6404d2006-10-10 01:37:40 +0000595 close(serverSocket_);
596 return;
597 }
598
599 if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
boz6ded7752007-06-05 22:41:18 +0000600 GlobalOutput("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000601 close(serverSocket_);
602 return;
603 }
604
605 // Register the server event
606 struct event serverEvent;
607 event_set(&serverEvent,
608 serverSocket_,
609 EV_READ | EV_PERSIST,
610 TNonblockingServer::eventHandler,
611 this);
612
613 // Add the event and start up the server
Mark Sleee02385b2007-06-09 01:21:16 +0000614 if (-1 == event_add(&serverEvent, 0)) {
boz6ded7752007-06-05 22:41:18 +0000615 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000616 return;
617 }
618
dweatherford58985992007-06-19 23:10:19 +0000619 // Run pre-serve callback function if we have one
620 if (preServeCallback_) {
621 preServeCallback_(preServeCallbackArg_);
622 }
623
Mark Sleee02385b2007-06-09 01:21:16 +0000624 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee2f6404d2006-10-10 01:37:40 +0000625 event_loop(0);
626}
627
628}}} // facebook::thrift::server