Change Thrift c++ to new protocol wrapping transport model
Summary: Also cleaned up excessive .h/.cpp files into Utils files
Reviewed By: aditya
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664838 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index bc39877..75a209e 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -153,7 +153,7 @@
try {
// Invoke the processor
- server_->getProcessor()->process(inputTransport_, outputTransport_);
+ server_->getProcessor()->process(inputProtocol_, outputProtocol_);
} catch (TTransportException &x) {
fprintf(stderr, "Server::process %s\n", x.getMessage().c_str());
close();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index ec024c0..faa572f 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -1,9 +1,9 @@
#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
-#include "Thrift.h"
-#include "server/TServer.h"
-#include "transport/TMemoryBuffer.h"
+#include <Thrift.h>
+#include <server/TServer.h>
+#include <transport/TTransportUtils.h>
#include <stack>
#include <event.h>
@@ -50,10 +50,8 @@
void handleEvent(int fd, short which);
public:
- TNonblockingServer(shared_ptr<TProcessor> processor,
- shared_ptr<TServerOptions> options,
- int port) :
- TServer(processor, options),
+ TNonblockingServer(shared_ptr<TProcessor> processor, int port) :
+ TServer(processor),
serverSocket_(0),
port_(port),
frameResponses_(true) {}
@@ -157,6 +155,12 @@
// Transport that processor writes to
shared_ptr<TMemoryBuffer> outputTransport_;
+ // Protocol encoder
+ shared_ptr<TProtocol> outputProtocol_;
+
+ // Protocol decoder
+ shared_ptr<TProtocol> inputProtocol_;
+
// Go into read mode
void setRead() {
setFlags(EV_READ | EV_PERSIST);
@@ -190,6 +194,12 @@
inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+ // Create protocol
+ std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ iop = s->getProtocolFactory()->getIOProtocols(inputTransport_, outputTransport_);
+ inputProtocol_ = iop.first;
+ outputProtocol_ = iop.second;
+
init(socket, eventFlags, s);
}
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index 293aa28..4b20432 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -3,7 +3,7 @@
#include <TProcessor.h>
#include <transport/TServerTransport.h>
-#include <transport/TTransportFactory.h>
+#include <protocol/TBinaryProtocol.h>
#include <concurrency/Thread.h>
#include <boost/shared_ptr.hpp>
@@ -14,8 +14,6 @@
using namespace facebook::thrift::transport;
using namespace boost;
-class TServerOptions;
-
/**
* Thrift server.
*
@@ -24,45 +22,61 @@
class TServer : public concurrency::Runnable {
public:
virtual ~TServer() {}
+
virtual void serve() = 0;
// Allows running the server as a Runnable thread
- virtual void run() { serve(); }
+ virtual void run() {
+ serve();
+ }
shared_ptr<TProcessor> getProcessor() {
return processor_;
}
+ shared_ptr<TProtocolFactory> getProtocolFactory() {
+ return protocolFactory_;
+ }
+
protected:
TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<TServerOptions> options) :
+ shared_ptr<TProtocolFactory> protocolFactory) :
processor_(processor),
serverTransport_(serverTransport),
transportFactory_(transportFactory),
- options_(options) {}
+ protocolFactory_(protocolFactory) {}
TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TServerOptions> options) :
- processor_(processor), options_(options) {}
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> transportFactory) :
+ processor_(processor),
+ serverTransport_(serverTransport),
+ transportFactory_(transportFactory) {
+ protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ }
+
+ TServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport) :
+ processor_(processor),
+ serverTransport_(serverTransport) {
+ transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
+ protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ }
+
+ TServer(shared_ptr<TProcessor> processor) :
+ processor_(processor) {
+ transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
+ protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ }
shared_ptr<TProcessor> processor_;
shared_ptr<TServerTransport> serverTransport_;
shared_ptr<TTransportFactory> transportFactory_;
- shared_ptr<TServerOptions> options_;
+ shared_ptr<TProtocolFactory> protocolFactory_;
};
-/**
- * Class to encapsulate all generic server options.
- */
-class TServerOptions {
- public:
- // TODO(mcslee): Fill in getters/setters here
- protected:
- // TODO(mcslee): Fill data members in here
-};
-
}}} // facebook::thrift::server
#endif // #ifndef _THRIFT_SERVER_TSERVER_H_
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 63b6c6b..3eb035e 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -14,7 +14,8 @@
void TSimpleServer::serve() {
shared_ptr<TTransport> client;
- pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
+ pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
+ pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
try {
// Start the server listening
@@ -28,14 +29,15 @@
try {
while (true) {
client = serverTransport_->accept();
- io = transportFactory_->getIOTransports(client);
+ iot = transportFactory_->getIOTransports(client);
+ iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
try {
- while (processor_->process(io.first, io.second)) {}
+ while (processor_->process(iop.first, iop.second)) {}
} catch (TTransportException& ttx) {
cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
}
- io.first->close();
- io.second->close();
+ iot.first->close();
+ iot.second->close();
client->close();
}
} catch (TTransportException& ttx) {
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
index a0d22a7..6470519 100644
--- a/lib/cpp/src/server/TSimpleServer.h
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -19,8 +19,8 @@
TSimpleServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<TServerOptions> options) :
- TServer(processor, serverTransport, transportFactory, options) {}
+ shared_ptr<TProtocolFactory> protocolFactory) :
+ TServer(processor, serverTransport, transportFactory, protocolFactory) {}
~TSimpleServer() {}
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 43f7463..7885f0f 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -16,8 +16,8 @@
public:
Task(shared_ptr<TProcessor> processor,
- shared_ptr<TTransport> input,
- shared_ptr<TTransport> output) :
+ shared_ptr<TProtocol> input,
+ shared_ptr<TProtocol> output) :
processor_(processor),
input_(input),
output_(output) {
@@ -35,23 +35,24 @@
break;
}
}
- input_->close();
- output_->close();
+ input_->getInputTransport()->close();
+ output_->getOutputTransport()->close();
}
private:
shared_ptr<TProcessor> processor_;
- shared_ptr<TTransport> input_;
- shared_ptr<TTransport> output_;
+ shared_ptr<TProtocol> input_;
+ shared_ptr<TProtocol> output_;
};
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<ThreadManager> threadManager,
- shared_ptr<TServerOptions> options) :
- TServer(processor, serverTransport, transportFactory, options),
+ shared_ptr<TProtocolFactory> protocolFactory,
+
+ shared_ptr<ThreadManager> threadManager) :
+ TServer(processor, serverTransport, transportFactory, protocolFactory),
threadManager_(threadManager) {
}
@@ -60,7 +61,8 @@
void TThreadPoolServer::serve() {
shared_ptr<TTransport> client;
- pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
+ pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
+ pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
try {
// Start the server listening
@@ -75,9 +77,11 @@
// Fetch client from server
client = serverTransport_->accept();
// Make IO transports
- io = transportFactory_->getIOTransports(client);
+ iot = transportFactory_->getIOTransports(client);
+ iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second)));
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
} catch (TTransportException& ttx) {
break;
}
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index b8f8b47..5c5899e 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -21,8 +21,8 @@
TThreadPoolServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<ThreadManager> threadManager,
- shared_ptr<TServerOptions> options);
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<ThreadManager> threadManager);
virtual ~TThreadPoolServer();