Thrift TTransportFactory model for servers
Summary: Servers need to create bufferedtransports etc. around the transports they get in a user-definable way. So use a factory pattern to allow the user to supply an object to the server that defines this behavior.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664792 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TThreadPoolServer.cc b/lib/cpp/src/server/TThreadPoolServer.cc
index d53d174..1eab53d 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cc
+++ b/lib/cpp/src/server/TThreadPoolServer.cc
@@ -1,5 +1,4 @@
#include "server/TThreadPoolServer.h"
-#include "transport/TBufferedTransport.h"
#include "transport/TTransportException.h"
#include "concurrency/Thread.h"
#include "concurrency/ThreadManager.h"
@@ -15,54 +14,52 @@
class TThreadPoolServer::Task: public Runnable {
shared_ptr<TProcessor> _processor;
- shared_ptr<TTransport> _transport;
- shared_ptr<TBufferedTransport> _bufferedTransport;
+ shared_ptr<TTransport> _input;
+ shared_ptr<TTransport> _output;
public:
Task(shared_ptr<TProcessor> processor,
- shared_ptr<TTransport> transport) :
+ shared_ptr<TTransport> input,
+ shared_ptr<TTransport> output) :
_processor(processor),
- _transport(transport),
- _bufferedTransport(new TBufferedTransport(transport)) {
+ _input(input),
+ _output(output) {
}
~Task() {}
- void run() {
-
+ void run() {
while(true) {
-
try {
- _processor->process(_bufferedTransport);
-
+ _processor->process(_input, _output);
} catch (TTransportException& ttx) {
-
- break;
-
+ break;
} catch(...) {
-
- break;
+ break;
}
}
-
- _bufferedTransport->close();
+ _input->close();
+ _output->close();
}
};
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
- shared_ptr<TServerOptions> options,
- shared_ptr<TServerTransport> serverTransport,
- shared_ptr<ThreadManager> threadManager) :
- TServer(processor, options),
- serverTransport_(serverTransport),
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> transportFactory,
+ shared_ptr<ThreadManager> threadManager,
+ shared_ptr<TServerOptions> options) :
+ TServer(processor, serverTransport, transportFactory, options),
threadManager_(threadManager) {
}
-
+
TThreadPoolServer::~TThreadPoolServer() {}
void TThreadPoolServer::run() {
+ shared_ptr<TTransport> client;
+ pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
+
try {
// Start the server listening
serverTransport_->listen();
@@ -71,15 +68,14 @@
return;
}
- // Fetch client from server
-
- while (true) {
-
+ while (true) {
try {
-
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_,
- shared_ptr<TTransport>(serverTransport_->accept()))));
-
+ // Fetch client from server
+ client = serverTransport_->accept();
+ // Make IO transports
+ io = transportFactory_->getIOTransports(client);
+ // Add to threadmanager pool
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second)));
} catch (TTransportException& ttx) {
break;
}