THRIFT-1552 Include paths for c/c++ should be prefixed with 'thrift/'
To ensure there are no include path collisions the C and C++ header
include paths should include 'thrift' as the root leaf. This will
prevent having to place /usr/include/thrift into the compilers include
header search path, which might otherwise result in the compiler
accidentally picking up headers that it shouldn't.
e.g. #include <foo/bar.h> should be #include <thrift/foo/bar.h>
Change-Id: I48f2b0f549bda0fc81e85506ac857adc800b98a1
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1325674 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
new file mode 100644
index 0000000..3e95508
--- /dev/null
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -0,0 +1,1535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define __STDC_FORMAT_MACROS
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "TNonblockingServer.h"
+#include <thrift/concurrency/Exception.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+
+#include <iostream>
+
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+
+#ifdef HAVE_NETINET_IN_H
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#endif
+
+#ifdef HAVE_ARPA_INET_H
+#include <arpa/inet.h>
+#endif
+
+#ifdef HAVE_NETDB_H
+#include <netdb.h>
+#endif
+
+#ifdef HAVE_FCNTL_H
+#include <fcntl.h>
+#endif
+
+#include <errno.h>
+#include <assert.h>
+
+#ifdef HAVE_SCHED_H
+#include <sched.h>
+#endif
+
+#ifndef AF_LOCAL
+#define AF_LOCAL AF_UNIX
+#endif
+
+#ifdef _MSC_VER
+#define PRIu32 "I32u"
+#endif
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using namespace std;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransportException;
+using boost::shared_ptr;
+
+/// Three states for sockets: recv frame size, recv data, and send mode
+enum TSocketState {
+ SOCKET_RECV_FRAMING,
+ SOCKET_RECV,
+ SOCKET_SEND
+};
+
+/**
+ * Five states for the nonblocking server:
+ * 1) initialize
+ * 2) read 4 byte frame size
+ * 3) read frame of data
+ * 4) send back data (if any)
+ * 5) force immediate connection close
+ */
+enum TAppState {
+ APP_INIT,
+ APP_READ_FRAME_SIZE,
+ APP_READ_REQUEST,
+ APP_WAIT_TASK,
+ APP_SEND_RESULT,
+ APP_CLOSE_CONNECTION
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TNonblockingServer::TConnection {
+ private:
+ /// Server IO Thread handling this connection
+ TNonblockingIOThread* ioThread_;
+
+ /// Server handle
+ TNonblockingServer* server_;
+
+ /// TProcessor
+ boost::shared_ptr<TProcessor> processor_;
+
+ /// Object wrapping network socket
+ boost::shared_ptr<TSocket> tSocket_;
+
+ /// Libevent object
+ struct event event_;
+
+ /// Libevent flags
+ short eventFlags_;
+
+ /// Socket mode
+ TSocketState socketState_;
+
+ /// Application state
+ TAppState appState_;
+
+ /// How much data needed to read
+ uint32_t readWant_;
+
+ /// Where in the read buffer are we
+ uint32_t readBufferPos_;
+
+ /// Read buffer
+ uint8_t* readBuffer_;
+
+ /// Read buffer size
+ uint32_t readBufferSize_;
+
+ /// Write buffer
+ uint8_t* writeBuffer_;
+
+ /// Write buffer size
+ uint32_t writeBufferSize_;
+
+ /// How far through writing are we?
+ uint32_t writeBufferPos_;
+
+ /// Largest size of write buffer seen since buffer was constructed
+ size_t largestWriteBufferSize_;
+
+ /// Count of the number of calls for use with getResizeBufferEveryN().
+ int32_t callsForResize_;
+
+ /// Task handle
+ int taskHandle_;
+
+ /// Task event
+ struct event taskEvent_;
+
+ /// Transport to read from
+ boost::shared_ptr<TMemoryBuffer> inputTransport_;
+
+ /// Transport that processor writes to
+ boost::shared_ptr<TMemoryBuffer> outputTransport_;
+
+ /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
+ boost::shared_ptr<TTransport> factoryInputTransport_;
+ boost::shared_ptr<TTransport> factoryOutputTransport_;
+
+ /// Protocol decoder
+ boost::shared_ptr<TProtocol> inputProtocol_;
+
+ /// Protocol encoder
+ boost::shared_ptr<TProtocol> outputProtocol_;
+
+ /// Server event handler, if any
+ boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+
+ /// Thrift call context, if any
+ void *connectionContext_;
+
+ /// Go into read mode
+ void setRead() {
+ setFlags(EV_READ | EV_PERSIST);
+ }
+
+ /// Go into write mode
+ void setWrite() {
+ setFlags(EV_WRITE | EV_PERSIST);
+ }
+
+ /// Set socket idle
+ void setIdle() {
+ setFlags(0);
+ }
+
+ /**
+ * Set event flags for this connection.
+ *
+ * @param eventFlags flags we pass to libevent for the connection.
+ */
+ void setFlags(short eventFlags);
+
+ /**
+ * Libevent handler called (via our static wrapper) when the connection
+ * socket had something happen. Rather than use the flags libevent passed,
+ * we use the connection state to determine whether we need to read or
+ * write the socket.
+ */
+ void workSocket();
+
+ /// Close this connection and free or reset its resources.
+ void close();
+
+ public:
+
+ class Task;
+
+ /// Constructor
+ TConnection(int socket, TNonblockingIOThread* ioThread,
+ const sockaddr* addr, socklen_t addrLen) {
+ readBuffer_ = NULL;
+ readBufferSize_ = 0;
+
+ ioThread_ = ioThread;
+ server_ = ioThread->getServer();
+
+ // Allocate input and output transports these only need to be allocated
+ // once per TConnection (they don't need to be reallocated on init() call)
+ inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
+ outputTransport_.reset(new TMemoryBuffer(
+ server_->getWriteBufferDefaultSize()));
+ tSocket_.reset(new TSocket());
+ init(socket, ioThread, addr, addrLen);
+ }
+
+ ~TConnection() {
+ std::free(readBuffer_);
+ }
+
+ /**
+ * Check buffers against any size limits and shrink it if exceeded.
+ *
+ * @param readLimit we reduce read buffer size to this (if nonzero).
+ * @param writeLimit if nonzero and write buffer is larger, replace it.
+ */
+ void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
+
+ /// Initialize
+ void init(int socket, TNonblockingIOThread* ioThread,
+ const sockaddr* addr, socklen_t addrLen);
+
+ /**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+ void transition();
+
+ /**
+ * C-callable event handler for connection events. Provides a callback
+ * that libevent can understand which invokes connection_->workSocket().
+ *
+ * @param fd the descriptor the event occurred on.
+ * @param which the flags associated with the event.
+ * @param v void* callback arg where we placed TConnection's "this".
+ */
+ static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
+ assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
+ ((TConnection*)v)->workSocket();
+ }
+
+ /**
+ * Notification to server that processing has ended on this request.
+ * Can be called either when processing is completed or when a waiting
+ * task has been preemptively terminated (on overload).
+ *
+ * Don't call this from the IO thread itself.
+ *
+ * @return true if successful, false if unable to notify (check errno).
+ */
+ bool notifyIOThread() {
+ return ioThread_->notify(this);
+ }
+
+ /*
+ * Returns the number of this connection's currently assigned IO
+ * thread.
+ */
+ int getIOThreadNumber() const {
+ return ioThread_->getThreadNumber();
+ }
+
+ /// Force connection shutdown for this connection.
+ void forceClose() {
+ appState_ = APP_CLOSE_CONNECTION;
+ if (!notifyIOThread()) {
+ throw TException("TConnection::forceClose: failed write on notify pipe");
+ }
+ }
+
+ /// return the server this connection was initialized for.
+ TNonblockingServer* getServer() const {
+ return server_;
+ }
+
+ /// get state of connection.
+ TAppState getState() const {
+ return appState_;
+ }
+
+ /// return the TSocket transport wrapping this network connection
+ boost::shared_ptr<TSocket> getTSocket() const {
+ return tSocket_;
+ }
+
+ /// return the server event handler if any
+ boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
+ return serverEventHandler_;
+ }
+
+ /// return the Thrift connection context if any
+ void* getConnectionContext() {
+ return connectionContext_;
+ }
+
+};
+
+class TNonblockingServer::TConnection::Task: public Runnable {
+ public:
+ Task(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output,
+ TConnection* connection) :
+ processor_(processor),
+ input_(input),
+ output_(output),
+ connection_(connection),
+ serverEventHandler_(connection_->getServerEventHandler()),
+ connectionContext_(connection_->getConnectionContext()) {}
+
+ void run() {
+ try {
+ for (;;) {
+ if (serverEventHandler_ != NULL) {
+ serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
+ }
+ if (!processor_->process(input_, output_, connectionContext_) ||
+ !input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (const TTransportException& ttx) {
+ GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
+ } catch (const bad_alloc&) {
+ GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
+ exit(-1);
+ } catch (const std::exception& x) {
+ GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
+ typeid(x).name(), x.what());
+ } catch (...) {
+ GlobalOutput.printf(
+ "TNonblockingServer: unknown exception while processing.");
+ }
+
+ // Signal completion back to the libevent thread via a pipe
+ if (!connection_->notifyIOThread()) {
+ throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
+ }
+ }
+
+ TConnection* getTConnection() {
+ return connection_;
+ }
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocol> input_;
+ boost::shared_ptr<TProtocol> output_;
+ TConnection* connection_;
+ boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+ void* connectionContext_;
+};
+
+void TNonblockingServer::TConnection::init(int socket,
+ TNonblockingIOThread* ioThread,
+ const sockaddr* addr,
+ socklen_t addrLen) {
+ tSocket_->setSocketFD(socket);
+ tSocket_->setCachedAddress(addr, addrLen);
+
+ ioThread_ = ioThread;
+ server_ = ioThread->getServer();
+ appState_ = APP_INIT;
+ eventFlags_ = 0;
+
+ readBufferPos_ = 0;
+ readWant_ = 0;
+
+ writeBuffer_ = NULL;
+ writeBufferSize_ = 0;
+ writeBufferPos_ = 0;
+ largestWriteBufferSize_ = 0;
+
+ socketState_ = SOCKET_RECV_FRAMING;
+ callsForResize_ = 0;
+
+ // get input/transports
+ factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
+ inputTransport_);
+ factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
+ outputTransport_);
+
+ // Create protocol
+ inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
+ factoryInputTransport_);
+ outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
+ factoryOutputTransport_);
+
+ // Set up for any server event handler
+ serverEventHandler_ = server_->getEventHandler();
+ if (serverEventHandler_ != NULL) {
+ connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
+ outputProtocol_);
+ } else {
+ connectionContext_ = NULL;
+ }
+
+ // Get the processor
+ processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
+}
+
+void TNonblockingServer::TConnection::workSocket() {
+ int got=0, left=0, sent=0;
+ uint32_t fetch = 0;
+
+ switch (socketState_) {
+ case SOCKET_RECV_FRAMING:
+ union {
+ uint8_t buf[sizeof(uint32_t)];
+ uint32_t size;
+ } framing;
+
+ // if we've already received some bytes we kept them here
+ framing.size = readWant_;
+ // determine size of this frame
+ try {
+ // Read from the socket
+ fetch = tSocket_->read(&framing.buf[readBufferPos_],
+ uint32_t(sizeof(framing.size) - readBufferPos_));
+ if (fetch == 0) {
+ // Whenever we get here it means a remote disconnect
+ close();
+ return;
+ }
+ readBufferPos_ += fetch;
+ } catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
+
+ return;
+ }
+
+ if (readBufferPos_ < sizeof(framing.size)) {
+ // more needed before frame size is known -- save what we have so far
+ readWant_ = framing.size;
+ return;
+ }
+
+ readWant_ = ntohl(framing.size);
+ if (readWant_ > server_->getMaxFrameSize()) {
+ // Don't allow giant frame sizes. This prevents bad clients from
+ // causing us to try and allocate a giant buffer.
+ GlobalOutput.printf("TNonblockingServer: frame size too large "
+ "(%"PRIu32" > %zu) from client %s. remote side not "
+ "using TFramedTransport?",
+ readWant_, server_->getMaxFrameSize(),
+ tSocket_->getSocketInfo().c_str());
+ close();
+ return;
+ }
+ // size known; now get the rest of the frame
+ transition();
+ return;
+
+ case SOCKET_RECV:
+ // It is an error to be in this state if we already have all the data
+ assert(readBufferPos_ < readWant_);
+
+ try {
+ // Read from the socket
+ fetch = readWant_ - readBufferPos_;
+ got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+ }
+ catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
+
+ return;
+ }
+
+ if (got > 0) {
+ // Move along in the buffer
+ readBufferPos_ += got;
+
+ // Check that we did not overdo it
+ assert(readBufferPos_ <= readWant_);
+
+ // We are done reading, move onto the next state
+ if (readBufferPos_ == readWant_) {
+ transition();
+ }
+ return;
+ }
+
+ // Whenever we get down here it means a remote disconnect
+ close();
+
+ return;
+
+ case SOCKET_SEND:
+ // Should never have position past size
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // If there is no data to send, then let us move on
+ if (writeBufferPos_ == writeBufferSize_) {
+ GlobalOutput("WARNING: Send state with no data to send\n");
+ transition();
+ return;
+ }
+
+ try {
+ left = writeBufferSize_ - writeBufferPos_;
+ sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
+ }
+ catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
+ close();
+ return;
+ }
+
+ writeBufferPos_ += sent;
+
+ // Did we overdo it?
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // We are done!
+ if (writeBufferPos_ == writeBufferSize_) {
+ transition();
+ }
+
+ return;
+
+ default:
+ GlobalOutput.printf("Unexpected Socket State %d", socketState_);
+ assert(0);
+ }
+}
+
+/**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+void TNonblockingServer::TConnection::transition() {
+ // ensure this connection is active right now
+ assert(ioThread_);
+ assert(server_);
+
+ // Switch upon the state that we are currently in and move to a new state
+ switch (appState_) {
+
+ case APP_READ_REQUEST:
+ // We are done reading the request, package the read buffer into transport
+ // and get back some data from the dispatch function
+ inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+ outputTransport_->resetBuffer();
+ // Prepend four bytes of blank space to the buffer so we can
+ // write the frame size there later.
+ outputTransport_->getWritePtr(4);
+ outputTransport_->wroteBytes(4);
+
+ server_->incrementActiveProcessors();
+
+ if (server_->isThreadPoolProcessing()) {
+ // We are setting up a Task to do this work and we will wait on it
+
+ // Create task and dispatch to the thread manager
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(processor_,
+ inputProtocol_,
+ outputProtocol_,
+ this));
+ // The application is now waiting on the task to finish
+ appState_ = APP_WAIT_TASK;
+
+ try {
+ server_->addTask(task);
+ } catch (IllegalStateException & ise) {
+ // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+ GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+ close();
+ }
+
+ // Set this connection idle so that libevent doesn't process more
+ // data on it while we're still waiting for the threadmanager to
+ // finish this task
+ setIdle();
+ return;
+ } else {
+ try {
+ // Invoke the processor
+ processor_->process(inputProtocol_, outputProtocol_,
+ connectionContext_);
+ } catch (const TTransportException &ttx) {
+ GlobalOutput.printf("TNonblockingServer transport error in "
+ "process(): %s", ttx.what());
+ server_->decrementActiveProcessors();
+ close();
+ return;
+ } catch (const std::exception &x) {
+ GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
+ typeid(x).name(), x.what());
+ server_->decrementActiveProcessors();
+ close();
+ return;
+ } catch (...) {
+ GlobalOutput.printf("Server::process() unknown exception");
+ server_->decrementActiveProcessors();
+ close();
+ return;
+ }
+ }
+
+ // Intentionally fall through here, the call to process has written into
+ // the writeBuffer_
+
+ case APP_WAIT_TASK:
+ // We have now finished processing a task and the result has been written
+ // into the outputTransport_, so we grab its contents and place them into
+ // the writeBuffer_ for actual writing by the libevent thread
+
+ server_->decrementActiveProcessors();
+ // Get the result of the operation
+ outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+
+ // If the function call generated return data, then move into the send
+ // state and get going
+ // 4 bytes were reserved for frame size
+ if (writeBufferSize_ > 4) {
+
+ // Move into write state
+ writeBufferPos_ = 0;
+ socketState_ = SOCKET_SEND;
+
+ // Put the frame size into the write buffer
+ int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
+ memcpy(writeBuffer_, &frameSize, 4);
+
+ // Socket into write mode
+ appState_ = APP_SEND_RESULT;
+ setWrite();
+
+ // Try to work the socket immediately
+ // workSocket();
+
+ return;
+ }
+
+ // In this case, the request was oneway and we should fall through
+ // right back into the read frame header state
+ goto LABEL_APP_INIT;
+
+ case APP_SEND_RESULT:
+ // it's now safe to perform buffer size housekeeping.
+ if (writeBufferSize_ > largestWriteBufferSize_) {
+ largestWriteBufferSize_ = writeBufferSize_;
+ }
+ if (server_->getResizeBufferEveryN() > 0
+ && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
+ checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
+ server_->getIdleWriteBufferLimit());
+ callsForResize_ = 0;
+ }
+
+ // N.B.: We also intentionally fall through here into the INIT state!
+
+ LABEL_APP_INIT:
+ case APP_INIT:
+
+ // Clear write buffer variables
+ writeBuffer_ = NULL;
+ writeBufferPos_ = 0;
+ writeBufferSize_ = 0;
+
+ // Into read4 state we go
+ socketState_ = SOCKET_RECV_FRAMING;
+ appState_ = APP_READ_FRAME_SIZE;
+
+ readBufferPos_ = 0;
+
+ // Register read event
+ setRead();
+
+ // Try to work the socket right away
+ // workSocket();
+
+ return;
+
+ case APP_READ_FRAME_SIZE:
+ // We just read the request length
+ // Double the buffer size until it is big enough
+ if (readWant_ > readBufferSize_) {
+ if (readBufferSize_ == 0) {
+ readBufferSize_ = 1;
+ }
+ uint32_t newSize = readBufferSize_;
+ while (readWant_ > newSize) {
+ newSize *= 2;
+ }
+
+ uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+ if (newBuffer == NULL) {
+ // nothing else to be done...
+ throw std::bad_alloc();
+ }
+ readBuffer_ = newBuffer;
+ readBufferSize_ = newSize;
+ }
+
+ readBufferPos_= 0;
+
+ // Move into read request state
+ socketState_ = SOCKET_RECV;
+ appState_ = APP_READ_REQUEST;
+
+ // Work the socket right away
+ // workSocket();
+
+ return;
+
+ case APP_CLOSE_CONNECTION:
+ server_->decrementActiveProcessors();
+ close();
+ return;
+
+ default:
+ GlobalOutput.printf("Unexpected Application State %d", appState_);
+ assert(0);
+ }
+}
+
+void TNonblockingServer::TConnection::setFlags(short eventFlags) {
+ // Catch the do nothing case
+ if (eventFlags_ == eventFlags) {
+ return;
+ }
+
+ // Delete a previously existing event
+ if (eventFlags_ != 0) {
+ if (event_del(&event_) == -1) {
+ GlobalOutput("TConnection::setFlags event_del");
+ return;
+ }
+ }
+
+ // Update in memory structure
+ eventFlags_ = eventFlags;
+
+ // Do not call event_set if there are no flags
+ if (!eventFlags_) {
+ return;
+ }
+
+ /*
+ * event_set:
+ *
+ * Prepares the event structure &event to be used in future calls to
+ * event_add() and event_del(). The event will be prepared to call the
+ * eventHandler using the 'sock' file descriptor to monitor events.
+ *
+ * The events can be either EV_READ, EV_WRITE, or both, indicating
+ * that an application can read or write from the file respectively without
+ * blocking.
+ *
+ * The eventHandler will be called with the file descriptor that triggered
+ * the event and the type of event which will be one of: EV_TIMEOUT,
+ * EV_SIGNAL, EV_READ, EV_WRITE.
+ *
+ * The additional flag EV_PERSIST makes an event_add() persistent until
+ * event_del() has been called.
+ *
+ * Once initialized, the &event struct can be used repeatedly with
+ * event_add() and event_del() and does not need to be reinitialized unless
+ * the eventHandler and/or the argument to it are to be changed. However,
+ * when an ev structure has been added to libevent using event_add() the
+ * structure must persist until the event occurs (assuming EV_PERSIST
+ * is not set) or is removed using event_del(). You may not reuse the same
+ * ev structure for multiple monitored descriptors; each descriptor needs
+ * its own ev.
+ */
+ event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
+ TConnection::eventHandler, this);
+ event_base_set(ioThread_->getEventBase(), &event_);
+
+ // Add the event
+ if (event_add(&event_, 0) == -1) {
+ GlobalOutput("TConnection::setFlags(): could not event_add");
+ }
+}
+
+/**
+ * Closes a connection
+ */
+void TNonblockingServer::TConnection::close() {
+ // Delete the registered libevent
+ if (event_del(&event_) == -1) {
+ GlobalOutput.perror("TConnection::close() event_del", errno);
+ }
+
+ if (serverEventHandler_ != NULL) {
+ serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
+ }
+ ioThread_ = NULL;
+
+ // Close the socket
+ tSocket_->close();
+
+ // close any factory produced transports
+ factoryInputTransport_->close();
+ factoryOutputTransport_->close();
+
+ // Give this object back to the server that owns it
+ server_->returnConnection(this);
+}
+
+void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
+ size_t readLimit,
+ size_t writeLimit) {
+ if (readLimit > 0 && readBufferSize_ > readLimit) {
+ free(readBuffer_);
+ readBuffer_ = NULL;
+ readBufferSize_ = 0;
+ }
+
+ if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
+ // just start over
+ outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
+ largestWriteBufferSize_ = 0;
+ }
+}
+
+TNonblockingServer::~TNonblockingServer() {
+ // TODO: We currently leak any active TConnection objects.
+ // Since we're shutting down and destroying the event_base, the TConnection
+ // objects will never receive any additional callbacks. (And even if they
+ // did, it would be bad, since they keep a pointer around to the server,
+ // which is being destroyed.)
+
+ // Clean up unused TConnection objects in connectionStack_
+ while (!connectionStack_.empty()) {
+ TConnection* connection = connectionStack_.top();
+ connectionStack_.pop();
+ delete connection;
+ }
+}
+
+/**
+ * Creates a new connection either by reusing an object off the stack or
+ * by allocating a new one entirely
+ */
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(
+ int socket, const sockaddr* addr, socklen_t addrLen) {
+ // Check the stack
+ Guard g(connMutex_);
+
+ // pick an IO thread to handle this connection -- currently round robin
+ assert(nextIOThread_ < ioThreads_.size());
+ int selectedThreadIdx = nextIOThread_;
+ nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
+
+ TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
+
+ // Check the connection stack to see if we can re-use
+ TConnection* result = NULL;
+ if (connectionStack_.empty()) {
+ result = new TConnection(socket, ioThread, addr, addrLen);
+ ++numTConnections_;
+ } else {
+ result = connectionStack_.top();
+ connectionStack_.pop();
+ result->init(socket, ioThread, addr, addrLen);
+ }
+ return result;
+}
+
+/**
+ * Returns a connection to the stack
+ */
+void TNonblockingServer::returnConnection(TConnection* connection) {
+ Guard g(connMutex_);
+
+ if (connectionStackLimit_ &&
+ (connectionStack_.size() >= connectionStackLimit_)) {
+ delete connection;
+ --numTConnections_;
+ } else {
+ connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
+ connectionStack_.push(connection);
+ }
+}
+
+/**
+ * Server socket had something happen. We accept all waiting client
+ * connections on fd and assign TConnection objects to handle those requests.
+ */
+void TNonblockingServer::handleEvent(int fd, short which) {
+ (void) which;
+ // Make sure that libevent didn't mess up the socket handles
+ assert(fd == serverSocket_);
+
+ // Server socket accepted a new connection
+ socklen_t addrLen;
+ sockaddr_storage addrStorage;
+ sockaddr* addrp = (sockaddr*)&addrStorage;
+ addrLen = sizeof(addrStorage);
+
+ // Going to accept a new client socket
+ int clientSocket;
+
+ // Accept as many new clients as possible, even though libevent signaled only
+ // one, this helps us to avoid having to go back into the libevent engine so
+ // many times
+ while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
+ // If we're overloaded, take action here
+ if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+ Guard g(connMutex_);
+ nConnectionsDropped_++;
+ nTotalConnectionsDropped_++;
+ if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
+ ::close(clientSocket);
+ return;
+ } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
+ if (!drainPendingTask()) {
+ // Nothing left to discard, so we drop connection instead.
+ ::close(clientSocket);
+ return;
+ }
+ }
+ }
+
+ // Explicitly set this socket to NONBLOCK mode
+ int flags;
+ if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
+ fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
+ GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
+ ::close(clientSocket);
+ return;
+ }
+
+ // Create a new TConnection for this client socket.
+ TConnection* clientConnection =
+ createConnection(clientSocket, addrp, addrLen);
+
+ // Fail fast if we could not create a TConnection object
+ if (clientConnection == NULL) {
+ GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
+ ::close(clientSocket);
+ return;
+ }
+
+ /*
+ * Either notify the ioThread that is assigned this connection to
+ * start processing, or if it is us, we'll just ask this
+ * connection to do its initial state change here.
+ *
+ * (We need to avoid writing to our own notification pipe, to
+ * avoid possible deadlocks if the pipe is full.)
+ *
+ * The IO thread #0 is the only one that handles these listen
+ * events, so unless the connection has been assigned to thread #0
+ * we know it's not on our thread.
+ */
+ if (clientConnection->getIOThreadNumber() == 0) {
+ clientConnection->transition();
+ } else {
+ clientConnection->notifyIOThread();
+ }
+
+ // addrLen is written by the accept() call, so needs to be set before the next call.
+ addrLen = sizeof(addrStorage);
+ }
+
+
+ // Done looping accept, now we have to make sure the error is due to
+ // blocking. Any other error is a problem
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
+ }
+}
+
+/**
+ * Creates a socket to listen on and binds it to the local port.
+ */
+void TNonblockingServer::createAndListenOnSocket() {
+ int s;
+
+ struct addrinfo hints, *res, *res0;
+ int error;
+
+ char port[sizeof("65536") + 1];
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+ sprintf(port, "%d", port_);
+
+ // Wildcard address
+ error = getaddrinfo(NULL, port, &hints, &res0);
+ if (error) {
+ throw TException("TNonblockingServer::serve() getaddrinfo " +
+ string(gai_strerror(error)));
+ }
+
+ // Pick the ipv6 address first since ipv4 addresses can be mapped
+ // into ipv6 space.
+ for (res = res0; res; res = res->ai_next) {
+ if (res->ai_family == AF_INET6 || res->ai_next == NULL)
+ break;
+ }
+
+ // Create the server socket
+ s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (s == -1) {
+ freeaddrinfo(res0);
+ throw TException("TNonblockingServer::serve() socket() -1");
+ }
+
+ #ifdef IPV6_V6ONLY
+ if (res->ai_family == AF_INET6) {
+ int zero = 0;
+ if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
+ GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+ }
+ }
+ #endif // #ifdef IPV6_V6ONLY
+
+
+ int one = 1;
+
+ // Set reuseaddr to avoid 2MSL delay on server restart
+ setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
+
+ if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
+ ::close(s);
+ freeaddrinfo(res0);
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "TNonblockingServer::serve() bind",
+ errno);
+ }
+
+ // Done with the addr info
+ freeaddrinfo(res0);
+
+ // Set up this file descriptor for listening
+ listenSocket(s);
+}
+
+/**
+ * Takes a socket created by listenSocket() and sets various options on it
+ * to prepare for use in the server.
+ */
+void TNonblockingServer::listenSocket(int s) {
+ // Set socket to nonblocking mode
+ int flags;
+ if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
+ fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
+ ::close(s);
+ throw TException("TNonblockingServer::serve() O_NONBLOCK");
+ }
+
+ int one = 1;
+ struct linger ling = {0, 0};
+
+ // Keepalive to ensure full result flushing
+ setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
+
+ // Turn linger off to avoid hung sockets
+ setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
+
+ // Set TCP nodelay if available, MAC OS X Hack
+ // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+ #ifndef TCP_NOPUSH
+ setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
+ #endif
+
+ #ifdef TCP_LOW_MIN_RTO
+ if (TSocket::getUseLowMinRto()) {
+ setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
+ }
+ #endif
+
+ if (listen(s, LISTEN_BACKLOG) == -1) {
+ ::close(s);
+ throw TException("TNonblockingServer::serve() listen");
+ }
+
+ // Cool, this socket is good to go, set it as the serverSocket_
+ serverSocket_ = s;
+}
+
+void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+ threadManager_ = threadManager;
+ if (threadManager != NULL) {
+ threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
+ threadPoolProcessing_ = true;
+ } else {
+ threadPoolProcessing_ = false;
+ }
+}
+
+bool TNonblockingServer::serverOverloaded() {
+ size_t activeConnections = numTConnections_ - connectionStack_.size();
+ if (numActiveProcessors_ > maxActiveProcessors_ ||
+ activeConnections > maxConnections_) {
+ if (!overloaded_) {
+ GlobalOutput.printf("TNonblockingServer: overload condition begun.");
+ overloaded_ = true;
+ }
+ } else {
+ if (overloaded_ &&
+ (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
+ (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+ GlobalOutput.printf("TNonblockingServer: overload ended; "
+ "%u dropped (%llu total)",
+ nConnectionsDropped_, nTotalConnectionsDropped_);
+ nConnectionsDropped_ = 0;
+ overloaded_ = false;
+ }
+ }
+
+ return overloaded_;
+}
+
+bool TNonblockingServer::drainPendingTask() {
+ if (threadManager_) {
+ boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
+ if (task) {
+ TConnection* connection =
+ static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer()
+ && connection->getState() == APP_WAIT_TASK);
+ connection->forceClose();
+ return true;
+ }
+ }
+ return false;
+}
+
+void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
+ TConnection* connection =
+ static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer() &&
+ connection->getState() == APP_WAIT_TASK);
+ connection->forceClose();
+}
+
+void TNonblockingServer::stop() {
+ // Breaks the event loop in all threads so that they end ASAP.
+ for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
+ ioThreads_[i]->stop();
+ }
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+ // init listen socket
+ createAndListenOnSocket();
+
+ // set up the IO threads
+ assert(ioThreads_.empty());
+ if (!numIOThreads_) {
+ numIOThreads_ = DEFAULT_IO_THREADS;
+ }
+
+ for (uint32_t id = 0; id < numIOThreads_; ++id) {
+ // the first IO thread also does the listening on server socket
+ int listenFd = (id == 0 ? serverSocket_ : -1);
+
+ shared_ptr<TNonblockingIOThread> thread(
+ new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+ ioThreads_.push_back(thread);
+ }
+
+ // Notify handler of the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
+ // Start all of our helper IO threads. Note that the threads run forever,
+ // only terminating if stop() is called.
+ assert(ioThreads_.size() == numIOThreads_);
+ assert(ioThreads_.size() > 0);
+
+ GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
+ port_, ioThreads_.size());
+
+ // Launch all the secondary IO threads in separate threads
+ if (ioThreads_.size() > 1) {
+ ioThreadFactory_.reset(new PlatformThreadFactory(
+#ifndef USE_BOOST_THREAD
+ PlatformThreadFactory::OTHER, // scheduler
+ PlatformThreadFactory::NORMAL, // priority
+ 1, // stack size (MB)
+#endif
+ false // detached
+ ));
+
+ assert(ioThreadFactory_.get());
+
+ // intentionally starting at thread 1, not 0
+ for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
+ shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
+ ioThreads_[i]->setThread(thread);
+ thread->start();
+ }
+ }
+
+ // Run the primary (listener) IO thread loop in our main thread; this will
+ // only return when the server is shutting down.
+ ioThreads_[0]->run();
+
+ // Ensure all threads are finished before exiting serve()
+ for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
+ ioThreads_[i]->join();
+ GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
+ }
+}
+
+TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
+ int number,
+ int listenSocket,
+ bool useHighPriority)
+ : server_(server)
+ , number_(number)
+ , listenSocket_(listenSocket)
+ , useHighPriority_(useHighPriority)
+ , eventBase_(NULL) {
+ notificationPipeFDs_[0] = -1;
+ notificationPipeFDs_[1] = -1;
+}
+
+TNonblockingIOThread::~TNonblockingIOThread() {
+ // make sure our associated thread is fully finished
+ join();
+
+ if (eventBase_) {
+ event_base_free(eventBase_);
+ }
+
+ if (listenSocket_ >= 0) {
+ if (0 != ::close(listenSocket_)) {
+ GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
+ errno);
+ }
+ listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
+ }
+
+ for (int i = 0; i < 2; ++i) {
+ if (notificationPipeFDs_[i] >= 0) {
+ if (0 != ::close(notificationPipeFDs_[i])) {
+ GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
+ errno);
+ }
+ notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
+ }
+ }
+}
+
+void TNonblockingIOThread::createNotificationPipe() {
+ if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
+ throw TException("can't create notification pipe");
+ }
+ if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+ evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
+ ::close(notificationPipeFDs_[0]);
+ ::close(notificationPipeFDs_[1]);
+ throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
+ }
+ for (int i = 0; i < 2; ++i) {
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+ int flags;
+ if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
+ fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+#else
+ if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
+#endif
+ ::close(notificationPipeFDs_[0]);
+ ::close(notificationPipeFDs_[1]);
+ throw TException("TNonblockingServer::createNotificationPipe() "
+ "FD_CLOEXEC");
+ }
+ }
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingIOThread::registerEvents() {
+ if (listenSocket_ >= 0) {
+ // Register the server event
+ event_set(&serverEvent_,
+ listenSocket_,
+ EV_READ | EV_PERSIST,
+ TNonblockingIOThread::listenHandler,
+ server_);
+ event_base_set(eventBase_, &serverEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(&serverEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on server listen event");
+ }
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
+ number_);
+ }
+
+ createNotificationPipe();
+
+ // Create an event to be notified when a task finishes
+ event_set(¬ificationEvent_,
+ getNotificationRecvFD(),
+ EV_READ | EV_PERSIST,
+ TNonblockingIOThread::notifyHandler,
+ this);
+
+ // Attach to the base
+ event_base_set(eventBase_, ¬ificationEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(¬ificationEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on task-done notification event");
+ }
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
+ number_);
+}
+
+bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
+ int fd = getNotificationSendFD();
+ if (fd < 0) {
+ return false;
+ }
+
+ const int kSize = sizeof(conn);
+ if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
+ return false;
+ }
+
+ return true;
+}
+
+/* static */
+void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
+ TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
+ assert(ioThread);
+ (void)which;
+
+ while (true) {
+ TNonblockingServer::TConnection* connection = 0;
+ const int kSize = sizeof(connection);
+ int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
+ if (nBytes == kSize) {
+ if (connection == NULL) {
+ // this is the command to stop our thread, exit the handler!
+ return;
+ }
+ connection->transition();
+ } else if (nBytes > 0) {
+ // throw away these bytes and hope that next time we get a solid read
+ GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
+ nBytes, kSize);
+ ioThread->breakLoop(true);
+ return;
+ } else if (nBytes == 0) {
+ GlobalOutput.printf("notifyHandler: Notify socket closed!");
+ // exit the loop
+ break;
+ } else { // nBytes < 0
+ if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ GlobalOutput.perror(
+ "TNonblocking: notifyHandler read() failed: ", errno);
+ ioThread->breakLoop(true);
+ return;
+ }
+ // exit the loop
+ break;
+ }
+ }
+}
+
+void TNonblockingIOThread::breakLoop(bool error) {
+ if (error) {
+ GlobalOutput.printf(
+ "TNonblockingServer: IO thread #%d exiting with error.", number_);
+ // TODO: figure out something better to do here, but for now kill the
+ // whole process.
+ GlobalOutput.printf("TNonblockingServer: aborting process.");
+ ::abort();
+ }
+
+ // sets a flag so that the loop exits on the next event
+ event_base_loopbreak(eventBase_);
+
+ // event_base_loopbreak() only causes the loop to exit the next time
+ // it wakes up. We need to force it to wake up, in case there are
+ // no real events it needs to process.
+ //
+ // If we're running in the same thread, we can't use the notify(0)
+ // mechanism to stop the thread, but happily if we're running in the
+ // same thread, this means the thread can't be blocking in the event
+ // loop either.
+ if (!Thread::is_current(threadId_)) {
+ notify(NULL);
+ }
+}
+
+void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+#ifdef HAVE_SCHED_H
+ // Start out with a standard, low-priority setup for the sched params.
+ struct sched_param sp;
+ bzero((void*) &sp, sizeof(sp));
+ int policy = SCHED_OTHER;
+
+ // If desired, set up high-priority sched params structure.
+ if (value) {
+ // FIFO scheduler, ranked above default SCHED_OTHER queue
+ policy = SCHED_FIFO;
+ // The priority only compares us to other SCHED_FIFO threads, so we
+ // just pick a random priority halfway between min & max.
+ const int priority = (sched_get_priority_max(policy) +
+ sched_get_priority_min(policy)) / 2;
+
+ sp.sched_priority = priority;
+ }
+
+ // Actually set the sched params for the current thread.
+ if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
+ GlobalOutput.printf(
+ "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+ } else {
+ GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
+ }
+#endif
+}
+
+void TNonblockingIOThread::run() {
+ threadId_ = Thread::get_current();
+
+ assert(eventBase_ == 0);
+ eventBase_ = event_base_new();
+
+ // Print some libevent stats
+ if (number_ == 0) {
+ GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
+ event_get_version(),
+ event_base_get_method(eventBase_));
+ }
+
+
+ registerEvents();
+
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
+ number_);
+
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(true);
+ }
+
+ // Run libevent engine, never returns, invokes calls to eventHandler
+ event_base_loop(eventBase_, 0);
+
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(false);
+ }
+
+ // cleans up our registered events
+ cleanupEvents();
+
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
+ number_);
+}
+
+void TNonblockingIOThread::cleanupEvents() {
+ // stop the listen socket, if any
+ if (listenSocket_ >= 0) {
+ if (event_del(&serverEvent_) == -1) {
+ GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
+ }
+ }
+
+ event_del(¬ificationEvent_);
+}
+
+
+void TNonblockingIOThread::stop() {
+ // This should cause the thread to fall out of its event loop ASAP.
+ breakLoop(false);
+}
+
+void TNonblockingIOThread::join() {
+ // If this was a thread created by a factory (not the thread that called
+ // serve()), we join() it to make sure we shut down fully.
+ if (thread_) {
+ try {
+ // Note that it is safe to both join() ourselves twice, as well as join
+ // the current thread as the pthread implementation checks for deadlock.
+ thread_->join();
+ } catch(...) {
+ // swallow everything
+ }
+ }
+}
+
+}}} // apache::thrift::server