Adding threaded server to Thrift
Summary: Spawns a new thread for each client connection
Reviewed By: marc
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664965 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index e7e7148..7eb87d9 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -14,8 +14,7 @@
using namespace boost;
class TThreadPoolServer : public TServer {
-public:
-
+ public:
class Task;
TThreadPoolServer(shared_ptr<TProcessor> processor,
@@ -36,7 +35,7 @@
virtual void serve();
-protected:
+ protected:
shared_ptr<ThreadManager> threadManager_;
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
new file mode 100644
index 0000000..a06117e
--- /dev/null
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -0,0 +1,110 @@
+#include "server/TThreadedServer.h"
+#include "transport/TTransportException.h"
+#include "concurrency/PosixThreadFactory.h"
+
+#include <string>
+#include <iostream>
+#include <pthread.h>
+#include <unistd.h>
+
+namespace facebook { namespace thrift { namespace server {
+
+using namespace std;
+using namespace facebook::thrift;
+using namespace facebook::thrift::transport;
+using namespace facebook::thrift::concurrency;
+
+class TThreadedServer::Task: public Runnable {
+
+public:
+
+ Task(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocol> input,
+ shared_ptr<TProtocol> output) :
+ processor_(processor),
+ input_(input),
+ output_(output) {
+ }
+
+ ~Task() {}
+
+ void run() {
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ cerr << "TThreadedServer client died: " << ttx.what() << endl;
+ } catch (TException& x) {
+ cerr << "TThreadedServer exception: " << x.what() << endl;
+ } catch (...) {
+ cerr << "TThreadedServer uncaught exception." << endl;
+ }
+ input_->getTransport()->close();
+ output_->getTransport()->close();
+ }
+
+ private:
+ shared_ptr<TProcessor> processor_;
+ shared_ptr<TProtocol> input_;
+ shared_ptr<TProtocol> output_;
+
+};
+
+
+TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> transportFactory,
+ shared_ptr<TProtocolFactory> protocolFactory):
+ TServer(processor, serverTransport, transportFactory, protocolFactory) {
+ threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+}
+
+TThreadedServer::~TThreadedServer() {}
+
+void TThreadedServer::serve() {
+
+ shared_ptr<TTransport> client;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
+
+ try {
+ // Start the server listening
+ serverTransport_->listen();
+ } catch (TTransportException& ttx) {
+ cerr << "TThreadedServer::run() listen(): " << ttx.what() << endl;
+ return;
+ }
+
+ while (true) {
+ try {
+ // Fetch client from server
+ client = serverTransport_->accept();
+ // Make IO transports
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+
+ TThreadedServer::Task* t = new TThreadedServer::Task(processor_,
+ inputProtocol,
+ outputProtocol);
+
+ // Create a thread for this task
+ shared_ptr<Thread> thread =
+ shared_ptr<Thread>(threadFactory_->newThread(shared_ptr<Runnable>(t)));
+
+ // Start the thread!
+ thread->start();
+
+ } catch (TTransportException& ttx) {
+ break;
+ }
+ }
+}
+
+}}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TThreadedServer.h b/lib/cpp/src/server/TThreadedServer.h
new file mode 100644
index 0000000..fb7ef64
--- /dev/null
+++ b/lib/cpp/src/server/TThreadedServer.h
@@ -0,0 +1,37 @@
+#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
+#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
+
+#include <server/TServer.h>
+#include <transport/TServerTransport.h>
+#include <concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace server {
+
+using namespace facebook::thrift::transport;
+using namespace facebook::thrift::concurrency;
+using namespace boost;
+
+class TThreadedServer : public TServer {
+
+ public:
+ class Task;
+
+ TThreadedServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> transportFactory,
+ shared_ptr<TProtocolFactory> protocolFactory);
+
+ virtual ~TThreadedServer();
+
+ virtual void serve();
+
+ protected:
+ shared_ptr<ThreadFactory> threadFactory_;
+
+};
+
+}}} // facebook::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_