blob: 2f85c8bbe7d6cd068d71cc843d3681e74c30fca9 [file] [log] [blame]
Marc Slemko35452342006-08-03 19:01:37 +00001#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00002#include "transport/TTransportException.h"
3#include "concurrency/Thread.h"
4#include "concurrency/ThreadManager.h"
5#include <string>
6#include <iostream>
7
8namespace facebook { namespace thrift { namespace server {
9
10using namespace std;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000011using namespace facebook::thrift;
Marc Slemko35452342006-08-03 19:01:37 +000012using namespace facebook::thrift::concurrency;
13using namespace facebook::thrift::transport;
14
Marc Slemko16698852006-08-04 03:16:10 +000015class TThreadPoolServer::Task: public Runnable {
Mark Slee2f6404d2006-10-10 01:37:40 +000016
Marc Slemko16698852006-08-04 03:16:10 +000017public:
Marc Slemko35452342006-08-03 19:01:37 +000018
Marc Slemko16698852006-08-04 03:16:10 +000019 Task(shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000020 shared_ptr<TProtocol> input,
21 shared_ptr<TProtocol> output) :
Mark Slee2f6404d2006-10-10 01:37:40 +000022 processor_(processor),
23 input_(input),
24 output_(output) {
Marc Slemko35452342006-08-03 19:01:37 +000025 }
Marc Slemko16698852006-08-04 03:16:10 +000026
27 ~Task() {}
Marc Slemko35452342006-08-03 19:01:37 +000028
Mark Sleed788b2e2006-09-07 01:26:35 +000029 void run() {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000030 try {
31 while (processor_->process(input_, output_)) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000032 if (!input_->getTransport()->peek()) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000033 break;
34 }
Marc Slemko35452342006-08-03 19:01:37 +000035 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000036 } catch (TTransportException& ttx) {
37 cerr << "TThreadPoolServer client died: " << ttx.what() << endl;
38 } catch (TException& x) {
39 cerr << "TThreadPoolServer exception: " << x.what() << endl;
40 } catch (...) {
41 cerr << "TThreadPoolServer uncaught exception." << endl;
Marc Slemko35452342006-08-03 19:01:37 +000042 }
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000043 input_->getTransport()->close();
44 output_->getTransport()->close();
Marc Slemko35452342006-08-03 19:01:37 +000045 }
Mark Slee2f6404d2006-10-10 01:37:40 +000046
47 private:
48 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +000049 shared_ptr<TProtocol> input_;
50 shared_ptr<TProtocol> output_;
Mark Slee2f6404d2006-10-10 01:37:40 +000051
Marc Slemko35452342006-08-03 19:01:37 +000052};
Marc Slemko16698852006-08-04 03:16:10 +000053
54TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000055 shared_ptr<TServerTransport> serverTransport,
56 shared_ptr<TTransportFactory> transportFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000057 shared_ptr<TProtocolFactory> protocolFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000058 shared_ptr<ThreadManager> threadManager) :
59 TServer(processor, serverTransport, transportFactory, protocolFactory),
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000060 threadManager_(threadManager) {}
61
62TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
63 shared_ptr<TServerTransport> serverTransport,
64 shared_ptr<TTransportFactory> inputTransportFactory,
65 shared_ptr<TTransportFactory> outputTransportFactory,
66 shared_ptr<TProtocolFactory> inputProtocolFactory,
67 shared_ptr<TProtocolFactory> outputProtocolFactory,
68 shared_ptr<ThreadManager> threadManager) :
69 TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
70 inputProtocolFactory, outputProtocolFactory),
71 threadManager_(threadManager) {}
72
Mark Sleed788b2e2006-09-07 01:26:35 +000073
Marc Slemko16698852006-08-04 03:16:10 +000074TThreadPoolServer::~TThreadPoolServer() {}
75
Mark Slee794993d2006-09-20 01:56:10 +000076void TThreadPoolServer::serve() {
Marc Slemko16698852006-08-04 03:16:10 +000077
Mark Sleed788b2e2006-09-07 01:26:35 +000078 shared_ptr<TTransport> client;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000079 shared_ptr<TTransport> inputTransport;
80 shared_ptr<TTransport> outputTransport;
81 shared_ptr<TProtocol> inputProtocol;
82 shared_ptr<TProtocol> outputProtocol;
Mark Sleed788b2e2006-09-07 01:26:35 +000083
Marc Slemko16698852006-08-04 03:16:10 +000084 try {
85 // Start the server listening
86 serverTransport_->listen();
87 } catch (TTransportException& ttx) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000088 cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
Marc Slemko16698852006-08-04 03:16:10 +000089 return;
90 }
91
Mark Sleed788b2e2006-09-07 01:26:35 +000092 while (true) {
Marc Slemko16698852006-08-04 03:16:10 +000093 try {
Mark Sleed788b2e2006-09-07 01:26:35 +000094 // Fetch client from server
95 client = serverTransport_->accept();
96 // Make IO transports
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000097 inputTransport = inputTransportFactory_->getTransport(client);
98 outputTransport = outputTransportFactory_->getTransport(client);
99 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
100 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
Mark Slee4af6ed72006-10-25 19:02:49 +0000101
Mark Sleed788b2e2006-09-07 01:26:35 +0000102 // Add to threadmanager pool
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000103 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
Marc Slemko16698852006-08-04 03:16:10 +0000104 } catch (TTransportException& ttx) {
105 break;
106 }
107 }
108}
Marc Slemko35452342006-08-03 19:01:37 +0000109
110}}} // facebook::thrift::server