Change Thrift c++ to new protocol wrapping transport model

Summary: Also cleaned up excessive .h/.cpp files into Utils files

Reviewed By: aditya


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664838 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index bc39877..75a209e 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -153,7 +153,7 @@
 
     try {
       // Invoke the processor
-      server_->getProcessor()->process(inputTransport_, outputTransport_);
+      server_->getProcessor()->process(inputProtocol_, outputProtocol_);
     } catch (TTransportException &x) {
       fprintf(stderr, "Server::process %s\n", x.getMessage().c_str());
       close();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index ec024c0..faa572f 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -1,9 +1,9 @@
 #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
 #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
 
-#include "Thrift.h"
-#include "server/TServer.h"
-#include "transport/TMemoryBuffer.h"
+#include <Thrift.h>
+#include <server/TServer.h>
+#include <transport/TTransportUtils.h>
 #include <stack>
 #include <event.h>
 
@@ -50,10 +50,8 @@
   void handleEvent(int fd, short which);
 
  public:
-  TNonblockingServer(shared_ptr<TProcessor> processor,
-                     shared_ptr<TServerOptions> options,
-                     int port) :
-    TServer(processor, options),
+  TNonblockingServer(shared_ptr<TProcessor> processor, int port) :
+    TServer(processor),
     serverSocket_(0),
     port_(port),
     frameResponses_(true) {}
@@ -157,6 +155,12 @@
   // Transport that processor writes to
   shared_ptr<TMemoryBuffer> outputTransport_;
 
+  // Protocol encoder
+  shared_ptr<TProtocol> outputProtocol_;
+
+  // Protocol decoder
+  shared_ptr<TProtocol> inputProtocol_;
+  
   // Go into read mode
   void setRead() {
     setFlags(EV_READ | EV_PERSIST);
@@ -190,6 +194,12 @@
     inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
     outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
     
+    // Create protocol
+    std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+    iop = s->getProtocolFactory()->getIOProtocols(inputTransport_, outputTransport_);
+    inputProtocol_ = iop.first;
+    outputProtocol_ = iop.second;
+
     init(socket, eventFlags, s);
   }
 
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index 293aa28..4b20432 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -3,7 +3,7 @@
 
 #include <TProcessor.h>
 #include <transport/TServerTransport.h>
-#include <transport/TTransportFactory.h>
+#include <protocol/TBinaryProtocol.h>
 #include <concurrency/Thread.h>
 
 #include <boost/shared_ptr.hpp>
@@ -14,8 +14,6 @@
 using namespace facebook::thrift::transport;
 using namespace boost;
 
-class TServerOptions;
-
 /**
  * Thrift server.
  *
@@ -24,45 +22,61 @@
 class TServer : public concurrency::Runnable {
 public:
   virtual ~TServer() {}
+
   virtual void serve() = 0;
 
   // Allows running the server as a Runnable thread
-  virtual void run() { serve(); }
+  virtual void run() {
+    serve();
+  }
   
   shared_ptr<TProcessor> getProcessor() {
     return processor_;
   }
   
+  shared_ptr<TProtocolFactory> getProtocolFactory() {
+    return protocolFactory_;
+  }
+
 protected:
   TServer(shared_ptr<TProcessor> processor,
           shared_ptr<TServerTransport> serverTransport,
           shared_ptr<TTransportFactory> transportFactory,
-          shared_ptr<TServerOptions> options) :
+          shared_ptr<TProtocolFactory> protocolFactory) :
     processor_(processor),
     serverTransport_(serverTransport),
     transportFactory_(transportFactory),
-    options_(options) {}
+    protocolFactory_(protocolFactory) {}
 
   TServer(shared_ptr<TProcessor> processor,
-          shared_ptr<TServerOptions> options) :
-    processor_(processor), options_(options) {}
+          shared_ptr<TServerTransport> serverTransport,
+          shared_ptr<TTransportFactory> transportFactory) :
+    processor_(processor),
+    serverTransport_(serverTransport),
+    transportFactory_(transportFactory) {
+  protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ }
+
+  TServer(shared_ptr<TProcessor> processor,
+          shared_ptr<TServerTransport> serverTransport) :
+    processor_(processor),
+    serverTransport_(serverTransport) {
+    transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
+    protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+  }
+
+  TServer(shared_ptr<TProcessor> processor) :
+    processor_(processor) {
+    transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
+    protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+  }
  
   shared_ptr<TProcessor> processor_;
   shared_ptr<TServerTransport> serverTransport_;
   shared_ptr<TTransportFactory> transportFactory_;
-  shared_ptr<TServerOptions> options_;
+  shared_ptr<TProtocolFactory> protocolFactory_;
 };
   
-/**
- * Class to encapsulate all generic server options.
- */
-class TServerOptions {
- public:
-  // TODO(mcslee): Fill in getters/setters here
- protected:
-  // TODO(mcslee): Fill data members in here
-};
-
 }}} // facebook::thrift::server
 
 #endif // #ifndef _THRIFT_SERVER_TSERVER_H_
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 63b6c6b..3eb035e 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -14,7 +14,8 @@
 void TSimpleServer::serve() {
 
   shared_ptr<TTransport> client;
-  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
+  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
+  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
 
   try {
     // Start the server listening
@@ -28,14 +29,15 @@
   try {
     while (true) {
       client = serverTransport_->accept();
-      io = transportFactory_->getIOTransports(client);
+      iot = transportFactory_->getIOTransports(client);
+      iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
       try {
-        while (processor_->process(io.first, io.second)) {}
+        while (processor_->process(iop.first, iop.second)) {}
       } catch (TTransportException& ttx) {
         cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
       }
-      io.first->close();
-      io.second->close();
+      iot.first->close();
+      iot.second->close();
       client->close();
     }
   } catch (TTransportException& ttx) {
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
index a0d22a7..6470519 100644
--- a/lib/cpp/src/server/TSimpleServer.h
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -19,8 +19,8 @@
   TSimpleServer(shared_ptr<TProcessor> processor,
                 shared_ptr<TServerTransport> serverTransport,
                 shared_ptr<TTransportFactory> transportFactory,
-                shared_ptr<TServerOptions> options) :
-    TServer(processor, serverTransport, transportFactory, options) {}
+                shared_ptr<TProtocolFactory> protocolFactory) :
+    TServer(processor, serverTransport, transportFactory, protocolFactory) {}
     
   ~TSimpleServer() {}
 
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 43f7463..7885f0f 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -16,8 +16,8 @@
 public:
     
   Task(shared_ptr<TProcessor> processor,
-       shared_ptr<TTransport> input,
-       shared_ptr<TTransport> output) :
+       shared_ptr<TProtocol> input,
+       shared_ptr<TProtocol> output) :
     processor_(processor),
     input_(input),
     output_(output) {
@@ -35,23 +35,24 @@
         break;
       }
     }
