Thrift now a TLP - INFRA-3116
git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
new file mode 100644
index 0000000..45f635c
--- /dev/null
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -0,0 +1,750 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TNonblockingServer.h"
+#include <concurrency/Exception.h>
+
+#include <iostream>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using namespace std;
+
+class TConnection::Task: public Runnable {
+ public:
+ Task(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output,
+ int taskHandle) :
+ processor_(processor),
+ input_(input),
+ output_(output),
+ taskHandle_(taskHandle) {}
+
+ void run() {
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ cerr << "TNonblockingServer client died: " << ttx.what() << endl;
+ } catch (TException& x) {
+ cerr << "TNonblockingServer exception: " << x.what() << endl;
+ } catch (...) {
+ cerr << "TNonblockingServer uncaught exception." << endl;
+ }
+
+ // Signal completion back to the libevent thread via a socketpair
+ int8_t b = 0;
+ if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
+ GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
+ }
+ if (-1 == ::close(taskHandle_)) {
+ GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
+ }
+ }
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocol> input_;
+ boost::shared_ptr<TProtocol> output_;
+ int taskHandle_;
+};
+
+void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
+ socket_ = socket;
+ server_ = s;
+ appState_ = APP_INIT;
+ eventFlags_ = 0;
+
+ readBufferPos_ = 0;
+ readWant_ = 0;
+
+ writeBuffer_ = NULL;
+ writeBufferSize_ = 0;
+ writeBufferPos_ = 0;
+
+ socketState_ = SOCKET_RECV;
+ appState_ = APP_INIT;
+
+ taskHandle_ = -1;
+
+ // Set flags, which also registers the event
+ setFlags(eventFlags);
+
+ // get input/transports
+ factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
+ factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
+
+ // Create protocol
+ inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+ outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
+}
+
+void TConnection::workSocket() {
+ int flags=0, got=0, left=0, sent=0;
+ uint32_t fetch = 0;
+
+ switch (socketState_) {
+ case SOCKET_RECV:
+ // It is an error to be in this state if we already have all the data
+ assert(readBufferPos_ < readWant_);
+
+ // Double the buffer size until it is big enough
+ if (readWant_ > readBufferSize_) {
+ while (readWant_ > readBufferSize_) {
+ readBufferSize_ *= 2;
+ }
+ readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+ if (readBuffer_ == NULL) {
+ GlobalOutput("TConnection::workSocket() realloc");
+ close();
+ return;
+ }
+ }
+
+ // Read from the socket
+ fetch = readWant_ - readBufferPos_;
+ got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
+
+ if (got > 0) {
+ // Move along in the buffer
+ readBufferPos_ += got;
+
+ // Check that we did not overdo it
+ assert(readBufferPos_ <= readWant_);
+
+ // We are done reading, move onto the next state
+ if (readBufferPos_ == readWant_) {
+ transition();
+ }
+ return;
+ } else if (got == -1) {
+ // Blocking errors are okay, just move on
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return;
+ }
+
+ if (errno != ECONNRESET) {
+ GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
+ }
+ }
+
+ // Whenever we get down here it means a remote disconnect
+ close();
+
+ return;
+
+ case SOCKET_SEND:
+ // Should never have position past size
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // If there is no data to send, then let us move on
+ if (writeBufferPos_ == writeBufferSize_) {
+ GlobalOutput("WARNING: Send state with no data to send\n");
+ transition();
+ return;
+ }
+
+ flags = 0;
+ #ifdef MSG_NOSIGNAL
+ // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+ // check for the EPIPE return condition and close the socket in that case
+ flags |= MSG_NOSIGNAL;
+ #endif // ifdef MSG_NOSIGNAL
+
+ left = writeBufferSize_ - writeBufferPos_;
+ sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
+
+ if (sent <= 0) {
+ // Blocking errors are okay, just move on
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return;
+ }
+ if (errno != EPIPE) {
+ GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
+ }
+ close();
+ return;
+ }
+
+ writeBufferPos_ += sent;
+
+ // Did we overdo it?
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // We are done!
+ if (writeBufferPos_ == writeBufferSize_) {
+ transition();
+ }
+
+ return;
+
+ default:
+ GlobalOutput.printf("Shit Got Ill. Socket State %d", socketState_);
+ assert(0);
+ }
+}
+
+/**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+void TConnection::transition() {
+
+ int sz = 0;
+
+ // Switch upon the state that we are currently in and move to a new state
+ switch (appState_) {
+
+ case APP_READ_REQUEST:
+ // We are done reading the request, package the read buffer into transport
+ // and get back some data from the dispatch function
+ // If we've used these transport buffers enough times, reset them to avoid bloating
+
+ inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+ ++numReadsSinceReset_;
+ if (numWritesSinceReset_ < 512) {
+ outputTransport_->resetBuffer();
+ } else {
+ // reset the capacity of the output transport if we used it enough times that it might be bloated
+ try {
+ outputTransport_->resetBuffer(true);
+ numWritesSinceReset_ = 0;
+ } catch (TTransportException &ttx) {
+ GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
+ close();
+ return;
+ }
+ }
+
+ // Prepend four bytes of blank space to the buffer so we can
+ // write the frame size there later.
+ outputTransport_->getWritePtr(4);
+ outputTransport_->wroteBytes(4);
+
+ if (server_->isThreadPoolProcessing()) {
+ // We are setting up a Task to do this work and we will wait on it
+ int sv[2];
+ if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput.perror("TConnection::socketpair() failed ", errno);
+ // Now we will fall through to the APP_WAIT_TASK block with no response
+ } else {
+ // Create task and dispatch to the thread manager
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+ inputProtocol_,
+ outputProtocol_,
+ sv[1]));
+ // The application is now waiting on the task to finish
+ appState_ = APP_WAIT_TASK;
+
+ // Create an event to be notified when the task finishes
+ event_set(&taskEvent_,
+ taskHandle_ = sv[0],
+ EV_READ,
+ TConnection::taskHandler,
+ this);
+
+ // Attach to the base
+ event_base_set(server_->getEventBase(), &taskEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(&taskEvent_, 0)) {
+ GlobalOutput("TNonblockingServer::serve(): coult not event_add");
+ return;
+ }
+ try {
+ server_->addTask(task);
+ } catch (IllegalStateException & ise) {
+ // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+ GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+ close();
+ }
+
+ // Set this connection idle so that libevent doesn't process more
+ // data on it while we're still waiting for the threadmanager to
+ // finish this task
+ setIdle();
+ return;
+ }
+ } else {
+ try {
+ // Invoke the processor
+ server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+ } catch (TTransportException &ttx) {
+ GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
+ close();
+ return;
+ } catch (TException &x) {
+ GlobalOutput.printf("TException: Server::process() %s", x.what());
+ close();
+ return;
+ } catch (...) {
+ GlobalOutput.printf("Server::process() unknown exception");
+ close();
+ return;
+ }
+ }
+
+ // Intentionally fall through here, the call to process has written into
+ // the writeBuffer_
+
+ case APP_WAIT_TASK:
+ // We have now finished processing a task and the result has been written
+ // into the outputTransport_, so we grab its contents and place them into
+ // the writeBuffer_ for actual writing by the libevent thread
+
+ // Get the result of the operation
+ outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+
+ // If the function call generated return data, then move into the send
+ // state and get going
+ // 4 bytes were reserved for frame size
+ if (writeBufferSize_ > 4) {
+
+ // Move into write state
+ writeBufferPos_ = 0;
+ socketState_ = SOCKET_SEND;
+
+ // Put the frame size into the write buffer
+ int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
+ memcpy(writeBuffer_, &frameSize, 4);
+
+ // Socket into write mode
+ appState_ = APP_SEND_RESULT;
+ setWrite();
+
+ // Try to work the socket immediately
+ // workSocket();
+
+ return;
+ }
+
+ // In this case, the request was oneway and we should fall through
+ // right back into the read frame header state
+ goto LABEL_APP_INIT;
+
+ case APP_SEND_RESULT:
+
+ ++numWritesSinceReset_;
+
+ // N.B.: We also intentionally fall through here into the INIT state!
+
+ LABEL_APP_INIT:
+ case APP_INIT:
+
+ // reset the input buffer if we used it enough times that it might be bloated
+ if (numReadsSinceReset_ > 512)
+ {
+ void * new_buffer = std::realloc(readBuffer_, 1024);
+ if (new_buffer == NULL) {
+ GlobalOutput("TConnection::transition() realloc");
+ close();
+ return;
+ }
+ readBuffer_ = (uint8_t*) new_buffer;
+ readBufferSize_ = 1024;
+ numReadsSinceReset_ = 0;
+ }
+
+ // Clear write buffer variables
+ writeBuffer_ = NULL;
+ writeBufferPos_ = 0;
+ writeBufferSize_ = 0;
+
+ // Set up read buffer for getting 4 bytes
+ readBufferPos_ = 0;
+ readWant_ = 4;
+
+ // Into read4 state we go
+ socketState_ = SOCKET_RECV;
+ appState_ = APP_READ_FRAME_SIZE;
+
+ // Register read event
+ setRead();
+
+ // Try to work the socket right away
+ // workSocket();
+
+ return;
+
+ case APP_READ_FRAME_SIZE:
+ // We just read the request length, deserialize it
+ sz = *(int32_t*)readBuffer_;
+ sz = (int32_t)ntohl(sz);
+
+ if (sz <= 0) {
+ GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
+ close();
+ return;
+ }
+
+ // Reset the read buffer
+ readWant_ = (uint32_t)sz;
+ readBufferPos_= 0;
+
+ // Move into read request state
+ appState_ = APP_READ_REQUEST;
+
+ // Work the socket right away
+ // workSocket();
+
+ return;
+
+ default:
+ GlobalOutput.printf("Totally Fucked. Application State %d", appState_);
+ assert(0);
+ }
+}
+
+void TConnection::setFlags(short eventFlags) {
+ // Catch the do nothing case
+ if (eventFlags_ == eventFlags) {
+ return;
+ }
+
+ // Delete a previously existing event
+ if (eventFlags_ != 0) {
+ if (event_del(&event_) == -1) {
+ GlobalOutput("TConnection::setFlags event_del");
+ return;
+ }
+ }
+
+ // Update in memory structure
+ eventFlags_ = eventFlags;
+
+ // Do not call event_set if there are no flags
+ if (!eventFlags_) {
+ return;
+ }
+
+ /**
+ * event_set:
+ *
+ * Prepares the event structure &event to be used in future calls to
+ * event_add() and event_del(). The event will be prepared to call the
+ * eventHandler using the 'sock' file descriptor to monitor events.
+ *
+ * The events can be either EV_READ, EV_WRITE, or both, indicating
+ * that an application can read or write from the file respectively without
+ * blocking.
+ *
+ * The eventHandler will be called with the file descriptor that triggered
+ * the event and the type of event which will be one of: EV_TIMEOUT,
+ * EV_SIGNAL, EV_READ, EV_WRITE.
+ *
+ * The additional flag EV_PERSIST makes an event_add() persistent until
+ * event_del() has been called.
+ *
+ * Once initialized, the &event struct can be used repeatedly with
+ * event_add() and event_del() and does not need to be reinitialized unless
+ * the eventHandler and/or the argument to it are to be changed. However,
+ * when an ev structure has been added to libevent using event_add() the
+ * structure must persist until the event occurs (assuming EV_PERSIST
+ * is not set) or is removed using event_del(). You may not reuse the same
+ * ev structure for multiple monitored descriptors; each descriptor needs
+ * its own ev.
+ */
+ event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
+ event_base_set(server_->getEventBase(), &event_);
+
+ // Add the event
+ if (event_add(&event_, 0) == -1) {
+ GlobalOutput("TConnection::setFlags(): could not event_add");
+ }
+}
+
+/**
+ * Closes a connection
+ */
+void TConnection::close() {
+ // Delete the registered libevent
+ if (event_del(&event_) == -1) {
+ GlobalOutput("TConnection::close() event_del");
+ }
+
+ // Close the socket
+ if (socket_ > 0) {
+ ::close(socket_);
+ }
+ socket_ = 0;
+
+ // close any factory produced transports
+ factoryInputTransport_->close();
+ factoryOutputTransport_->close();
+
+ // Give this object back to the server that owns it
+ server_->returnConnection(this);
+}
+
+void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+ if (readBufferSize_ > limit) {
+ readBufferSize_ = limit;
+ readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+ if (readBuffer_ == NULL) {
+ GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
+ close();
+ }
+ }
+}
+
+/**
+ * Creates a new connection either by reusing an object off the stack or
+ * by allocating a new one entirely
+ */
+TConnection* TNonblockingServer::createConnection(int socket, short flags) {
+ // Check the stack
+ if (connectionStack_.empty()) {
+ return new TConnection(socket, flags, this);
+ } else {
+ TConnection* result = connectionStack_.top();
+ connectionStack_.pop();
+ result->init(socket, flags, this);
+ return result;
+ }
+}
+
+/**
+ * Returns a connection to the stack
+ */
+void TNonblockingServer::returnConnection(TConnection* connection) {
+ if (connectionStackLimit_ &&
+ (connectionStack_.size() >= connectionStackLimit_)) {
+ delete connection;
+ } else {
+ connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+ connectionStack_.push(connection);
+ }
+}
+
+/**
+ * Server socket had something happen. We accept all waiting client
+ * connections on fd and assign TConnection objects to handle those requests.
+ */
+void TNonblockingServer::handleEvent(int fd, short which) {
+ // Make sure that libevent didn't fuck up the socket handles
+ assert(fd == serverSocket_);
+
+ // Server socket accepted a new connection
+ socklen_t addrLen;
+ struct sockaddr addr;
+ addrLen = sizeof(addr);
+
+ // Going to accept a new client socket
+ int clientSocket;
+
+ // Accept as many new clients as possible, even though libevent signaled only
+ // one, this helps us to avoid having to go back into the libevent engine so
+ // many times
+ while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
+
+ // Explicitly set this socket to NONBLOCK mode
+ int flags;
+ if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
+ fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
+ GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
+ close(clientSocket);
+ return;
+ }
+
+ // Create a new TConnection for this client socket.
+ TConnection* clientConnection =
+ createConnection(clientSocket, EV_READ | EV_PERSIST);
+
+ // Fail fast if we could not create a TConnection object
+ if (clientConnection == NULL) {
+ GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
+ close(clientSocket);
+ return;
+ }
+
+ // Put this client connection into the proper state
+ clientConnection->transition();
+ }
+
+ // Done looping accept, now we have to make sure the error is due to
+ // blocking. Any other error is a problem
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
+ }
+}
+
+/**
+ * Creates a socket to listen on and binds it to the local port.
+ */
+void TNonblockingServer::listenSocket() {
+ int s;
+ struct addrinfo hints, *res, *res0;
+ int error;
+
+ char port[sizeof("65536") + 1];
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+ sprintf(port, "%d", port_);
+
+ // Wildcard address
+ error = getaddrinfo(NULL, port, &hints, &res0);
+ if (error) {
+ string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
+ GlobalOutput(errStr.c_str());
+ return;
+ }
+
+ // Pick the ipv6 address first since ipv4 addresses can be mapped
+ // into ipv6 space.
+ for (res = res0; res; res = res->ai_next) {
+ if (res->ai_family == AF_INET6 || res->ai_next == NULL)
+ break;
+ }
+
+ // Create the server socket
+ s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (s == -1) {
+ freeaddrinfo(res0);
+ throw TException("TNonblockingServer::serve() socket() -1");
+ }
+
+ #ifdef IPV6_V6ONLY
+ int zero = 0;
+ if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
+ GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+ }
+ #endif // #ifdef IPV6_V6ONLY
+
+
+ int one = 1;
+
+ // Set reuseaddr to avoid 2MSL delay on server restart
+ setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+
+ if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
+ close(s);
+ freeaddrinfo(res0);
+ throw TException("TNonblockingServer::serve() bind");
+ }
+
+ // Done with the addr info
+ freeaddrinfo(res0);
+
+ // Set up this file descriptor for listening
+ listenSocket(s);
+}
+
+/**
+ * Takes a socket created by listenSocket() and sets various options on it
+ * to prepare for use in the server.
+ */
+void TNonblockingServer::listenSocket(int s) {
+ // Set socket to nonblocking mode
+ int flags;
+ if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
+ fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
+ close(s);
+ throw TException("TNonblockingServer::serve() O_NONBLOCK");
+ }
+
+ int one = 1;
+ struct linger ling = {0, 0};
+
+ // Keepalive to ensure full result flushing
+ setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
+
+ // Turn linger off to avoid hung sockets
+ setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+
+ // Set TCP nodelay if available, MAC OS X Hack
+ // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+ #ifndef TCP_NOPUSH
+ setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
+ #endif
+
+ if (listen(s, LISTEN_BACKLOG) == -1) {
+ close(s);
+ throw TException("TNonblockingServer::serve() listen");
+ }
+
+ // Cool, this socket is good to go, set it as the serverSocket_
+ serverSocket_ = s;
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingServer::registerEvents(event_base* base) {
+ assert(serverSocket_ != -1);
+ assert(!eventBase_);
+ eventBase_ = base;
+
+ // Print some libevent stats
+ GlobalOutput.printf("libevent %s method %s",
+ event_get_version(),
+ event_get_method());
+
+ // Register the server event
+ event_set(&serverEvent_,
+ serverSocket_,
+ EV_READ | EV_PERSIST,
+ TNonblockingServer::eventHandler,
+ this);
+ event_base_set(eventBase_, &serverEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(&serverEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): coult not event_add");
+ }
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+ // Init socket
+ listenSocket();
+
+ // Initialize libevent core
+ registerEvents(static_cast<event_base*>(event_init()));
+
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
+ // Run libevent engine, never returns, invokes calls to eventHandler
+ event_base_loop(eventBase_, 0);
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
new file mode 100644
index 0000000..1684b64
--- /dev/null
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
+#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
+
+#include <Thrift.h>
+#include <server/TServer.h>
+#include <transport/TBufferTransports.h>
+#include <concurrency/ThreadManager.h>
+#include <stack>
+#include <string>
+#include <errno.h>
+#include <cstdlib>
+#include <event.h>
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::transport::TMemoryBuffer;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::concurrency::Runnable;
+using apache::thrift::concurrency::ThreadManager;
+
+// Forward declaration of class
+class TConnection;
+
+/**
+ * This is a non-blocking server in C++ for high performance that operates a
+ * single IO thread. It assumes that all incoming requests are framed with a
+ * 4 byte length indicator and writes out responses using the same framing.
+ *
+ * It does not use the TServerTransport framework, but rather has socket
+ * operations hardcoded for use with select.
+ *
+ */
+class TNonblockingServer : public TServer {
+ private:
+
+ // Listen backlog
+ static const int LISTEN_BACKLOG = 1024;
+
+ // Default limit on size of idle connection pool
+ static const size_t CONNECTION_STACK_LIMIT = 1024;
+
+ // Maximum size of buffer allocated to idle connection
+ static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
+
+ // Server socket file descriptor
+ int serverSocket_;
+
+ // Port server runs on
+ int port_;
+
+ // For processing via thread pool, may be NULL
+ boost::shared_ptr<ThreadManager> threadManager_;
+
+ // Is thread pool processing?
+ bool threadPoolProcessing_;
+
+ // The event base for libevent
+ event_base* eventBase_;
+
+ // Event struct, for use with eventBase_
+ struct event serverEvent_;
+
+ // Number of TConnection object we've created
+ size_t numTConnections_;
+
+ // Limit for how many TConnection objects to cache
+ size_t connectionStackLimit_;
+
+ /**
+ * Max read buffer size for an idle connection. When we place an idle
+ * TConnection into connectionStack_, we insure that its read buffer is
+ * reduced to this size to insure that idle connections don't hog memory.
+ */
+ uint32_t idleBufferMemLimit_;
+
+ /**
+ * This is a stack of all the objects that have been created but that
+ * are NOT currently in use. When we close a connection, we place it on this
+ * stack so that the object can be reused later, rather than freeing the
+ * memory and reallocating a new object later.
+ */
+ std::stack<TConnection*> connectionStack_;
+
+ void handleEvent(int fd, short which);
+
+ public:
+ TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+ int port) :
+ TServer(processor),
+ serverSocket_(-1),
+ port_(port),
+ threadPoolProcessing_(false),
+ eventBase_(NULL),
+ numTConnections_(0),
+ connectionStackLimit_(CONNECTION_STACK_LIMIT),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
+
+ TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
+ TServer(processor),
+ serverSocket_(-1),
+ port_(port),
+ threadManager_(threadManager),
+ eventBase_(NULL),
+ numTConnections_(0),
+ connectionStackLimit_(CONNECTION_STACK_LIMIT),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+ setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(protocolFactory);
+ setOutputProtocolFactory(protocolFactory);
+ setThreadManager(threadManager);
+ }
+
+ TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TTransportFactory> inputTransportFactory,
+ boost::shared_ptr<TTransportFactory> outputTransportFactory,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
+ TServer(processor),
+ serverSocket_(0),
+ port_(port),
+ threadManager_(threadManager),
+ eventBase_(NULL),
+ numTConnections_(0),
+ connectionStackLimit_(CONNECTION_STACK_LIMIT),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+ setInputTransportFactory(inputTransportFactory);
+ setOutputTransportFactory(outputTransportFactory);
+ setInputProtocolFactory(inputProtocolFactory);
+ setOutputProtocolFactory(outputProtocolFactory);
+ setThreadManager(threadManager);
+ }
+
+ ~TNonblockingServer() {}
+
+ void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+ threadManager_ = threadManager;
+ threadPoolProcessing_ = (threadManager != NULL);
+ }
+
+ boost::shared_ptr<ThreadManager> getThreadManager() {
+ return threadManager_;
+ }
+
+ /**
+ * Get the maximum number of unused TConnection we will hold in reserve.
+ *
+ * @return the current limit on TConnection pool size.
+ */
+ size_t getConnectionStackLimit() const {
+ return connectionStackLimit_;
+ }
+
+ /**
+ * Set the maximum number of unused TConnection we will hold in reserve.
+ *
+ * @param sz the new limit for TConnection pool size.
+ */
+ void setConnectionStackLimit(size_t sz) {
+ connectionStackLimit_ = sz;
+ }
+
+ bool isThreadPoolProcessing() const {
+ return threadPoolProcessing_;
+ }
+
+ void addTask(boost::shared_ptr<Runnable> task) {
+ threadManager_->add(task);
+ }
+
+ event_base* getEventBase() const {
+ return eventBase_;
+ }
+
+ void incrementNumConnections() {
+ ++numTConnections_;
+ }
+
+ void decrementNumConnections() {
+ --numTConnections_;
+ }
+
+ size_t getNumConnections() {
+ return numTConnections_;
+ }
+
+ size_t getNumIdleConnections() {
+ return connectionStack_.size();
+ }
+
+ /**
+ * Get the maximum limit of memory allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will shrink buffers when idle.
+ */
+ size_t getIdleBufferMemLimit() const {
+ return idleBufferMemLimit_;
+ }
+
+ /**
+ * Set the maximum limit of memory allocated to idle TConnection objects.
+ * If a TConnection object goes idle with more than this much memory
+ * allocated to its buffer, we shrink it to this value.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when idle.
+ */
+ void setIdleBufferMemLimit(size_t limit) {
+ idleBufferMemLimit_ = limit;
+ }
+
+ TConnection* createConnection(int socket, short flags);
+
+ void returnConnection(TConnection* connection);
+
+ static void eventHandler(int fd, short which, void* v) {
+ ((TNonblockingServer*)v)->handleEvent(fd, which);
+ }
+
+ void listenSocket();
+
+ void listenSocket(int fd);
+
+ void registerEvents(event_base* base);
+
+ void serve();
+};
+
+/**
+ * Two states for sockets, recv and send mode
+ */
+enum TSocketState {
+ SOCKET_RECV,
+ SOCKET_SEND
+};
+
+/**
+ * Four states for the nonblocking servr:
+ * 1) initialize
+ * 2) read 4 byte frame size
+ * 3) read frame of data
+ * 4) send back data (if any)
+ */
+enum TAppState {
+ APP_INIT,
+ APP_READ_FRAME_SIZE,
+ APP_READ_REQUEST,
+ APP_WAIT_TASK,
+ APP_SEND_RESULT
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TConnection {
+ private:
+
+ class Task;
+
+ // Server handle
+ TNonblockingServer* server_;
+
+ // Socket handle
+ int socket_;
+
+ // Libevent object
+ struct event event_;
+
+ // Libevent flags
+ short eventFlags_;
+
+ // Socket mode
+ TSocketState socketState_;
+
+ // Application state
+ TAppState appState_;
+
+ // How much data needed to read
+ uint32_t readWant_;
+
+ // Where in the read buffer are we
+ uint32_t readBufferPos_;
+
+ // Read buffer
+ uint8_t* readBuffer_;
+
+ // Read buffer size
+ uint32_t readBufferSize_;
+
+ // Write buffer
+ uint8_t* writeBuffer_;
+
+ // Write buffer size
+ uint32_t writeBufferSize_;
+
+ // How far through writing are we?
+ uint32_t writeBufferPos_;
+
+ // How many times have we read since our last buffer reset?
+ uint32_t numReadsSinceReset_;
+
+ // How many times have we written since our last buffer reset?
+ uint32_t numWritesSinceReset_;
+
+ // Task handle
+ int taskHandle_;
+
+ // Task event
+ struct event taskEvent_;
+
+ // Transport to read from
+ boost::shared_ptr<TMemoryBuffer> inputTransport_;
+
+ // Transport that processor writes to
+ boost::shared_ptr<TMemoryBuffer> outputTransport_;
+
+ // extra transport generated by transport factory (e.g. BufferedRouterTransport)
+ boost::shared_ptr<TTransport> factoryInputTransport_;
+ boost::shared_ptr<TTransport> factoryOutputTransport_;
+
+ // Protocol decoder
+ boost::shared_ptr<TProtocol> inputProtocol_;
+
+ // Protocol encoder
+ boost::shared_ptr<TProtocol> outputProtocol_;
+
+ // Go into read mode
+ void setRead() {
+ setFlags(EV_READ | EV_PERSIST);
+ }
+
+ // Go into write mode
+ void setWrite() {
+ setFlags(EV_WRITE | EV_PERSIST);
+ }
+
+ // Set socket idle
+ void setIdle() {
+ setFlags(0);
+ }
+
+ // Set event flags
+ void setFlags(short eventFlags);
+
+ // Libevent handlers
+ void workSocket();
+
+ // Close this client and reset
+ void close();
+
+ public:
+
+ // Constructor
+ TConnection(int socket, short eventFlags, TNonblockingServer *s) {
+ readBuffer_ = (uint8_t*)std::malloc(1024);
+ if (readBuffer_ == NULL) {
+ throw new apache::thrift::TException("Out of memory.");
+ }
+ readBufferSize_ = 1024;
+
+ numReadsSinceReset_ = 0;
+ numWritesSinceReset_ = 0;
+
+ // Allocate input and output tranpsorts
+ // these only need to be allocated once per TConnection (they don't need to be
+ // reallocated on init() call)
+ inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
+ outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+
+ init(socket, eventFlags, s);
+ server_->incrementNumConnections();
+ }
+
+ ~TConnection() {
+ server_->decrementNumConnections();
+ }
+
+ /**
+ * Check read buffer against a given limit and shrink it if exceeded.
+ *
+ * @param limit we limit buffer size to.
+ */
+ void checkIdleBufferMemLimit(uint32_t limit);
+
+ // Initialize
+ void init(int socket, short eventFlags, TNonblockingServer *s);
+
+ // Transition into a new state
+ void transition();
+
+ // Handler wrapper
+ static void eventHandler(int fd, short /* which */, void* v) {
+ assert(fd == ((TConnection*)v)->socket_);
+ ((TConnection*)v)->workSocket();
+ }
+
+ // Handler wrapper for task block
+ static void taskHandler(int fd, short /* which */, void* v) {
+ assert(fd == ((TConnection*)v)->taskHandle_);
+ if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
+ GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
+ }
+ ((TConnection*)v)->transition();
+ }
+
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
diff --git a/lib/cpp/src/server/TServer.cpp b/lib/cpp/src/server/TServer.cpp
new file mode 100644
index 0000000..6b692ab
--- /dev/null
+++ b/lib/cpp/src/server/TServer.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <unistd.h>
+
+namespace apache { namespace thrift { namespace server {
+
+int increase_max_fds(int max_fds=(1<<24)) {
+ struct rlimit fdmaxrl;
+
+ for(fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds;
+ max_fds && (setrlimit(RLIMIT_NOFILE, &fdmaxrl) < 0);
+ fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds) {
+ max_fds /= 2;
+ }
+
+ return fdmaxrl.rlim_cur;
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
new file mode 100644
index 0000000..5c4c588
--- /dev/null
+++ b/lib/cpp/src/server/TServer.h
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TSERVER_H_
+#define _THRIFT_SERVER_TSERVER_H_ 1
+
+#include <TProcessor.h>
+#include <transport/TServerTransport.h>
+#include <protocol/TBinaryProtocol.h>
+#include <concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::TProcessor;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportFactory;
+
+/**
+ * Virtual interface class that can handle events from the server core. To
+ * use this you should subclass it and implement the methods that you care
+ * about. Your subclass can also store local data that you may care about,
+ * such as additional "arguments" to these methods (stored in the object
+ * instance's state).
+ */
+class TServerEventHandler {
+ public:
+
+ virtual ~TServerEventHandler() {}
+
+ /**
+ * Called before the server begins.
+ */
+ virtual void preServe() {}
+
+ /**
+ * Called when a new client has connected and is about to being processing.
+ */
+ virtual void clientBegin(boost::shared_ptr<TProtocol> /* input */,
+ boost::shared_ptr<TProtocol> /* output */) {}
+
+ /**
+ * Called when a client has finished making requests.
+ */
+ virtual void clientEnd(boost::shared_ptr<TProtocol> /* input */,
+ boost::shared_ptr<TProtocol> /* output */) {}
+
+ protected:
+
+ /**
+ * Prevent direct instantiation.
+ */
+ TServerEventHandler() {}
+
+};
+
+/**
+ * Thrift server.
+ *
+ */
+class TServer : public concurrency::Runnable {
+ public:
+
+ virtual ~TServer() {}
+
+ virtual void serve() = 0;
+
+ virtual void stop() {}
+
+ // Allows running the server as a Runnable thread
+ virtual void run() {
+ serve();
+ }
+
+ boost::shared_ptr<TProcessor> getProcessor() {
+ return processor_;
+ }
+
+ boost::shared_ptr<TServerTransport> getServerTransport() {
+ return serverTransport_;
+ }
+
+ boost::shared_ptr<TTransportFactory> getInputTransportFactory() {
+ return inputTransportFactory_;
+ }
+
+ boost::shared_ptr<TTransportFactory> getOutputTransportFactory() {
+ return outputTransportFactory_;
+ }
+
+ boost::shared_ptr<TProtocolFactory> getInputProtocolFactory() {
+ return inputProtocolFactory_;
+ }
+
+ boost::shared_ptr<TProtocolFactory> getOutputProtocolFactory() {
+ return outputProtocolFactory_;
+ }
+
+ boost::shared_ptr<TServerEventHandler> getEventHandler() {
+ return eventHandler_;
+ }
+
+protected:
+ TServer(boost::shared_ptr<TProcessor> processor):
+ processor_(processor) {
+ setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ }
+
+ TServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport):
+ processor_(processor),
+ serverTransport_(serverTransport) {
+ setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ }
+
+ TServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory):
+ processor_(processor),
+ serverTransport_(serverTransport),
+ inputTransportFactory_(transportFactory),
+ outputTransportFactory_(transportFactory),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory) {}
+
+ TServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> inputTransportFactory,
+ boost::shared_ptr<TTransportFactory> outputTransportFactory,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
+ processor_(processor),
+ serverTransport_(serverTransport),
+ inputTransportFactory_(inputTransportFactory),
+ outputTransportFactory_(outputTransportFactory),
+ inputProtocolFactory_(inputProtocolFactory),
+ outputProtocolFactory_(outputProtocolFactory) {}
+
+
+ // Class variables
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TServerTransport> serverTransport_;
+
+ boost::shared_ptr<TTransportFactory> inputTransportFactory_;
+ boost::shared_ptr<TTransportFactory> outputTransportFactory_;
+
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
+
+ boost::shared_ptr<TServerEventHandler> eventHandler_;
+
+public:
+ void setInputTransportFactory(boost::shared_ptr<TTransportFactory> inputTransportFactory) {
+ inputTransportFactory_ = inputTransportFactory;
+ }
+
+ void setOutputTransportFactory(boost::shared_ptr<TTransportFactory> outputTransportFactory) {
+ outputTransportFactory_ = outputTransportFactory;
+ }
+
+ void setInputProtocolFactory(boost::shared_ptr<TProtocolFactory> inputProtocolFactory) {
+ inputProtocolFactory_ = inputProtocolFactory;
+ }
+
+ void setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory> outputProtocolFactory) {
+ outputProtocolFactory_ = outputProtocolFactory;
+ }
+
+ void setServerEventHandler(boost::shared_ptr<TServerEventHandler> eventHandler) {
+ eventHandler_ = eventHandler;
+ }
+
+};
+
+/**
+ * Helper function to increase the max file descriptors limit
+ * for the current process and all of its children.
+ * By default, tries to increase it to as much as 2^24.
+ */
+ int increase_max_fds(int max_fds=(1<<24));
+
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSERVER_H_
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
new file mode 100644
index 0000000..394ce21
--- /dev/null
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "server/TSimpleServer.h"
+#include "transport/TTransportException.h"
+#include <string>
+#include <iostream>
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace std;
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using boost::shared_ptr;
+
+/**
+ * A simple single-threaded application server. Perfect for unit tests!
+ *
+ */
+void TSimpleServer::serve() {
+
+ shared_ptr<TTransport> client;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
+
+ try {
+ // Start the server listening
+ serverTransport_->listen();
+ } catch (TTransportException& ttx) {
+ cerr << "TSimpleServer::run() listen(): " << ttx.what() << endl;
+ return;
+ }
+
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
+ // Fetch client from server
+ while (!stop_) {
+ try {
+ client = serverTransport_->accept();
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+ if (eventHandler_ != NULL) {
+ eventHandler_->clientBegin(inputProtocol, outputProtocol);
+ }
+ try {
+ while (processor_->process(inputProtocol, outputProtocol)) {
+ // Peek ahead, is the remote side closed?
+ if (!inputTransport->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ cerr << "TSimpleServer client died: " << ttx.what() << endl;
+ } catch (TException& tx) {
+ cerr << "TSimpleServer exception: " << tx.what() << endl;
+ }
+ if (eventHandler_ != NULL) {
+ eventHandler_->clientEnd(inputProtocol, outputProtocol);
+ }
+ inputTransport->close();
+ outputTransport->close();
+ client->close();
+ } catch (TTransportException& ttx) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ cerr << "TServerTransport died on accept: " << ttx.what() << endl;
+ continue;
+ } catch (TException& tx) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ cerr << "Some kind of accept exception: " << tx.what() << endl;
+ continue;
+ } catch (string s) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ cerr << "TThreadPoolServer: Unknown exception: " << s << endl;
+ break;
+ }
+ }
+
+ if (stop_) {
+ try {
+ serverTransport_->close();
+ } catch (TTransportException &ttx) {
+ cerr << "TServerTransport failed on close: " << ttx.what() << endl;
+ }
+ stop_ = false;
+ }
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
new file mode 100644
index 0000000..c4fc91c
--- /dev/null
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
+#define _THRIFT_SERVER_TSIMPLESERVER_H_ 1
+
+#include "server/TServer.h"
+#include "transport/TServerTransport.h"
+
+namespace apache { namespace thrift { namespace server {
+
+/**
+ * This is the most basic simple server. It is single-threaded and runs a
+ * continuous loop of accepting a single connection, processing requests on
+ * that connection until it closes, and then repeating. It is a good example
+ * of how to extend the TServer interface.
+ *
+ */
+class TSimpleServer : public TServer {
+ public:
+ TSimpleServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory) :
+ TServer(processor, serverTransport, transportFactory, protocolFactory),
+ stop_(false) {}
+
+ TSimpleServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> inputTransportFactory,
+ boost::shared_ptr<TTransportFactory> outputTransportFactory,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
+ TServer(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ stop_(false) {}
+
+ ~TSimpleServer() {}
+
+ void serve();
+
+ void stop() {
+ stop_ = true;
+ }
+
+ protected:
+ bool stop_;
+
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
new file mode 100644
index 0000000..0894cfa
--- /dev/null
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "server/TThreadPoolServer.h"
+#include "transport/TTransportException.h"
+#include "concurrency/Thread.h"
+#include "concurrency/ThreadManager.h"
+#include <string>
+#include <iostream>
+
+namespace apache { namespace thrift { namespace server {
+
+using boost::shared_ptr;
+using namespace std;
+using namespace apache::thrift;
+using namespace apache::thrift::concurrency;
+using namespace apache::thrift::protocol;;
+using namespace apache::thrift::transport;
+
+class TThreadPoolServer::Task : public Runnable {
+
+public:
+
+ Task(TThreadPoolServer &server,
+ shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocol> input,
+ shared_ptr<TProtocol> output) :
+ server_(server),
+ processor_(processor),
+ input_(input),
+ output_(output) {
+ }
+
+ ~Task() {}
+
+ void run() {
+ boost::shared_ptr<TServerEventHandler> eventHandler =
+ server_.getEventHandler();
+ if (eventHandler != NULL) {
+ eventHandler->clientBegin(input_, output_);
+ }
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ // This is reasonably expected, client didn't send a full request so just
+ // ignore him
+ // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
+ // GlobalOutput(errStr.c_str());
+ } catch (TException& x) {
+ string errStr = string("TThreadPoolServer exception: ") + x.what();
+ GlobalOutput(errStr.c_str());
+ } catch (std::exception &x) {
+ string errStr = string("TThreadPoolServer, std::exception: ") + x.what();
+ GlobalOutput(errStr.c_str());
+ }
+
+ if (eventHandler != NULL) {
+ eventHandler->clientEnd(input_, output_);
+ }
+
+ try {
+ input_->getTransport()->close();
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ try {
+ output_->getTransport()->close();
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+
+ }
+
+ private:
+ TServer& server_;
+ shared_ptr<TProcessor> processor_;
+ shared_ptr<TProtocol> input_;
+ shared_ptr<TProtocol> output_;
+
+};
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> transportFactory,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<ThreadManager> threadManager) :
+ TServer(processor, serverTransport, transportFactory, protocolFactory),
+ threadManager_(threadManager),
+ stop_(false), timeout_(0) {}
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<ThreadManager> threadManager) :
+ TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ threadManager_(threadManager),
+ stop_(false), timeout_(0) {}
+
+
+TThreadPoolServer::~TThreadPoolServer() {}
+
+void TThreadPoolServer::serve() {
+ shared_ptr<TTransport> client;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
+
+ try {
+ // Start the server listening
+ serverTransport_->listen();
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadPoolServer::run() listen(): ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ return;
+ }
+
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
+ while (!stop_) {
+ try {
+ client.reset();
+ inputTransport.reset();
+ outputTransport.reset();
+ inputProtocol.reset();
+ outputProtocol.reset();
+
+ // Fetch client from server
+ client = serverTransport_->accept();
+
+ // Make IO transports
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+
+ // Add to threadmanager pool
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
+
+ } catch (TTransportException& ttx) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+ string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ continue;
+ } catch (TException& tx) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
+ GlobalOutput(errStr.c_str());
+ continue;
+ } catch (string s) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ string errStr = "TThreadPoolServer: Unknown exception: " + s;
+ GlobalOutput(errStr.c_str());
+ break;
+ }
+ }
+
+ // If stopped manually, join the existing threads
+ if (stop_) {
+ try {
+ serverTransport_->close();
+ threadManager_->join();
+ } catch (TException &tx) {
+ string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ stop_ = false;
+ }
+
+}
+
+int64_t TThreadPoolServer::getTimeout() const {
+ return timeout_;
+}
+
+void TThreadPoolServer::setTimeout(int64_t value) {
+ timeout_ = value;
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
new file mode 100644
index 0000000..7b7e906
--- /dev/null
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
+#define _THRIFT_SERVER_TTHREADPOOLSERVER_H_ 1
+
+#include <concurrency/ThreadManager.h>
+#include <server/TServer.h>
+#include <transport/TServerTransport.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::concurrency::ThreadManager;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransportFactory;
+
+class TThreadPoolServer : public TServer {
+ public:
+ class Task;
+
+ TThreadPoolServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<ThreadManager> threadManager);
+
+ TThreadPoolServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> inputTransportFactory,
+ boost::shared_ptr<TTransportFactory> outputTransportFactory,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+ boost::shared_ptr<ThreadManager> threadManager);
+
+ virtual ~TThreadPoolServer();
+
+ virtual void serve();
+
+ virtual int64_t getTimeout() const;
+
+ virtual void setTimeout(int64_t value);
+
+ virtual void stop() {
+ stop_ = true;
+ serverTransport_->interrupt();
+ }
+
+ protected:
+
+ boost::shared_ptr<ThreadManager> threadManager_;
+
+ volatile bool stop_;
+
+ volatile int64_t timeout_;
+
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
new file mode 100644
index 0000000..cc30f8f
--- /dev/null
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "server/TThreadedServer.h"
+#include "transport/TTransportException.h"
+#include "concurrency/PosixThreadFactory.h"
+
+#include <string>
+#include <iostream>
+#include <pthread.h>
+#include <unistd.h>
+
+namespace apache { namespace thrift { namespace server {
+
+using boost::shared_ptr;
+using namespace std;
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+
+class TThreadedServer::Task: public Runnable {
+
+public:
+
+ Task(TThreadedServer& server,
+ shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocol> input,
+ shared_ptr<TProtocol> output) :
+ server_(server),
+ processor_(processor),
+ input_(input),
+ output_(output) {
+ }
+
+ ~Task() {}
+
+ void run() {
+ boost::shared_ptr<TServerEventHandler> eventHandler =
+ server_.getEventHandler();
+ if (eventHandler != NULL) {
+ eventHandler->clientBegin(input_, output_);
+ }
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadedServer client died: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ } catch (TException& x) {
+ string errStr = string("TThreadedServer exception: ") + x.what();
+ GlobalOutput(errStr.c_str());
+ } catch (...) {
+ GlobalOutput("TThreadedServer uncaught exception.");
+ }
+ if (eventHandler != NULL) {
+ eventHandler->clientEnd(input_, output_);
+ }
+
+ try {
+ input_->getTransport()->close();
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadedServer input close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ try {
+ output_->getTransport()->close();
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadedServer output close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+
+ // Remove this task from parent bookkeeping
+ {
+ Synchronized s(server_.tasksMonitor_);
+ server_.tasks_.erase(this);
+ if (server_.tasks_.empty()) {
+ server_.tasksMonitor_.notify();
+ }
+ }
+
+ }
+
+ private:
+ TThreadedServer& server_;
+ friend class TThreadedServer;
+
+ shared_ptr<TProcessor> processor_;
+ shared_ptr<TProtocol> input_;
+ shared_ptr<TProtocol> output_;
+};
+
+
+TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> transportFactory,
+ shared_ptr<TProtocolFactory> protocolFactory):
+ TServer(processor, serverTransport, transportFactory, protocolFactory),
+ stop_(false) {
+ threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+}
+
+TThreadedServer::TThreadedServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<ThreadFactory> threadFactory):
+ TServer(processor, serverTransport, transportFactory, protocolFactory),
+ threadFactory_(threadFactory),
+ stop_(false) {
+}
+
+TThreadedServer::~TThreadedServer() {}
+
+void TThreadedServer::serve() {
+
+ shared_ptr<TTransport> client;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
+
+ try {
+ // Start the server listening
+ serverTransport_->listen();
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadedServer::run() listen(): ") +ttx.what();
+ GlobalOutput(errStr.c_str());
+ return;
+ }
+
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
+ while (!stop_) {
+ try {
+ client.reset();
+ inputTransport.reset();
+ outputTransport.reset();
+ inputProtocol.reset();
+ outputProtocol.reset();
+
+ // Fetch client from server
+ client = serverTransport_->accept();
+
+ // Make IO transports
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+
+ TThreadedServer::Task* task = new TThreadedServer::Task(*this,
+ processor_,
+ inputProtocol,
+ outputProtocol);
+
+ // Create a task
+ shared_ptr<Runnable> runnable =
+ shared_ptr<Runnable>(task);
+
+ // Create a thread for this task
+ shared_ptr<Thread> thread =
+ shared_ptr<Thread>(threadFactory_->newThread(runnable));
+
+ // Insert thread into the set of threads
+ {
+ Synchronized s(tasksMonitor_);
+ tasks_.insert(task);
+ }
+
+ // Start the thread!
+ thread->start();
+
+ } catch (TTransportException& ttx) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+ string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ continue;
+ } catch (TException& tx) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
+ GlobalOutput(errStr.c_str());
+ continue;
+ } catch (string s) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ string errStr = "TThreadedServer: Unknown exception: " + s;
+ GlobalOutput(errStr.c_str());
+ break;
+ }
+ }
+
+ // If stopped manually, make sure to close server transport
+ if (stop_) {
+ try {
+ serverTransport_->close();
+ } catch (TException &tx) {
+ string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ try {
+ Synchronized s(tasksMonitor_);
+ while (!tasks_.empty()) {
+ tasksMonitor_.wait();
+ }
+ } catch (TException &tx) {
+ string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ stop_ = false;
+ }
+
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TThreadedServer.h b/lib/cpp/src/server/TThreadedServer.h
new file mode 100644
index 0000000..4d0811a
--- /dev/null
+++ b/lib/cpp/src/server/TThreadedServer.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
+#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
+
+#include <server/TServer.h>
+#include <transport/TServerTransport.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::TProcessor;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransportFactory;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::ThreadFactory;
+
+class TThreadedServer : public TServer {
+
+ public:
+ class Task;
+
+ TThreadedServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory);
+
+ TThreadedServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<ThreadFactory> threadFactory);
+
+ virtual ~TThreadedServer();
+
+ virtual void serve();
+
+ void stop() {
+ stop_ = true;
+ serverTransport_->interrupt();
+ }
+
+ protected:
+ boost::shared_ptr<ThreadFactory> threadFactory_;
+ volatile bool stop_;
+
+ Monitor tasksMonitor_;
+ std::set<Task*> tasks_;
+
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_