blob: 7885f0f5abfcdb57bfe41186740b06eed94a17d1 [file] [log] [blame]
Marc Slemko35452342006-08-03 19:01:37 +00001#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00002#include "transport/TTransportException.h"
3#include "concurrency/Thread.h"
4#include "concurrency/ThreadManager.h"
5#include <string>
6#include <iostream>
7
8namespace facebook { namespace thrift { namespace server {
9
10using namespace std;
11using namespace facebook::thrift::concurrency;
12using namespace facebook::thrift::transport;
13
Marc Slemko16698852006-08-04 03:16:10 +000014class TThreadPoolServer::Task: public Runnable {
Mark Slee2f6404d2006-10-10 01:37:40 +000015
Marc Slemko16698852006-08-04 03:16:10 +000016public:
Marc Slemko35452342006-08-03 19:01:37 +000017
Marc Slemko16698852006-08-04 03:16:10 +000018 Task(shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000019 shared_ptr<TProtocol> input,
20 shared_ptr<TProtocol> output) :
Mark Slee2f6404d2006-10-10 01:37:40 +000021 processor_(processor),
22 input_(input),
23 output_(output) {
Marc Slemko35452342006-08-03 19:01:37 +000024 }
Marc Slemko16698852006-08-04 03:16:10 +000025
26 ~Task() {}
Marc Slemko35452342006-08-03 19:01:37 +000027
Mark Sleed788b2e2006-09-07 01:26:35 +000028 void run() {
Marc Slemko16698852006-08-04 03:16:10 +000029 while(true) {
Marc Slemko35452342006-08-03 19:01:37 +000030 try {
Mark Slee2f6404d2006-10-10 01:37:40 +000031 processor_->process(input_, output_);
Marc Slemko35452342006-08-03 19:01:37 +000032 } catch (TTransportException& ttx) {
Mark Sleed788b2e2006-09-07 01:26:35 +000033 break;
Marc Slemko16698852006-08-04 03:16:10 +000034 } catch(...) {
Mark Sleed788b2e2006-09-07 01:26:35 +000035 break;
Marc Slemko35452342006-08-03 19:01:37 +000036 }
37 }
Mark Slee4af6ed72006-10-25 19:02:49 +000038 input_->getInputTransport()->close();
39 output_->getOutputTransport()->close();
Marc Slemko35452342006-08-03 19:01:37 +000040 }
Mark Slee2f6404d2006-10-10 01:37:40 +000041
42 private:
43 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +000044 shared_ptr<TProtocol> input_;
45 shared_ptr<TProtocol> output_;
Mark Slee2f6404d2006-10-10 01:37:40 +000046
Marc Slemko35452342006-08-03 19:01:37 +000047};
Marc Slemko16698852006-08-04 03:16:10 +000048
49TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000050 shared_ptr<TServerTransport> serverTransport,
51 shared_ptr<TTransportFactory> transportFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000052 shared_ptr<TProtocolFactory> protocolFactory,
53
54 shared_ptr<ThreadManager> threadManager) :
55 TServer(processor, serverTransport, transportFactory, protocolFactory),
Marc Slemko16698852006-08-04 03:16:10 +000056 threadManager_(threadManager) {
57}
Mark Sleed788b2e2006-09-07 01:26:35 +000058
Marc Slemko16698852006-08-04 03:16:10 +000059TThreadPoolServer::~TThreadPoolServer() {}
60
Mark Slee794993d2006-09-20 01:56:10 +000061void TThreadPoolServer::serve() {
Marc Slemko16698852006-08-04 03:16:10 +000062
Mark Sleed788b2e2006-09-07 01:26:35 +000063 shared_ptr<TTransport> client;
Mark Slee4af6ed72006-10-25 19:02:49 +000064 pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
65 pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
Mark Sleed788b2e2006-09-07 01:26:35 +000066
Marc Slemko16698852006-08-04 03:16:10 +000067 try {
68 // Start the server listening
69 serverTransport_->listen();
70 } catch (TTransportException& ttx) {
71 cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl;
72 return;
73 }
74
Mark Sleed788b2e2006-09-07 01:26:35 +000075 while (true) {
Marc Slemko16698852006-08-04 03:16:10 +000076 try {
Mark Sleed788b2e2006-09-07 01:26:35 +000077 // Fetch client from server
78 client = serverTransport_->accept();
79 // Make IO transports
Mark Slee4af6ed72006-10-25 19:02:49 +000080 iot = transportFactory_->getIOTransports(client);
81 iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
82
Mark Sleed788b2e2006-09-07 01:26:35 +000083 // Add to threadmanager pool
Mark Slee4af6ed72006-10-25 19:02:49 +000084 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
Marc Slemko16698852006-08-04 03:16:10 +000085 } catch (TTransportException& ttx) {
86 break;
87 }
88 }
89}
Marc Slemko35452342006-08-03 19:01:37 +000090
91}}} // facebook::thrift::server