blob: e8d0e713312c40faa557b18a9ebfc194f43ff9ff [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier2fa9c312011-09-05 19:15:53 +000020#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
Mark Slee2f6404d2006-10-10 01:37:40 +000024#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000025#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000026#include <transport/TSocket.h>
Jake Farrellb0d95602011-12-06 01:17:26 +000027#include <concurrency/PosixThreadFactory.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000028
Mark Sleee02385b2007-06-09 01:21:16 +000029#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000030
31#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000032#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000033#endif
34
35#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000036#include <netinet/in.h>
37#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000038#endif
39
40#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000041#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000042#endif
43
44#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000045#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000046#endif
47
Roger Meier2fa9c312011-09-05 19:15:53 +000048#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000049#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000050#endif
51
Mark Slee2f6404d2006-10-10 01:37:40 +000052#include <errno.h>
53#include <assert.h>
Jake Farrellb0d95602011-12-06 01:17:26 +000054#include <sched.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000055
David Reiss9b903442009-10-21 05:51:28 +000056#ifndef AF_LOCAL
57#define AF_LOCAL AF_UNIX
58#endif
59
T Jake Lucianib5e62212009-01-31 22:36:20 +000060namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000061
T Jake Lucianib5e62212009-01-31 22:36:20 +000062using namespace apache::thrift::protocol;
63using namespace apache::thrift::transport;
64using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000065using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000066using apache::thrift::transport::TSocket;
67using apache::thrift::transport::TTransportException;
Jake Farrellb0d95602011-12-06 01:17:26 +000068using boost::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000069
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000070/// Three states for sockets: recv frame size, recv data, and send mode
71enum TSocketState {
72 SOCKET_RECV_FRAMING,
73 SOCKET_RECV,
74 SOCKET_SEND
75};
76
77/**
78 * Five states for the nonblocking server:
79 * 1) initialize
80 * 2) read 4 byte frame size
81 * 3) read frame of data
82 * 4) send back data (if any)
83 * 5) force immediate connection close
84 */
85enum TAppState {
86 APP_INIT,
87 APP_READ_FRAME_SIZE,
88 APP_READ_REQUEST,
89 APP_WAIT_TASK,
90 APP_SEND_RESULT,
91 APP_CLOSE_CONNECTION
92};
93
94/**
95 * Represents a connection that is handled via libevent. This connection
96 * essentially encapsulates a socket that has some associated libevent state.
97 */
98class TNonblockingServer::TConnection {
99 private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000100 /// Server IO Thread handling this connection
101 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000102
103 /// Server handle
104 TNonblockingServer* server_;
105
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000106 /// TProcessor
107 boost::shared_ptr<TProcessor> processor_;
108
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000109 /// Object wrapping network socket
110 boost::shared_ptr<TSocket> tSocket_;
111
112 /// Libevent object
113 struct event event_;
114
115 /// Libevent flags
116 short eventFlags_;
117
118 /// Socket mode
119 TSocketState socketState_;
120
121 /// Application state
122 TAppState appState_;
123
124 /// How much data needed to read
125 uint32_t readWant_;
126
127 /// Where in the read buffer are we
128 uint32_t readBufferPos_;
129
130 /// Read buffer
131 uint8_t* readBuffer_;
132
133 /// Read buffer size
134 uint32_t readBufferSize_;
135
136 /// Write buffer
137 uint8_t* writeBuffer_;
138
139 /// Write buffer size
140 uint32_t writeBufferSize_;
141
142 /// How far through writing are we?
143 uint32_t writeBufferPos_;
144
145 /// Largest size of write buffer seen since buffer was constructed
146 size_t largestWriteBufferSize_;
147
148 /// Count of the number of calls for use with getResizeBufferEveryN().
149 int32_t callsForResize_;
150
151 /// Task handle
152 int taskHandle_;
153
154 /// Task event
155 struct event taskEvent_;
156
157 /// Transport to read from
158 boost::shared_ptr<TMemoryBuffer> inputTransport_;
159
160 /// Transport that processor writes to
161 boost::shared_ptr<TMemoryBuffer> outputTransport_;
162
163 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
164 boost::shared_ptr<TTransport> factoryInputTransport_;
165 boost::shared_ptr<TTransport> factoryOutputTransport_;
166
167 /// Protocol decoder
168 boost::shared_ptr<TProtocol> inputProtocol_;
169
170 /// Protocol encoder
171 boost::shared_ptr<TProtocol> outputProtocol_;
172
173 /// Server event handler, if any
174 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
175
176 /// Thrift call context, if any
177 void *connectionContext_;
178
179 /// Go into read mode
180 void setRead() {
181 setFlags(EV_READ | EV_PERSIST);
182 }
183
184 /// Go into write mode
185 void setWrite() {
186 setFlags(EV_WRITE | EV_PERSIST);
187 }
188
189 /// Set socket idle
190 void setIdle() {
191 setFlags(0);
192 }
193
194 /**
195 * Set event flags for this connection.
196 *
197 * @param eventFlags flags we pass to libevent for the connection.
198 */
199 void setFlags(short eventFlags);
200
201 /**
202 * Libevent handler called (via our static wrapper) when the connection
203 * socket had something happen. Rather than use the flags libevent passed,
204 * we use the connection state to determine whether we need to read or
205 * write the socket.
206 */
207 void workSocket();
208
209 /// Close this connection and free or reset its resources.
210 void close();
211
212 public:
213
214 class Task;
215
216 /// Constructor
Jake Farrellb0d95602011-12-06 01:17:26 +0000217 TConnection(int socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000218 const sockaddr* addr, socklen_t addrLen) {
219 readBuffer_ = NULL;
220 readBufferSize_ = 0;
221
Jake Farrellb0d95602011-12-06 01:17:26 +0000222 ioThread_ = ioThread;
223 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000224
Jake Farrellb0d95602011-12-06 01:17:26 +0000225 // Allocate input and output transports these only need to be allocated
226 // once per TConnection (they don't need to be reallocated on init() call)
227 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
228 outputTransport_.reset(new TMemoryBuffer(
229 server_->getWriteBufferDefaultSize()));
230 tSocket_.reset(new TSocket());
231 init(socket, ioThread, addr, addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000232 }
233
234 ~TConnection() {
235 std::free(readBuffer_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000236 }
237
238 /**
239 * Check buffers against any size limits and shrink it if exceeded.
240 *
241 * @param readLimit we reduce read buffer size to this (if nonzero).
242 * @param writeLimit if nonzero and write buffer is larger, replace it.
243 */
244 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
245
246 /// Initialize
Jake Farrellb0d95602011-12-06 01:17:26 +0000247 void init(int socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000248 const sockaddr* addr, socklen_t addrLen);
249
250 /**
251 * This is called when the application transitions from one state into
252 * another. This means that it has finished writing the data that it needed
253 * to, or finished receiving the data that it needed to.
254 */
255 void transition();
256
257 /**
258 * C-callable event handler for connection events. Provides a callback
259 * that libevent can understand which invokes connection_->workSocket().
260 *
261 * @param fd the descriptor the event occurred on.
262 * @param which the flags associated with the event.
263 * @param v void* callback arg where we placed TConnection's "this".
264 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000265 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000266 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
267 ((TConnection*)v)->workSocket();
268 }
269
270 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000271 * Notification to server that processing has ended on this request.
272 * Can be called either when processing is completed or when a waiting
273 * task has been preemptively terminated (on overload).
274 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000275 * Don't call this from the IO thread itself.
276 *
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000277 * @return true if successful, false if unable to notify (check errno).
278 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000279 bool notifyIOThread() {
280 return ioThread_->notify(this);
281 }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000282
Jake Farrellb0d95602011-12-06 01:17:26 +0000283 /*
284 * Returns the number of this connection's currently assigned IO
285 * thread.
286 */
287 int getIOThreadNumber() const {
288 return ioThread_->getThreadNumber();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000289 }
290
291 /// Force connection shutdown for this connection.
292 void forceClose() {
293 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000294 if (!notifyIOThread()) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000295 throw TException("TConnection::forceClose: failed write on notify pipe");
296 }
297 }
298
299 /// return the server this connection was initialized for.
Jake Farrellb0d95602011-12-06 01:17:26 +0000300 TNonblockingServer* getServer() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000301 return server_;
302 }
303
304 /// get state of connection.
Jake Farrellb0d95602011-12-06 01:17:26 +0000305 TAppState getState() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000306 return appState_;
307 }
308
309 /// return the TSocket transport wrapping this network connection
310 boost::shared_ptr<TSocket> getTSocket() const {
311 return tSocket_;
312 }
313
314 /// return the server event handler if any
315 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
316 return serverEventHandler_;
317 }
318
319 /// return the Thrift connection context if any
320 void* getConnectionContext() {
321 return connectionContext_;
322 }
323
324};
325
326class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000327 public:
328 Task(boost::shared_ptr<TProcessor> processor,
329 boost::shared_ptr<TProtocol> input,
330 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000331 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000332 processor_(processor),
333 input_(input),
334 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000335 connection_(connection),
336 serverEventHandler_(connection_->getServerEventHandler()),
337 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000338
339 void run() {
340 try {
David Reiss105961d2010-10-06 17:10:17 +0000341 for (;;) {
342 if (serverEventHandler_ != NULL) {
343 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
344 }
345 if (!processor_->process(input_, output_, connectionContext_) ||
346 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000347 break;
348 }
349 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000350 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000351 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
Bryan Duxbury1e987582011-08-25 17:33:03 +0000352 } catch (const bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000353 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
David Reiss28e88ec2010-03-09 05:19:27 +0000354 exit(-1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000355 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000356 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Bryan Duxbury1e987582011-08-25 17:33:03 +0000357 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000358 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000359 GlobalOutput.printf(
360 "TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000361 }
Mark Slee79b16942007-11-26 19:05:29 +0000362
David Reiss01fe1532010-03-09 05:19:25 +0000363 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000364 if (!connection_->notifyIOThread()) {
David Reiss01fe1532010-03-09 05:19:25 +0000365 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000366 }
David Reiss01fe1532010-03-09 05:19:25 +0000367 }
368
369 TConnection* getTConnection() {
370 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000371 }
372
373 private:
374 boost::shared_ptr<TProcessor> processor_;
375 boost::shared_ptr<TProtocol> input_;
376 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000377 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000378 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
379 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000380};
Mark Slee5ea15f92007-03-05 22:55:59 +0000381
Jake Farrellb0d95602011-12-06 01:17:26 +0000382void TNonblockingServer::TConnection::init(int socket,
383 TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000384 const sockaddr* addr,
385 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000386 tSocket_->setSocketFD(socket);
387 tSocket_->setCachedAddress(addr, addrLen);
388
Jake Farrellb0d95602011-12-06 01:17:26 +0000389 ioThread_ = ioThread;
390 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000391 appState_ = APP_INIT;
392 eventFlags_ = 0;
393
394 readBufferPos_ = 0;
395 readWant_ = 0;
396
397 writeBuffer_ = NULL;
398 writeBufferSize_ = 0;
399 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000400 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000401
David Reiss89a12942010-10-06 17:10:52 +0000402 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000403 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000404
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000405 // get input/transports
Jake Farrellb0d95602011-12-06 01:17:26 +0000406 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
407 inputTransport_);
408 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
409 outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000410
411 // Create protocol
Jake Farrellb0d95602011-12-06 01:17:26 +0000412 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
413 factoryInputTransport_);
414 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
415 factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000416
417 // Set up for any server event handler
418 serverEventHandler_ = server_->getEventHandler();
419 if (serverEventHandler_ != NULL) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000420 connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
421 outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000422 } else {
423 connectionContext_ = NULL;
424 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000425
426 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000427 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000428}
429
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000430void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000431 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000432 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000433
434 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000435 case SOCKET_RECV_FRAMING:
436 union {
437 uint8_t buf[sizeof(uint32_t)];
438 int32_t size;
439 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000440
David Reiss89a12942010-10-06 17:10:52 +0000441 // if we've already received some bytes we kept them here
442 framing.size = readWant_;
443 // determine size of this frame
444 try {
445 // Read from the socket
446 fetch = tSocket_->read(&framing.buf[readBufferPos_],
447 uint32_t(sizeof(framing.size) - readBufferPos_));
448 if (fetch == 0) {
449 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000450 close();
451 return;
452 }
David Reiss89a12942010-10-06 17:10:52 +0000453 readBufferPos_ += fetch;
454 } catch (TTransportException& te) {
455 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
456 close();
457
458 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000459 }
460
David Reiss89a12942010-10-06 17:10:52 +0000461 if (readBufferPos_ < sizeof(framing.size)) {
462 // more needed before frame size is known -- save what we have so far
463 readWant_ = framing.size;
464 return;
465 }
466
467 readWant_ = ntohl(framing.size);
468 if (static_cast<int>(readWant_) <= 0) {
469 GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
470 close();
471 return;
472 }
473 // size known; now get the rest of the frame
474 transition();
475 return;
476
477 case SOCKET_RECV:
478 // It is an error to be in this state if we already have all the data
479 assert(readBufferPos_ < readWant_);
480
David Reiss105961d2010-10-06 17:10:17 +0000481 try {
482 // Read from the socket
483 fetch = readWant_ - readBufferPos_;
484 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
485 }
486 catch (TTransportException& te) {
487 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
488 close();
Mark Slee79b16942007-11-26 19:05:29 +0000489
David Reiss105961d2010-10-06 17:10:17 +0000490 return;
491 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000492
Mark Slee2f6404d2006-10-10 01:37:40 +0000493 if (got > 0) {
494 // Move along in the buffer
495 readBufferPos_ += got;
496
497 // Check that we did not overdo it
498 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000499
Mark Slee2f6404d2006-10-10 01:37:40 +0000500 // We are done reading, move onto the next state
501 if (readBufferPos_ == readWant_) {
502 transition();
503 }
504 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000505 }
506
507 // Whenever we get down here it means a remote disconnect
508 close();
Mark Slee79b16942007-11-26 19:05:29 +0000509
Mark Slee2f6404d2006-10-10 01:37:40 +0000510 return;
511
512 case SOCKET_SEND:
513 // Should never have position past size
514 assert(writeBufferPos_ <= writeBufferSize_);
515
516 // If there is no data to send, then let us move on
517 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000518 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000519 transition();
520 return;
521 }
522
David Reiss105961d2010-10-06 17:10:17 +0000523 try {
524 left = writeBufferSize_ - writeBufferPos_;
525 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
526 }
527 catch (TTransportException& te) {
528 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000529 close();
530 return;
531 }
532
533 writeBufferPos_ += sent;
534
535 // Did we overdo it?
536 assert(writeBufferPos_ <= writeBufferSize_);
537
Mark Slee79b16942007-11-26 19:05:29 +0000538 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000539 if (writeBufferPos_ == writeBufferSize_) {
540 transition();
541 }
542
543 return;
544
545 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000546 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000547 assert(0);
548 }
549}
550
551/**
552 * This is called when the application transitions from one state into
553 * another. This means that it has finished writing the data that it needed
554 * to, or finished receiving the data that it needed to.
555 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000556void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000557 // ensure this connection is active right now
558 assert(ioThread_);
559 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000560
Mark Slee2f6404d2006-10-10 01:37:40 +0000561 // Switch upon the state that we are currently in and move to a new state
562 switch (appState_) {
563
564 case APP_READ_REQUEST:
565 // We are done reading the request, package the read buffer into transport
566 // and get back some data from the dispatch function
567 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000568 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000569 // Prepend four bytes of blank space to the buffer so we can
570 // write the frame size there later.
571 outputTransport_->getWritePtr(4);
572 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000573
David Reiss01fe1532010-03-09 05:19:25 +0000574 server_->incrementActiveProcessors();
575
Mark Sleee02385b2007-06-09 01:21:16 +0000576 if (server_->isThreadPoolProcessing()) {
577 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000578
David Reiss01fe1532010-03-09 05:19:25 +0000579 // Create task and dispatch to the thread manager
580 boost::shared_ptr<Runnable> task =
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000581 boost::shared_ptr<Runnable>(new Task(processor_,
David Reiss01fe1532010-03-09 05:19:25 +0000582 inputProtocol_,
583 outputProtocol_,
584 this));
585 // The application is now waiting on the task to finish
586 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000587
David Reisse11f3072008-10-07 21:39:19 +0000588 try {
589 server_->addTask(task);
590 } catch (IllegalStateException & ise) {
591 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000592 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000593 close();
594 }
Mark Slee402ee282007-08-23 01:43:20 +0000595
David Reiss01fe1532010-03-09 05:19:25 +0000596 // Set this connection idle so that libevent doesn't process more
597 // data on it while we're still waiting for the threadmanager to
598 // finish this task
599 setIdle();
600 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000601 } else {
602 try {
603 // Invoke the processor
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000604 processor_->process(inputProtocol_, outputProtocol_,
605 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000606 } catch (const TTransportException &ttx) {
607 GlobalOutput.printf("TNonblockingServer transport error in "
608 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000609 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000610 close();
611 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000612 } catch (const std::exception &x) {
613 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
614 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000615 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000616 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000617 return;
618 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000619 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000620 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000621 close();
622 return;
623 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000624 }
625
Mark Slee402ee282007-08-23 01:43:20 +0000626 // Intentionally fall through here, the call to process has written into
627 // the writeBuffer_
628
Mark Sleee02385b2007-06-09 01:21:16 +0000629 case APP_WAIT_TASK:
630 // We have now finished processing a task and the result has been written
631 // into the outputTransport_, so we grab its contents and place them into
632 // the writeBuffer_ for actual writing by the libevent thread
633
David Reiss01fe1532010-03-09 05:19:25 +0000634 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000635 // Get the result of the operation
636 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
637
638 // If the function call generated return data, then move into the send
639 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000640 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000641 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000642
643 // Move into write state
644 writeBufferPos_ = 0;
645 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000646
David Reissaf787782008-07-03 20:29:34 +0000647 // Put the frame size into the write buffer
648 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
649 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000650
651 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000652 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000653 setWrite();
654
655 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000656 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000657
658 return;
659 }
660
David Reissc51986f2009-03-24 20:01:25 +0000661 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000662 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000663 goto LABEL_APP_INIT;
664
Mark Slee2f6404d2006-10-10 01:37:40 +0000665 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000666 // it's now safe to perform buffer size housekeeping.
667 if (writeBufferSize_ > largestWriteBufferSize_) {
668 largestWriteBufferSize_ = writeBufferSize_;
669 }
670 if (server_->getResizeBufferEveryN() > 0
671 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
672 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
673 server_->getIdleWriteBufferLimit());
674 callsForResize_ = 0;
675 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000676
677 // N.B.: We also intentionally fall through here into the INIT state!
678
Mark Slee92f00fb2006-10-25 01:28:17 +0000679 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000680 case APP_INIT:
681
682 // Clear write buffer variables
683 writeBuffer_ = NULL;
684 writeBufferPos_ = 0;
685 writeBufferSize_ = 0;
686
Mark Slee2f6404d2006-10-10 01:37:40 +0000687 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000688 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000689 appState_ = APP_READ_FRAME_SIZE;
690
David Reiss89a12942010-10-06 17:10:52 +0000691 readBufferPos_ = 0;
692
Mark Slee2f6404d2006-10-10 01:37:40 +0000693 // Register read event
694 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000695
Mark Slee2f6404d2006-10-10 01:37:40 +0000696 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000697 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000698
699 return;
700
701 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000702 // We just read the request length
703 // Double the buffer size until it is big enough
704 if (readWant_ > readBufferSize_) {
705 if (readBufferSize_ == 0) {
706 readBufferSize_ = 1;
707 }
708 uint32_t newSize = readBufferSize_;
709 while (readWant_ > newSize) {
710 newSize *= 2;
711 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000712
David Reiss89a12942010-10-06 17:10:52 +0000713 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
714 if (newBuffer == NULL) {
715 // nothing else to be done...
716 throw std::bad_alloc();
717 }
718 readBuffer_ = newBuffer;
719 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000720 }
721
Mark Slee2f6404d2006-10-10 01:37:40 +0000722 readBufferPos_= 0;
723
724 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000725 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000726 appState_ = APP_READ_REQUEST;
727
728 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000729 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000730
731 return;
732
David Reiss01fe1532010-03-09 05:19:25 +0000733 case APP_CLOSE_CONNECTION:
734 server_->decrementActiveProcessors();
735 close();
736 return;
737
Mark Slee2f6404d2006-10-10 01:37:40 +0000738 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000739 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000740 assert(0);
741 }
742}
743
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000744void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000745 // Catch the do nothing case
746 if (eventFlags_ == eventFlags) {
747 return;
748 }
749
750 // Delete a previously existing event
751 if (eventFlags_ != 0) {
752 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000753 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000754 return;
755 }
756 }
757
758 // Update in memory structure
759 eventFlags_ = eventFlags;
760
Mark Slee402ee282007-08-23 01:43:20 +0000761 // Do not call event_set if there are no flags
762 if (!eventFlags_) {
763 return;
764 }
765
David Reiss01fe1532010-03-09 05:19:25 +0000766 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000767 * event_set:
768 *
769 * Prepares the event structure &event to be used in future calls to
770 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000771 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000772 *
773 * The events can be either EV_READ, EV_WRITE, or both, indicating
774 * that an application can read or write from the file respectively without
775 * blocking.
776 *
Mark Sleee02385b2007-06-09 01:21:16 +0000777 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000778 * the event and the type of event which will be one of: EV_TIMEOUT,
779 * EV_SIGNAL, EV_READ, EV_WRITE.
780 *
781 * The additional flag EV_PERSIST makes an event_add() persistent until
782 * event_del() has been called.
783 *
784 * Once initialized, the &event struct can be used repeatedly with
785 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000786 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000787 * when an ev structure has been added to libevent using event_add() the
788 * structure must persist until the event occurs (assuming EV_PERSIST
789 * is not set) or is removed using event_del(). You may not reuse the same
790 * ev structure for multiple monitored descriptors; each descriptor needs
791 * its own ev.
792 */
David Reiss105961d2010-10-06 17:10:17 +0000793 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
794 TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000795 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000796
797 // Add the event
798 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000799 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000800 }
801}
802
803/**
804 * Closes a connection
805 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000806void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000807 // Delete the registered libevent
808 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000809 GlobalOutput.perror("TConnection::close() event_del", errno);
810 }
811
812 if (serverEventHandler_ != NULL) {
813 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000814 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000815 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000816
817 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000818 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000819
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000820 // close any factory produced transports
821 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000822 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000823
Mark Slee2f6404d2006-10-10 01:37:40 +0000824 // Give this object back to the server that owns it
825 server_->returnConnection(this);
826}
827
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000828void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
829 size_t readLimit,
830 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000831 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000832 free(readBuffer_);
833 readBuffer_ = NULL;
834 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000835 }
David Reiss54bec5d2010-10-06 17:10:45 +0000836
837 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
838 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000839 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000840 largestWriteBufferSize_ = 0;
841 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000842}
843
David Reiss8ede8182010-09-02 15:26:28 +0000844TNonblockingServer::~TNonblockingServer() {
845 // TODO: We currently leak any active TConnection objects.
846 // Since we're shutting down and destroying the event_base, the TConnection
847 // objects will never receive any additional callbacks. (And even if they
848 // did, it would be bad, since they keep a pointer around to the server,
849 // which is being destroyed.)
850
851 // Clean up unused TConnection objects in connectionStack_
852 while (!connectionStack_.empty()) {
853 TConnection* connection = connectionStack_.top();
854 connectionStack_.pop();
855 delete connection;
856 }
David Reiss8ede8182010-09-02 15:26:28 +0000857}
858
Mark Slee2f6404d2006-10-10 01:37:40 +0000859/**
860 * Creates a new connection either by reusing an object off the stack or
861 * by allocating a new one entirely
862 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000863TNonblockingServer::TConnection* TNonblockingServer::createConnection(
Jake Farrellb0d95602011-12-06 01:17:26 +0000864 int socket, const sockaddr* addr, socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000865 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000866 Guard g(connMutex_);
867
868 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000869 assert(nextIOThread_ < ioThreads_.size());
870 int selectedThreadIdx = nextIOThread_;
871 nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
872
873 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
874
875 // Check the connection stack to see if we can re-use
876 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000877 if (connectionStack_.empty()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000878 result = new TConnection(socket, ioThread, addr, addrLen);
879 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000880 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000881 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000882 connectionStack_.pop();
Jake Farrellb0d95602011-12-06 01:17:26 +0000883 result->init(socket, ioThread, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000884 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000885 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000886}
887
888/**
889 * Returns a connection to the stack
890 */
891void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000892 Guard g(connMutex_);
893
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000894 if (connectionStackLimit_ &&
895 (connectionStack_.size() >= connectionStackLimit_)) {
896 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000897 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000898 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000899 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000900 connectionStack_.push(connection);
901 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000902}
903
904/**
David Reissa79e4882008-03-05 07:51:47 +0000905 * Server socket had something happen. We accept all waiting client
906 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000907 */
908void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000909 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000910 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000911 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000912
Mark Slee2f6404d2006-10-10 01:37:40 +0000913 // Server socket accepted a new connection
914 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000915 sockaddr_storage addrStorage;
916 sockaddr* addrp = (sockaddr*)&addrStorage;
917 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000918
Mark Slee2f6404d2006-10-10 01:37:40 +0000919 // Going to accept a new client socket
920 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000921
Mark Slee2f6404d2006-10-10 01:37:40 +0000922 // Accept as many new clients as possible, even though libevent signaled only
923 // one, this helps us to avoid having to go back into the libevent engine so
924 // many times
David Reiss105961d2010-10-06 17:10:17 +0000925 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000926 // If we're overloaded, take action here
927 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000928 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000929 nConnectionsDropped_++;
930 nTotalConnectionsDropped_++;
931 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
932 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000933 return;
David Reiss01fe1532010-03-09 05:19:25 +0000934 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
935 if (!drainPendingTask()) {
936 // Nothing left to discard, so we drop connection instead.
937 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000938 return;
David Reiss01fe1532010-03-09 05:19:25 +0000939 }
940 }
941 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000942
Mark Slee2f6404d2006-10-10 01:37:40 +0000943 // Explicitly set this socket to NONBLOCK mode
944 int flags;
945 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
946 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000947 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000948 close(clientSocket);
949 return;
950 }
951
952 // Create a new TConnection for this client socket.
953 TConnection* clientConnection =
Jake Farrellb0d95602011-12-06 01:17:26 +0000954 createConnection(clientSocket, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000955
956 // Fail fast if we could not create a TConnection object
957 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000958 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000959 close(clientSocket);
960 return;
961 }
962
Jake Farrellb0d95602011-12-06 01:17:26 +0000963 /*
964 * Either notify the ioThread that is assigned this connection to
965 * start processing, or if it is us, we'll just ask this
966 * connection to do its initial state change here.
967 *
968 * (We need to avoid writing to our own notification pipe, to
969 * avoid possible deadlocks if the pipe is full.)
970 *
971 * The IO thread #0 is the only one that handles these listen
972 * events, so unless the connection has been assigned to thread #0
973 * we know it's not on our thread.
974 */
975 if (clientConnection->getIOThreadNumber() == 0) {
976 clientConnection->transition();
977 } else {
978 clientConnection->notifyIOThread();
979 }
David Reiss3e7fca42009-09-19 01:59:13 +0000980
981 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000982 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000983 }
Mark Slee79b16942007-11-26 19:05:29 +0000984
Jake Farrellb0d95602011-12-06 01:17:26 +0000985
Mark Slee2f6404d2006-10-10 01:37:40 +0000986 // Done looping accept, now we have to make sure the error is due to
987 // blocking. Any other error is a problem
988 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000989 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000990 }
991}
992
993/**
Mark Slee79b16942007-11-26 19:05:29 +0000994 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000995 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000996void TNonblockingServer::createAndListenOnSocket() {
Mark Slee79b16942007-11-26 19:05:29 +0000997 int s;
Jake Farrellb0d95602011-12-06 01:17:26 +0000998
Mark Sleefb4b5142007-11-20 01:27:08 +0000999 struct addrinfo hints, *res, *res0;
1000 int error;
Mark Slee79b16942007-11-26 19:05:29 +00001001
Mark Sleefb4b5142007-11-20 01:27:08 +00001002 char port[sizeof("65536") + 1];
1003 memset(&hints, 0, sizeof(hints));
1004 hints.ai_family = PF_UNSPEC;
1005 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +00001006 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +00001007 sprintf(port, "%d", port_);
1008
1009 // Wildcard address
1010 error = getaddrinfo(NULL, port, &hints, &res0);
1011 if (error) {
David Reiss9b209552008-04-08 06:26:05 +00001012 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
1013 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +00001014 return;
1015 }
1016
1017 // Pick the ipv6 address first since ipv4 addresses can be mapped
1018 // into ipv6 space.
1019 for (res = res0; res; res = res->ai_next) {
1020 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
1021 break;
1022 }
1023
Mark Slee2f6404d2006-10-10 01:37:40 +00001024 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001025 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1026 if (s == -1) {
1027 freeaddrinfo(res0);
1028 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001029 }
1030
David Reiss13aea462008-06-10 22:56:04 +00001031 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001032 if (res->ai_family == AF_INET6) {
1033 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001034 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001035 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1036 }
David Reiss13aea462008-06-10 22:56:04 +00001037 }
1038 #endif // #ifdef IPV6_V6ONLY
1039
1040
Mark Slee79b16942007-11-26 19:05:29 +00001041 int one = 1;
1042
1043 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +00001044 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001045
Roger Meier30aae0c2011-07-08 12:23:31 +00001046 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +00001047 close(s);
1048 freeaddrinfo(res0);
1049 throw TException("TNonblockingServer::serve() bind");
1050 }
1051
1052 // Done with the addr info
1053 freeaddrinfo(res0);
1054
1055 // Set up this file descriptor for listening
1056 listenSocket(s);
1057}
1058
1059/**
1060 * Takes a socket created by listenSocket() and sets various options on it
1061 * to prepare for use in the server.
1062 */
1063void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001064 // Set socket to nonblocking mode
1065 int flags;
Mark Slee79b16942007-11-26 19:05:29 +00001066 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
1067 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
1068 close(s);
1069 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001070 }
1071
1072 int one = 1;
1073 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001074
1075 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001076 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001077
1078 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001079 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001080
1081 // Set TCP nodelay if available, MAC OS X Hack
1082 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1083 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001084 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001085 #endif
1086
David Reiss1c20c872010-03-09 05:20:14 +00001087 #ifdef TCP_LOW_MIN_RTO
1088 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001089 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001090 }
1091 #endif
1092
Mark Slee79b16942007-11-26 19:05:29 +00001093 if (listen(s, LISTEN_BACKLOG) == -1) {
1094 close(s);
1095 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001096 }
1097
Mark Slee79b16942007-11-26 19:05:29 +00001098 // Cool, this socket is good to go, set it as the serverSocket_
1099 serverSocket_ = s;
1100}
1101
David Reiss068f4162010-03-09 05:19:45 +00001102void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1103 threadManager_ = threadManager;
1104 if (threadManager != NULL) {
1105 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
1106 threadPoolProcessing_ = true;
1107 } else {
1108 threadPoolProcessing_ = false;
1109 }
1110}
1111
David Reiss01fe1532010-03-09 05:19:25 +00001112bool TNonblockingServer::serverOverloaded() {
1113 size_t activeConnections = numTConnections_ - connectionStack_.size();
1114 if (numActiveProcessors_ > maxActiveProcessors_ ||
1115 activeConnections > maxConnections_) {
1116 if (!overloaded_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001117 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001118 overloaded_ = true;
1119 }
1120 } else {
1121 if (overloaded_ &&
1122 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1123 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001124 GlobalOutput.printf("TNonblockingServer: overload ended; "
1125 "%u dropped (%llu total)",
David Reiss01fe1532010-03-09 05:19:25 +00001126 nConnectionsDropped_, nTotalConnectionsDropped_);
1127 nConnectionsDropped_ = 0;
1128 overloaded_ = false;
1129 }
1130 }
1131
1132 return overloaded_;
1133}
1134
1135bool TNonblockingServer::drainPendingTask() {
1136 if (threadManager_) {
1137 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1138 if (task) {
1139 TConnection* connection =
1140 static_cast<TConnection::Task*>(task.get())->getTConnection();
1141 assert(connection && connection->getServer()
1142 && connection->getState() == APP_WAIT_TASK);
1143 connection->forceClose();
1144 return true;
1145 }
1146 }
1147 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001148}
1149
David Reiss068f4162010-03-09 05:19:45 +00001150void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1151 TConnection* connection =
1152 static_cast<TConnection::Task*>(task.get())->getTConnection();
Jake Farrellb0d95602011-12-06 01:17:26 +00001153 assert(connection && connection->getServer() &&
1154 connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001155 connection->forceClose();
1156}
1157
Jake Farrellb0d95602011-12-06 01:17:26 +00001158void TNonblockingServer::stop() {
1159 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001160 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001161 ioThreads_[i]->stop();
1162 }
1163}
1164
Mark Slee79b16942007-11-26 19:05:29 +00001165/**
1166 * Main workhorse function, starts up the server listening on a port and
1167 * loops over the libevent handler.
1168 */
1169void TNonblockingServer::serve() {
Jake Farrellb0d95602011-12-06 01:17:26 +00001170 // init listen socket
1171 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001172
Jake Farrellb0d95602011-12-06 01:17:26 +00001173 // set up the IO threads
1174 assert(ioThreads_.empty());
1175 if (!numIOThreads_) {
1176 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001177 }
1178
Roger Meierd0cdecf2011-12-08 19:34:01 +00001179 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001180 // the first IO thread also does the listening on server socket
1181 int listenFd = (id == 0 ? serverSocket_ : -1);
Mark Slee2f6404d2006-10-10 01:37:40 +00001182
Jake Farrellb0d95602011-12-06 01:17:26 +00001183 shared_ptr<TNonblockingIOThread> thread(
1184 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
1185 ioThreads_.push_back(thread);
1186 }
1187
1188 // Notify handler of the preServe event
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001189 if (eventHandler_ != NULL) {
1190 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001191 }
1192
Jake Farrellb0d95602011-12-06 01:17:26 +00001193 // Start all of our helper IO threads. Note that the threads run forever,
1194 // only terminating if stop() is called.
1195 assert(ioThreads_.size() == numIOThreads_);
1196 assert(ioThreads_.size() > 0);
1197
1198 GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
1199 port_, ioThreads_.size());
1200
1201 // Launch all the secondary IO threads in separate threads
1202 if (ioThreads_.size() > 1) {
1203 ioThreadFactory_.reset(new PosixThreadFactory(
1204 PosixThreadFactory::OTHER, // scheduler
1205 PosixThreadFactory::NORMAL, // priority
1206 1, // stack size (MB)
1207 false // detached
1208 ));
1209
1210 assert(ioThreadFactory_.get());
1211
1212 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001213 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001214 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1215 ioThreads_[i]->setThread(thread);
1216 thread->start();
1217 }
1218 }
1219
1220 // Run the primary (listener) IO thread loop in our main thread; this will
1221 // only return when the server is shutting down.
1222 ioThreads_[0]->run();
1223
1224 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001225 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001226 ioThreads_[i]->join();
1227 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1228 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001229}
1230
Jake Farrellb0d95602011-12-06 01:17:26 +00001231TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1232 int number,
1233 int listenSocket,
1234 bool useHighPriority)
1235 : server_(server)
1236 , number_(number)
1237 , listenSocket_(listenSocket)
1238 , useHighPriority_(useHighPriority)
1239 , eventBase_(NULL) {
1240 notificationPipeFDs_[0] = -1;
1241 notificationPipeFDs_[1] = -1;
1242}
1243
1244TNonblockingIOThread::~TNonblockingIOThread() {
1245 // make sure our associated thread is fully finished
1246 join();
1247
1248 if (eventBase_) {
1249 event_base_free(eventBase_);
Bryan Duxbury76c43682011-08-24 21:26:48 +00001250 }
1251
Jake Farrellb0d95602011-12-06 01:17:26 +00001252 if (listenSocket_ >= 0) {
1253 if (0 != close(listenSocket_)) {
1254 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
1255 errno);
1256 }
1257 listenSocket_ = TNonblockingServer::INVALID_SOCKET;
1258 }
1259
1260 for (int i = 0; i < 2; ++i) {
1261 if (notificationPipeFDs_[i] >= 0) {
1262 if (0 != ::close(notificationPipeFDs_[i])) {
1263 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
1264 errno);
1265 }
1266 notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET;
1267 }
1268 }
1269}
1270
1271void TNonblockingIOThread::createNotificationPipe() {
1272 if (pipe(notificationPipeFDs_) != 0) {
1273 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
1274 throw TException("can't create notification pipe");
1275 }
1276 int flags;
1277 if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
1278 fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
1279 close(notificationPipeFDs_[0]);
1280 close(notificationPipeFDs_[1]);
1281 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
1282 }
1283 for (int i = 0; i < 2; ++i) {
1284 if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
1285 fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
1286 close(notificationPipeFDs_[0]);
1287 close(notificationPipeFDs_[1]);
1288 throw TException("TNonblockingServer::createNotificationPipe() "
1289 "FD_CLOEXEC");
1290 }
1291 }
1292}
1293
1294/**
1295 * Register the core libevent events onto the proper base.
1296 */
1297void TNonblockingIOThread::registerEvents() {
1298 if (listenSocket_ >= 0) {
1299 // Register the server event
1300 event_set(&serverEvent_,
1301 listenSocket_,
1302 EV_READ | EV_PERSIST,
1303 TNonblockingIOThread::listenHandler,
1304 server_);
1305 event_base_set(eventBase_, &serverEvent_);
1306
1307 // Add the event and start up the server
1308 if (-1 == event_add(&serverEvent_, 0)) {
1309 throw TException("TNonblockingServer::serve(): "
1310 "event_add() failed on server listen event");
1311 }
1312 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
1313 number_);
1314 }
1315
1316 createNotificationPipe();
1317
1318 // Create an event to be notified when a task finishes
1319 event_set(&notificationEvent_,
1320 getNotificationRecvFD(),
1321 EV_READ | EV_PERSIST,
1322 TNonblockingIOThread::notifyHandler,
1323 this);
1324
1325 // Attach to the base
1326 event_base_set(eventBase_, &notificationEvent_);
1327
1328 // Add the event and start up the server
1329 if (-1 == event_add(&notificationEvent_, 0)) {
1330 throw TException("TNonblockingServer::serve(): "
1331 "event_add() failed on task-done notification event");
1332 }
1333 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
1334 number_);
1335}
1336
1337bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
1338 int fd = getNotificationSendFD();
1339 if (fd < 0) {
1340 return false;
1341 }
1342
1343 const int kSize = sizeof(conn);
1344 if (write(fd, &conn, kSize) != kSize) {
1345 return false;
1346 }
1347
1348 return true;
1349}
1350
1351/* static */
1352void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) {
1353 TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
1354 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001355 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001356
1357 while (true) {
1358 TNonblockingServer::TConnection* connection = 0;
1359 const int kSize = sizeof(connection);
1360 int nBytes = read(fd, &connection, kSize);
1361 if (nBytes == kSize) {
1362 if (connection == NULL) {
1363 // this is the command to stop our thread, exit the handler!
1364 return;
1365 }
1366 connection->transition();
1367 } else if (nBytes > 0) {
1368 // throw away these bytes and hope that next time we get a solid read
1369 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
1370 nBytes, kSize);
1371 ioThread->breakLoop(true);
1372 return;
1373 } else if (nBytes == 0) {
1374 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1375 // exit the loop
1376 break;
1377 } else { // nBytes < 0
1378 if (errno != EWOULDBLOCK && errno != EAGAIN) {
1379 GlobalOutput.perror(
1380 "TNonblocking: notifyHandler read() failed: ", errno);
1381 ioThread->breakLoop(true);
1382 return;
1383 }
1384 // exit the loop
1385 break;
1386 }
1387 }
1388}
1389
1390void TNonblockingIOThread::breakLoop(bool error) {
1391 if (error) {
1392 GlobalOutput.printf(
1393 "TNonblockingServer: IO thread #%d exiting with error.", number_);
1394 // TODO: figure out something better to do here, but for now kill the
1395 // whole process.
1396 GlobalOutput.printf("TNonblockingServer: aborting process.");
1397 ::abort();
1398 }
1399
1400 // sets a flag so that the loop exits on the next event
Bryan Duxbury76c43682011-08-24 21:26:48 +00001401 event_base_loopbreak(eventBase_);
1402
Jake Farrellb0d95602011-12-06 01:17:26 +00001403 // event_base_loopbreak() only causes the loop to exit the next time
1404 // it wakes up. We need to force it to wake up, in case there are
1405 // no real events it needs to process.
Bryan Duxbury76c43682011-08-24 21:26:48 +00001406 //
Jake Farrellb0d95602011-12-06 01:17:26 +00001407 // If we're running in the same thread, we can't use the notify(0)
1408 // mechanism to stop the thread, but happily if we're running in the
1409 // same thread, this means the thread can't be blocking in the event
1410 // loop either.
1411 if (!pthread_equal(pthread_self(), threadId_)) {
1412 notify(NULL);
1413 }
1414}
1415
1416void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
1417 // Start out with a standard, low-priority setup for the sched params.
1418 struct sched_param sp;
1419 bzero((void*) &sp, sizeof(sp));
1420 int policy = SCHED_OTHER;
1421
1422 // If desired, set up high-priority sched params structure.
1423 if (value) {
1424 // FIFO scheduler, ranked above default SCHED_OTHER queue
1425 policy = SCHED_FIFO;
1426 // The priority only compares us to other SCHED_FIFO threads, so we
1427 // just pick a random priority halfway between min & max.
1428 const int priority = (sched_get_priority_max(policy) +
1429 sched_get_priority_min(policy)) / 2;
1430
1431 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001432 }
1433
Jake Farrellb0d95602011-12-06 01:17:26 +00001434 // Actually set the sched params for the current thread.
1435 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
1436 GlobalOutput.printf(
1437 "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
1438 } else {
1439 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
1440 }
1441}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001442
Jake Farrellb0d95602011-12-06 01:17:26 +00001443void TNonblockingIOThread::run() {
1444 threadId_ = pthread_self();
1445
1446 assert(eventBase_ == 0);
1447 eventBase_ = event_base_new();
1448
1449 // Print some libevent stats
1450 if (number_ == 0) {
1451 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
1452 event_get_version(),
1453 event_base_get_method(eventBase_));
1454 }
1455
1456
1457 registerEvents();
1458
1459 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
1460 number_);
1461
1462 if (useHighPriority_) {
1463 setCurrentThreadHighPriority(true);
1464 }
1465
1466 // Run libevent engine, never returns, invokes calls to eventHandler
1467 event_base_loop(eventBase_, 0);
1468
1469 if (useHighPriority_) {
1470 setCurrentThreadHighPriority(false);
1471 }
1472
1473 // cleans up our registered events
1474 cleanupEvents();
1475
1476 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
1477 number_);
1478}
1479
1480void TNonblockingIOThread::cleanupEvents() {
1481 // stop the listen socket, if any
1482 if (listenSocket_ >= 0) {
1483 if (event_del(&serverEvent_) == -1) {
1484 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
1485 }
1486 }
1487
1488 event_del(&notificationEvent_);
1489}
1490
1491
1492void TNonblockingIOThread::stop() {
1493 // This should cause the thread to fall out of its event loop ASAP.
1494 breakLoop(false);
1495}
1496
1497void TNonblockingIOThread::join() {
1498 // If this was a thread created by a factory (not the thread that called
1499 // serve()), we join() it to make sure we shut down fully.
1500 if (thread_) {
1501 try {
1502 // Note that it is safe to both join() ourselves twice, as well as join
1503 // the current thread as the pthread implementation checks for deadlock.
1504 thread_->join();
1505 } catch(...) {
1506 // swallow everything
1507 }
1508 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001509}
1510
T Jake Lucianib5e62212009-01-31 22:36:20 +00001511}}} // apache::thrift::server