blob: 89e9afd78b68068abdcb6747aa1f076baee97f04 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000025
26#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000027#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000028#endif
29
30#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000031#include <netinet/in.h>
32#include <netinet/tcp.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000033#endif
34
35#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000036#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000037#endif
38
Mark Slee2f6404d2006-10-10 01:37:40 +000039#include <fcntl.h>
40#include <errno.h>
41#include <assert.h>
42
David Reiss9b903442009-10-21 05:51:28 +000043#ifndef AF_LOCAL
44#define AF_LOCAL AF_UNIX
45#endif
46
T Jake Lucianib5e62212009-01-31 22:36:20 +000047namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000048
T Jake Lucianib5e62212009-01-31 22:36:20 +000049using namespace apache::thrift::protocol;
50using namespace apache::thrift::transport;
51using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000052using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000053using apache::thrift::transport::TSocket;
54using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000055
56class TConnection::Task: public Runnable {
57 public:
58 Task(boost::shared_ptr<TProcessor> processor,
59 boost::shared_ptr<TProtocol> input,
60 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000061 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000062 processor_(processor),
63 input_(input),
64 output_(output),
David Reiss105961d2010-10-06 17:10:17 +000065 connection_(connection),
66 serverEventHandler_(connection_->getServerEventHandler()),
67 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +000068
69 void run() {
70 try {
David Reiss105961d2010-10-06 17:10:17 +000071 for (;;) {
72 if (serverEventHandler_ != NULL) {
73 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
74 }
75 if (!processor_->process(input_, output_, connectionContext_) ||
76 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +000077 break;
78 }
79 }
80 } catch (TTransportException& ttx) {
David Reissa79e4882008-03-05 07:51:47 +000081 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000082 } catch (TException& x) {
David Reissa79e4882008-03-05 07:51:47 +000083 cerr << "TNonblockingServer exception: " << x.what() << endl;
David Reiss28e88ec2010-03-09 05:19:27 +000084 } catch (bad_alloc&) {
85 cerr << "TNonblockingServer caught bad_alloc exception.";
86 exit(-1);
Mark Sleee02385b2007-06-09 01:21:16 +000087 } catch (...) {
David Reissa79e4882008-03-05 07:51:47 +000088 cerr << "TNonblockingServer uncaught exception." << endl;
Mark Sleee02385b2007-06-09 01:21:16 +000089 }
Mark Slee79b16942007-11-26 19:05:29 +000090
David Reiss01fe1532010-03-09 05:19:25 +000091 // Signal completion back to the libevent thread via a pipe
92 if (!connection_->notifyServer()) {
93 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000094 }
David Reiss01fe1532010-03-09 05:19:25 +000095 }
96
97 TConnection* getTConnection() {
98 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +000099 }
100
101 private:
102 boost::shared_ptr<TProcessor> processor_;
103 boost::shared_ptr<TProtocol> input_;
104 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000105 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000106 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
107 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000108};
Mark Slee5ea15f92007-03-05 22:55:59 +0000109
David Reiss105961d2010-10-06 17:10:17 +0000110void TConnection::init(int socket, short eventFlags, TNonblockingServer* s,
111 const sockaddr* addr, socklen_t addrLen) {
112 tSocket_->setSocketFD(socket);
113 tSocket_->setCachedAddress(addr, addrLen);
114
Mark Slee2f6404d2006-10-10 01:37:40 +0000115 server_ = s;
116 appState_ = APP_INIT;
117 eventFlags_ = 0;
118
119 readBufferPos_ = 0;
120 readWant_ = 0;
121
122 writeBuffer_ = NULL;
123 writeBufferSize_ = 0;
124 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000125 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000126
David Reiss89a12942010-10-06 17:10:52 +0000127 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000128 appState_ = APP_INIT;
David Reiss54bec5d2010-10-06 17:10:45 +0000129 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000130
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 // Set flags, which also registers the event
132 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000133
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000134 // get input/transports
135 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
136 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000137
138 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000139 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
140 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000141
142 // Set up for any server event handler
143 serverEventHandler_ = server_->getEventHandler();
144 if (serverEventHandler_ != NULL) {
145 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
146 } else {
147 connectionContext_ = NULL;
148 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000149}
150
151void TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000152 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000153 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000154
155 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000156 case SOCKET_RECV_FRAMING:
157 union {
158 uint8_t buf[sizeof(uint32_t)];
159 int32_t size;
160 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000161
David Reiss89a12942010-10-06 17:10:52 +0000162 // if we've already received some bytes we kept them here
163 framing.size = readWant_;
164 // determine size of this frame
165 try {
166 // Read from the socket
167 fetch = tSocket_->read(&framing.buf[readBufferPos_],
168 uint32_t(sizeof(framing.size) - readBufferPos_));
169 if (fetch == 0) {
170 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000171 close();
172 return;
173 }
David Reiss89a12942010-10-06 17:10:52 +0000174 readBufferPos_ += fetch;
175 } catch (TTransportException& te) {
176 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
177 close();
178
179 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000180 }
181
David Reiss89a12942010-10-06 17:10:52 +0000182 if (readBufferPos_ < sizeof(framing.size)) {
183 // more needed before frame size is known -- save what we have so far
184 readWant_ = framing.size;
185 return;
186 }
187
188 readWant_ = ntohl(framing.size);
189 if (static_cast<int>(readWant_) <= 0) {
190 GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
191 close();
192 return;
193 }
194 // size known; now get the rest of the frame
195 transition();
196 return;
197
198 case SOCKET_RECV:
199 // It is an error to be in this state if we already have all the data
200 assert(readBufferPos_ < readWant_);
201
David Reiss105961d2010-10-06 17:10:17 +0000202 try {
203 // Read from the socket
204 fetch = readWant_ - readBufferPos_;
205 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
206 }
207 catch (TTransportException& te) {
208 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
209 close();
Mark Slee79b16942007-11-26 19:05:29 +0000210
David Reiss105961d2010-10-06 17:10:17 +0000211 return;
212 }
213
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 if (got > 0) {
215 // Move along in the buffer
216 readBufferPos_ += got;
217
218 // Check that we did not overdo it
219 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000220
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 // We are done reading, move onto the next state
222 if (readBufferPos_ == readWant_) {
223 transition();
224 }
225 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000226 }
227
228 // Whenever we get down here it means a remote disconnect
229 close();
Mark Slee79b16942007-11-26 19:05:29 +0000230
Mark Slee2f6404d2006-10-10 01:37:40 +0000231 return;
232
233 case SOCKET_SEND:
234 // Should never have position past size
235 assert(writeBufferPos_ <= writeBufferSize_);
236
237 // If there is no data to send, then let us move on
238 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000239 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000240 transition();
241 return;
242 }
243
David Reiss105961d2010-10-06 17:10:17 +0000244 try {
245 left = writeBufferSize_ - writeBufferPos_;
246 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
247 }
248 catch (TTransportException& te) {
249 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000250 close();
251 return;
252 }
253
254 writeBufferPos_ += sent;
255
256 // Did we overdo it?
257 assert(writeBufferPos_ <= writeBufferSize_);
258
Mark Slee79b16942007-11-26 19:05:29 +0000259 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000260 if (writeBufferPos_ == writeBufferSize_) {
261 transition();
262 }
263
264 return;
265
266 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000267 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000268 assert(0);
269 }
270}
271
272/**
273 * This is called when the application transitions from one state into
274 * another. This means that it has finished writing the data that it needed
275 * to, or finished receiving the data that it needed to.
276 */
277void TConnection::transition() {
278 // Switch upon the state that we are currently in and move to a new state
279 switch (appState_) {
280
281 case APP_READ_REQUEST:
282 // We are done reading the request, package the read buffer into transport
283 // and get back some data from the dispatch function
284 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000285 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000286 // Prepend four bytes of blank space to the buffer so we can
287 // write the frame size there later.
288 outputTransport_->getWritePtr(4);
289 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000290
David Reiss01fe1532010-03-09 05:19:25 +0000291 server_->incrementActiveProcessors();
292
Mark Sleee02385b2007-06-09 01:21:16 +0000293 if (server_->isThreadPoolProcessing()) {
294 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000295
David Reiss01fe1532010-03-09 05:19:25 +0000296 // Create task and dispatch to the thread manager
297 boost::shared_ptr<Runnable> task =
298 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
299 inputProtocol_,
300 outputProtocol_,
301 this));
302 // The application is now waiting on the task to finish
303 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000304
David Reisse11f3072008-10-07 21:39:19 +0000305 try {
306 server_->addTask(task);
307 } catch (IllegalStateException & ise) {
308 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000309 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000310 close();
311 }
Mark Slee402ee282007-08-23 01:43:20 +0000312
David Reiss01fe1532010-03-09 05:19:25 +0000313 // Set this connection idle so that libevent doesn't process more
314 // data on it while we're still waiting for the threadmanager to
315 // finish this task
316 setIdle();
317 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000318 } else {
319 try {
320 // Invoke the processor
David Reiss23248712010-10-06 17:10:08 +0000321 server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
Mark Sleee02385b2007-06-09 01:21:16 +0000322 } catch (TTransportException &ttx) {
David Reiss01e55c12008-07-13 22:18:51 +0000323 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000324 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000325 close();
326 return;
327 } catch (TException &x) {
David Reiss01e55c12008-07-13 22:18:51 +0000328 GlobalOutput.printf("TException: Server::process() %s", x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000329 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000330 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000331 return;
332 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000333 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000334 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000335 close();
336 return;
337 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000338 }
339
Mark Slee402ee282007-08-23 01:43:20 +0000340 // Intentionally fall through here, the call to process has written into
341 // the writeBuffer_
342
Mark Sleee02385b2007-06-09 01:21:16 +0000343 case APP_WAIT_TASK:
344 // We have now finished processing a task and the result has been written
345 // into the outputTransport_, so we grab its contents and place them into
346 // the writeBuffer_ for actual writing by the libevent thread
347
David Reiss01fe1532010-03-09 05:19:25 +0000348 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000349 // Get the result of the operation
350 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
351
352 // If the function call generated return data, then move into the send
353 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000354 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000355 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000356
357 // Move into write state
358 writeBufferPos_ = 0;
359 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000360
David Reissaf787782008-07-03 20:29:34 +0000361 // Put the frame size into the write buffer
362 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
363 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000364
365 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000366 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000367 setWrite();
368
369 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000370 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000371
372 return;
373 }
374
David Reissc51986f2009-03-24 20:01:25 +0000375 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000376 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000377 goto LABEL_APP_INIT;
378
Mark Slee2f6404d2006-10-10 01:37:40 +0000379 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000380 // it's now safe to perform buffer size housekeeping.
381 if (writeBufferSize_ > largestWriteBufferSize_) {
382 largestWriteBufferSize_ = writeBufferSize_;
383 }
384 if (server_->getResizeBufferEveryN() > 0
385 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
386 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
387 server_->getIdleWriteBufferLimit());
388 callsForResize_ = 0;
389 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000390
391 // N.B.: We also intentionally fall through here into the INIT state!
392
Mark Slee92f00fb2006-10-25 01:28:17 +0000393 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000394 case APP_INIT:
395
396 // Clear write buffer variables
397 writeBuffer_ = NULL;
398 writeBufferPos_ = 0;
399 writeBufferSize_ = 0;
400
Mark Slee2f6404d2006-10-10 01:37:40 +0000401 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000402 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000403 appState_ = APP_READ_FRAME_SIZE;
404
David Reiss89a12942010-10-06 17:10:52 +0000405 readBufferPos_ = 0;
406
Mark Slee2f6404d2006-10-10 01:37:40 +0000407 // Register read event
408 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000409
Mark Slee2f6404d2006-10-10 01:37:40 +0000410 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000411 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000412
413 return;
414
415 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000416 // We just read the request length
417 // Double the buffer size until it is big enough
418 if (readWant_ > readBufferSize_) {
419 if (readBufferSize_ == 0) {
420 readBufferSize_ = 1;
421 }
422 uint32_t newSize = readBufferSize_;
423 while (readWant_ > newSize) {
424 newSize *= 2;
425 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000426
David Reiss89a12942010-10-06 17:10:52 +0000427 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
428 if (newBuffer == NULL) {
429 // nothing else to be done...
430 throw std::bad_alloc();
431 }
432 readBuffer_ = newBuffer;
433 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000434 }
435
Mark Slee2f6404d2006-10-10 01:37:40 +0000436 readBufferPos_= 0;
437
438 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000439 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000440 appState_ = APP_READ_REQUEST;
441
442 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000443 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000444
445 return;
446
David Reiss01fe1532010-03-09 05:19:25 +0000447 case APP_CLOSE_CONNECTION:
448 server_->decrementActiveProcessors();
449 close();
450 return;
451
Mark Slee2f6404d2006-10-10 01:37:40 +0000452 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000453 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000454 assert(0);
455 }
456}
457
458void TConnection::setFlags(short eventFlags) {
459 // Catch the do nothing case
460 if (eventFlags_ == eventFlags) {
461 return;
462 }
463
464 // Delete a previously existing event
465 if (eventFlags_ != 0) {
466 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000467 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000468 return;
469 }
470 }
471
472 // Update in memory structure
473 eventFlags_ = eventFlags;
474
Mark Slee402ee282007-08-23 01:43:20 +0000475 // Do not call event_set if there are no flags
476 if (!eventFlags_) {
477 return;
478 }
479
David Reiss01fe1532010-03-09 05:19:25 +0000480 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000481 * event_set:
482 *
483 * Prepares the event structure &event to be used in future calls to
484 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000485 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000486 *
487 * The events can be either EV_READ, EV_WRITE, or both, indicating
488 * that an application can read or write from the file respectively without
489 * blocking.
490 *
Mark Sleee02385b2007-06-09 01:21:16 +0000491 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000492 * the event and the type of event which will be one of: EV_TIMEOUT,
493 * EV_SIGNAL, EV_READ, EV_WRITE.
494 *
495 * The additional flag EV_PERSIST makes an event_add() persistent until
496 * event_del() has been called.
497 *
498 * Once initialized, the &event struct can be used repeatedly with
499 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000500 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000501 * when an ev structure has been added to libevent using event_add() the
502 * structure must persist until the event occurs (assuming EV_PERSIST
503 * is not set) or is removed using event_del(). You may not reuse the same
504 * ev structure for multiple monitored descriptors; each descriptor needs
505 * its own ev.
506 */
David Reiss105961d2010-10-06 17:10:17 +0000507 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
508 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000509 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000510
511 // Add the event
512 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000513 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000514 }
515}
516
517/**
518 * Closes a connection
519 */
520void TConnection::close() {
521 // Delete the registered libevent
522 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000523 GlobalOutput.perror("TConnection::close() event_del", errno);
524 }
525
526 if (serverEventHandler_ != NULL) {
527 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000528 }
529
530 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000531 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000532
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000533 // close any factory produced transports
534 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000535 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000536
Mark Slee2f6404d2006-10-10 01:37:40 +0000537 // Give this object back to the server that owns it
538 server_->returnConnection(this);
539}
540
David Reiss54bec5d2010-10-06 17:10:45 +0000541void TConnection::checkIdleBufferMemLimit(size_t readLimit,
542 size_t writeLimit) {
543 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000544 free(readBuffer_);
545 readBuffer_ = NULL;
546 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000547 }
David Reiss54bec5d2010-10-06 17:10:45 +0000548
549 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
550 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000551 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000552 largestWriteBufferSize_ = 0;
553 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000554}
555
David Reiss8ede8182010-09-02 15:26:28 +0000556TNonblockingServer::~TNonblockingServer() {
557 // TODO: We currently leak any active TConnection objects.
558 // Since we're shutting down and destroying the event_base, the TConnection
559 // objects will never receive any additional callbacks. (And even if they
560 // did, it would be bad, since they keep a pointer around to the server,
561 // which is being destroyed.)
562
563 // Clean up unused TConnection objects in connectionStack_
564 while (!connectionStack_.empty()) {
565 TConnection* connection = connectionStack_.top();
566 connectionStack_.pop();
567 delete connection;
568 }
569
Roger Meierc1905582011-08-02 23:37:36 +0000570 if (eventBase_ && ownEventBase_) {
David Reiss8ede8182010-09-02 15:26:28 +0000571 event_base_free(eventBase_);
572 }
573
574 if (serverSocket_ >= 0) {
575 close(serverSocket_);
576 }
577}
578
Mark Slee2f6404d2006-10-10 01:37:40 +0000579/**
580 * Creates a new connection either by reusing an object off the stack or
581 * by allocating a new one entirely
582 */
David Reiss105961d2010-10-06 17:10:17 +0000583TConnection* TNonblockingServer::createConnection(int socket, short flags,
584 const sockaddr* addr,
585 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000586 // Check the stack
587 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000588 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000589 } else {
590 TConnection* result = connectionStack_.top();
591 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000592 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000593 return result;
594 }
595}
596
597/**
598 * Returns a connection to the stack
599 */
600void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000601 if (connectionStackLimit_ &&
602 (connectionStack_.size() >= connectionStackLimit_)) {
603 delete connection;
604 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000605 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000606 connectionStack_.push(connection);
607 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000608}
609
610/**
David Reissa79e4882008-03-05 07:51:47 +0000611 * Server socket had something happen. We accept all waiting client
612 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000613 */
614void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000615 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000616 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000617 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000618
Mark Slee2f6404d2006-10-10 01:37:40 +0000619 // Server socket accepted a new connection
620 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000621 sockaddr_storage addrStorage;
622 sockaddr* addrp = (sockaddr*)&addrStorage;
623 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000624
Mark Slee2f6404d2006-10-10 01:37:40 +0000625 // Going to accept a new client socket
626 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000627
Mark Slee2f6404d2006-10-10 01:37:40 +0000628 // Accept as many new clients as possible, even though libevent signaled only
629 // one, this helps us to avoid having to go back into the libevent engine so
630 // many times
David Reiss105961d2010-10-06 17:10:17 +0000631 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000632 // If we're overloaded, take action here
633 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
634 nConnectionsDropped_++;
635 nTotalConnectionsDropped_++;
636 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
637 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000638 return;
David Reiss01fe1532010-03-09 05:19:25 +0000639 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
640 if (!drainPendingTask()) {
641 // Nothing left to discard, so we drop connection instead.
642 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000643 return;
David Reiss01fe1532010-03-09 05:19:25 +0000644 }
645 }
646 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000647 // Explicitly set this socket to NONBLOCK mode
648 int flags;
649 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
650 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000651 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000652 close(clientSocket);
653 return;
654 }
655
656 // Create a new TConnection for this client socket.
657 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000658 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000659
660 // Fail fast if we could not create a TConnection object
661 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000662 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000663 close(clientSocket);
664 return;
665 }
666
667 // Put this client connection into the proper state
668 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000669
670 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000671 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000672 }
Mark Slee79b16942007-11-26 19:05:29 +0000673
Mark Slee2f6404d2006-10-10 01:37:40 +0000674 // Done looping accept, now we have to make sure the error is due to
675 // blocking. Any other error is a problem
676 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000677 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000678 }
679}
680
681/**
Mark Slee79b16942007-11-26 19:05:29 +0000682 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000683 */
Mark Slee79b16942007-11-26 19:05:29 +0000684void TNonblockingServer::listenSocket() {
685 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000686 struct addrinfo hints, *res, *res0;
687 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000688
Mark Sleefb4b5142007-11-20 01:27:08 +0000689 char port[sizeof("65536") + 1];
690 memset(&hints, 0, sizeof(hints));
691 hints.ai_family = PF_UNSPEC;
692 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000693 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000694 sprintf(port, "%d", port_);
695
696 // Wildcard address
697 error = getaddrinfo(NULL, port, &hints, &res0);
698 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000699 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
700 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000701 return;
702 }
703
704 // Pick the ipv6 address first since ipv4 addresses can be mapped
705 // into ipv6 space.
706 for (res = res0; res; res = res->ai_next) {
707 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
708 break;
709 }
710
Mark Slee2f6404d2006-10-10 01:37:40 +0000711 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000712 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
713 if (s == -1) {
714 freeaddrinfo(res0);
715 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000716 }
717
David Reiss13aea462008-06-10 22:56:04 +0000718 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +0000719 if (res->ai_family == AF_INET6) {
720 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +0000721 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +0000722 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
723 }
David Reiss13aea462008-06-10 22:56:04 +0000724 }
725 #endif // #ifdef IPV6_V6ONLY
726
727
Mark Slee79b16942007-11-26 19:05:29 +0000728 int one = 1;
729
730 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +0000731 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +0000732
Roger Meier30aae0c2011-07-08 12:23:31 +0000733 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +0000734 close(s);
735 freeaddrinfo(res0);
736 throw TException("TNonblockingServer::serve() bind");
737 }
738
739 // Done with the addr info
740 freeaddrinfo(res0);
741
742 // Set up this file descriptor for listening
743 listenSocket(s);
744}
745
746/**
747 * Takes a socket created by listenSocket() and sets various options on it
748 * to prepare for use in the server.
749 */
750void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000751 // Set socket to nonblocking mode
752 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000753 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
754 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
755 close(s);
756 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000757 }
758
759 int one = 1;
760 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000761
762 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +0000763 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000764
765 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +0000766 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000767
768 // Set TCP nodelay if available, MAC OS X Hack
769 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
770 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +0000771 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000772 #endif
773
David Reiss1c20c872010-03-09 05:20:14 +0000774 #ifdef TCP_LOW_MIN_RTO
775 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +0000776 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +0000777 }
778 #endif
779
Mark Slee79b16942007-11-26 19:05:29 +0000780 if (listen(s, LISTEN_BACKLOG) == -1) {
781 close(s);
782 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000783 }
784
Mark Slee79b16942007-11-26 19:05:29 +0000785 // Cool, this socket is good to go, set it as the serverSocket_
786 serverSocket_ = s;
787}
788
David Reiss01fe1532010-03-09 05:19:25 +0000789void TNonblockingServer::createNotificationPipe() {
Roger Meier30aae0c2011-07-08 12:23:31 +0000790 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
791 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
792 throw TException("can't create notification pipe");
David Reiss01fe1532010-03-09 05:19:25 +0000793 }
Roger Meier30aae0c2011-07-08 12:23:31 +0000794 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
795 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
David Reiss83b8fda2010-03-09 05:19:34 +0000796 close(notificationPipeFDs_[0]);
797 close(notificationPipeFDs_[1]);
798 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
799 }
David Reiss01fe1532010-03-09 05:19:25 +0000800}
801
Mark Slee79b16942007-11-26 19:05:29 +0000802/**
803 * Register the core libevent events onto the proper base.
804 */
Roger Meierc1905582011-08-02 23:37:36 +0000805void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
Mark Slee79b16942007-11-26 19:05:29 +0000806 assert(serverSocket_ != -1);
807 assert(!eventBase_);
808 eventBase_ = base;
Roger Meierc1905582011-08-02 23:37:36 +0000809 ownEventBase_ = ownEventBase;
Mark Slee79b16942007-11-26 19:05:29 +0000810
811 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000812 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000813 event_get_version(),
814 event_get_method());
Mark Slee2f6404d2006-10-10 01:37:40 +0000815
816 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000817 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000818 serverSocket_,
819 EV_READ | EV_PERSIST,
820 TNonblockingServer::eventHandler,
821 this);
Mark Slee79b16942007-11-26 19:05:29 +0000822 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000823
824 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000825 if (-1 == event_add(&serverEvent_, 0)) {
826 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000827 }
David Reiss01fe1532010-03-09 05:19:25 +0000828 if (threadPoolProcessing_) {
829 // Create an event to be notified when a task finishes
830 event_set(&notificationEvent_,
831 getNotificationRecvFD(),
832 EV_READ | EV_PERSIST,
833 TConnection::taskHandler,
834 this);
David Reiss1c20c872010-03-09 05:20:14 +0000835
David Reiss01fe1532010-03-09 05:19:25 +0000836 // Attach to the base
837 event_base_set(eventBase_, &notificationEvent_);
838
839 // Add the event and start up the server
840 if (-1 == event_add(&notificationEvent_, 0)) {
841 throw TException("TNonblockingServer::serve(): notification event_add fail");
842 }
843 }
844}
845
David Reiss068f4162010-03-09 05:19:45 +0000846void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
847 threadManager_ = threadManager;
848 if (threadManager != NULL) {
849 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
850 threadPoolProcessing_ = true;
851 } else {
852 threadPoolProcessing_ = false;
853 }
854}
855
David Reiss01fe1532010-03-09 05:19:25 +0000856bool TNonblockingServer::serverOverloaded() {
857 size_t activeConnections = numTConnections_ - connectionStack_.size();
858 if (numActiveProcessors_ > maxActiveProcessors_ ||
859 activeConnections > maxConnections_) {
860 if (!overloaded_) {
861 GlobalOutput.printf("thrift non-blocking server overload condition");
862 overloaded_ = true;
863 }
864 } else {
865 if (overloaded_ &&
866 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
867 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
868 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
869 nConnectionsDropped_, nTotalConnectionsDropped_);
870 nConnectionsDropped_ = 0;
871 overloaded_ = false;
872 }
873 }
874
875 return overloaded_;
876}
877
878bool TNonblockingServer::drainPendingTask() {
879 if (threadManager_) {
880 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
881 if (task) {
882 TConnection* connection =
883 static_cast<TConnection::Task*>(task.get())->getTConnection();
884 assert(connection && connection->getServer()
885 && connection->getState() == APP_WAIT_TASK);
886 connection->forceClose();
887 return true;
888 }
889 }
890 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000891}
892
David Reiss068f4162010-03-09 05:19:45 +0000893void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
894 TConnection* connection =
895 static_cast<TConnection::Task*>(task.get())->getTConnection();
896 assert(connection && connection->getServer()
897 && connection->getState() == APP_WAIT_TASK);
898 connection->forceClose();
899}
900
Mark Slee79b16942007-11-26 19:05:29 +0000901/**
902 * Main workhorse function, starts up the server listening on a port and
903 * loops over the libevent handler.
904 */
905void TNonblockingServer::serve() {
906 // Init socket
907 listenSocket();
908
David Reiss01fe1532010-03-09 05:19:25 +0000909 if (threadPoolProcessing_) {
910 // Init task completion notification pipe
911 createNotificationPipe();
912 }
913
Mark Slee79b16942007-11-26 19:05:29 +0000914 // Initialize libevent core
Roger Meierc1905582011-08-02 23:37:36 +0000915 registerEvents(static_cast<event_base*>(event_init()), true);
Mark Slee2f6404d2006-10-10 01:37:40 +0000916
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000917 // Run the preServe event
918 if (eventHandler_ != NULL) {
919 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000920 }
921
Mark Sleee02385b2007-06-09 01:21:16 +0000922 // Run libevent engine, never returns, invokes calls to eventHandler
Mark Slee79b16942007-11-26 19:05:29 +0000923 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000924}
925
T Jake Lucianib5e62212009-01-31 22:36:20 +0000926}}} // apache::thrift::server