-    input_->close();
-    output_->close();
+    input_->getInputTransport()->close();
+    output_->getOutputTransport()->close();
   }
 
  private:
   shared_ptr<TProcessor> processor_;
-  shared_ptr<TTransport> input_;
-  shared_ptr<TTransport> output_;
+  shared_ptr<TProtocol> input_;
+  shared_ptr<TProtocol> output_;
 
 };
   
 TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
                                      shared_ptr<TServerTransport> serverTransport,
                                      shared_ptr<TTransportFactory> transportFactory,
-                                     shared_ptr<ThreadManager> threadManager,
-                                     shared_ptr<TServerOptions> options) :
-  TServer(processor, serverTransport, transportFactory, options), 
+                                     shared_ptr<TProtocolFactory> protocolFactory,
+
+                                     shared_ptr<ThreadManager> threadManager) :
+  TServer(processor, serverTransport, transportFactory, protocolFactory), 
   threadManager_(threadManager) {
 }
 
@@ -60,7 +61,8 @@
 void TThreadPoolServer::serve() {
 
   shared_ptr<TTransport> client;
-  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
+  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
+  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
 
   try {
     // Start the server listening
@@ -75,9 +77,11 @@
       // Fetch client from server
       client = serverTransport_->accept();
       // Make IO transports
-      io = transportFactory_->getIOTransports(client);
+      iot = transportFactory_->getIOTransports(client);
+      iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+
       // Add to threadmanager pool
-      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second)));
+      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
     } catch (TTransportException& ttx) {
       break;
     }
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index b8f8b47..5c5899e 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -21,8 +21,8 @@
   TThreadPoolServer(shared_ptr<TProcessor> processor,
 		    shared_ptr<TServerTransport> serverTransport,
 		    shared_ptr<TTransportFactory> transportFactory,
-		    shared_ptr<ThreadManager> threadManager,
-		    shared_ptr<TServerOptions> options);
+                    shared_ptr<TProtocolFactory> protocolFactory,
+		    shared_ptr<ThreadManager> threadManager);
 
   virtual ~TThreadPoolServer();