blob: a0810770a5ac24dff03195fddbb01adc616135a3 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000025
26#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000027#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000028#endif
29
30#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000031#include <netinet/in.h>
32#include <netinet/tcp.h>
Bryan Duxbury76c43682011-08-24 21:26:48 +000033#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000034#endif
35
36#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000037#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000038#endif
39
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <fcntl.h>
41#include <errno.h>
42#include <assert.h>
43
David Reiss9b903442009-10-21 05:51:28 +000044#ifndef AF_LOCAL
45#define AF_LOCAL AF_UNIX
46#endif
47
T Jake Lucianib5e62212009-01-31 22:36:20 +000048namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000049
T Jake Lucianib5e62212009-01-31 22:36:20 +000050using namespace apache::thrift::protocol;
51using namespace apache::thrift::transport;
52using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000053using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000054using apache::thrift::transport::TSocket;
55using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000056
57class TConnection::Task: public Runnable {
58 public:
59 Task(boost::shared_ptr<TProcessor> processor,
60 boost::shared_ptr<TProtocol> input,
61 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000062 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000063 processor_(processor),
64 input_(input),
65 output_(output),
David Reiss105961d2010-10-06 17:10:17 +000066 connection_(connection),
67 serverEventHandler_(connection_->getServerEventHandler()),
68 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +000069
70 void run() {
71 try {
David Reiss105961d2010-10-06 17:10:17 +000072 for (;;) {
73 if (serverEventHandler_ != NULL) {
74 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
75 }
76 if (!processor_->process(input_, output_, connectionContext_) ||
77 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +000078 break;
79 }
80 }
Bryan Duxbury1e987582011-08-25 17:33:03 +000081 } catch (const TTransportException& ttx) {
82 GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what());
83 } catch (const bad_alloc&) {
84 GlobalOutput("TNonblockingServer caught bad_alloc exception.");
David Reiss28e88ec2010-03-09 05:19:27 +000085 exit(-1);
Bryan Duxbury1e987582011-08-25 17:33:03 +000086 } catch (const std::exception& x) {
87 GlobalOutput.printf("TNonblockingServer process() exception: %s: %s",
88 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +000089 } catch (...) {
Bryan Duxbury1e987582011-08-25 17:33:03 +000090 GlobalOutput("TNonblockingServer uncaught exception.");
Mark Sleee02385b2007-06-09 01:21:16 +000091 }
Mark Slee79b16942007-11-26 19:05:29 +000092
David Reiss01fe1532010-03-09 05:19:25 +000093 // Signal completion back to the libevent thread via a pipe
94 if (!connection_->notifyServer()) {
95 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000096 }
David Reiss01fe1532010-03-09 05:19:25 +000097 }
98
99 TConnection* getTConnection() {
100 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000101 }
102
103 private:
104 boost::shared_ptr<TProcessor> processor_;
105 boost::shared_ptr<TProtocol> input_;
106 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000107 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000108 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
109 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000110};
Mark Slee5ea15f92007-03-05 22:55:59 +0000111
David Reiss105961d2010-10-06 17:10:17 +0000112void TConnection::init(int socket, short eventFlags, TNonblockingServer* s,
113 const sockaddr* addr, socklen_t addrLen) {
114 tSocket_->setSocketFD(socket);
115 tSocket_->setCachedAddress(addr, addrLen);
116
Mark Slee2f6404d2006-10-10 01:37:40 +0000117 server_ = s;
118 appState_ = APP_INIT;
119 eventFlags_ = 0;
120
121 readBufferPos_ = 0;
122 readWant_ = 0;
123
124 writeBuffer_ = NULL;
125 writeBufferSize_ = 0;
126 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000127 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000128
David Reiss89a12942010-10-06 17:10:52 +0000129 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 appState_ = APP_INIT;
David Reiss54bec5d2010-10-06 17:10:45 +0000131 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000132
Mark Slee2f6404d2006-10-10 01:37:40 +0000133 // Set flags, which also registers the event
134 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000135
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000136 // get input/transports
137 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
138 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000139
140 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000141 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
142 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000143
144 // Set up for any server event handler
145 serverEventHandler_ = server_->getEventHandler();
146 if (serverEventHandler_ != NULL) {
147 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
148 } else {
149 connectionContext_ = NULL;
150 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000151}
152
153void TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000154 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000155 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000156
157 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000158 case SOCKET_RECV_FRAMING:
159 union {
160 uint8_t buf[sizeof(uint32_t)];
161 int32_t size;
162 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000163
David Reiss89a12942010-10-06 17:10:52 +0000164 // if we've already received some bytes we kept them here
165 framing.size = readWant_;
166 // determine size of this frame
167 try {
168 // Read from the socket
169 fetch = tSocket_->read(&framing.buf[readBufferPos_],
170 uint32_t(sizeof(framing.size) - readBufferPos_));
171 if (fetch == 0) {
172 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000173 close();
174 return;
175 }
David Reiss89a12942010-10-06 17:10:52 +0000176 readBufferPos_ += fetch;
177 } catch (TTransportException& te) {
178 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
179 close();
180
181 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000182 }
183
David Reiss89a12942010-10-06 17:10:52 +0000184 if (readBufferPos_ < sizeof(framing.size)) {
185 // more needed before frame size is known -- save what we have so far
186 readWant_ = framing.size;
187 return;
188 }
189
190 readWant_ = ntohl(framing.size);
191 if (static_cast<int>(readWant_) <= 0) {
192 GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
193 close();
194 return;
195 }
196 // size known; now get the rest of the frame
197 transition();
198 return;
199
200 case SOCKET_RECV:
201 // It is an error to be in this state if we already have all the data
202 assert(readBufferPos_ < readWant_);
203
David Reiss105961d2010-10-06 17:10:17 +0000204 try {
205 // Read from the socket
206 fetch = readWant_ - readBufferPos_;
207 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
208 }
209 catch (TTransportException& te) {
210 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
211 close();
Mark Slee79b16942007-11-26 19:05:29 +0000212
David Reiss105961d2010-10-06 17:10:17 +0000213 return;
214 }
215
Mark Slee2f6404d2006-10-10 01:37:40 +0000216 if (got > 0) {
217 // Move along in the buffer
218 readBufferPos_ += got;
219
220 // Check that we did not overdo it
221 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000222
Mark Slee2f6404d2006-10-10 01:37:40 +0000223 // We are done reading, move onto the next state
224 if (readBufferPos_ == readWant_) {
225 transition();
226 }
227 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000228 }
229
230 // Whenever we get down here it means a remote disconnect
231 close();
Mark Slee79b16942007-11-26 19:05:29 +0000232
Mark Slee2f6404d2006-10-10 01:37:40 +0000233 return;
234
235 case SOCKET_SEND:
236 // Should never have position past size
237 assert(writeBufferPos_ <= writeBufferSize_);
238
239 // If there is no data to send, then let us move on
240 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000241 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000242 transition();
243 return;
244 }
245
David Reiss105961d2010-10-06 17:10:17 +0000246 try {
247 left = writeBufferSize_ - writeBufferPos_;
248 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
249 }
250 catch (TTransportException& te) {
251 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000252 close();
253 return;
254 }
255
256 writeBufferPos_ += sent;
257
258 // Did we overdo it?
259 assert(writeBufferPos_ <= writeBufferSize_);
260
Mark Slee79b16942007-11-26 19:05:29 +0000261 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000262 if (writeBufferPos_ == writeBufferSize_) {
263 transition();
264 }
265
266 return;
267
268 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000269 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000270 assert(0);
271 }
272}
273
274/**
275 * This is called when the application transitions from one state into
276 * another. This means that it has finished writing the data that it needed
277 * to, or finished receiving the data that it needed to.
278 */
279void TConnection::transition() {
280 // Switch upon the state that we are currently in and move to a new state
281 switch (appState_) {
282
283 case APP_READ_REQUEST:
284 // We are done reading the request, package the read buffer into transport
285 // and get back some data from the dispatch function
286 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000287 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000288 // Prepend four bytes of blank space to the buffer so we can
289 // write the frame size there later.
290 outputTransport_->getWritePtr(4);
291 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000292
David Reiss01fe1532010-03-09 05:19:25 +0000293 server_->incrementActiveProcessors();
294
Mark Sleee02385b2007-06-09 01:21:16 +0000295 if (server_->isThreadPoolProcessing()) {
296 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000297
David Reiss01fe1532010-03-09 05:19:25 +0000298 // Create task and dispatch to the thread manager
299 boost::shared_ptr<Runnable> task =
300 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
301 inputProtocol_,
302 outputProtocol_,
303 this));
304 // The application is now waiting on the task to finish
305 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000306
David Reisse11f3072008-10-07 21:39:19 +0000307 try {
308 server_->addTask(task);
309 } catch (IllegalStateException & ise) {
310 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000311 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000312 close();
313 }
Mark Slee402ee282007-08-23 01:43:20 +0000314
David Reiss01fe1532010-03-09 05:19:25 +0000315 // Set this connection idle so that libevent doesn't process more
316 // data on it while we're still waiting for the threadmanager to
317 // finish this task
318 setIdle();
319 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000320 } else {
321 try {
322 // Invoke the processor
David Reiss23248712010-10-06 17:10:08 +0000323 server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000324 } catch (const TTransportException &ttx) {
325 GlobalOutput.printf("TNonblockingServer transport error in "
326 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000327 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000328 close();
329 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000330 } catch (const std::exception &x) {
331 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
332 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000333 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000334 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000335 return;
336 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000337 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000338 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000339 close();
340 return;
341 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000342 }
343
Mark Slee402ee282007-08-23 01:43:20 +0000344 // Intentionally fall through here, the call to process has written into
345 // the writeBuffer_
346
Mark Sleee02385b2007-06-09 01:21:16 +0000347 case APP_WAIT_TASK:
348 // We have now finished processing a task and the result has been written
349 // into the outputTransport_, so we grab its contents and place them into
350 // the writeBuffer_ for actual writing by the libevent thread
351
David Reiss01fe1532010-03-09 05:19:25 +0000352 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000353 // Get the result of the operation
354 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
355
356 // If the function call generated return data, then move into the send
357 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000358 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000359 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000360
361 // Move into write state
362 writeBufferPos_ = 0;
363 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000364
David Reissaf787782008-07-03 20:29:34 +0000365 // Put the frame size into the write buffer
366 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
367 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000368
369 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000370 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000371 setWrite();
372
373 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000374 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000375
376 return;
377 }
378
David Reissc51986f2009-03-24 20:01:25 +0000379 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000380 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000381 goto LABEL_APP_INIT;
382
Mark Slee2f6404d2006-10-10 01:37:40 +0000383 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000384 // it's now safe to perform buffer size housekeeping.
385 if (writeBufferSize_ > largestWriteBufferSize_) {
386 largestWriteBufferSize_ = writeBufferSize_;
387 }
388 if (server_->getResizeBufferEveryN() > 0
389 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
390 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
391 server_->getIdleWriteBufferLimit());
392 callsForResize_ = 0;
393 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000394
395 // N.B.: We also intentionally fall through here into the INIT state!
396
Mark Slee92f00fb2006-10-25 01:28:17 +0000397 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000398 case APP_INIT:
399
400 // Clear write buffer variables
401 writeBuffer_ = NULL;
402 writeBufferPos_ = 0;
403 writeBufferSize_ = 0;
404
Mark Slee2f6404d2006-10-10 01:37:40 +0000405 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000406 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000407 appState_ = APP_READ_FRAME_SIZE;
408
David Reiss89a12942010-10-06 17:10:52 +0000409 readBufferPos_ = 0;
410
Mark Slee2f6404d2006-10-10 01:37:40 +0000411 // Register read event
412 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000413
Mark Slee2f6404d2006-10-10 01:37:40 +0000414 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000415 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000416
417 return;
418
419 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000420 // We just read the request length
421 // Double the buffer size until it is big enough
422 if (readWant_ > readBufferSize_) {
423 if (readBufferSize_ == 0) {
424 readBufferSize_ = 1;
425 }
426 uint32_t newSize = readBufferSize_;
427 while (readWant_ > newSize) {
428 newSize *= 2;
429 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000430
David Reiss89a12942010-10-06 17:10:52 +0000431 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
432 if (newBuffer == NULL) {
433 // nothing else to be done...
434 throw std::bad_alloc();
435 }
436 readBuffer_ = newBuffer;
437 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000438 }
439
Mark Slee2f6404d2006-10-10 01:37:40 +0000440 readBufferPos_= 0;
441
442 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000443 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000444 appState_ = APP_READ_REQUEST;
445
446 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000447 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000448
449 return;
450
David Reiss01fe1532010-03-09 05:19:25 +0000451 case APP_CLOSE_CONNECTION:
452 server_->decrementActiveProcessors();
453 close();
454 return;
455
Mark Slee2f6404d2006-10-10 01:37:40 +0000456 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000457 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000458 assert(0);
459 }
460}
461
462void TConnection::setFlags(short eventFlags) {
463 // Catch the do nothing case
464 if (eventFlags_ == eventFlags) {
465 return;
466 }
467
468 // Delete a previously existing event
469 if (eventFlags_ != 0) {
470 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000471 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000472 return;
473 }
474 }
475
476 // Update in memory structure
477 eventFlags_ = eventFlags;
478
Mark Slee402ee282007-08-23 01:43:20 +0000479 // Do not call event_set if there are no flags
480 if (!eventFlags_) {
481 return;
482 }
483
David Reiss01fe1532010-03-09 05:19:25 +0000484 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000485 * event_set:
486 *
487 * Prepares the event structure &event to be used in future calls to
488 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000489 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000490 *
491 * The events can be either EV_READ, EV_WRITE, or both, indicating
492 * that an application can read or write from the file respectively without
493 * blocking.
494 *
Mark Sleee02385b2007-06-09 01:21:16 +0000495 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000496 * the event and the type of event which will be one of: EV_TIMEOUT,
497 * EV_SIGNAL, EV_READ, EV_WRITE.
498 *
499 * The additional flag EV_PERSIST makes an event_add() persistent until
500 * event_del() has been called.
501 *
502 * Once initialized, the &event struct can be used repeatedly with
503 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000504 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000505 * when an ev structure has been added to libevent using event_add() the
506 * structure must persist until the event occurs (assuming EV_PERSIST
507 * is not set) or is removed using event_del(). You may not reuse the same
508 * ev structure for multiple monitored descriptors; each descriptor needs
509 * its own ev.
510 */
David Reiss105961d2010-10-06 17:10:17 +0000511 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
512 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000513 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000514
515 // Add the event
516 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000517 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000518 }
519}
520
521/**
522 * Closes a connection
523 */
524void TConnection::close() {
525 // Delete the registered libevent
526 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000527 GlobalOutput.perror("TConnection::close() event_del", errno);
528 }
529
530 if (serverEventHandler_ != NULL) {
531 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000532 }
533
534 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000535 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000536
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000537 // close any factory produced transports
538 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000539 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000540
Mark Slee2f6404d2006-10-10 01:37:40 +0000541 // Give this object back to the server that owns it
542 server_->returnConnection(this);
543}
544
David Reiss54bec5d2010-10-06 17:10:45 +0000545void TConnection::checkIdleBufferMemLimit(size_t readLimit,
546 size_t writeLimit) {
547 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000548 free(readBuffer_);
549 readBuffer_ = NULL;
550 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000551 }
David Reiss54bec5d2010-10-06 17:10:45 +0000552
553 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
554 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000555 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000556 largestWriteBufferSize_ = 0;
557 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000558}
559
David Reiss8ede8182010-09-02 15:26:28 +0000560TNonblockingServer::~TNonblockingServer() {
561 // TODO: We currently leak any active TConnection objects.
562 // Since we're shutting down and destroying the event_base, the TConnection
563 // objects will never receive any additional callbacks. (And even if they
564 // did, it would be bad, since they keep a pointer around to the server,
565 // which is being destroyed.)
566
567 // Clean up unused TConnection objects in connectionStack_
568 while (!connectionStack_.empty()) {
569 TConnection* connection = connectionStack_.top();
570 connectionStack_.pop();
571 delete connection;
572 }
573
Roger Meierc1905582011-08-02 23:37:36 +0000574 if (eventBase_ && ownEventBase_) {
David Reiss8ede8182010-09-02 15:26:28 +0000575 event_base_free(eventBase_);
576 }
577
578 if (serverSocket_ >= 0) {
579 close(serverSocket_);
580 }
581}
582
Mark Slee2f6404d2006-10-10 01:37:40 +0000583/**
584 * Creates a new connection either by reusing an object off the stack or
585 * by allocating a new one entirely
586 */
David Reiss105961d2010-10-06 17:10:17 +0000587TConnection* TNonblockingServer::createConnection(int socket, short flags,
588 const sockaddr* addr,
589 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000590 // Check the stack
591 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000592 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000593 } else {
594 TConnection* result = connectionStack_.top();
595 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000596 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000597 return result;
598 }
599}
600
601/**
602 * Returns a connection to the stack
603 */
604void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000605 if (connectionStackLimit_ &&
606 (connectionStack_.size() >= connectionStackLimit_)) {
607 delete connection;
608 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000609 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000610 connectionStack_.push(connection);
611 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000612}
613
614/**
David Reissa79e4882008-03-05 07:51:47 +0000615 * Server socket had something happen. We accept all waiting client
616 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000617 */
618void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000619 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000620 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000621 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000622
Mark Slee2f6404d2006-10-10 01:37:40 +0000623 // Server socket accepted a new connection
624 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000625 sockaddr_storage addrStorage;
626 sockaddr* addrp = (sockaddr*)&addrStorage;
627 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000628
Mark Slee2f6404d2006-10-10 01:37:40 +0000629 // Going to accept a new client socket
630 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000631
Mark Slee2f6404d2006-10-10 01:37:40 +0000632 // Accept as many new clients as possible, even though libevent signaled only
633 // one, this helps us to avoid having to go back into the libevent engine so
634 // many times
David Reiss105961d2010-10-06 17:10:17 +0000635 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000636 // If we're overloaded, take action here
637 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
638 nConnectionsDropped_++;
639 nTotalConnectionsDropped_++;
640 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
641 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000642 return;
David Reiss01fe1532010-03-09 05:19:25 +0000643 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
644 if (!drainPendingTask()) {
645 // Nothing left to discard, so we drop connection instead.
646 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000647 return;
David Reiss01fe1532010-03-09 05:19:25 +0000648 }
649 }
650 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000651 // Explicitly set this socket to NONBLOCK mode
652 int flags;
653 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
654 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000655 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000656 close(clientSocket);
657 return;
658 }
659
660 // Create a new TConnection for this client socket.
661 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000662 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000663
664 // Fail fast if we could not create a TConnection object
665 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000666 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000667 close(clientSocket);
668 return;
669 }
670
671 // Put this client connection into the proper state
672 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000673
674 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000675 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000676 }
Mark Slee79b16942007-11-26 19:05:29 +0000677
Mark Slee2f6404d2006-10-10 01:37:40 +0000678 // Done looping accept, now we have to make sure the error is due to
679 // blocking. Any other error is a problem
680 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000681 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000682 }
683}
684
685/**
Mark Slee79b16942007-11-26 19:05:29 +0000686 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000687 */
Mark Slee79b16942007-11-26 19:05:29 +0000688void TNonblockingServer::listenSocket() {
689 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000690 struct addrinfo hints, *res, *res0;
691 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000692
Mark Sleefb4b5142007-11-20 01:27:08 +0000693 char port[sizeof("65536") + 1];
694 memset(&hints, 0, sizeof(hints));
695 hints.ai_family = PF_UNSPEC;
696 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000697 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000698 sprintf(port, "%d", port_);
699
700 // Wildcard address
701 error = getaddrinfo(NULL, port, &hints, &res0);
702 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000703 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
704 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000705 return;
706 }
707
708 // Pick the ipv6 address first since ipv4 addresses can be mapped
709 // into ipv6 space.
710 for (res = res0; res; res = res->ai_next) {
711 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
712 break;
713 }
714
Mark Slee2f6404d2006-10-10 01:37:40 +0000715 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000716 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
717 if (s == -1) {
718 freeaddrinfo(res0);
719 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000720 }
721
David Reiss13aea462008-06-10 22:56:04 +0000722 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +0000723 if (res->ai_family == AF_INET6) {
724 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +0000725 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +0000726 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
727 }
David Reiss13aea462008-06-10 22:56:04 +0000728 }
729 #endif // #ifdef IPV6_V6ONLY
730
731
Mark Slee79b16942007-11-26 19:05:29 +0000732 int one = 1;
733
734 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +0000735 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +0000736
Roger Meier30aae0c2011-07-08 12:23:31 +0000737 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +0000738 close(s);
739 freeaddrinfo(res0);
740 throw TException("TNonblockingServer::serve() bind");
741 }
742
743 // Done with the addr info
744 freeaddrinfo(res0);
745
746 // Set up this file descriptor for listening
747 listenSocket(s);
748}
749
750/**
751 * Takes a socket created by listenSocket() and sets various options on it
752 * to prepare for use in the server.
753 */
754void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000755 // Set socket to nonblocking mode
756 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000757 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
758 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
759 close(s);
760 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000761 }
762
763 int one = 1;
764 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000765
766 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +0000767 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000768
769 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +0000770 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000771
772 // Set TCP nodelay if available, MAC OS X Hack
773 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
774 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +0000775 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000776 #endif
777
David Reiss1c20c872010-03-09 05:20:14 +0000778 #ifdef TCP_LOW_MIN_RTO
779 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +0000780 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +0000781 }
782 #endif
783
Mark Slee79b16942007-11-26 19:05:29 +0000784 if (listen(s, LISTEN_BACKLOG) == -1) {
785 close(s);
786 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000787 }
788
Mark Slee79b16942007-11-26 19:05:29 +0000789 // Cool, this socket is good to go, set it as the serverSocket_
790 serverSocket_ = s;
791}
792
David Reiss01fe1532010-03-09 05:19:25 +0000793void TNonblockingServer::createNotificationPipe() {
Roger Meier30aae0c2011-07-08 12:23:31 +0000794 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
795 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
796 throw TException("can't create notification pipe");
David Reiss01fe1532010-03-09 05:19:25 +0000797 }
Roger Meier30aae0c2011-07-08 12:23:31 +0000798 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
799 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
David Reiss83b8fda2010-03-09 05:19:34 +0000800 close(notificationPipeFDs_[0]);
801 close(notificationPipeFDs_[1]);
802 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
803 }
David Reiss01fe1532010-03-09 05:19:25 +0000804}
805
Mark Slee79b16942007-11-26 19:05:29 +0000806/**
807 * Register the core libevent events onto the proper base.
808 */
Roger Meierc1905582011-08-02 23:37:36 +0000809void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
Mark Slee79b16942007-11-26 19:05:29 +0000810 assert(serverSocket_ != -1);
811 assert(!eventBase_);
812 eventBase_ = base;
Roger Meierc1905582011-08-02 23:37:36 +0000813 ownEventBase_ = ownEventBase;
Mark Slee79b16942007-11-26 19:05:29 +0000814
815 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000816 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000817 event_get_version(),
Bryan Duxbury37874ca2011-08-25 17:28:23 +0000818 event_base_get_method(eventBase_));
Mark Slee2f6404d2006-10-10 01:37:40 +0000819
820 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000821 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000822 serverSocket_,
823 EV_READ | EV_PERSIST,
824 TNonblockingServer::eventHandler,
825 this);
Mark Slee79b16942007-11-26 19:05:29 +0000826 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000827
828 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000829 if (-1 == event_add(&serverEvent_, 0)) {
830 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000831 }
David Reiss01fe1532010-03-09 05:19:25 +0000832 if (threadPoolProcessing_) {
833 // Create an event to be notified when a task finishes
834 event_set(&notificationEvent_,
835 getNotificationRecvFD(),
836 EV_READ | EV_PERSIST,
837 TConnection::taskHandler,
838 this);
David Reiss1c20c872010-03-09 05:20:14 +0000839
David Reiss01fe1532010-03-09 05:19:25 +0000840 // Attach to the base
841 event_base_set(eventBase_, &notificationEvent_);
842
843 // Add the event and start up the server
844 if (-1 == event_add(&notificationEvent_, 0)) {
845 throw TException("TNonblockingServer::serve(): notification event_add fail");
846 }
847 }
848}
849
David Reiss068f4162010-03-09 05:19:45 +0000850void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
851 threadManager_ = threadManager;
852 if (threadManager != NULL) {
853 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
854 threadPoolProcessing_ = true;
855 } else {
856 threadPoolProcessing_ = false;
857 }
858}
859
David Reiss01fe1532010-03-09 05:19:25 +0000860bool TNonblockingServer::serverOverloaded() {
861 size_t activeConnections = numTConnections_ - connectionStack_.size();
862 if (numActiveProcessors_ > maxActiveProcessors_ ||
863 activeConnections > maxConnections_) {
864 if (!overloaded_) {
865 GlobalOutput.printf("thrift non-blocking server overload condition");
866 overloaded_ = true;
867 }
868 } else {
869 if (overloaded_ &&
870 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
871 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
872 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
873 nConnectionsDropped_, nTotalConnectionsDropped_);
874 nConnectionsDropped_ = 0;
875 overloaded_ = false;
876 }
877 }
878
879 return overloaded_;
880}
881
882bool TNonblockingServer::drainPendingTask() {
883 if (threadManager_) {
884 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
885 if (task) {
886 TConnection* connection =
887 static_cast<TConnection::Task*>(task.get())->getTConnection();
888 assert(connection && connection->getServer()
889 && connection->getState() == APP_WAIT_TASK);
890 connection->forceClose();
891 return true;
892 }
893 }
894 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000895}
896
David Reiss068f4162010-03-09 05:19:45 +0000897void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
898 TConnection* connection =
899 static_cast<TConnection::Task*>(task.get())->getTConnection();
900 assert(connection && connection->getServer()
901 && connection->getState() == APP_WAIT_TASK);
902 connection->forceClose();
903}
904
Mark Slee79b16942007-11-26 19:05:29 +0000905/**
906 * Main workhorse function, starts up the server listening on a port and
907 * loops over the libevent handler.
908 */
909void TNonblockingServer::serve() {
910 // Init socket
911 listenSocket();
912
David Reiss01fe1532010-03-09 05:19:25 +0000913 if (threadPoolProcessing_) {
914 // Init task completion notification pipe
915 createNotificationPipe();
916 }
917
Mark Slee79b16942007-11-26 19:05:29 +0000918 // Initialize libevent core
Bryan Duxbury37874ca2011-08-25 17:28:23 +0000919 registerEvents(static_cast<event_base*>(event_base_new()), true);
Mark Slee2f6404d2006-10-10 01:37:40 +0000920
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000921 // Run the preServe event
922 if (eventHandler_ != NULL) {
923 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000924 }
925
Bryan Duxbury76c43682011-08-24 21:26:48 +0000926 // Run libevent engine, invokes calls to eventHandler
927 // Only returns if stop() is called.
Mark Slee79b16942007-11-26 19:05:29 +0000928 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000929}
930
Bryan Duxbury76c43682011-08-24 21:26:48 +0000931void TNonblockingServer::stop() {
932 if (!eventBase_) {
933 return;
934 }
935
936 // Call event_base_loopbreak() to tell libevent to exit the loop
937 //
938 // (The libevent documentation doesn't explicitly state that this function is
939 // safe to call from another thread. However, all it does is set a variable,
940 // in the event_base, so it should be fine.)
941 event_base_loopbreak(eventBase_);
942
943 // event_base_loopbreak() only causes the loop to exit the next time it wakes
944 // up. We need to force it to wake up, in case there are no real events
945 // it needs to process.
946 //
947 // Attempt to connect to the server socket. If anything fails,
948 // we'll just have to wait until libevent wakes up on its own.
949 //
950 // First create a socket
951 int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
952 if (fd < 0) {
953 return;
954 }
955
956 // Set up the address
957 struct sockaddr_in addr;
958 addr.sin_family = AF_INET;
959 addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
960 addr.sin_port = htons(port_);
961
962 // Finally do the connect().
963 // We don't care about the return value;
964 // we're just going to close the socket either way.
965 connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
966 close(fd);
967}
968
T Jake Lucianib5e62212009-01-31 22:36:20 +0000969}}} // apache::thrift::server