blob: 49240447ef46c553d4f254876ab6de5f594b5f4a [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko35452342006-08-03 19:01:37 +00007#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00008#include "transport/TTransportException.h"
9#include "concurrency/Thread.h"
10#include "concurrency/ThreadManager.h"
11#include <string>
12#include <iostream>
13
T Jake Lucianib5e62212009-01-31 22:36:20 +000014namespace apache { namespace thrift { namespace server {
Marc Slemko35452342006-08-03 19:01:37 +000015
Mark Slee5ea15f92007-03-05 22:55:59 +000016using boost::shared_ptr;
Marc Slemko35452342006-08-03 19:01:37 +000017using namespace std;
T Jake Lucianib5e62212009-01-31 22:36:20 +000018using namespace apache::thrift;
19using namespace apache::thrift::concurrency;
20using namespace apache::thrift::protocol;;
21using namespace apache::thrift::transport;
Marc Slemko35452342006-08-03 19:01:37 +000022
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000023class TThreadPoolServer::Task : public Runnable {
24
Marc Slemko16698852006-08-04 03:16:10 +000025public:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000026
27 Task(TThreadPoolServer &server,
28 shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000029 shared_ptr<TProtocol> input,
30 shared_ptr<TProtocol> output) :
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000031 server_(server),
Mark Slee2f6404d2006-10-10 01:37:40 +000032 processor_(processor),
33 input_(input),
34 output_(output) {
Marc Slemko35452342006-08-03 19:01:37 +000035 }
Marc Slemko16698852006-08-04 03:16:10 +000036
37 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000038
Mark Sleeeb0d0242007-01-25 07:58:55 +000039 void run() {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000040 boost::shared_ptr<TServerEventHandler> eventHandler =
41 server_.getEventHandler();
42 if (eventHandler != NULL) {
43 eventHandler->clientBegin(input_, output_);
44 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000045 try {
46 while (processor_->process(input_, output_)) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000047 if (!input_->getTransport()->peek()) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000048 break;
49 }
Marc Slemko35452342006-08-03 19:01:37 +000050 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000051 } catch (TTransportException& ttx) {
Martin Kraemeree341cb2007-02-05 21:40:38 +000052 // This is reasonably expected, client didn't send a full request so just
53 // ignore him
Mark Slee2e8a8d42008-01-16 00:38:20 +000054 // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
55 // GlobalOutput(errStr.c_str());
Mark Sleeb9ff32a2006-11-16 01:00:24 +000056 } catch (TException& x) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000057 string errStr = string("TThreadPoolServer exception: ") + x.what();
58 GlobalOutput(errStr.c_str());
pfung78ee85c2007-12-13 22:30:47 +000059 } catch (std::exception &x) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000060 string errStr = string("TThreadPoolServer, std::exception: ") + x.what();
61 GlobalOutput(errStr.c_str());
Marc Slemko35452342006-08-03 19:01:37 +000062 }
pfunge8abada2008-01-05 23:23:53 +000063
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000064 if (eventHandler != NULL) {
65 eventHandler->clientEnd(input_, output_);
66 }
Mark Slee2e8a8d42008-01-16 00:38:20 +000067
68 try {
69 input_->getTransport()->close();
70 } catch (TTransportException& ttx) {
71 string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
72 GlobalOutput(errStr.c_str());
73 }
74 try {
75 output_->getTransport()->close();
76 } catch (TTransportException& ttx) {
77 string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
78 GlobalOutput(errStr.c_str());
79 }
80
Marc Slemko35452342006-08-03 19:01:37 +000081 }
Mark Slee2f6404d2006-10-10 01:37:40 +000082
83 private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000084 TServer& server_;
Mark Slee2f6404d2006-10-10 01:37:40 +000085 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +000086 shared_ptr<TProtocol> input_;
87 shared_ptr<TProtocol> output_;
Mark Slee2f6404d2006-10-10 01:37:40 +000088
Marc Slemko35452342006-08-03 19:01:37 +000089};
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000090
Marc Slemko16698852006-08-04 03:16:10 +000091TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000092 shared_ptr<TServerTransport> serverTransport,
93 shared_ptr<TTransportFactory> transportFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000094 shared_ptr<TProtocolFactory> protocolFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000095 shared_ptr<ThreadManager> threadManager) :
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000096 TServer(processor, serverTransport, transportFactory, protocolFactory),
Mark Slee6e3f6372007-03-01 22:05:46 +000097 threadManager_(threadManager),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000098 stop_(false), timeout_(0) {}
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000099
100TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
101 shared_ptr<TServerTransport> serverTransport,
102 shared_ptr<TTransportFactory> inputTransportFactory,
103 shared_ptr<TTransportFactory> outputTransportFactory,
104 shared_ptr<TProtocolFactory> inputProtocolFactory,
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000105 shared_ptr<TProtocolFactory> outputProtocolFactory,
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000106 shared_ptr<ThreadManager> threadManager) :
107 TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
108 inputProtocolFactory, outputProtocolFactory),
Mark Slee6e3f6372007-03-01 22:05:46 +0000109 threadManager_(threadManager),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000110 stop_(false), timeout_(0) {}
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000111
Mark Sleed788b2e2006-09-07 01:26:35 +0000112
Marc Slemko16698852006-08-04 03:16:10 +0000113TThreadPoolServer::~TThreadPoolServer() {}
114
Mark Slee794993d2006-09-20 01:56:10 +0000115void TThreadPoolServer::serve() {
Mark Sleed788b2e2006-09-07 01:26:35 +0000116 shared_ptr<TTransport> client;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000117 shared_ptr<TTransport> inputTransport;
118 shared_ptr<TTransport> outputTransport;
119 shared_ptr<TProtocol> inputProtocol;
120 shared_ptr<TProtocol> outputProtocol;
Mark Sleed788b2e2006-09-07 01:26:35 +0000121
Marc Slemko16698852006-08-04 03:16:10 +0000122 try {
123 // Start the server listening
124 serverTransport_->listen();
125 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000126 string errStr = string("TThreadPoolServer::run() listen(): ") + ttx.what();
127 GlobalOutput(errStr.c_str());
Marc Slemko16698852006-08-04 03:16:10 +0000128 return;
129 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000130
131 // Run the preServe event
132 if (eventHandler_ != NULL) {
133 eventHandler_->preServe();
134 }
135
Mark Slee6e3f6372007-03-01 22:05:46 +0000136 while (!stop_) {
Marc Slemko16698852006-08-04 03:16:10 +0000137 try {
Mark Slee3303f362007-03-05 20:09:37 +0000138 client.reset();
139 inputTransport.reset();
140 outputTransport.reset();
141 inputProtocol.reset();
142 outputProtocol.reset();
143
Mark Sleed788b2e2006-09-07 01:26:35 +0000144 // Fetch client from server
145 client = serverTransport_->accept();
Mark Sleea5a783f2007-03-02 19:41:08 +0000146
Mark Sleed788b2e2006-09-07 01:26:35 +0000147 // Make IO transports
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000148 inputTransport = inputTransportFactory_->getTransport(client);
149 outputTransport = outputTransportFactory_->getTransport(client);
150 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
151 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
Mark Slee4af6ed72006-10-25 19:02:49 +0000152
Mark Sleed788b2e2006-09-07 01:26:35 +0000153 // Add to threadmanager pool
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000154 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
Mark Sleea5a783f2007-03-02 19:41:08 +0000155
Marc Slemko16698852006-08-04 03:16:10 +0000156 } catch (TTransportException& ttx) {
Mark Slee3303f362007-03-05 20:09:37 +0000157 if (inputTransport != NULL) { inputTransport->close(); }
158 if (outputTransport != NULL) { outputTransport->close(); }
159 if (client != NULL) { client->close(); }
Mark Sleea5a783f2007-03-02 19:41:08 +0000160 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000161 string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
162 GlobalOutput(errStr.c_str());
Mark Sleea5a783f2007-03-02 19:41:08 +0000163 }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000164 continue;
165 } catch (TException& tx) {
Mark Slee3303f362007-03-05 20:09:37 +0000166 if (inputTransport != NULL) { inputTransport->close(); }
167 if (outputTransport != NULL) { outputTransport->close(); }
168 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000169 string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
170 GlobalOutput(errStr.c_str());
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000171 continue;
172 } catch (string s) {
Mark Slee3303f362007-03-05 20:09:37 +0000173 if (inputTransport != NULL) { inputTransport->close(); }
174 if (outputTransport != NULL) { outputTransport->close(); }
175 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000176 string errStr = "TThreadPoolServer: Unknown exception: " + s;
177 GlobalOutput(errStr.c_str());
Marc Slemko16698852006-08-04 03:16:10 +0000178 break;
179 }
180 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000181
182 // If stopped manually, join the existing threads
183 if (stop_) {
184 try {
185 serverTransport_->close();
186 threadManager_->join();
187 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000188 string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
189 GlobalOutput(errStr.c_str());
Mark Slee6e3f6372007-03-01 22:05:46 +0000190 }
Mark Sleea5a783f2007-03-02 19:41:08 +0000191 stop_ = false;
Mark Slee6e3f6372007-03-01 22:05:46 +0000192 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000193
Marc Slemko16698852006-08-04 03:16:10 +0000194}
Marc Slemko35452342006-08-03 19:01:37 +0000195
Mark Slee9b82d272007-05-23 05:16:07 +0000196int64_t TThreadPoolServer::getTimeout() const {
197 return timeout_;
198}
199
200void TThreadPoolServer::setTimeout(int64_t value) {
201 timeout_ = value;
202}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000203
T Jake Lucianib5e62212009-01-31 22:36:20 +0000204}}} // apache::thrift::server