blob: 357152bce127b416ce38d346f5ebe98e7d10b2e3 [file] [log] [blame]
Marc Slemko35452342006-08-03 19:01:37 +00001#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00002#include "transport/TTransportException.h"
3#include "concurrency/Thread.h"
4#include "concurrency/ThreadManager.h"
5#include <string>
6#include <iostream>
7
8namespace facebook { namespace thrift { namespace server {
9
10using namespace std;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000011using namespace facebook::thrift;
Marc Slemko35452342006-08-03 19:01:37 +000012using namespace facebook::thrift::concurrency;
13using namespace facebook::thrift::transport;
14
Marc Slemko16698852006-08-04 03:16:10 +000015class TThreadPoolServer::Task: public Runnable {
Mark Slee2f6404d2006-10-10 01:37:40 +000016
Marc Slemko16698852006-08-04 03:16:10 +000017public:
Marc Slemko35452342006-08-03 19:01:37 +000018
Marc Slemko16698852006-08-04 03:16:10 +000019 Task(shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000020 shared_ptr<TProtocol> input,
21 shared_ptr<TProtocol> output) :
Mark Slee2f6404d2006-10-10 01:37:40 +000022 processor_(processor),
23 input_(input),
24 output_(output) {
Marc Slemko35452342006-08-03 19:01:37 +000025 }
Marc Slemko16698852006-08-04 03:16:10 +000026
27 ~Task() {}
Marc Slemko35452342006-08-03 19:01:37 +000028
Mark Sleed788b2e2006-09-07 01:26:35 +000029 void run() {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000030 try {
31 while (processor_->process(input_, output_)) {
32 if (!input_->getInputTransport()->peek()) {
33 break;
34 }
Marc Slemko35452342006-08-03 19:01:37 +000035 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000036 } catch (TTransportException& ttx) {
37 cerr << "TThreadPoolServer client died: " << ttx.what() << endl;
38 } catch (TException& x) {
39 cerr << "TThreadPoolServer exception: " << x.what() << endl;
40 } catch (...) {
41 cerr << "TThreadPoolServer uncaught exception." << endl;
Marc Slemko35452342006-08-03 19:01:37 +000042 }
Mark Slee4af6ed72006-10-25 19:02:49 +000043 input_->getInputTransport()->close();
44 output_->getOutputTransport()->close();
Marc Slemko35452342006-08-03 19:01:37 +000045 }
Mark Slee2f6404d2006-10-10 01:37:40 +000046
47 private:
48 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +000049 shared_ptr<TProtocol> input_;
50 shared_ptr<TProtocol> output_;
Mark Slee2f6404d2006-10-10 01:37:40 +000051
Marc Slemko35452342006-08-03 19:01:37 +000052};
Marc Slemko16698852006-08-04 03:16:10 +000053
54TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000055 shared_ptr<TServerTransport> serverTransport,
56 shared_ptr<TTransportFactory> transportFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000057 shared_ptr<TProtocolFactory> protocolFactory,
58
59 shared_ptr<ThreadManager> threadManager) :
60 TServer(processor, serverTransport, transportFactory, protocolFactory),
Marc Slemko16698852006-08-04 03:16:10 +000061 threadManager_(threadManager) {
62}
Mark Sleed788b2e2006-09-07 01:26:35 +000063
Marc Slemko16698852006-08-04 03:16:10 +000064TThreadPoolServer::~TThreadPoolServer() {}
65
Mark Slee794993d2006-09-20 01:56:10 +000066void TThreadPoolServer::serve() {
Marc Slemko16698852006-08-04 03:16:10 +000067
Mark Sleed788b2e2006-09-07 01:26:35 +000068 shared_ptr<TTransport> client;
Mark Slee4af6ed72006-10-25 19:02:49 +000069 pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
70 pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
Mark Sleed788b2e2006-09-07 01:26:35 +000071
Marc Slemko16698852006-08-04 03:16:10 +000072 try {
73 // Start the server listening
74 serverTransport_->listen();
75 } catch (TTransportException& ttx) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000076 cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
Marc Slemko16698852006-08-04 03:16:10 +000077 return;
78 }
79
Mark Sleed788b2e2006-09-07 01:26:35 +000080 while (true) {
Marc Slemko16698852006-08-04 03:16:10 +000081 try {
Mark Sleed788b2e2006-09-07 01:26:35 +000082 // Fetch client from server
83 client = serverTransport_->accept();
84 // Make IO transports
Mark Slee4af6ed72006-10-25 19:02:49 +000085 iot = transportFactory_->getIOTransports(client);
86 iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
87
Mark Sleed788b2e2006-09-07 01:26:35 +000088 // Add to threadmanager pool
Mark Slee4af6ed72006-10-25 19:02:49 +000089 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
Marc Slemko16698852006-08-04 03:16:10 +000090 } catch (TTransportException& ttx) {
91 break;
92 }
93 }
94}
Marc Slemko35452342006-08-03 19:01:37 +000095
96}}} // facebook::thrift::server