blob: 8514d1877780ac18183d47b4de18cb20d68a3332 [file] [log] [blame]
Marc Slemko35452342006-08-03 19:01:37 +00001#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00002#include "transport/TTransportException.h"
3#include "concurrency/Thread.h"
4#include "concurrency/ThreadManager.h"
5#include <string>
6#include <iostream>
7
8namespace facebook { namespace thrift { namespace server {
9
10using namespace std;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000011using namespace facebook::thrift;
Marc Slemko35452342006-08-03 19:01:37 +000012using namespace facebook::thrift::concurrency;
13using namespace facebook::thrift::transport;
14
Marc Slemko16698852006-08-04 03:16:10 +000015class TThreadPoolServer::Task: public Runnable {
Mark Slee2f6404d2006-10-10 01:37:40 +000016
Marc Slemko16698852006-08-04 03:16:10 +000017public:
Marc Slemko35452342006-08-03 19:01:37 +000018
Marc Slemko16698852006-08-04 03:16:10 +000019 Task(shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000020 shared_ptr<TProtocol> input,
21 shared_ptr<TProtocol> output) :
Mark Slee2f6404d2006-10-10 01:37:40 +000022 processor_(processor),
23 input_(input),
24 output_(output) {
Marc Slemko35452342006-08-03 19:01:37 +000025 }
Marc Slemko16698852006-08-04 03:16:10 +000026
27 ~Task() {}
Marc Slemko35452342006-08-03 19:01:37 +000028
Mark Sleeeb0d0242007-01-25 07:58:55 +000029 void run() {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000030 try {
31 while (processor_->process(input_, output_)) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000032 if (!input_->getTransport()->peek()) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000033 break;
34 }
Marc Slemko35452342006-08-03 19:01:37 +000035 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000036 } catch (TTransportException& ttx) {
Martin Kraemeree341cb2007-02-05 21:40:38 +000037 // This is reasonably expected, client didn't send a full request so just
38 // ignore him
39 //cerr << "TThreadPoolServer client died: " << ttx.what() << endl;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000040 } catch (TException& x) {
41 cerr << "TThreadPoolServer exception: " << x.what() << endl;
42 } catch (...) {
43 cerr << "TThreadPoolServer uncaught exception." << endl;
Marc Slemko35452342006-08-03 19:01:37 +000044 }
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000045 input_->getTransport()->close();
46 output_->getTransport()->close();
Marc Slemko35452342006-08-03 19:01:37 +000047 }
Mark Slee2f6404d2006-10-10 01:37:40 +000048
49 private:
50 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +000051 shared_ptr<TProtocol> input_;
52 shared_ptr<TProtocol> output_;
Mark Slee2f6404d2006-10-10 01:37:40 +000053
Marc Slemko35452342006-08-03 19:01:37 +000054};
Marc Slemko16698852006-08-04 03:16:10 +000055
56TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000057 shared_ptr<TServerTransport> serverTransport,
58 shared_ptr<TTransportFactory> transportFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000059 shared_ptr<TProtocolFactory> protocolFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000060 shared_ptr<ThreadManager> threadManager) :
61 TServer(processor, serverTransport, transportFactory, protocolFactory),
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000062 threadManager_(threadManager) {}
63
64TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
65 shared_ptr<TServerTransport> serverTransport,
66 shared_ptr<TTransportFactory> inputTransportFactory,
67 shared_ptr<TTransportFactory> outputTransportFactory,
68 shared_ptr<TProtocolFactory> inputProtocolFactory,
69 shared_ptr<TProtocolFactory> outputProtocolFactory,
70 shared_ptr<ThreadManager> threadManager) :
71 TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
72 inputProtocolFactory, outputProtocolFactory),
73 threadManager_(threadManager) {}
74
Mark Sleed788b2e2006-09-07 01:26:35 +000075
Marc Slemko16698852006-08-04 03:16:10 +000076TThreadPoolServer::~TThreadPoolServer() {}
77
Mark Slee794993d2006-09-20 01:56:10 +000078void TThreadPoolServer::serve() {
Marc Slemko16698852006-08-04 03:16:10 +000079
Mark Sleed788b2e2006-09-07 01:26:35 +000080 shared_ptr<TTransport> client;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000081 shared_ptr<TTransport> inputTransport;
82 shared_ptr<TTransport> outputTransport;
83 shared_ptr<TProtocol> inputProtocol;
84 shared_ptr<TProtocol> outputProtocol;
Mark Sleed788b2e2006-09-07 01:26:35 +000085
Marc Slemko16698852006-08-04 03:16:10 +000086 try {
87 // Start the server listening
88 serverTransport_->listen();
89 } catch (TTransportException& ttx) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000090 cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
Marc Slemko16698852006-08-04 03:16:10 +000091 return;
92 }
93
Mark Sleed788b2e2006-09-07 01:26:35 +000094 while (true) {
Marc Slemko16698852006-08-04 03:16:10 +000095 try {
Mark Sleed788b2e2006-09-07 01:26:35 +000096 // Fetch client from server
97 client = serverTransport_->accept();
98 // Make IO transports
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000099 inputTransport = inputTransportFactory_->getTransport(client);
100 outputTransport = outputTransportFactory_->getTransport(client);
101 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
102 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
Mark Slee4af6ed72006-10-25 19:02:49 +0000103
Mark Sleed788b2e2006-09-07 01:26:35 +0000104 // Add to threadmanager pool
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000105 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
Marc Slemko16698852006-08-04 03:16:10 +0000106 } catch (TTransportException& ttx) {
107 break;
108 }
109 }
110}
Marc Slemko35452342006-08-03 19:01:37 +0000111
112}}} // facebook::thrift::server