Adding threaded server to Thrift

Summary: Spawns a new thread for each client connection

Reviewed By: marc


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664965 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index e7e7148..7eb87d9 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -14,8 +14,7 @@
 using namespace boost;
 
 class TThreadPoolServer : public TServer {
-public:
-
+ public:
   class Task;
   
   TThreadPoolServer(shared_ptr<TProcessor> processor,
@@ -36,7 +35,7 @@
 
   virtual void serve();
 
-protected:
+ protected:
 
   shared_ptr<ThreadManager> threadManager_;
   
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
new file mode 100644
index 0000000..a06117e
--- /dev/null
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -0,0 +1,110 @@
+#include "server/TThreadedServer.h"
+#include "transport/TTransportException.h"
+#include "concurrency/PosixThreadFactory.h"
+
+#include <string>
+#include <iostream>
+#include <pthread.h>
+#include <unistd.h>
+
+namespace facebook { namespace thrift { namespace server { 
+
+using namespace std;
+using namespace facebook::thrift;
+using namespace facebook::thrift::transport;
+using namespace facebook::thrift::concurrency;
+
+class TThreadedServer::Task: public Runnable {
+       
+public:
+    
+  Task(shared_ptr<TProcessor> processor,
+       shared_ptr<TProtocol> input,
+       shared_ptr<TProtocol> output) :
+    processor_(processor),
+    input_(input),
+    output_(output) {
+  }
+
+  ~Task() {}
+    
+  void run() {
+    try {
+      while (processor_->process(input_, output_)) {
+        if (!input_->getTransport()->peek()) {
+          break;
+        }
+      }
+    } catch (TTransportException& ttx) {
+      cerr << "TThreadedServer client died: " << ttx.what() << endl;
+    } catch (TException& x) {
+      cerr << "TThreadedServer exception: " << x.what() << endl;
+    } catch (...) {
+      cerr << "TThreadedServer uncaught exception." << endl;
+    }
+    input_->getTransport()->close();
+    output_->getTransport()->close();
+  }
+
+ private:
+  shared_ptr<TProcessor> processor_;
+  shared_ptr<TProtocol> input_;
+  shared_ptr<TProtocol> output_;
+
+};
+
+
+TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
+                                 shared_ptr<TServerTransport> serverTransport,
+                                 shared_ptr<TTransportFactory> transportFactory,
+                                 shared_ptr<TProtocolFactory> protocolFactory):
+  TServer(processor, serverTransport, transportFactory, protocolFactory) {
+  threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+}
+
+TThreadedServer::~TThreadedServer() {}
+
+void TThreadedServer::serve() {
+
+  shared_ptr<TTransport> client;
+  shared_ptr<TTransport> inputTransport;
+  shared_ptr<TTransport> outputTransport;
+  shared_ptr<TProtocol> inputProtocol;
+  shared_ptr<TProtocol> outputProtocol;
+
+  try {
+    // Start the server listening
+    serverTransport_->listen();
+  } catch (TTransportException& ttx) {
+    cerr << "TThreadedServer::run() listen(): " << ttx.what() << endl;
+    return;
+  }
+
+  while (true) {   
+    try {
+      // Fetch client from server
+      client = serverTransport_->accept();
+      // Make IO transports
+      inputTransport = inputTransportFactory_->getTransport(client);
+      outputTransport = outputTransportFactory_->getTransport(client);
+      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+
+      TThreadedServer::Task* t = new TThreadedServer::Task(processor_, 
+                                                           inputProtocol,
+                                                           outputProtocol);
+
+      // Create a thread for this task
+      shared_ptr<Thread> thread =
+        shared_ptr<Thread>(threadFactory_->newThread(shared_ptr<Runnable>(t)));
+      
+      // Start the thread!
+      thread->start();
+
+    } catch (TTransportException& ttx) {
+      break;
+    }
+  }
+}
+
+}}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TThreadedServer.h b/lib/cpp/src/server/TThreadedServer.h
new file mode 100644
index 0000000..fb7ef64
--- /dev/null
+++ b/lib/cpp/src/server/TThreadedServer.h
@@ -0,0 +1,37 @@
+#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
+#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
+
+#include <server/TServer.h>
+#include <transport/TServerTransport.h>
+#include <concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace server { 
+
+using namespace facebook::thrift::transport;
+using namespace facebook::thrift::concurrency;
+using namespace boost;
+
+class TThreadedServer : public TServer {
+
+ public:
+  class Task;
+  
+  TThreadedServer(shared_ptr<TProcessor> processor,
+                  shared_ptr<TServerTransport> serverTransport,
+                  shared_ptr<TTransportFactory> transportFactory,
+                  shared_ptr<TProtocolFactory> protocolFactory);
+
+  virtual ~TThreadedServer();
+
+  virtual void serve();
+
+ protected:
+  shared_ptr<ThreadFactory> threadFactory_;
+
+};
+
+}}} // facebook::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_