blob: 26eb373d898244516fa2b6e54005e82217ae0468 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko35452342006-08-03 19:01:37 +00007#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00008#include "transport/TTransportException.h"
9#include "concurrency/Thread.h"
10#include "concurrency/ThreadManager.h"
11#include <string>
12#include <iostream>
13
14namespace facebook { namespace thrift { namespace server {
15
Mark Slee5ea15f92007-03-05 22:55:59 +000016using boost::shared_ptr;
Marc Slemko35452342006-08-03 19:01:37 +000017using namespace std;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000018using namespace facebook::thrift;
Marc Slemko35452342006-08-03 19:01:37 +000019using namespace facebook::thrift::concurrency;
Mark Slee5ea15f92007-03-05 22:55:59 +000020using namespace facebook::thrift::protocol;;
Marc Slemko35452342006-08-03 19:01:37 +000021using namespace facebook::thrift::transport;
22
Marc Slemko16698852006-08-04 03:16:10 +000023class TThreadPoolServer::Task: public Runnable {
Mark Slee2f6404d2006-10-10 01:37:40 +000024
Marc Slemko16698852006-08-04 03:16:10 +000025public:
Marc Slemko35452342006-08-03 19:01:37 +000026
Marc Slemko16698852006-08-04 03:16:10 +000027 Task(shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000028 shared_ptr<TProtocol> input,
29 shared_ptr<TProtocol> output) :
Mark Slee2f6404d2006-10-10 01:37:40 +000030 processor_(processor),
31 input_(input),
32 output_(output) {
Marc Slemko35452342006-08-03 19:01:37 +000033 }
Marc Slemko16698852006-08-04 03:16:10 +000034
35 ~Task() {}
Marc Slemko35452342006-08-03 19:01:37 +000036
Mark Sleeeb0d0242007-01-25 07:58:55 +000037 void run() {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000038 try {
39 while (processor_->process(input_, output_)) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000040 if (!input_->getTransport()->peek()) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000041 break;
42 }
Marc Slemko35452342006-08-03 19:01:37 +000043 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000044 } catch (TTransportException& ttx) {
Martin Kraemeree341cb2007-02-05 21:40:38 +000045 // This is reasonably expected, client didn't send a full request so just
46 // ignore him
Martin Kraemerac62f942007-03-30 18:41:48 +000047 //cerr << "TThreadPoolServer client died: " << ttx.what() << endl;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000048 } catch (TException& x) {
49 cerr << "TThreadPoolServer exception: " << x.what() << endl;
50 } catch (...) {
51 cerr << "TThreadPoolServer uncaught exception." << endl;
Marc Slemko35452342006-08-03 19:01:37 +000052 }
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000053 input_->getTransport()->close();
54 output_->getTransport()->close();
Marc Slemko35452342006-08-03 19:01:37 +000055 }
Mark Slee2f6404d2006-10-10 01:37:40 +000056
57 private:
58 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +000059 shared_ptr<TProtocol> input_;
60 shared_ptr<TProtocol> output_;
Mark Slee2f6404d2006-10-10 01:37:40 +000061
Marc Slemko35452342006-08-03 19:01:37 +000062};
Marc Slemko16698852006-08-04 03:16:10 +000063
64TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000065 shared_ptr<TServerTransport> serverTransport,
66 shared_ptr<TTransportFactory> transportFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000067 shared_ptr<TProtocolFactory> protocolFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000068 shared_ptr<ThreadManager> threadManager) :
69 TServer(processor, serverTransport, transportFactory, protocolFactory),
Mark Slee6e3f6372007-03-01 22:05:46 +000070 threadManager_(threadManager),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000071 stop_(false), timeout_(0) {}
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000072
73TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
74 shared_ptr<TServerTransport> serverTransport,
75 shared_ptr<TTransportFactory> inputTransportFactory,
76 shared_ptr<TTransportFactory> outputTransportFactory,
77 shared_ptr<TProtocolFactory> inputProtocolFactory,
78 shared_ptr<TProtocolFactory> outputProtocolFactory,
79 shared_ptr<ThreadManager> threadManager) :
80 TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
81 inputProtocolFactory, outputProtocolFactory),
Mark Slee6e3f6372007-03-01 22:05:46 +000082 threadManager_(threadManager),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000083 stop_(false), timeout_(0) {}
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000084
Mark Sleed788b2e2006-09-07 01:26:35 +000085
Marc Slemko16698852006-08-04 03:16:10 +000086TThreadPoolServer::~TThreadPoolServer() {}
87
Mark Slee794993d2006-09-20 01:56:10 +000088void TThreadPoolServer::serve() {
Mark Sleed788b2e2006-09-07 01:26:35 +000089 shared_ptr<TTransport> client;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000090 shared_ptr<TTransport> inputTransport;
91 shared_ptr<TTransport> outputTransport;
92 shared_ptr<TProtocol> inputProtocol;
93 shared_ptr<TProtocol> outputProtocol;
Mark Sleed788b2e2006-09-07 01:26:35 +000094
Marc Slemko16698852006-08-04 03:16:10 +000095 try {
96 // Start the server listening
97 serverTransport_->listen();
98 } catch (TTransportException& ttx) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000099 cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
Marc Slemko16698852006-08-04 03:16:10 +0000100 return;
101 }
102
Mark Slee6e3f6372007-03-01 22:05:46 +0000103 while (!stop_) {
Marc Slemko16698852006-08-04 03:16:10 +0000104 try {
Mark Slee3303f362007-03-05 20:09:37 +0000105 client.reset();
106 inputTransport.reset();
107 outputTransport.reset();
108 inputProtocol.reset();
109 outputProtocol.reset();
110
Mark Sleed788b2e2006-09-07 01:26:35 +0000111 // Fetch client from server
112 client = serverTransport_->accept();
Mark Sleea5a783f2007-03-02 19:41:08 +0000113
Mark Sleed788b2e2006-09-07 01:26:35 +0000114 // Make IO transports
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000115 inputTransport = inputTransportFactory_->getTransport(client);
116 outputTransport = outputTransportFactory_->getTransport(client);
117 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
118 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
Mark Slee4af6ed72006-10-25 19:02:49 +0000119
Mark Sleed788b2e2006-09-07 01:26:35 +0000120 // Add to threadmanager pool
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000121 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)), timeout_);
Mark Sleea5a783f2007-03-02 19:41:08 +0000122
Marc Slemko16698852006-08-04 03:16:10 +0000123 } catch (TTransportException& ttx) {
Mark Slee3303f362007-03-05 20:09:37 +0000124 if (inputTransport != NULL) { inputTransport->close(); }
125 if (outputTransport != NULL) { outputTransport->close(); }
126 if (client != NULL) { client->close(); }
Mark Sleea5a783f2007-03-02 19:41:08 +0000127 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
128 cerr << "TThreadPoolServer: TServerTransport died on accept: " << ttx.what() << endl;
129 }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000130 continue;
131 } catch (TException& tx) {
Mark Slee3303f362007-03-05 20:09:37 +0000132 if (inputTransport != NULL) { inputTransport->close(); }
133 if (outputTransport != NULL) { outputTransport->close(); }
134 if (client != NULL) { client->close(); }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000135 cerr << "TThreadPoolServer: Caught TException: " << tx.what() << endl;
136 continue;
137 } catch (string s) {
Mark Slee3303f362007-03-05 20:09:37 +0000138 if (inputTransport != NULL) { inputTransport->close(); }
139 if (outputTransport != NULL) { outputTransport->close(); }
140 if (client != NULL) { client->close(); }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000141 cerr << "TThreadPoolServer: Unknown exception: " << s << endl;
Marc Slemko16698852006-08-04 03:16:10 +0000142 break;
143 }
144 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000145
146 // If stopped manually, join the existing threads
147 if (stop_) {
148 try {
149 serverTransport_->close();
150 threadManager_->join();
151 } catch (TException &tx) {
152 cerr << "TThreadPoolServer: Exception shutting down: " << tx.what() << endl;
153 }
Mark Sleea5a783f2007-03-02 19:41:08 +0000154 stop_ = false;
Mark Slee6e3f6372007-03-01 22:05:46 +0000155 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000156
Marc Slemko16698852006-08-04 03:16:10 +0000157}
Marc Slemko35452342006-08-03 19:01:37 +0000158
Mark Slee9b82d272007-05-23 05:16:07 +0000159int64_t TThreadPoolServer::getTimeout() const {
160 return timeout_;
161}
162
163void TThreadPoolServer::setTimeout(int64_t value) {
164 timeout_ = value;
165}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000166
Marc Slemko35452342006-08-03 19:01:37 +0000167}}} // facebook::thrift::server