blob: f43a1c9c30fa594a148c1085658feca32e34bb86 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000025
26#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000027#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000028#endif
29
30#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000031#include <netinet/in.h>
32#include <netinet/tcp.h>
Bryan Duxbury76c43682011-08-24 21:26:48 +000033#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000034#endif
35
36#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000037#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000038#endif
39
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <fcntl.h>
41#include <errno.h>
42#include <assert.h>
43
David Reiss9b903442009-10-21 05:51:28 +000044#ifndef AF_LOCAL
45#define AF_LOCAL AF_UNIX
46#endif
47
T Jake Lucianib5e62212009-01-31 22:36:20 +000048namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000049
T Jake Lucianib5e62212009-01-31 22:36:20 +000050using namespace apache::thrift::protocol;
51using namespace apache::thrift::transport;
52using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000053using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000054using apache::thrift::transport::TSocket;
55using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000056
57class TConnection::Task: public Runnable {
58 public:
59 Task(boost::shared_ptr<TProcessor> processor,
60 boost::shared_ptr<TProtocol> input,
61 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +000062 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +000063 processor_(processor),
64 input_(input),
65 output_(output),
David Reiss105961d2010-10-06 17:10:17 +000066 connection_(connection),
67 serverEventHandler_(connection_->getServerEventHandler()),
68 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +000069
70 void run() {
71 try {
David Reiss105961d2010-10-06 17:10:17 +000072 for (;;) {
73 if (serverEventHandler_ != NULL) {
74 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
75 }
76 if (!processor_->process(input_, output_, connectionContext_) ||
77 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +000078 break;
79 }
80 }
Bryan Duxbury1e987582011-08-25 17:33:03 +000081 } catch (const TTransportException& ttx) {
82 GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what());
83 } catch (const bad_alloc&) {
84 GlobalOutput("TNonblockingServer caught bad_alloc exception.");
David Reiss28e88ec2010-03-09 05:19:27 +000085 exit(-1);
Bryan Duxbury1e987582011-08-25 17:33:03 +000086 } catch (const std::exception& x) {
87 GlobalOutput.printf("TNonblockingServer process() exception: %s: %s",
88 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +000089 } catch (...) {
Bryan Duxbury1e987582011-08-25 17:33:03 +000090 GlobalOutput("TNonblockingServer uncaught exception.");
Mark Sleee02385b2007-06-09 01:21:16 +000091 }
Mark Slee79b16942007-11-26 19:05:29 +000092
David Reiss01fe1532010-03-09 05:19:25 +000093 // Signal completion back to the libevent thread via a pipe
94 if (!connection_->notifyServer()) {
95 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +000096 }
David Reiss01fe1532010-03-09 05:19:25 +000097 }
98
99 TConnection* getTConnection() {
100 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000101 }
102
103 private:
104 boost::shared_ptr<TProcessor> processor_;
105 boost::shared_ptr<TProtocol> input_;
106 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000107 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000108 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
109 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000110};
Mark Slee5ea15f92007-03-05 22:55:59 +0000111
David Reiss105961d2010-10-06 17:10:17 +0000112void TConnection::init(int socket, short eventFlags, TNonblockingServer* s,
113 const sockaddr* addr, socklen_t addrLen) {
114 tSocket_->setSocketFD(socket);
115 tSocket_->setCachedAddress(addr, addrLen);
116
Mark Slee2f6404d2006-10-10 01:37:40 +0000117 server_ = s;
118 appState_ = APP_INIT;
119 eventFlags_ = 0;
120
121 readBufferPos_ = 0;
122 readWant_ = 0;
123
124 writeBuffer_ = NULL;
125 writeBufferSize_ = 0;
126 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000127 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000128
David Reiss89a12942010-10-06 17:10:52 +0000129 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 appState_ = APP_INIT;
David Reiss54bec5d2010-10-06 17:10:45 +0000131 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000132
Mark Slee2f6404d2006-10-10 01:37:40 +0000133 // Set flags, which also registers the event
134 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000135
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000136 // get input/transports
137 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
138 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000139
140 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000141 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
142 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000143
144 // Set up for any server event handler
145 serverEventHandler_ = server_->getEventHandler();
146 if (serverEventHandler_ != NULL) {
147 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
148 } else {
149 connectionContext_ = NULL;
150 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000151}
152
153void TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000154 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000155 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000156
157 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000158 case SOCKET_RECV_FRAMING:
159 union {
160 uint8_t buf[sizeof(uint32_t)];
161 int32_t size;
162 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000163
David Reiss89a12942010-10-06 17:10:52 +0000164 // if we've already received some bytes we kept them here
165 framing.size = readWant_;
166 // determine size of this frame
167 try {
168 // Read from the socket
169 fetch = tSocket_->read(&framing.buf[readBufferPos_],
170 uint32_t(sizeof(framing.size) - readBufferPos_));
171 if (fetch == 0) {
172 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000173 close();
174 return;
175 }
David Reiss89a12942010-10-06 17:10:52 +0000176 readBufferPos_ += fetch;
177 } catch (TTransportException& te) {
178 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
179 close();
180
181 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000182 }
183
David Reiss89a12942010-10-06 17:10:52 +0000184 if (readBufferPos_ < sizeof(framing.size)) {
185 // more needed before frame size is known -- save what we have so far
186 readWant_ = framing.size;
187 return;
188 }
189
190 readWant_ = ntohl(framing.size);
191 if (static_cast<int>(readWant_) <= 0) {
192 GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
193 close();
194 return;
195 }
196 // size known; now get the rest of the frame
197 transition();
198 return;
199
200 case SOCKET_RECV:
201 // It is an error to be in this state if we already have all the data
202 assert(readBufferPos_ < readWant_);
203
David Reiss105961d2010-10-06 17:10:17 +0000204 try {
205 // Read from the socket
206 fetch = readWant_ - readBufferPos_;
207 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
208 }
209 catch (TTransportException& te) {
210 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
211 close();
Mark Slee79b16942007-11-26 19:05:29 +0000212
David Reiss105961d2010-10-06 17:10:17 +0000213 return;
214 }
215
Mark Slee2f6404d2006-10-10 01:37:40 +0000216 if (got > 0) {
217 // Move along in the buffer
218 readBufferPos_ += got;
219
220 // Check that we did not overdo it
221 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000222
Mark Slee2f6404d2006-10-10 01:37:40 +0000223 // We are done reading, move onto the next state
224 if (readBufferPos_ == readWant_) {
225 transition();
226 }
227 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000228 }
229
230 // Whenever we get down here it means a remote disconnect
231 close();
Mark Slee79b16942007-11-26 19:05:29 +0000232
Mark Slee2f6404d2006-10-10 01:37:40 +0000233 return;
234
235 case SOCKET_SEND:
236 // Should never have position past size
237 assert(writeBufferPos_ <= writeBufferSize_);
238
239 // If there is no data to send, then let us move on
240 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000241 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000242 transition();
243 return;
244 }
245
David Reiss105961d2010-10-06 17:10:17 +0000246 try {
247 left = writeBufferSize_ - writeBufferPos_;
248 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
249 }
250 catch (TTransportException& te) {
251 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000252 close();
253 return;
254 }
255
256 writeBufferPos_ += sent;
257
258 // Did we overdo it?
259 assert(writeBufferPos_ <= writeBufferSize_);
260
Mark Slee79b16942007-11-26 19:05:29 +0000261 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000262 if (writeBufferPos_ == writeBufferSize_) {
263 transition();
264 }
265
266 return;
267
268 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000269 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000270 assert(0);
271 }
272}
273
274/**
275 * This is called when the application transitions from one state into
276 * another. This means that it has finished writing the data that it needed
277 * to, or finished receiving the data that it needed to.
278 */
279void TConnection::transition() {
280 // Switch upon the state that we are currently in and move to a new state
281 switch (appState_) {
282
283 case APP_READ_REQUEST:
284 // We are done reading the request, package the read buffer into transport
285 // and get back some data from the dispatch function
286 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000287 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000288 // Prepend four bytes of blank space to the buffer so we can
289 // write the frame size there later.
290 outputTransport_->getWritePtr(4);
291 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000292
David Reiss01fe1532010-03-09 05:19:25 +0000293 server_->incrementActiveProcessors();
294
Mark Sleee02385b2007-06-09 01:21:16 +0000295 if (server_->isThreadPoolProcessing()) {
296 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000297
David Reiss01fe1532010-03-09 05:19:25 +0000298 // Create task and dispatch to the thread manager
299 boost::shared_ptr<Runnable> task =
300 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
301 inputProtocol_,
302 outputProtocol_,
303 this));
304 // The application is now waiting on the task to finish
305 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000306
David Reisse11f3072008-10-07 21:39:19 +0000307 try {
308 server_->addTask(task);
309 } catch (IllegalStateException & ise) {
310 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000311 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000312 close();
313 }
Mark Slee402ee282007-08-23 01:43:20 +0000314
David Reiss01fe1532010-03-09 05:19:25 +0000315 // Set this connection idle so that libevent doesn't process more
316 // data on it while we're still waiting for the threadmanager to
317 // finish this task
318 setIdle();
319 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000320 } else {
321 try {
322 // Invoke the processor
Bryan Duxbury489f8f12011-08-29 18:50:12 +0000323 server_->getProcessor()->process(inputProtocol_, outputProtocol_,
324 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000325 } catch (const TTransportException &ttx) {
326 GlobalOutput.printf("TNonblockingServer transport error in "
327 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000328 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000329 close();
330 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000331 } catch (const std::exception &x) {
332 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
333 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000334 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000335 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000336 return;
337 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000338 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000339 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000340 close();
341 return;
342 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000343 }
344
Mark Slee402ee282007-08-23 01:43:20 +0000345 // Intentionally fall through here, the call to process has written into
346 // the writeBuffer_
347
Mark Sleee02385b2007-06-09 01:21:16 +0000348 case APP_WAIT_TASK:
349 // We have now finished processing a task and the result has been written
350 // into the outputTransport_, so we grab its contents and place them into
351 // the writeBuffer_ for actual writing by the libevent thread
352
David Reiss01fe1532010-03-09 05:19:25 +0000353 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000354 // Get the result of the operation
355 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
356
357 // If the function call generated return data, then move into the send
358 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000359 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000360 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000361
362 // Move into write state
363 writeBufferPos_ = 0;
364 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000365
David Reissaf787782008-07-03 20:29:34 +0000366 // Put the frame size into the write buffer
367 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
368 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000369
370 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000371 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000372 setWrite();
373
374 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000375 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000376
377 return;
378 }
379
David Reissc51986f2009-03-24 20:01:25 +0000380 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000381 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000382 goto LABEL_APP_INIT;
383
Mark Slee2f6404d2006-10-10 01:37:40 +0000384 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000385 // it's now safe to perform buffer size housekeeping.
386 if (writeBufferSize_ > largestWriteBufferSize_) {
387 largestWriteBufferSize_ = writeBufferSize_;
388 }
389 if (server_->getResizeBufferEveryN() > 0
390 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
391 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
392 server_->getIdleWriteBufferLimit());
393 callsForResize_ = 0;
394 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000395
396 // N.B.: We also intentionally fall through here into the INIT state!
397
Mark Slee92f00fb2006-10-25 01:28:17 +0000398 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000399 case APP_INIT:
400
401 // Clear write buffer variables
402 writeBuffer_ = NULL;
403 writeBufferPos_ = 0;
404 writeBufferSize_ = 0;
405
Mark Slee2f6404d2006-10-10 01:37:40 +0000406 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000407 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000408 appState_ = APP_READ_FRAME_SIZE;
409
David Reiss89a12942010-10-06 17:10:52 +0000410 readBufferPos_ = 0;
411
Mark Slee2f6404d2006-10-10 01:37:40 +0000412 // Register read event
413 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000414
Mark Slee2f6404d2006-10-10 01:37:40 +0000415 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000416 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000417
418 return;
419
420 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000421 // We just read the request length
422 // Double the buffer size until it is big enough
423 if (readWant_ > readBufferSize_) {
424 if (readBufferSize_ == 0) {
425 readBufferSize_ = 1;
426 }
427 uint32_t newSize = readBufferSize_;
428 while (readWant_ > newSize) {
429 newSize *= 2;
430 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000431
David Reiss89a12942010-10-06 17:10:52 +0000432 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
433 if (newBuffer == NULL) {
434 // nothing else to be done...
435 throw std::bad_alloc();
436 }
437 readBuffer_ = newBuffer;
438 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000439 }
440
Mark Slee2f6404d2006-10-10 01:37:40 +0000441 readBufferPos_= 0;
442
443 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000444 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000445 appState_ = APP_READ_REQUEST;
446
447 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000448 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000449
450 return;
451
David Reiss01fe1532010-03-09 05:19:25 +0000452 case APP_CLOSE_CONNECTION:
453 server_->decrementActiveProcessors();
454 close();
455 return;
456
Mark Slee2f6404d2006-10-10 01:37:40 +0000457 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000458 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000459 assert(0);
460 }
461}
462
463void TConnection::setFlags(short eventFlags) {
464 // Catch the do nothing case
465 if (eventFlags_ == eventFlags) {
466 return;
467 }
468
469 // Delete a previously existing event
470 if (eventFlags_ != 0) {
471 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000472 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000473 return;
474 }
475 }
476
477 // Update in memory structure
478 eventFlags_ = eventFlags;
479
Mark Slee402ee282007-08-23 01:43:20 +0000480 // Do not call event_set if there are no flags
481 if (!eventFlags_) {
482 return;
483 }
484
David Reiss01fe1532010-03-09 05:19:25 +0000485 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000486 * event_set:
487 *
488 * Prepares the event structure &event to be used in future calls to
489 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000490 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000491 *
492 * The events can be either EV_READ, EV_WRITE, or both, indicating
493 * that an application can read or write from the file respectively without
494 * blocking.
495 *
Mark Sleee02385b2007-06-09 01:21:16 +0000496 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000497 * the event and the type of event which will be one of: EV_TIMEOUT,
498 * EV_SIGNAL, EV_READ, EV_WRITE.
499 *
500 * The additional flag EV_PERSIST makes an event_add() persistent until
501 * event_del() has been called.
502 *
503 * Once initialized, the &event struct can be used repeatedly with
504 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000505 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000506 * when an ev structure has been added to libevent using event_add() the
507 * structure must persist until the event occurs (assuming EV_PERSIST
508 * is not set) or is removed using event_del(). You may not reuse the same
509 * ev structure for multiple monitored descriptors; each descriptor needs
510 * its own ev.
511 */
David Reiss105961d2010-10-06 17:10:17 +0000512 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
513 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000514 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000515
516 // Add the event
517 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000518 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000519 }
520}
521
522/**
523 * Closes a connection
524 */
525void TConnection::close() {
526 // Delete the registered libevent
527 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000528 GlobalOutput.perror("TConnection::close() event_del", errno);
529 }
530
531 if (serverEventHandler_ != NULL) {
532 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000533 }
534
535 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000536 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000537
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000538 // close any factory produced transports
539 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000540 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000541
Mark Slee2f6404d2006-10-10 01:37:40 +0000542 // Give this object back to the server that owns it
543 server_->returnConnection(this);
544}
545
David Reiss54bec5d2010-10-06 17:10:45 +0000546void TConnection::checkIdleBufferMemLimit(size_t readLimit,
547 size_t writeLimit) {
548 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000549 free(readBuffer_);
550 readBuffer_ = NULL;
551 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000552 }
David Reiss54bec5d2010-10-06 17:10:45 +0000553
554 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
555 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000556 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000557 largestWriteBufferSize_ = 0;
558 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000559}
560
David Reiss8ede8182010-09-02 15:26:28 +0000561TNonblockingServer::~TNonblockingServer() {
562 // TODO: We currently leak any active TConnection objects.
563 // Since we're shutting down and destroying the event_base, the TConnection
564 // objects will never receive any additional callbacks. (And even if they
565 // did, it would be bad, since they keep a pointer around to the server,
566 // which is being destroyed.)
567
568 // Clean up unused TConnection objects in connectionStack_
569 while (!connectionStack_.empty()) {
570 TConnection* connection = connectionStack_.top();
571 connectionStack_.pop();
572 delete connection;
573 }
574
Roger Meierc1905582011-08-02 23:37:36 +0000575 if (eventBase_ && ownEventBase_) {
David Reiss8ede8182010-09-02 15:26:28 +0000576 event_base_free(eventBase_);
577 }
578
579 if (serverSocket_ >= 0) {
580 close(serverSocket_);
581 }
582}
583
Mark Slee2f6404d2006-10-10 01:37:40 +0000584/**
585 * Creates a new connection either by reusing an object off the stack or
586 * by allocating a new one entirely
587 */
David Reiss105961d2010-10-06 17:10:17 +0000588TConnection* TNonblockingServer::createConnection(int socket, short flags,
589 const sockaddr* addr,
590 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000591 // Check the stack
592 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000593 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000594 } else {
595 TConnection* result = connectionStack_.top();
596 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000597 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000598 return result;
599 }
600}
601
602/**
603 * Returns a connection to the stack
604 */
605void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000606 if (connectionStackLimit_ &&
607 (connectionStack_.size() >= connectionStackLimit_)) {
608 delete connection;
609 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000610 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000611 connectionStack_.push(connection);
612 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000613}
614
615/**
David Reissa79e4882008-03-05 07:51:47 +0000616 * Server socket had something happen. We accept all waiting client
617 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000618 */
619void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000620 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000621 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000622 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000623
Mark Slee2f6404d2006-10-10 01:37:40 +0000624 // Server socket accepted a new connection
625 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000626 sockaddr_storage addrStorage;
627 sockaddr* addrp = (sockaddr*)&addrStorage;
628 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000629
Mark Slee2f6404d2006-10-10 01:37:40 +0000630 // Going to accept a new client socket
631 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000632
Mark Slee2f6404d2006-10-10 01:37:40 +0000633 // Accept as many new clients as possible, even though libevent signaled only
634 // one, this helps us to avoid having to go back into the libevent engine so
635 // many times
David Reiss105961d2010-10-06 17:10:17 +0000636 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000637 // If we're overloaded, take action here
638 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
639 nConnectionsDropped_++;
640 nTotalConnectionsDropped_++;
641 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
642 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000643 return;
David Reiss01fe1532010-03-09 05:19:25 +0000644 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
645 if (!drainPendingTask()) {
646 // Nothing left to discard, so we drop connection instead.
647 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000648 return;
David Reiss01fe1532010-03-09 05:19:25 +0000649 }
650 }
651 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000652 // Explicitly set this socket to NONBLOCK mode
653 int flags;
654 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
655 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000656 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000657 close(clientSocket);
658 return;
659 }
660
661 // Create a new TConnection for this client socket.
662 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000663 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000664
665 // Fail fast if we could not create a TConnection object
666 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000667 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000668 close(clientSocket);
669 return;
670 }
671
672 // Put this client connection into the proper state
673 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000674
675 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000676 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000677 }
Mark Slee79b16942007-11-26 19:05:29 +0000678
Mark Slee2f6404d2006-10-10 01:37:40 +0000679 // Done looping accept, now we have to make sure the error is due to
680 // blocking. Any other error is a problem
681 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000682 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000683 }
684}
685
686/**
Mark Slee79b16942007-11-26 19:05:29 +0000687 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000688 */
Mark Slee79b16942007-11-26 19:05:29 +0000689void TNonblockingServer::listenSocket() {
690 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000691 struct addrinfo hints, *res, *res0;
692 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000693
Mark Sleefb4b5142007-11-20 01:27:08 +0000694 char port[sizeof("65536") + 1];
695 memset(&hints, 0, sizeof(hints));
696 hints.ai_family = PF_UNSPEC;
697 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000698 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000699 sprintf(port, "%d", port_);
700
701 // Wildcard address
702 error = getaddrinfo(NULL, port, &hints, &res0);
703 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000704 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
705 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000706 return;
707 }
708
709 // Pick the ipv6 address first since ipv4 addresses can be mapped
710 // into ipv6 space.
711 for (res = res0; res; res = res->ai_next) {
712 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
713 break;
714 }
715
Mark Slee2f6404d2006-10-10 01:37:40 +0000716 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +0000717 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
718 if (s == -1) {
719 freeaddrinfo(res0);
720 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +0000721 }
722
David Reiss13aea462008-06-10 22:56:04 +0000723 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +0000724 if (res->ai_family == AF_INET6) {
725 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +0000726 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +0000727 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
728 }
David Reiss13aea462008-06-10 22:56:04 +0000729 }
730 #endif // #ifdef IPV6_V6ONLY
731
732
Mark Slee79b16942007-11-26 19:05:29 +0000733 int one = 1;
734
735 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +0000736 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +0000737
Roger Meier30aae0c2011-07-08 12:23:31 +0000738 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +0000739 close(s);
740 freeaddrinfo(res0);
741 throw TException("TNonblockingServer::serve() bind");
742 }
743
744 // Done with the addr info
745 freeaddrinfo(res0);
746
747 // Set up this file descriptor for listening
748 listenSocket(s);
749}
750
751/**
752 * Takes a socket created by listenSocket() and sets various options on it
753 * to prepare for use in the server.
754 */
755void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000756 // Set socket to nonblocking mode
757 int flags;
Mark Slee79b16942007-11-26 19:05:29 +0000758 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
759 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
760 close(s);
761 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +0000762 }
763
764 int one = 1;
765 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +0000766
767 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +0000768 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000769
770 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +0000771 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +0000772
773 // Set TCP nodelay if available, MAC OS X Hack
774 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
775 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +0000776 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +0000777 #endif
778
David Reiss1c20c872010-03-09 05:20:14 +0000779 #ifdef TCP_LOW_MIN_RTO
780 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +0000781 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +0000782 }
783 #endif
784
Mark Slee79b16942007-11-26 19:05:29 +0000785 if (listen(s, LISTEN_BACKLOG) == -1) {
786 close(s);
787 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +0000788 }
789
Mark Slee79b16942007-11-26 19:05:29 +0000790 // Cool, this socket is good to go, set it as the serverSocket_
791 serverSocket_ = s;
792}
793
David Reiss01fe1532010-03-09 05:19:25 +0000794void TNonblockingServer::createNotificationPipe() {
Roger Meier30aae0c2011-07-08 12:23:31 +0000795 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
796 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
797 throw TException("can't create notification pipe");
David Reiss01fe1532010-03-09 05:19:25 +0000798 }
Roger Meier30aae0c2011-07-08 12:23:31 +0000799 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
800 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
David Reiss83b8fda2010-03-09 05:19:34 +0000801 close(notificationPipeFDs_[0]);
802 close(notificationPipeFDs_[1]);
803 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
804 }
David Reiss01fe1532010-03-09 05:19:25 +0000805}
806
Mark Slee79b16942007-11-26 19:05:29 +0000807/**
808 * Register the core libevent events onto the proper base.
809 */
Roger Meierc1905582011-08-02 23:37:36 +0000810void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
Mark Slee79b16942007-11-26 19:05:29 +0000811 assert(serverSocket_ != -1);
812 assert(!eventBase_);
813 eventBase_ = base;
Roger Meierc1905582011-08-02 23:37:36 +0000814 ownEventBase_ = ownEventBase;
Mark Slee79b16942007-11-26 19:05:29 +0000815
816 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +0000817 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +0000818 event_get_version(),
Bryan Duxbury37874ca2011-08-25 17:28:23 +0000819 event_base_get_method(eventBase_));
Mark Slee2f6404d2006-10-10 01:37:40 +0000820
821 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +0000822 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +0000823 serverSocket_,
824 EV_READ | EV_PERSIST,
825 TNonblockingServer::eventHandler,
826 this);
Mark Slee79b16942007-11-26 19:05:29 +0000827 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000828
829 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +0000830 if (-1 == event_add(&serverEvent_, 0)) {
831 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000832 }
David Reiss01fe1532010-03-09 05:19:25 +0000833 if (threadPoolProcessing_) {
834 // Create an event to be notified when a task finishes
835 event_set(&notificationEvent_,
836 getNotificationRecvFD(),
837 EV_READ | EV_PERSIST,
838 TConnection::taskHandler,
839 this);
David Reiss1c20c872010-03-09 05:20:14 +0000840
David Reiss01fe1532010-03-09 05:19:25 +0000841 // Attach to the base
842 event_base_set(eventBase_, &notificationEvent_);
843
844 // Add the event and start up the server
845 if (-1 == event_add(&notificationEvent_, 0)) {
846 throw TException("TNonblockingServer::serve(): notification event_add fail");
847 }
848 }
849}
850
David Reiss068f4162010-03-09 05:19:45 +0000851void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
852 threadManager_ = threadManager;
853 if (threadManager != NULL) {
854 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
855 threadPoolProcessing_ = true;
856 } else {
857 threadPoolProcessing_ = false;
858 }
859}
860
David Reiss01fe1532010-03-09 05:19:25 +0000861bool TNonblockingServer::serverOverloaded() {
862 size_t activeConnections = numTConnections_ - connectionStack_.size();
863 if (numActiveProcessors_ > maxActiveProcessors_ ||
864 activeConnections > maxConnections_) {
865 if (!overloaded_) {
866 GlobalOutput.printf("thrift non-blocking server overload condition");
867 overloaded_ = true;
868 }
869 } else {
870 if (overloaded_ &&
871 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
872 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
873 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
874 nConnectionsDropped_, nTotalConnectionsDropped_);
875 nConnectionsDropped_ = 0;
876 overloaded_ = false;
877 }
878 }
879
880 return overloaded_;
881}
882
883bool TNonblockingServer::drainPendingTask() {
884 if (threadManager_) {
885 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
886 if (task) {
887 TConnection* connection =
888 static_cast<TConnection::Task*>(task.get())->getTConnection();
889 assert(connection && connection->getServer()
890 && connection->getState() == APP_WAIT_TASK);
891 connection->forceClose();
892 return true;
893 }
894 }
895 return false;
Mark Slee79b16942007-11-26 19:05:29 +0000896}
897
David Reiss068f4162010-03-09 05:19:45 +0000898void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
899 TConnection* connection =
900 static_cast<TConnection::Task*>(task.get())->getTConnection();
901 assert(connection && connection->getServer()
902 && connection->getState() == APP_WAIT_TASK);
903 connection->forceClose();
904}
905
Mark Slee79b16942007-11-26 19:05:29 +0000906/**
907 * Main workhorse function, starts up the server listening on a port and
908 * loops over the libevent handler.
909 */
910void TNonblockingServer::serve() {
911 // Init socket
912 listenSocket();
913
David Reiss01fe1532010-03-09 05:19:25 +0000914 if (threadPoolProcessing_) {
915 // Init task completion notification pipe
916 createNotificationPipe();
917 }
918
Mark Slee79b16942007-11-26 19:05:29 +0000919 // Initialize libevent core
Bryan Duxbury37874ca2011-08-25 17:28:23 +0000920 registerEvents(static_cast<event_base*>(event_base_new()), true);
Mark Slee2f6404d2006-10-10 01:37:40 +0000921
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000922 // Run the preServe event
923 if (eventHandler_ != NULL) {
924 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +0000925 }
926
Bryan Duxbury76c43682011-08-24 21:26:48 +0000927 // Run libevent engine, invokes calls to eventHandler
928 // Only returns if stop() is called.
Mark Slee79b16942007-11-26 19:05:29 +0000929 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +0000930}
931
Bryan Duxbury76c43682011-08-24 21:26:48 +0000932void TNonblockingServer::stop() {
933 if (!eventBase_) {
934 return;
935 }
936
937 // Call event_base_loopbreak() to tell libevent to exit the loop
938 //
939 // (The libevent documentation doesn't explicitly state that this function is
940 // safe to call from another thread. However, all it does is set a variable,
941 // in the event_base, so it should be fine.)
942 event_base_loopbreak(eventBase_);
943
944 // event_base_loopbreak() only causes the loop to exit the next time it wakes
945 // up. We need to force it to wake up, in case there are no real events
946 // it needs to process.
947 //
948 // Attempt to connect to the server socket. If anything fails,
949 // we'll just have to wait until libevent wakes up on its own.
950 //
951 // First create a socket
952 int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
953 if (fd < 0) {
954 return;
955 }
956
957 // Set up the address
958 struct sockaddr_in addr;
959 addr.sin_family = AF_INET;
960 addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
961 addr.sin_port = htons(port_);
962
963 // Finally do the connect().
964 // We don't care about the return value;
965 // we're just going to close the socket either way.
966 connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
967 close(fd);
968}
969
T Jake Lucianib5e62212009-01-31 22:36:20 +0000970}}} // apache::thrift::server