Thrift now a TLP - INFRA-3116

git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
new file mode 100644
index 0000000..45f635c
--- /dev/null
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -0,0 +1,750 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TNonblockingServer.h"
+#include <concurrency/Exception.h>
+
+#include <iostream>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using namespace std;
+
+class TConnection::Task: public Runnable {
+ public:
+  Task(boost::shared_ptr<TProcessor> processor,
+       boost::shared_ptr<TProtocol> input,
+       boost::shared_ptr<TProtocol> output,
+       int taskHandle) :
+    processor_(processor),
+    input_(input),
+    output_(output),
+    taskHandle_(taskHandle) {}
+
+  void run() {
+    try {
+      while (processor_->process(input_, output_)) {
+        if (!input_->getTransport()->peek()) {
+          break;
+        }
+      }
+    } catch (TTransportException& ttx) {
+      cerr << "TNonblockingServer client died: " << ttx.what() << endl;
+    } catch (TException& x) {
+      cerr << "TNonblockingServer exception: " << x.what() << endl;
+    } catch (...) {
+      cerr << "TNonblockingServer uncaught exception." << endl;
+    }
+
+    // Signal completion back to the libevent thread via a socketpair
+    int8_t b = 0;
+    if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
+      GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
+    }
+    if (-1 == ::close(taskHandle_)) {
+      GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
+    }
+  }
+
+ private:
+  boost::shared_ptr<TProcessor> processor_;
+  boost::shared_ptr<TProtocol> input_;
+  boost::shared_ptr<TProtocol> output_;
+  int taskHandle_;
+};
+
+void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
+  socket_ = socket;
+  server_ = s;
+  appState_ = APP_INIT;
+  eventFlags_ = 0;
+
+  readBufferPos_ = 0;
+  readWant_ = 0;
+
+  writeBuffer_ = NULL;
+  writeBufferSize_ = 0;
+  writeBufferPos_ = 0;
+
+  socketState_ = SOCKET_RECV;
+  appState_ = APP_INIT;
+
+  taskHandle_ = -1;
+
+  // Set flags, which also registers the event
+  setFlags(eventFlags);
+
+  // get input/transports
+  factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
+  factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
+
+  // Create protocol
+  inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+  outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
+}
+
+void TConnection::workSocket() {
+  int flags=0, got=0, left=0, sent=0;
+  uint32_t fetch = 0;
+
+  switch (socketState_) {
+  case SOCKET_RECV:
+    // It is an error to be in this state if we already have all the data
+    assert(readBufferPos_ < readWant_);
+
+    // Double the buffer size until it is big enough
+    if (readWant_ > readBufferSize_) {
+      while (readWant_ > readBufferSize_) {
+        readBufferSize_ *= 2;
+      }
+      readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+      if (readBuffer_ == NULL) {
+        GlobalOutput("TConnection::workSocket() realloc");
+        close();
+        return;
+      }
+    }
+
+    // Read from the socket
+    fetch = readWant_ - readBufferPos_;
+    got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
+
+    if (got > 0) {
+      // Move along in the buffer
+      readBufferPos_ += got;
+
+      // Check that we did not overdo it
+      assert(readBufferPos_ <= readWant_);
+
+      // We are done reading, move onto the next state
+      if (readBufferPos_ == readWant_) {
+        transition();
+      }
+      return;
+    } else if (got == -1) {
+      // Blocking errors are okay, just move on
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        return;
+      }
+
+      if (errno != ECONNRESET) {
+        GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
+      }
+    }
+
+    // Whenever we get down here it means a remote disconnect
+    close();
+
+    return;
+
+  case SOCKET_SEND:
+    // Should never have position past size
+    assert(writeBufferPos_ <= writeBufferSize_);
+
+    // If there is no data to send, then let us move on
+    if (writeBufferPos_ == writeBufferSize_) {
+      GlobalOutput("WARNING: Send state with no data to send\n");
+      transition();
+      return;
+    }
+
+    flags = 0;
+    #ifdef MSG_NOSIGNAL
+    // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+    // check for the EPIPE return condition and close the socket in that case
+    flags |= MSG_NOSIGNAL;
+    #endif // ifdef MSG_NOSIGNAL
+
+    left = writeBufferSize_ - writeBufferPos_;
+    sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
+
+    if (sent <= 0) {
+      // Blocking errors are okay, just move on
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        return;
+      }
+      if (errno != EPIPE) {
+        GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
+      }
+      close();
+      return;
+    }
+
+    writeBufferPos_ += sent;
+
+    // Did we overdo it?
+    assert(writeBufferPos_ <= writeBufferSize_);
+
+    // We are done!
+    if (writeBufferPos_ == writeBufferSize_) {
+      transition();
+    }
+
+    return;
+
+  default:
+    GlobalOutput.printf("Shit Got Ill. Socket State %d", socketState_);
+    assert(0);
+  }
+}
+
+/**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+void TConnection::transition() {
+
+  int sz = 0;
+
+  // Switch upon the state that we are currently in and move to a new state
+  switch (appState_) {
+
+  case APP_READ_REQUEST:
+    // We are done reading the request, package the read buffer into transport
+    // and get back some data from the dispatch function
+    // If we've used these transport buffers enough times, reset them to avoid bloating
+
+    inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+    ++numReadsSinceReset_;
+    if (numWritesSinceReset_ < 512) {
+      outputTransport_->resetBuffer();
+    } else {
+      // reset the capacity of the output transport if we used it enough times that it might be bloated
+      try {
+        outputTransport_->resetBuffer(true);
+        numWritesSinceReset_ = 0;
+      } catch (TTransportException &ttx) {
+        GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
+        close();
+        return;
+      }
+    }
+
+    // Prepend four bytes of blank space to the buffer so we can
+    // write the frame size there later.
+    outputTransport_->getWritePtr(4);
+    outputTransport_->wroteBytes(4);
+
+    if (server_->isThreadPoolProcessing()) {
+      // We are setting up a Task to do this work and we will wait on it
+      int sv[2];
+      if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+        GlobalOutput.perror("TConnection::socketpair() failed ", errno);
+        // Now we will fall through to the APP_WAIT_TASK block with no response
+      } else {
+        // Create task and dispatch to the thread manager
+        boost::shared_ptr<Runnable> task =
+          boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+                                               inputProtocol_,
+                                               outputProtocol_,
+                                               sv[1]));
+        // The application is now waiting on the task to finish
+        appState_ = APP_WAIT_TASK;
+
+        // Create an event to be notified when the task finishes
+        event_set(&taskEvent_,
+                  taskHandle_ = sv[0],
+                  EV_READ,
+                  TConnection::taskHandler,
+                  this);
+
+        // Attach to the base
+        event_base_set(server_->getEventBase(), &taskEvent_);
+
+        // Add the event and start up the server
+        if (-1 == event_add(&taskEvent_, 0)) {
+          GlobalOutput("TNonblockingServer::serve(): coult not event_add");
+          return;
+        }
+        try {
+          server_->addTask(task);
+        } catch (IllegalStateException & ise) {
+          // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+          GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+          close();
+        }
+
+        // Set this connection idle so that libevent doesn't process more
+        // data on it while we're still waiting for the threadmanager to
+        // finish this task
+        setIdle();
+        return;
+      }
+    } else {
+      try {
+        // Invoke the processor
+        server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+      } catch (TTransportException &ttx) {
+        GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
+        close();
+        return;
+      } catch (TException &x) {
+        GlobalOutput.printf("TException: Server::process() %s", x.what());
+        close();
+        return;
+      } catch (...) {
+        GlobalOutput.printf("Server::process() unknown exception");
+        close();
+        return;
+      }
+    }
+
+    // Intentionally fall through here, the call to process has written into
+    // the writeBuffer_
+
+  case APP_WAIT_TASK:
+    // We have now finished processing a task and the result has been written
+    // into the outputTransport_, so we grab its contents and place them into
+    // the writeBuffer_ for actual writing by the libevent thread
+
+    // Get the result of the operation
+    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+
+    // If the function call generated return data, then move into the send
+    // state and get going
+    // 4 bytes were reserved for frame size
+    if (writeBufferSize_ > 4) {
+
+      // Move into write state
+      writeBufferPos_ = 0;
+      socketState_ = SOCKET_SEND;
+
+      // Put the frame size into the write buffer
+      int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
+      memcpy(writeBuffer_, &frameSize, 4);
+
+      // Socket into write mode
+      appState_ = APP_SEND_RESULT;
+      setWrite();
+
+      // Try to work the socket immediately
+      // workSocket();
+
+      return;
+    }
+
+    // In this case, the request was oneway and we should fall through
+    // right back into the read frame header state
+    goto LABEL_APP_INIT;
+
+  case APP_SEND_RESULT:
+
+    ++numWritesSinceReset_;
+
+    // N.B.: We also intentionally fall through here into the INIT state!
+
+  LABEL_APP_INIT:
+  case APP_INIT:
+
+    // reset the input buffer if we used it enough times that it might be bloated
+    if (numReadsSinceReset_ > 512)
+    {
+      void * new_buffer = std::realloc(readBuffer_, 1024);
+      if (new_buffer == NULL) {
+        GlobalOutput("TConnection::transition() realloc");
+        close();
+        return;
+      }
+      readBuffer_ = (uint8_t*) new_buffer;
+      readBufferSize_ = 1024;
+      numReadsSinceReset_ = 0;
+    }
+
+    // Clear write buffer variables
+    writeBuffer_ = NULL;
+    writeBufferPos_ = 0;
+    writeBufferSize_ = 0;
+
+    // Set up read buffer for getting 4 bytes
+    readBufferPos_ = 0;
+    readWant_ = 4;
+
+    // Into read4 state we go
+    socketState_ = SOCKET_RECV;
+    appState_ = APP_READ_FRAME_SIZE;
+
+    // Register read event
+    setRead();
+
+    // Try to work the socket right away
+    // workSocket();
+
+    return;
+
+  case APP_READ_FRAME_SIZE:
+    // We just read the request length, deserialize it
+    sz = *(int32_t*)readBuffer_;
+    sz = (int32_t)ntohl(sz);
+
+    if (sz <= 0) {
+      GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
+      close();
+      return;
+    }
+
+    // Reset the read buffer
+    readWant_ = (uint32_t)sz;
+    readBufferPos_= 0;
+
+    // Move into read request state
+    appState_ = APP_READ_REQUEST;
+
+    // Work the socket right away
+    // workSocket();
+
+    return;
+
+  default:
+    GlobalOutput.printf("Totally Fucked. Application State %d", appState_);
+    assert(0);
+  }
+}
+
+void TConnection::setFlags(short eventFlags) {
+  // Catch the do nothing case
+  if (eventFlags_ == eventFlags) {
+    return;
+  }
+
+  // Delete a previously existing event
+  if (eventFlags_ != 0) {
+    if (event_del(&event_) == -1) {
+      GlobalOutput("TConnection::setFlags event_del");
+      return;
+    }
+  }
+
+  // Update in memory structure
+  eventFlags_ = eventFlags;
+
+  // Do not call event_set if there are no flags
+  if (!eventFlags_) {
+    return;
+  }
+
+  /**
+   * event_set:
+   *
+   * Prepares the event structure &event to be used in future calls to
+   * event_add() and event_del().  The event will be prepared to call the
+   * eventHandler using the 'sock' file descriptor to monitor events.
+   *
+   * The events can be either EV_READ, EV_WRITE, or both, indicating
+   * that an application can read or write from the file respectively without
+   * blocking.
+   *
+   * The eventHandler will be called with the file descriptor that triggered
+   * the event and the type of event which will be one of: EV_TIMEOUT,
+   * EV_SIGNAL, EV_READ, EV_WRITE.
+   *
+   * The additional flag EV_PERSIST makes an event_add() persistent until
+   * event_del() has been called.
+   *
+   * Once initialized, the &event struct can be used repeatedly with
+   * event_add() and event_del() and does not need to be reinitialized unless
+   * the eventHandler and/or the argument to it are to be changed.  However,
+   * when an ev structure has been added to libevent using event_add() the
+   * structure must persist until the event occurs (assuming EV_PERSIST
+   * is not set) or is removed using event_del().  You may not reuse the same
+   * ev structure for multiple monitored descriptors; each descriptor needs
+   * its own ev.
+   */
+  event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
+  event_base_set(server_->getEventBase(), &event_);
+
+  // Add the event
+  if (event_add(&event_, 0) == -1) {
+    GlobalOutput("TConnection::setFlags(): could not event_add");
+  }
+}
+
+/**
+ * Closes a connection
+ */
+void TConnection::close() {
+  // Delete the registered libevent
+  if (event_del(&event_) == -1) {
+    GlobalOutput("TConnection::close() event_del");
+  }
+
+  // Close the socket
+  if (socket_ > 0) {
+    ::close(socket_);
+  }
+  socket_ = 0;
+
+  // close any factory produced transports
+  factoryInputTransport_->close();
+  factoryOutputTransport_->close();
+
+  // Give this object back to the server that owns it
+  server_->returnConnection(this);
+}
+
+void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+  if (readBufferSize_ > limit) {
+    readBufferSize_ = limit;
+    readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+    if (readBuffer_ == NULL) {
+      GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
+      close();
+    }
+  }
+}
+
+/**
+ * Creates a new connection either by reusing an object off the stack or
+ * by allocating a new one entirely
+ */
+TConnection* TNonblockingServer::createConnection(int socket, short flags) {
+  // Check the stack
+  if (connectionStack_.empty()) {
+    return new TConnection(socket, flags, this);
+  } else {
+    TConnection* result = connectionStack_.top();
+    connectionStack_.pop();
+    result->init(socket, flags, this);
+    return result;
+  }
+}
+
+/**
+ * Returns a connection to the stack
+ */
+void TNonblockingServer::returnConnection(TConnection* connection) {
+  if (connectionStackLimit_ &&
+      (connectionStack_.size() >= connectionStackLimit_)) {
+    delete connection;
+  } else {
+    connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+    connectionStack_.push(connection);
+  }
+}
+
+/**
+ * Server socket had something happen.  We accept all waiting client
+ * connections on fd and assign TConnection objects to handle those requests.
+ */
+void TNonblockingServer::handleEvent(int fd, short which) {
+  // Make sure that libevent didn't fuck up the socket handles
+  assert(fd == serverSocket_);
+
+  // Server socket accepted a new connection
+  socklen_t addrLen;
+  struct sockaddr addr;
+  addrLen = sizeof(addr);
+
+  // Going to accept a new client socket
+  int clientSocket;
+
+  // Accept as many new clients as possible, even though libevent signaled only
+  // one, this helps us to avoid having to go back into the libevent engine so
+  // many times
+  while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
+
+    // Explicitly set this socket to NONBLOCK mode
+    int flags;
+    if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
+        fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
+      GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
+      close(clientSocket);
+      return;
+    }
+
+    // Create a new TConnection for this client socket.
+    TConnection* clientConnection =
+      createConnection(clientSocket, EV_READ | EV_PERSIST);
+
+    // Fail fast if we could not create a TConnection object
+    if (clientConnection == NULL) {
+      GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
+      close(clientSocket);
+      return;
+    }
+
+    // Put this client connection into the proper state
+    clientConnection->transition();
+  }
+
+  // Done looping accept, now we have to make sure the error is due to
+  // blocking. Any other error is a problem
+  if (errno != EAGAIN && errno != EWOULDBLOCK) {
+    GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
+  }
+}
+
+/**
+ * Creates a socket to listen on and binds it to the local port.
+ */
+void TNonblockingServer::listenSocket() {
+  int s;
+  struct addrinfo hints, *res, *res0;
+  int error;
+
+  char port[sizeof("65536") + 1];
+  memset(&hints, 0, sizeof(hints));
+  hints.ai_family = PF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+  sprintf(port, "%d", port_);
+
+  // Wildcard address
+  error = getaddrinfo(NULL, port, &hints, &res0);
+  if (error) {
+    string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
+    GlobalOutput(errStr.c_str());
+    return;
+  }
+
+  // Pick the ipv6 address first since ipv4 addresses can be mapped
+  // into ipv6 space.
+  for (res = res0; res; res = res->ai_next) {
+    if (res->ai_family == AF_INET6 || res->ai_next == NULL)
+      break;
+  }
+
+  // Create the server socket
+  s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  if (s == -1) {
+    freeaddrinfo(res0);
+    throw TException("TNonblockingServer::serve() socket() -1");
+  }
+
+  #ifdef IPV6_V6ONLY
+  int zero = 0;
+  if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
+    GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+  }
+  #endif // #ifdef IPV6_V6ONLY
+
+
+  int one = 1;
+
+  // Set reuseaddr to avoid 2MSL delay on server restart
+  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+
+  if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
+    close(s);
+    freeaddrinfo(res0);
+    throw TException("TNonblockingServer::serve() bind");
+  }
+
+  // Done with the addr info
+  freeaddrinfo(res0);
+
+  // Set up this file descriptor for listening
+  listenSocket(s);
+}
+
+/**
+ * Takes a socket created by listenSocket() and sets various options on it
+ * to prepare for use in the server.
+ */
+void TNonblockingServer::listenSocket(int s) {
+  // Set socket to nonblocking mode
+  int flags;
+  if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
+      fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
+    close(s);
+    throw TException("TNonblockingServer::serve() O_NONBLOCK");
+  }
+
+  int one = 1;
+  struct linger ling = {0, 0};
+
+  // Keepalive to ensure full result flushing
+  setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
+
+  // Turn linger off to avoid hung sockets
+  setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+
+  // Set TCP nodelay if available, MAC OS X Hack
+  // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+  #ifndef TCP_NOPUSH
+  setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
+  #endif
+
+  if (listen(s, LISTEN_BACKLOG) == -1) {
+    close(s);
+    throw TException("TNonblockingServer::serve() listen");
+  }
+
+  // Cool, this socket is good to go, set it as the serverSocket_
+  serverSocket_ = s;
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingServer::registerEvents(event_base* base) {
+  assert(serverSocket_ != -1);
+  assert(!eventBase_);
+  eventBase_ = base;
+
+  // Print some libevent stats
+  GlobalOutput.printf("libevent %s method %s",
+          event_get_version(),
+          event_get_method());
+
+  // Register the server event
+  event_set(&serverEvent_,
+            serverSocket_,
+            EV_READ | EV_PERSIST,
+            TNonblockingServer::eventHandler,
+            this);
+  event_base_set(eventBase_, &serverEvent_);
+
+  // Add the event and start up the server
+  if (-1 == event_add(&serverEvent_, 0)) {
+    throw TException("TNonblockingServer::serve(): coult not event_add");
+  }
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+  // Init socket
+  listenSocket();
+
+  // Initialize libevent core
+  registerEvents(static_cast<event_base*>(event_init()));
+
+  // Run the preServe event
+  if (eventHandler_ != NULL) {
+    eventHandler_->preServe();
+  }
+
+  // Run libevent engine, never returns, invokes calls to eventHandler
+  event_base_loop(eventBase_, 0);
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
new file mode 100644
index 0000000..1684b64
--- /dev/null
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
+#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
+
+#include <Thrift.h>
+#include <server/TServer.h>
+#include <transport/TBufferTransports.h>
+#include <concurrency/ThreadManager.h>
+#include <stack>
+#include <string>
+#include <errno.h>
+#include <cstdlib>
+#include <event.h>
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::transport::TMemoryBuffer;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::concurrency::Runnable;
+using apache::thrift::concurrency::ThreadManager;
+
+// Forward declaration of class
+class TConnection;
+
+/**
+ * This is a non-blocking server in C++ for high performance that operates a
+ * single IO thread. It assumes that all incoming requests are framed with a
+ * 4 byte length indicator and writes out responses using the same framing.
+ *
+ * It does not use the TServerTransport framework, but rather has socket
+ * operations hardcoded for use with select.
+ *
+ */
+class TNonblockingServer : public TServer {
+ private:
+
+  // Listen backlog
+  static const int LISTEN_BACKLOG = 1024;
+
+  // Default limit on size of idle connection pool
+  static const size_t CONNECTION_STACK_LIMIT = 1024;
+
+  // Maximum size of buffer allocated to idle connection
+  static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
+
+  // Server socket file descriptor
+  int serverSocket_;
+
+  // Port server runs on
+  int port_;
+
+  // For processing via thread pool, may be NULL
+  boost::shared_ptr<ThreadManager> threadManager_;
+
+  // Is thread pool processing?
+  bool threadPoolProcessing_;
+
+  // The event base for libevent
+  event_base* eventBase_;
+
+  // Event struct, for use with eventBase_
+  struct event serverEvent_;
+
+  // Number of TConnection object we've created
+  size_t numTConnections_;
+
+  // Limit for how many TConnection objects to cache
+  size_t connectionStackLimit_;
+
+  /**
+   * Max read buffer size for an idle connection.  When we place an idle
+   * TConnection into connectionStack_, we insure that its read buffer is
+   * reduced to this size to insure that idle connections don't hog memory.
+   */
+  uint32_t idleBufferMemLimit_;
+
+  /**
+   * This is a stack of all the objects that have been created but that
+   * are NOT currently in use. When we close a connection, we place it on this
+   * stack so that the object can be reused later, rather than freeing the
+   * memory and reallocating a new object later.
+   */
+  std::stack<TConnection*> connectionStack_;
+
+  void handleEvent(int fd, short which);
+
+ public:
+  TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+                     int port) :
+    TServer(processor),
+    serverSocket_(-1),
+    port_(port),
+    threadPoolProcessing_(false),
+    eventBase_(NULL),
+    numTConnections_(0),
+    connectionStackLimit_(CONNECTION_STACK_LIMIT),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
+
+  TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+                     boost::shared_ptr<TProtocolFactory> protocolFactory,
+                     int port,
+                     boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
+    TServer(processor),
+    serverSocket_(-1),
+    port_(port),
+    threadManager_(threadManager),
+    eventBase_(NULL),
+    numTConnections_(0),
+    connectionStackLimit_(CONNECTION_STACK_LIMIT),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+    setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setInputProtocolFactory(protocolFactory);
+    setOutputProtocolFactory(protocolFactory);
+    setThreadManager(threadManager);
+  }
+
+  TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+                     boost::shared_ptr<TTransportFactory> inputTransportFactory,
+                     boost::shared_ptr<TTransportFactory> outputTransportFactory,
+                     boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+                     boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+                     int port,
+                     boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
+    TServer(processor),
+    serverSocket_(0),
+    port_(port),
+    threadManager_(threadManager),
+    eventBase_(NULL),
+    numTConnections_(0),
+    connectionStackLimit_(CONNECTION_STACK_LIMIT),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+    setInputTransportFactory(inputTransportFactory);
+    setOutputTransportFactory(outputTransportFactory);
+    setInputProtocolFactory(inputProtocolFactory);
+    setOutputProtocolFactory(outputProtocolFactory);
+    setThreadManager(threadManager);
+  }
+
+  ~TNonblockingServer() {}
+
+  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+    threadManager_ = threadManager;
+    threadPoolProcessing_ = (threadManager != NULL);
+  }
+
+  boost::shared_ptr<ThreadManager> getThreadManager() {
+    return threadManager_;
+  }
+
+  /**
+   * Get the maximum number of unused TConnection we will hold in reserve.
+   *
+   * @return the current limit on TConnection pool size.
+   */
+  size_t getConnectionStackLimit() const {
+    return connectionStackLimit_;
+  }
+
+  /**
+   * Set the maximum number of unused TConnection we will hold in reserve.
+   *
+   * @param sz the new limit for TConnection pool size.
+   */
+  void setConnectionStackLimit(size_t sz) {
+    connectionStackLimit_ = sz;
+  }
+
+  bool isThreadPoolProcessing() const {
+    return threadPoolProcessing_;
+  }
+
+  void addTask(boost::shared_ptr<Runnable> task) {
+    threadManager_->add(task);
+  }
+
+  event_base* getEventBase() const {
+    return eventBase_;
+  }
+
+  void incrementNumConnections() {
+    ++numTConnections_;
+  }
+
+  void decrementNumConnections() {
+    --numTConnections_;
+  }
+
+  size_t getNumConnections() {
+    return numTConnections_;
+  }
+
+  size_t getNumIdleConnections() {
+    return connectionStack_.size();
+  }
+
+  /**
+   * Get the maximum limit of memory allocated to idle TConnection objects.
+   *
+   * @return # bytes beyond which we will shrink buffers when idle.
+   */
+  size_t getIdleBufferMemLimit() const {
+    return idleBufferMemLimit_;
+  }
+
+  /**
+   * Set the maximum limit of memory allocated to idle TConnection objects.
+   * If a TConnection object goes idle with more than this much memory
+   * allocated to its buffer, we shrink it to this value.
+   *
+   * @param limit of bytes beyond which we will shrink buffers when idle.
+   */
+  void setIdleBufferMemLimit(size_t limit) {
+    idleBufferMemLimit_ = limit;
+  }
+
+  TConnection* createConnection(int socket, short flags);
+
+  void returnConnection(TConnection* connection);
+
+  static void eventHandler(int fd, short which, void* v) {
+    ((TNonblockingServer*)v)->handleEvent(fd, which);
+  }
+
+  void listenSocket();
+
+  void listenSocket(int fd);
+
+  void registerEvents(event_base* base);
+
+  void serve();
+};
+
+/**
+ * Two states for sockets, recv and send mode
+ */
+enum TSocketState {
+  SOCKET_RECV,
+  SOCKET_SEND
+};
+
+/**
+ * Four states for the nonblocking servr:
+ *  1) initialize
+ *  2) read 4 byte frame size
+ *  3) read frame of data
+ *  4) send back data (if any)
+ */
+enum TAppState {
+  APP_INIT,
+  APP_READ_FRAME_SIZE,
+  APP_READ_REQUEST,
+  APP_WAIT_TASK,
+  APP_SEND_RESULT
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TConnection {
+ private:
+
+  class Task;
+
+  // Server handle
+  TNonblockingServer* server_;
+
+  // Socket handle
+  int socket_;
+
+  // Libevent object
+  struct event event_;
+
+  // Libevent flags
+  short eventFlags_;
+
+  // Socket mode
+  TSocketState socketState_;
+
+  // Application state
+  TAppState appState_;
+
+  // How much data needed to read
+  uint32_t readWant_;
+
+  // Where in the read buffer are we
+  uint32_t readBufferPos_;
+
+  // Read buffer
+  uint8_t* readBuffer_;
+
+  // Read buffer size
+  uint32_t readBufferSize_;
+
+  // Write buffer
+  uint8_t* writeBuffer_;
+
+  // Write buffer size
+  uint32_t writeBufferSize_;
+
+  // How far through writing are we?
+  uint32_t writeBufferPos_;
+
+  // How many times have we read since our last buffer reset?
+  uint32_t numReadsSinceReset_;
+
+  // How many times have we written since our last buffer reset?
+  uint32_t numWritesSinceReset_;
+
+  // Task handle
+  int taskHandle_;
+
+  // Task event
+  struct event taskEvent_;
+
+  // Transport to read from
+  boost::shared_ptr<TMemoryBuffer> inputTransport_;
+
+  // Transport that processor writes to
+  boost::shared_ptr<TMemoryBuffer> outputTransport_;
+
+  // extra transport generated by transport factory (e.g. BufferedRouterTransport)
+  boost::shared_ptr<TTransport> factoryInputTransport_;
+  boost::shared_ptr<TTransport> factoryOutputTransport_;
+
+  // Protocol decoder
+  boost::shared_ptr<TProtocol> inputProtocol_;
+
+  // Protocol encoder
+  boost::shared_ptr<TProtocol> outputProtocol_;
+
+  // Go into read mode
+  void setRead() {
+    setFlags(EV_READ | EV_PERSIST);
+  }
+
+  // Go into write mode
+  void setWrite() {
+    setFlags(EV_WRITE | EV_PERSIST);
+  }
+
+  // Set socket idle
+  void setIdle() {
+    setFlags(0);
+  }
+
+  // Set event flags
+  void setFlags(short eventFlags);
+
+  // Libevent handlers
+  void workSocket();
+
+  // Close this client and reset
+  void close();
+
+ public:
+
+  // Constructor
+  TConnection(int socket, short eventFlags, TNonblockingServer *s) {
+    readBuffer_ = (uint8_t*)std::malloc(1024);
+    if (readBuffer_ == NULL) {
+      throw new apache::thrift::TException("Out of memory.");
+    }
+    readBufferSize_ = 1024;
+
+    numReadsSinceReset_ = 0;
+    numWritesSinceReset_ = 0;
+
+    // Allocate input and output tranpsorts
+    // these only need to be allocated once per TConnection (they don't need to be
+    // reallocated on init() call)
+    inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
+    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+
+    init(socket, eventFlags, s);
+    server_->incrementNumConnections();
+  }
+
+  ~TConnection() {
+    server_->decrementNumConnections();
+  }
+
+  /**
+   * Check read buffer against a given limit and shrink it if exceeded.
+   *
+   * @param limit we limit buffer size to.
+   */
+  void checkIdleBufferMemLimit(uint32_t limit);
+
+  // Initialize
+  void init(int socket, short eventFlags, TNonblockingServer *s);
+
+  // Transition into a new state
+  void transition();
+
+  // Handler wrapper
+  static void eventHandler(int fd, short /* which */, void* v) {
+    assert(fd == ((TConnection*)v)->socket_);
+    ((TConnection*)v)->workSocket();
+  }
+
+  // Handler wrapper for task block
+  static void taskHandler(int fd, short /* which */, void* v) {
+    assert(fd == ((TConnection*)v)->taskHandle_);
+    if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
+      GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
+    }
+    ((TConnection*)v)->transition();
+  }
+
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
diff --git a/lib/cpp/src/server/TServer.cpp b/lib/cpp/src/server/TServer.cpp
new file mode 100644
index 0000000..6b692ab
--- /dev/null
+++ b/lib/cpp/src/server/TServer.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <unistd.h>
+
+namespace apache { namespace thrift { namespace server {
+
+int increase_max_fds(int max_fds=(1<<24))  {
+  struct rlimit fdmaxrl;
+
+  for(fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds;
+      max_fds && (setrlimit(RLIMIT_NOFILE, &fdmaxrl) < 0);
+      fdmaxrl.rlim_cur = max_fds, fdmaxrl.rlim_max = max_fds) {
+    max_fds /= 2;
+  }
+
+  return  fdmaxrl.rlim_cur;
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
new file mode 100644
index 0000000..5c4c588
--- /dev/null
+++ b/lib/cpp/src/server/TServer.h
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TSERVER_H_
+#define _THRIFT_SERVER_TSERVER_H_ 1
+
+#include <TProcessor.h>
+#include <transport/TServerTransport.h>
+#include <protocol/TBinaryProtocol.h>
+#include <concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::TProcessor;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportFactory;
+
+/**
+ * Virtual interface class that can handle events from the server core. To
+ * use this you should subclass it and implement the methods that you care
+ * about. Your subclass can also store local data that you may care about,
+ * such as additional "arguments" to these methods (stored in the object
+ * instance's state).
+ */
+class TServerEventHandler {
+ public:
+
+  virtual ~TServerEventHandler() {}
+
+  /**
+   * Called before the server begins.
+   */
+  virtual void preServe() {}
+
+  /**
+   * Called when a new client has connected and is about to being processing.
+   */
+  virtual void clientBegin(boost::shared_ptr<TProtocol> /* input */,
+                           boost::shared_ptr<TProtocol> /* output */) {}
+
+  /**
+   * Called when a client has finished making requests.
+   */
+  virtual void clientEnd(boost::shared_ptr<TProtocol> /* input */,
+                         boost::shared_ptr<TProtocol> /* output */) {}
+
+ protected:
+
+  /**
+   * Prevent direct instantiation.
+   */
+  TServerEventHandler() {}
+
+};
+
+/**
+ * Thrift server.
+ *
+ */
+class TServer : public concurrency::Runnable {
+ public:
+
+  virtual ~TServer() {}
+
+  virtual void serve() = 0;
+
+  virtual void stop() {}
+
+  // Allows running the server as a Runnable thread
+  virtual void run() {
+    serve();
+  }
+
+  boost::shared_ptr<TProcessor> getProcessor() {
+    return processor_;
+  }
+
+  boost::shared_ptr<TServerTransport> getServerTransport() {
+    return serverTransport_;
+  }
+
+  boost::shared_ptr<TTransportFactory> getInputTransportFactory() {
+    return inputTransportFactory_;
+  }
+
+  boost::shared_ptr<TTransportFactory> getOutputTransportFactory() {
+    return outputTransportFactory_;
+  }
+
+  boost::shared_ptr<TProtocolFactory> getInputProtocolFactory() {
+    return inputProtocolFactory_;
+  }
+
+  boost::shared_ptr<TProtocolFactory> getOutputProtocolFactory() {
+    return outputProtocolFactory_;
+  }
+
+  boost::shared_ptr<TServerEventHandler> getEventHandler() {
+    return eventHandler_;
+  }
+
+protected:
+  TServer(boost::shared_ptr<TProcessor> processor):
+    processor_(processor) {
+    setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setInputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+    setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+  }
+
+  TServer(boost::shared_ptr<TProcessor> processor,
+          boost::shared_ptr<TServerTransport> serverTransport):
+    processor_(processor),
+    serverTransport_(serverTransport) {
+    setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setInputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+    setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+  }
+
+  TServer(boost::shared_ptr<TProcessor> processor,
+          boost::shared_ptr<TServerTransport> serverTransport,
+          boost::shared_ptr<TTransportFactory> transportFactory,
+          boost::shared_ptr<TProtocolFactory> protocolFactory):
+    processor_(processor),
+    serverTransport_(serverTransport),
+    inputTransportFactory_(transportFactory),
+    outputTransportFactory_(transportFactory),
+    inputProtocolFactory_(protocolFactory),
+    outputProtocolFactory_(protocolFactory) {}
+
+  TServer(boost::shared_ptr<TProcessor> processor,
+          boost::shared_ptr<TServerTransport> serverTransport,
+          boost::shared_ptr<TTransportFactory> inputTransportFactory,
+          boost::shared_ptr<TTransportFactory> outputTransportFactory,
+          boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+          boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
+    processor_(processor),
+    serverTransport_(serverTransport),
+    inputTransportFactory_(inputTransportFactory),
+    outputTransportFactory_(outputTransportFactory),
+    inputProtocolFactory_(inputProtocolFactory),
+    outputProtocolFactory_(outputProtocolFactory) {}
+
+
+  // Class variables
+  boost::shared_ptr<TProcessor> processor_;
+  boost::shared_ptr<TServerTransport> serverTransport_;
+
+  boost::shared_ptr<TTransportFactory> inputTransportFactory_;
+  boost::shared_ptr<TTransportFactory> outputTransportFactory_;
+
+  boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
+  boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
+
+  boost::shared_ptr<TServerEventHandler> eventHandler_;
+
+public:
+  void setInputTransportFactory(boost::shared_ptr<TTransportFactory> inputTransportFactory) {
+    inputTransportFactory_ = inputTransportFactory;
+  }
+
+  void setOutputTransportFactory(boost::shared_ptr<TTransportFactory> outputTransportFactory) {
+    outputTransportFactory_ = outputTransportFactory;
+  }
+
+  void setInputProtocolFactory(boost::shared_ptr<TProtocolFactory> inputProtocolFactory) {
+    inputProtocolFactory_ = inputProtocolFactory;
+  }
+
+  void setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory> outputProtocolFactory) {
+    outputProtocolFactory_ = outputProtocolFactory;
+  }
+
+  void setServerEventHandler(boost::shared_ptr<TServerEventHandler> eventHandler) {
+    eventHandler_ = eventHandler;
+  }
+
+};
+
+/**
+ * Helper function to increase the max file descriptors limit
+ * for the current process and all of its children.
+ * By default, tries to increase it to as much as 2^24.
+ */
+ int increase_max_fds(int max_fds=(1<<24));
+
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSERVER_H_
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
new file mode 100644
index 0000000..394ce21
--- /dev/null
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "server/TSimpleServer.h"
+#include "transport/TTransportException.h"
+#include <string>
+#include <iostream>
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace std;
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using boost::shared_ptr;
+
+/**
+ * A simple single-threaded application server. Perfect for unit tests!
+ *
+ */
+void TSimpleServer::serve() {
+
+  shared_ptr<TTransport> client;
+  shared_ptr<TTransport> inputTransport;
+  shared_ptr<TTransport> outputTransport;
+  shared_ptr<TProtocol> inputProtocol;
+  shared_ptr<TProtocol> outputProtocol;
+
+  try {
+    // Start the server listening
+    serverTransport_->listen();
+  } catch (TTransportException& ttx) {
+    cerr << "TSimpleServer::run() listen(): " << ttx.what() << endl;
+    return;
+  }
+
+  // Run the preServe event
+  if (eventHandler_ != NULL) {
+    eventHandler_->preServe();
+  }
+
+  // Fetch client from server
+  while (!stop_) {
+    try {
+      client = serverTransport_->accept();
+      inputTransport = inputTransportFactory_->getTransport(client);
+      outputTransport = outputTransportFactory_->getTransport(client);
+      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+      if (eventHandler_ != NULL) {
+        eventHandler_->clientBegin(inputProtocol, outputProtocol);
+      }
+      try {
+        while (processor_->process(inputProtocol, outputProtocol)) {
+          // Peek ahead, is the remote side closed?
+          if (!inputTransport->peek()) {
+            break;
+          }
+        }
+      } catch (TTransportException& ttx) {
+        cerr << "TSimpleServer client died: " << ttx.what() << endl;
+      } catch (TException& tx) {
+        cerr << "TSimpleServer exception: " << tx.what() << endl;
+      }
+      if (eventHandler_ != NULL) {
+        eventHandler_->clientEnd(inputProtocol, outputProtocol);
+      }
+      inputTransport->close();
+      outputTransport->close();
+      client->close();
+    } catch (TTransportException& ttx) {
+      if (inputTransport != NULL) { inputTransport->close(); }
+      if (outputTransport != NULL) { outputTransport->close(); }
+      if (client != NULL) { client->close(); }
+      cerr << "TServerTransport died on accept: " << ttx.what() << endl;
+      continue;
+    } catch (TException& tx) {
+      if (inputTransport != NULL) { inputTransport->close(); }
+      if (outputTransport != NULL) { outputTransport->close(); }
+      if (client != NULL) { client->close(); }
+      cerr << "Some kind of accept exception: " << tx.what() << endl;
+      continue;
+    } catch (string s) {
+      if (inputTransport != NULL) { inputTransport->close(); }
+      if (outputTransport != NULL) { outputTransport->close(); }
+      if (client != NULL) { client->close(); }
+      cerr << "TThreadPoolServer: Unknown exception: " << s << endl;
+      break;
+    }
+  }
+
+  if (stop_) {
+    try {
+      serverTransport_->close();
+    } catch (TTransportException &ttx) {
+      cerr << "TServerTransport failed on close: " << ttx.what() << endl;
+    }
+    stop_ = false;
+  }
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
new file mode 100644
index 0000000..c4fc91c
--- /dev/null
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
+#define _THRIFT_SERVER_TSIMPLESERVER_H_ 1
+
+#include "server/TServer.h"
+#include "transport/TServerTransport.h"
+
+namespace apache { namespace thrift { namespace server {
+
+/**
+ * This is the most basic simple server. It is single-threaded and runs a
+ * continuous loop of accepting a single connection, processing requests on
+ * that connection until it closes, and then repeating. It is a good example
+ * of how to extend the TServer interface.
+ *
+ */
+class TSimpleServer : public TServer {
+ public:
+  TSimpleServer(boost::shared_ptr<TProcessor> processor,
+                boost::shared_ptr<TServerTransport> serverTransport,
+                boost::shared_ptr<TTransportFactory> transportFactory,
+                boost::shared_ptr<TProtocolFactory> protocolFactory) :
+    TServer(processor, serverTransport, transportFactory, protocolFactory),
+    stop_(false) {}
+
+  TSimpleServer(boost::shared_ptr<TProcessor> processor,
+                boost::shared_ptr<TServerTransport> serverTransport,
+                boost::shared_ptr<TTransportFactory> inputTransportFactory,
+                boost::shared_ptr<TTransportFactory> outputTransportFactory,
+                boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+                boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
+    TServer(processor, serverTransport,
+            inputTransportFactory, outputTransportFactory,
+            inputProtocolFactory, outputProtocolFactory),
+    stop_(false) {}
+
+  ~TSimpleServer() {}
+
+  void serve();
+
+  void stop() {
+    stop_ = true;
+  }
+
+ protected:
+  bool stop_;
+
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
new file mode 100644
index 0000000..0894cfa
--- /dev/null
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "server/TThreadPoolServer.h"
+#include "transport/TTransportException.h"
+#include "concurrency/Thread.h"
+#include "concurrency/ThreadManager.h"
+#include <string>
+#include <iostream>
+
+namespace apache { namespace thrift { namespace server {
+
+using boost::shared_ptr;
+using namespace std;
+using namespace apache::thrift;
+using namespace apache::thrift::concurrency;
+using namespace apache::thrift::protocol;;
+using namespace apache::thrift::transport;
+
+class TThreadPoolServer::Task : public Runnable {
+
+public:
+
+  Task(TThreadPoolServer &server,
+       shared_ptr<TProcessor> processor,
+       shared_ptr<TProtocol> input,
+       shared_ptr<TProtocol> output) :
+    server_(server),
+    processor_(processor),
+    input_(input),
+    output_(output) {
+  }
+
+  ~Task() {}
+
+  void run() {
+    boost::shared_ptr<TServerEventHandler> eventHandler =
+      server_.getEventHandler();
+    if (eventHandler != NULL) {
+      eventHandler->clientBegin(input_, output_);
+    }
+    try {
+      while (processor_->process(input_, output_)) {
+        if (!input_->getTransport()->peek()) {
+          break;
+        }
+      }
+    } catch (TTransportException& ttx) {
+      // This is reasonably expected, client didn't send a full request so just
+      // ignore him
+      // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
+      // GlobalOutput(errStr.c_str());
+    } catch (TException& x) {
+      string errStr = string("TThreadPoolServer exception: ") + x.what();
+      GlobalOutput(errStr.c_str());
+    } catch (std::exception &x) {
+      string errStr = string("TThreadPoolServer, std::exception: ") + x.what();
+      GlobalOutput(errStr.c_str());
+    }
+
+    if (eventHandler != NULL) {
+      eventHandler->clientEnd(input_, output_);
+    }
+
+    try {
+      input_->getTransport()->close();
+    } catch (TTransportException& ttx) {
+      string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
+      GlobalOutput(errStr.c_str());
+    }
+    try {
+      output_->getTransport()->close();
+    } catch (TTransportException& ttx) {
+      string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
+      GlobalOutput(errStr.c_str());
+    }
+
+  }
+
+ private:
+  TServer& server_;
+  shared_ptr<TProcessor> processor_;
+  shared_ptr<TProtocol> input_;
+  shared_ptr<TProtocol> output_;
+
+};
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+                                     shared_ptr<TServerTransport> serverTransport,
+                                     shared_ptr<TTransportFactory> transportFactory,
+                                     shared_ptr<TProtocolFactory> protocolFactory,
+                                     shared_ptr<ThreadManager> threadManager) :
+  TServer(processor, serverTransport, transportFactory, protocolFactory),
+  threadManager_(threadManager),
+  stop_(false), timeout_(0) {}
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+                                     shared_ptr<TServerTransport> serverTransport,
+                                     shared_ptr<TTransportFactory> inputTransportFactory,
+                                     shared_ptr<TTransportFactory> outputTransportFactory,
+                                     shared_ptr<TProtocolFactory> inputProtocolFactory,
+                                     shared_ptr<TProtocolFactory> outputProtocolFactory,
+                                     shared_ptr<ThreadManager> threadManager) :
+  TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+          inputProtocolFactory, outputProtocolFactory),
+  threadManager_(threadManager),
+  stop_(false), timeout_(0) {}
+
+
+TThreadPoolServer::~TThreadPoolServer() {}
+
+void TThreadPoolServer::serve() {
+  shared_ptr<TTransport> client;
+  shared_ptr<TTransport> inputTransport;
+  shared_ptr<TTransport> outputTransport;
+  shared_ptr<TProtocol> inputProtocol;
+  shared_ptr<TProtocol> outputProtocol;
+
+  try {
+    // Start the server listening
+    serverTransport_->listen();
+  } catch (TTransportException& ttx) {
+    string errStr = string("TThreadPoolServer::run() listen(): ") + ttx.what();
+    GlobalOutput(errStr.c_str());
+    return;
+  }
+
+  // Run the preServe event
+  if (eventHandler_ != NULL) {
+    eventHandler_->preServe();
+  }
+
+  while (!stop_) {
+    try {
+      client.reset();
+      inputTransport.reset();
+      outputTransport.reset();
+      inputProtocol.reset();
+      outputProtocol.reset();
+
+      // Fetch client from server
+      client = serverTransport_->accept();
+
+      // Make IO transports
+      inputTransport = inputTransportFactory_->getTransport(client);
+      outputTransport = outputTransportFactory_->getTransport(client);
+      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+
+      // Add to threadmanager pool
+      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
+
+    } catch (TTransportException& ttx) {
+      if (inputTransport != NULL) { inputTransport->close(); }
+      if (outputTransport != NULL) { outputTransport->close(); }
+      if (client != NULL) { client->close(); }
+      if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+        string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
+        GlobalOutput(errStr.c_str());
+      }
+      continue;
+    } catch (TException& tx) {
+      if (inputTransport != NULL) { inputTransport->close(); }
+      if (outputTransport != NULL) { outputTransport->close(); }
+      if (client != NULL) { client->close(); }
+      string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
+      GlobalOutput(errStr.c_str());
+      continue;
+    } catch (string s) {
+      if (inputTransport != NULL) { inputTransport->close(); }
+      if (outputTransport != NULL) { outputTransport->close(); }
+      if (client != NULL) { client->close(); }
+      string errStr = "TThreadPoolServer: Unknown exception: " + s;
+      GlobalOutput(errStr.c_str());
+      break;
+    }
+  }
+
+  // If stopped manually, join the existing threads
+  if (stop_) {
+    try {
+      serverTransport_->close();
+      threadManager_->join();
+    } catch (TException &tx) {
+      string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
+      GlobalOutput(errStr.c_str());
+    }
+    stop_ = false;
+  }
+
+}
+
+int64_t TThreadPoolServer::getTimeout() const {
+  return timeout_;
+}
+
+void TThreadPoolServer::setTimeout(int64_t value) {
+  timeout_ = value;
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
new file mode 100644
index 0000000..7b7e906
--- /dev/null
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
+#define _THRIFT_SERVER_TTHREADPOOLSERVER_H_ 1
+
+#include <concurrency/ThreadManager.h>
+#include <server/TServer.h>
+#include <transport/TServerTransport.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::concurrency::ThreadManager;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransportFactory;
+
+class TThreadPoolServer : public TServer {
+ public:
+  class Task;
+
+  TThreadPoolServer(boost::shared_ptr<TProcessor> processor,
+                    boost::shared_ptr<TServerTransport> serverTransport,
+                    boost::shared_ptr<TTransportFactory> transportFactory,
+                    boost::shared_ptr<TProtocolFactory> protocolFactory,
+                    boost::shared_ptr<ThreadManager> threadManager);
+
+  TThreadPoolServer(boost::shared_ptr<TProcessor> processor,
+                    boost::shared_ptr<TServerTransport> serverTransport,
+                    boost::shared_ptr<TTransportFactory> inputTransportFactory,
+                    boost::shared_ptr<TTransportFactory> outputTransportFactory,
+                    boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+                    boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+                    boost::shared_ptr<ThreadManager> threadManager);
+
+  virtual ~TThreadPoolServer();
+
+  virtual void serve();
+
+  virtual int64_t getTimeout() const;
+
+  virtual void setTimeout(int64_t value);
+
+  virtual void stop() {
+    stop_ = true;
+    serverTransport_->interrupt();
+  }
+
+ protected:
+
+  boost::shared_ptr<ThreadManager> threadManager_;
+
+  volatile bool stop_;
+
+  volatile int64_t timeout_;
+
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
new file mode 100644
index 0000000..cc30f8f
--- /dev/null
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "server/TThreadedServer.h"
+#include "transport/TTransportException.h"
+#include "concurrency/PosixThreadFactory.h"
+
+#include <string>
+#include <iostream>
+#include <pthread.h>
+#include <unistd.h>
+
+namespace apache { namespace thrift { namespace server {
+
+using boost::shared_ptr;
+using namespace std;
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+
+class TThreadedServer::Task: public Runnable {
+
+public:
+
+  Task(TThreadedServer& server,
+       shared_ptr<TProcessor> processor,
+       shared_ptr<TProtocol> input,
+       shared_ptr<TProtocol> output) :
+    server_(server),
+    processor_(processor),
+    input_(input),
+    output_(output) {
+  }
+
+  ~Task() {}
+
+  void run() {
+    boost::shared_ptr<TServerEventHandler> eventHandler =
+      server_.getEventHandler();
+    if (eventHandler != NULL) {
+      eventHandler->clientBegin(input_, output_);
+    }
+    try {
+      while (processor_->process(input_, output_)) {
+        if (!input_->getTransport()->peek()) {
+          break;
+        }
+      }
+    } catch (TTransportException& ttx) {
+      string errStr = string("TThreadedServer client died: ") + ttx.what();
+      GlobalOutput(errStr.c_str());
+    } catch (TException& x) {
+      string errStr = string("TThreadedServer exception: ") + x.what();
+      GlobalOutput(errStr.c_str());
+    } catch (...) {
+      GlobalOutput("TThreadedServer uncaught exception.");
+    }
+    if (eventHandler != NULL) {
+      eventHandler->clientEnd(input_, output_);
+    }
+
+    try {
+      input_->getTransport()->close();
+    } catch (TTransportException& ttx) {
+      string errStr = string("TThreadedServer input close failed: ") + ttx.what();
+      GlobalOutput(errStr.c_str());
+    }
+    try {
+      output_->getTransport()->close();
+    } catch (TTransportException& ttx) {
+      string errStr = string("TThreadedServer output close failed: ") + ttx.what();
+      GlobalOutput(errStr.c_str());
+    }
+
+    // Remove this task from parent bookkeeping
+    {
+      Synchronized s(server_.tasksMonitor_);
+      server_.tasks_.erase(this);
+      if (server_.tasks_.empty()) {
+        server_.tasksMonitor_.notify();
+      }
+    }
+
+  }
+
+ private:
+  TThreadedServer& server_;
+  friend class TThreadedServer;
+
+  shared_ptr<TProcessor> processor_;
+  shared_ptr<TProtocol> input_;
+  shared_ptr<TProtocol> output_;
+};
+
+
+TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
+                                 shared_ptr<TServerTransport> serverTransport,
+                                 shared_ptr<TTransportFactory> transportFactory,
+                                 shared_ptr<TProtocolFactory> protocolFactory):
+  TServer(processor, serverTransport, transportFactory, protocolFactory),
+  stop_(false) {
+  threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+}
+
+TThreadedServer::TThreadedServer(boost::shared_ptr<TProcessor> processor,
+                                 boost::shared_ptr<TServerTransport> serverTransport,
+                                 boost::shared_ptr<TTransportFactory> transportFactory,
+                                 boost::shared_ptr<TProtocolFactory> protocolFactory,
+                                 boost::shared_ptr<ThreadFactory> threadFactory):
+  TServer(processor, serverTransport, transportFactory, protocolFactory),
+  threadFactory_(threadFactory),
+  stop_(false) {
+}
+
+TThreadedServer::~TThreadedServer() {}
+
+void TThreadedServer::serve() {
+
+  shared_ptr<TTransport> client;
+  shared_ptr<TTransport> inputTransport;
+  shared_ptr<TTransport> outputTransport;
+  shared_ptr<TProtocol> inputProtocol;
+  shared_ptr<TProtocol> outputProtocol;
+
+  try {
+    // Start the server listening
+    serverTransport_->listen();
+  } catch (TTransportException& ttx) {
+    string errStr = string("TThreadedServer::run() listen(): ") +ttx.what();
+    GlobalOutput(errStr.c_str());
+    return;
+  }
+
+  // Run the preServe event
+  if (eventHandler_ != NULL) {
+    eventHandler_->preServe();
+  }
+
+  while (!stop_) {
+    try {
+      client.reset();
+      inputTransport.reset();
+      outputTransport.reset();
+      inputProtocol.reset();
+      outputProtocol.reset();
+
+      // Fetch client from server
+      client = serverTransport_->accept();
+
+      // Make IO transports
+      inputTransport = inputTransportFactory_->getTransport(client);
+      outputTransport = outputTransportFactory_->getTransport(client);
+      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+
+      TThreadedServer::Task* task = new TThreadedServer::Task(*this,
+                                                              processor_,
+                                                              inputProtocol,
+                                                              outputProtocol);
+
+      // Create a task
+      shared_ptr<Runnable> runnable =
+        shared_ptr<Runnable>(task);
+
+      // Create a thread for this task
+      shared_ptr<Thread> thread =
+        shared_ptr<Thread>(threadFactory_->newThread(runnable));
+
+      // Insert thread into the set of threads
+      {
+        Synchronized s(tasksMonitor_);
+        tasks_.insert(task);
+      }
+
+      // Start the thread!
+      thread->start();
+
+    } catch (TTransportException& ttx) {
+      if (inputTransport != NULL) { inputTransport->close(); }
+      if (outputTransport != NULL) { outputTransport->close(); }
+      if (client != NULL) { client->close(); }
+      if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+        string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
+        GlobalOutput(errStr.c_str());
+      }
+      continue;
+    } catch (TException& tx) {
+      if (inputTransport != NULL) { inputTransport->close(); }
+      if (outputTransport != NULL) { outputTransport->close(); }
+      if (client != NULL) { client->close(); }
+      string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
+      GlobalOutput(errStr.c_str());
+      continue;
+    } catch (string s) {
+      if (inputTransport != NULL) { inputTransport->close(); }
+      if (outputTransport != NULL) { outputTransport->close(); }
+      if (client != NULL) { client->close(); }
+      string errStr = "TThreadedServer: Unknown exception: " + s;
+      GlobalOutput(errStr.c_str());
+      break;
+    }
+  }
+
+  // If stopped manually, make sure to close server transport
+  if (stop_) {
+    try {
+      serverTransport_->close();
+    } catch (TException &tx) {
+      string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
+      GlobalOutput(errStr.c_str());
+    }
+    try {
+      Synchronized s(tasksMonitor_);
+      while (!tasks_.empty()) {
+        tasksMonitor_.wait();
+      }
+    } catch (TException &tx) {
+      string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
+      GlobalOutput(errStr.c_str());
+    }
+    stop_ = false;
+  }
+
+}
+
+}}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TThreadedServer.h b/lib/cpp/src/server/TThreadedServer.h
new file mode 100644
index 0000000..4d0811a
--- /dev/null
+++ b/lib/cpp/src/server/TThreadedServer.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
+#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
+
+#include <server/TServer.h>
+#include <transport/TServerTransport.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace server {
+
+using apache::thrift::TProcessor;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransportFactory;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::ThreadFactory;
+
+class TThreadedServer : public TServer {
+
+ public:
+  class Task;
+
+  TThreadedServer(boost::shared_ptr<TProcessor> processor,
+                  boost::shared_ptr<TServerTransport> serverTransport,
+                  boost::shared_ptr<TTransportFactory> transportFactory,
+                  boost::shared_ptr<TProtocolFactory> protocolFactory);
+
+  TThreadedServer(boost::shared_ptr<TProcessor> processor,
+                  boost::shared_ptr<TServerTransport> serverTransport,
+                  boost::shared_ptr<TTransportFactory> transportFactory,
+                  boost::shared_ptr<TProtocolFactory> protocolFactory,
+                  boost::shared_ptr<ThreadFactory> threadFactory);
+
+  virtual ~TThreadedServer();
+
+  virtual void serve();
+
+  void stop() {
+    stop_ = true;
+    serverTransport_->interrupt();
+  }
+
+ protected:
+  boost::shared_ptr<ThreadFactory> threadFactory_;
+  volatile bool stop_;
+
+  Monitor tasksMonitor_;
+  std::set<Task*> tasks_;
+
+};
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_