-- Change concept of protocol and transport factory
Summary:
- Transport factories now wrap around one transport
- Protocol factories now wrap around one transport (as opposed to a pair of input/output
transports)
- TServer now takes input/output transport and protocol factories
The motivation for this change is that you could concievably want to use a different protocol or
transport for input and output. An example is that incoming data is encoded using binary protocol
but outgoing data is encrypted XML (with encryption being done on the transport level).
This change should be mostly backwards compatible because the TServer classes have constructors
that take a transport factory and use that for both the input and transport factories. The only
change might be for anyone who is using the C++ client code directly i.e. instantiating
TBinaryProtocol() directly because the constructor now only accepts one transport.
Reviewed By: Slee
Test Plan: Everything compiles (for both thrift and search).
Notes:
I am going to make the same changes in all the supported languages after this...
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664940 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.cpp b/lib/cpp/src/protocol/TBinaryProtocol.cpp
index 481012c..05de125 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.cpp
+++ b/lib/cpp/src/protocol/TBinaryProtocol.cpp
@@ -79,30 +79,30 @@
uint32_t TBinaryProtocol::writeBool(const bool value) {
uint8_t tmp = value ? 1 : 0;
- outputTransport_->write(&tmp, 1);
+ trans_->write(&tmp, 1);
return 1;
}
uint32_t TBinaryProtocol::writeByte(const int8_t byte) {
- outputTransport_->write((uint8_t*)&byte, 1);
+ trans_->write((uint8_t*)&byte, 1);
return 1;
}
uint32_t TBinaryProtocol::writeI16(const int16_t i16) {
int16_t net = (int16_t)htons(i16);
- outputTransport_->write((uint8_t*)&net, 2);
+ trans_->write((uint8_t*)&net, 2);
return 2;
}
uint32_t TBinaryProtocol::writeI32(const int32_t i32) {
int32_t net = (int32_t)htonl(i32);
- outputTransport_->write((uint8_t*)&net, 4);
+ trans_->write((uint8_t*)&net, 4);
return 4;
}
uint32_t TBinaryProtocol::writeI64(const int64_t i64) {
int64_t net = (int64_t)htonll(i64);
- outputTransport_->write((uint8_t*)&net, 8);
+ trans_->write((uint8_t*)&net, 8);
return 8;
}
@@ -117,14 +117,14 @@
b[5] = d[2];
b[6] = d[1];
b[7] = d[0];
- outputTransport_->write((uint8_t*)b, 8);
+ trans_->write((uint8_t*)b, 8);
return 8;
}
uint32_t TBinaryProtocol::writeString(const string& str) {
uint32_t result = writeI32(str.size());
- outputTransport_->write((uint8_t*)str.data(), str.size());
+ trans_->write((uint8_t*)str.data(), str.size());
return result + str.size();
}
@@ -233,21 +233,21 @@
uint32_t TBinaryProtocol::readBool(bool& value) {
uint8_t b[1];
- inputTransport_->readAll(b, 1);
+ trans_->readAll(b, 1);
value = *(int8_t*)b != 0;
return 1;
}
uint32_t TBinaryProtocol::readByte(int8_t& byte) {
uint8_t b[1];
- inputTransport_->readAll(b, 1);
+ trans_->readAll(b, 1);
byte = *(int8_t*)b;
return 1;
}
uint32_t TBinaryProtocol::readI16(int16_t& i16) {
uint8_t b[2];
- inputTransport_->readAll(b, 2);
+ trans_->readAll(b, 2);
i16 = *(int16_t*)b;
i16 = (int16_t)ntohs(i16);
return 2;
@@ -255,7 +255,7 @@
uint32_t TBinaryProtocol::readI32(int32_t& i32) {
uint8_t b[4];
- inputTransport_->readAll(b, 4);
+ trans_->readAll(b, 4);
i32 = *(int32_t*)b;
i32 = (int32_t)ntohl(i32);
return 4;
@@ -263,7 +263,7 @@
uint32_t TBinaryProtocol::readI64(int64_t& i64) {
uint8_t b[8];
- inputTransport_->readAll(b, 8);
+ trans_->readAll(b, 8);
i64 = *(int64_t*)b;
i64 = (int64_t)ntohll(i64);
return 8;
@@ -272,7 +272,7 @@
uint32_t TBinaryProtocol::readDouble(double& dub) {
uint8_t b[8];
uint8_t d[8];
- inputTransport_->readAll(b, 8);
+ trans_->readAll(b, 8);
d[0] = b[7];
d[1] = b[6];
d[2] = b[5];
@@ -294,7 +294,7 @@
// Use the heap here to prevent stack overflow for v. large strings
uint8_t *b = new uint8_t[size];
- inputTransport_->readAll(b, size);
+ trans_->readAll(b, size);
str = string((char*)b, size);
delete [] b;
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.h b/lib/cpp/src/protocol/TBinaryProtocol.h
index de9a836..c0a1837 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.h
+++ b/lib/cpp/src/protocol/TBinaryProtocol.h
@@ -17,8 +17,8 @@
*/
class TBinaryProtocol : public TProtocol {
public:
- TBinaryProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
- TProtocol(in, out) {}
+ TBinaryProtocol(shared_ptr<TTransport> trans) :
+ TProtocol(trans) {}
~TBinaryProtocol() {}
@@ -137,9 +137,8 @@
virtual ~TBinaryProtocolFactory() {}
- std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) {
- boost::shared_ptr<TProtocol> prot(new TBinaryProtocol(in, out));
- return std::make_pair(prot, prot);
+ boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TProtocol>(new TBinaryProtocol(trans));
}
};
diff --git a/lib/cpp/src/protocol/TProtocol.h b/lib/cpp/src/protocol/TProtocol.h
index 8077b27..39ee39e 100644
--- a/lib/cpp/src/protocol/TProtocol.h
+++ b/lib/cpp/src/protocol/TProtocol.h
@@ -282,22 +282,24 @@
}
}
- shared_ptr<TTransport> getInputTransport() {
- return inputTransport_;
+ inline shared_ptr<TTransport> getTransport() {
+ return trans_;
}
- shared_ptr<TTransport> getOutputTransport() {
- return outputTransport_;
+ // TODO: remove these two calls, they are for backwards
+ // compatibility
+ inline shared_ptr<TTransport> getInputTransport() {
+ return trans_;
+ }
+ inline shared_ptr<TTransport> getOutputTransport() {
+ return trans_;
}
protected:
- TProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
- inputTransport_(in),
- outputTransport_(out) {}
+ TProtocol(shared_ptr<TTransport> trans):
+ trans_(trans) {}
- shared_ptr<TTransport> inputTransport_;
-
- shared_ptr<TTransport> outputTransport_;
+ shared_ptr<TTransport> trans_;
private:
TProtocol() {}
@@ -312,7 +314,7 @@
virtual ~TProtocolFactory() {}
- virtual std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) = 0;
+ virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) = 0;
};
}}} // facebook::thrift::protocol
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 5755514..25cd7b5 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -28,17 +28,13 @@
// Set flags, which also registers the event
setFlags(eventFlags);
- // TODO: this needs to be replaced by the new version of TTransportFactory
- factoryInputTransport_ = (s->getTransportFactory()->getIOTransports(inputTransport_)).first;
- // factoryOutputTransport_ = (transportFactory->getIOTransports(outputTransport_)).first;
+ // get input/transports
+ factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
+ factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
// Create protocol
- std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
- iop = s->getProtocolFactory()->getIOProtocols(factoryInputTransport_ ,
- outputTransport_);
- inputProtocol_ = iop.first;
- outputProtocol_ = iop.second;
-
+ inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+ outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
}
void TConnection::workSocket() {
@@ -353,7 +349,7 @@
// close any factory produced transports
factoryInputTransport_->close();
- // factoryOutputTransport_->close();
+ factoryOutputTransport_->close();
// Give this object back to the server that owns it
server_->returnConnection(this);
@@ -366,7 +362,7 @@
TConnection* TNonblockingServer::createConnection(int socket, short flags) {
// Check the stack
if (connectionStack_.empty()) {
- return new TConnection(socket, flags, this, this->getTransportFactory());
+ return new TConnection(socket, flags, this);
} else {
TConnection* result = connectionStack_.top();
connectionStack_.pop();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 4ea3fa3..08ecec6 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -60,19 +60,31 @@
TNonblockingServer(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
int port) :
- TServer(processor, protocolFactory),
+ TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {}
+ frameResponses_(true) {
+ setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(protocolFactory);
+ setOutputProtocolFactory(protocolFactory);
+ }
- TNonblockingServer(shared_ptr<TProcessor> processor,
- shared_ptr<TProtocolFactory> protocolFactory,
- shared_ptr<TTransportFactory> transportFactory,
+ TNonblockingServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
int port) :
- TServer(processor, protocolFactory, transportFactory),
+ TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {}
+ frameResponses_(true) {
+ setInputTransportFactory(inputTransportFactory);
+ setOutputTransportFactory(outputTransportFactory);
+ setInputProtocolFactory(inputProtocolFactory);
+ setOutputProtocolFactory(outputProtocolFactory);
+ }
~TNonblockingServer() {}
@@ -175,13 +187,13 @@
// extra transport generated by transport factory (e.g. BufferedRouterTransport)
shared_ptr<TTransport> factoryInputTransport_;
- // shared_ptr<TTransport> factoryOutputTransport_;
-
- // Protocol encoder
- shared_ptr<TProtocol> outputProtocol_;
+ shared_ptr<TTransport> factoryOutputTransport_;
// Protocol decoder
shared_ptr<TProtocol> inputProtocol_;
+
+ // Protocol encoder
+ shared_ptr<TProtocol> outputProtocol_;
// Go into read mode
void setRead() {
@@ -205,8 +217,7 @@
public:
// Constructor
- TConnection(int socket, short eventFlags, TNonblockingServer *s,
- shared_ptr<TTransportFactory> transportFactory) {
+ TConnection(int socket, short eventFlags, TNonblockingServer *s) {
readBuffer_ = (uint8_t*)malloc(1024);
if (readBuffer_ == NULL) {
throw new facebook::thrift::TException("Out of memory.");
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index b9f4fca..ad9c291 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -38,73 +38,92 @@
return serverTransport_;
}
- shared_ptr<TTransportFactory> getTransportFactory() {
- return transportFactory_;
+ shared_ptr<TTransportFactory> getInputTransportFactory() {
+ return inputTransportFactory_;
+ }
+
+ shared_ptr<TTransportFactory> getOutputTransportFactory() {
+ return outputTransportFactory_;
}
- shared_ptr<TProtocolFactory> getProtocolFactory() {
- return protocolFactory_;
+ shared_ptr<TProtocolFactory> getInputProtocolFactory() {
+ return inputProtocolFactory_;
}
+ shared_ptr<TProtocolFactory> getOutputProtocolFactory() {
+ return outputProtocolFactory_;
+ }
protected:
+ TServer(shared_ptr<TProcessor> processor):
+ processor_(processor) {
+ setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ }
+
+ TServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport):
+ processor_(processor),
+ serverTransport_(serverTransport) {
+ setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ }
+
TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<TProtocolFactory> protocolFactory) :
+ shared_ptr<TProtocolFactory> protocolFactory):
processor_(processor),
serverTransport_(serverTransport),
- transportFactory_(transportFactory),
- protocolFactory_(protocolFactory) {}
+ inputTransportFactory_(transportFactory),
+ outputTransportFactory_(transportFactory),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory) {}
TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
- shared_ptr<TTransportFactory> transportFactory) :
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory):
processor_(processor),
serverTransport_(serverTransport),
- transportFactory_(transportFactory) {
- protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
- }
+ inputTransportFactory_(inputTransportFactory),
+ outputTransportFactory_(outputTransportFactory),
+ inputProtocolFactory_(inputProtocolFactory),
+ outputProtocolFactory_(outputProtocolFactory) {}
- TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TServerTransport> serverTransport) :
- processor_(processor),
- serverTransport_(serverTransport) {
- transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
- protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
- }
-
- TServer(shared_ptr<TProcessor> processor) :
- processor_(processor) {
- transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
- protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
- }
-
- TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TTransportFactory> transportFactory) :
- processor_(processor),
- transportFactory_(transportFactory) {
- protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
- }
-
- TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TProtocolFactory> protocolFactory) :
- processor_(processor) {
- transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
- protocolFactory_ = protocolFactory;
- }
-
- TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TProtocolFactory> protocolFactory,
- shared_ptr<TTransportFactory> transportFactory):
- processor_(processor),
- transportFactory_(transportFactory),
- protocolFactory_(protocolFactory) {}
+ // Class variables
shared_ptr<TProcessor> processor_;
shared_ptr<TServerTransport> serverTransport_;
- shared_ptr<TTransportFactory> transportFactory_;
- shared_ptr<TProtocolFactory> protocolFactory_;
+
+ shared_ptr<TTransportFactory> inputTransportFactory_;
+ shared_ptr<TTransportFactory> outputTransportFactory_;
+
+ shared_ptr<TProtocolFactory> inputProtocolFactory_;
+ shared_ptr<TProtocolFactory> outputProtocolFactory_;
+
+ void setInputTransportFactory(shared_ptr<TTransportFactory> inputTransportFactory) {
+ inputTransportFactory_ = inputTransportFactory;
+ }
+
+ void setOutputTransportFactory(shared_ptr<TTransportFactory> outputTransportFactory) {
+ outputTransportFactory_ = outputTransportFactory;
+ }
+
+ void setInputProtocolFactory(shared_ptr<TProtocolFactory> inputProtocolFactory) {
+ inputProtocolFactory_ = inputProtocolFactory;
+ }
+
+ void setOutputProtocolFactory(shared_ptr<TProtocolFactory> outputProtocolFactory) {
+ outputProtocolFactory_ = outputProtocolFactory;
+ }
+
};
}}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 8d62bf9..1e7e7fb 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -14,8 +14,10 @@
void TSimpleServer::serve() {
shared_ptr<TTransport> client;
- pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
- pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
try {
// Start the server listening
@@ -29,12 +31,14 @@
try {
while (true) {
client = serverTransport_->accept();
- iot = transportFactory_->getIOTransports(client);
- iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
try {
- while (processor_->process(iop.first, iop.second)) {
+ while (processor_->process(inputProtocol, outputProtocol)) {
// Peek ahead, is the remote side closed?
- if (!iot.first->peek()) {
+ if (!inputTransport->peek()) {
break;
}
}
@@ -43,8 +47,8 @@
} catch (TException& tx) {
cerr << "TSimpleServer exception: " << tx.what() << endl;
}
- iot.first->close();
- iot.second->close();
+ inputTransport->close();
+ outputTransport->close();
client->close();
}
} catch (TTransportException& ttx) {
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
index 6470519..cf3ed10 100644
--- a/lib/cpp/src/server/TSimpleServer.h
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -21,6 +21,16 @@
shared_ptr<TTransportFactory> transportFactory,
shared_ptr<TProtocolFactory> protocolFactory) :
TServer(processor, serverTransport, transportFactory, protocolFactory) {}
+
+ TSimpleServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory):
+ TServer(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory) {}
~TSimpleServer() {}
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 357152b..2f85c8b 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -29,7 +29,7 @@
void run() {
try {
while (processor_->process(input_, output_)) {
- if (!input_->getInputTransport()->peek()) {
+ if (!input_->getTransport()->peek()) {
break;
}
}
@@ -40,8 +40,8 @@
} catch (...) {
cerr << "TThreadPoolServer uncaught exception." << endl;
}
- input_->getInputTransport()->close();
- output_->getOutputTransport()->close();
+ input_->getTransport()->close();
+ output_->getTransport()->close();
}
private:
@@ -55,19 +55,31 @@
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
shared_ptr<TProtocolFactory> protocolFactory,
-
shared_ptr<ThreadManager> threadManager) :
TServer(processor, serverTransport, transportFactory, protocolFactory),
- threadManager_(threadManager) {
-}
+ threadManager_(threadManager) {}
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<ThreadManager> threadManager) :
+ TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ threadManager_(threadManager) {}
+
TThreadPoolServer::~TThreadPoolServer() {}
void TThreadPoolServer::serve() {
shared_ptr<TTransport> client;
- pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
- pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
try {
// Start the server listening
@@ -82,11 +94,13 @@
// Fetch client from server
client = serverTransport_->accept();
// Make IO transports
- iot = transportFactory_->getIOTransports(client);
- iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
} catch (TTransportException& ttx) {
break;
}
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index 5c5899e..aabd686 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -24,6 +24,14 @@
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<ThreadManager> threadManager);
+ TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<ThreadManager> threadManager);
+
virtual ~TThreadPoolServer();
virtual void serve();
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h
index b2e4d4f..add3107 100644
--- a/lib/cpp/src/transport/TBufferedRouterTransport.h
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.h
@@ -117,9 +117,8 @@
/**
* Wraps the transport into a buffered one.
*/
- virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
- boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
- return std::make_pair(buffered, buffered);
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TTransport>(new TBufferedRouterTransport(trans, rTrans_));
}
private:
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
index 5e4ae6b..02dd89c 100644
--- a/lib/cpp/src/transport/TTransport.h
+++ b/lib/cpp/src/transport/TTransport.h
@@ -155,8 +155,8 @@
/**
* Default implementation does nothing, just returns the transport given.
*/
- virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
- return std::make_pair(trans, trans);
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return trans;
}
};
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index 427cc0e..79a137c 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -122,9 +122,8 @@
/**
* Wraps the transport into a buffered one.
*/
- virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
- boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
- return std::make_pair(buffered, buffered);
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
}
};