blob: 0530d8dd8357d5b226572b1c7c79e55ed252fa2f [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Konrad Grochowski9be4e682013-06-22 22:03:31 +020020#include <thrift/thrift-config.h>
Roger Meier12d70532011-12-14 23:35:28 +000021
Roger Meier49ff8b12012-04-13 09:12:31 +000022#include <thrift/server/TThreadPoolServer.h>
23#include <thrift/transport/TTransportException.h>
24#include <thrift/concurrency/Thread.h>
25#include <thrift/concurrency/ThreadManager.h>
Marc Slemko35452342006-08-03 19:01:37 +000026#include <string>
27#include <iostream>
28
Konrad Grochowski16a23a62014-11-13 15:33:38 +010029namespace apache {
30namespace thrift {
31namespace server {
Marc Slemko35452342006-08-03 19:01:37 +000032
Mark Slee5ea15f92007-03-05 22:55:59 +000033using boost::shared_ptr;
Marc Slemko35452342006-08-03 19:01:37 +000034using namespace std;
T Jake Lucianib5e62212009-01-31 22:36:20 +000035using namespace apache::thrift;
36using namespace apache::thrift::concurrency;
Roger Meier0069cc42010-10-13 18:10:18 +000037using namespace apache::thrift::protocol;
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using namespace apache::thrift::transport;
Marc Slemko35452342006-08-03 19:01:37 +000039
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000040class TThreadPoolServer::Task : public Runnable {
41
Marc Slemko16698852006-08-04 03:16:10 +000042public:
Konrad Grochowski16a23a62014-11-13 15:33:38 +010043 Task(TThreadPoolServer& server,
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000044 shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000045 shared_ptr<TProtocol> input,
David Reiss23248712010-10-06 17:10:08 +000046 shared_ptr<TProtocol> output,
Konrad Grochowski16a23a62014-11-13 15:33:38 +010047 shared_ptr<TTransport> transport)
48 : server_(server),
49 processor_(processor),
50 input_(input),
51 output_(output),
52 transport_(transport) {}
Marc Slemko16698852006-08-04 03:16:10 +000053
54 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000055
Mark Sleeeb0d0242007-01-25 07:58:55 +000056 void run() {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010057 boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
David Reiss23248712010-10-06 17:10:08 +000058 void* connectionContext = NULL;
Roger Meier72957452013-06-29 00:28:50 +020059 if (eventHandler) {
David Reiss23248712010-10-06 17:10:08 +000060 connectionContext = eventHandler->createContext(input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000061 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000062 try {
David Reiss23248712010-10-06 17:10:08 +000063 for (;;) {
Roger Meier72957452013-06-29 00:28:50 +020064 if (eventHandler) {
David Reiss23248712010-10-06 17:10:08 +000065 eventHandler->processContext(connectionContext, transport_);
66 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +010067 if (!processor_->process(input_, output_, connectionContext)
68 || !input_->getTransport()->peek()) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000069 break;
70 }
Marc Slemko35452342006-08-03 19:01:37 +000071 }
Roger Meierb69d24d2012-10-04 18:02:15 +000072 } catch (const TTransportException&) {
Martin Kraemeree341cb2007-02-05 21:40:38 +000073 // This is reasonably expected, client didn't send a full request so just
74 // ignore him
Mark Slee2e8a8d42008-01-16 00:38:20 +000075 // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
76 // GlobalOutput(errStr.c_str());
Bryan Duxbury1e987582011-08-25 17:33:03 +000077 } catch (const std::exception& x) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010078 GlobalOutput.printf("TThreadPoolServer exception %s: %s", typeid(x).name(), x.what());
David Reiss58e4d2c2010-03-09 05:19:43 +000079 } catch (...) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +010080 GlobalOutput(
81 "TThreadPoolServer, unexpected exception in "
82 "TThreadPoolServer::Task::run()");
Marc Slemko35452342006-08-03 19:01:37 +000083 }
pfunge8abada2008-01-05 23:23:53 +000084
Roger Meier72957452013-06-29 00:28:50 +020085 if (eventHandler) {
David Reiss23248712010-10-06 17:10:08 +000086 eventHandler->deleteContext(connectionContext, input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000087 }
Mark Slee2e8a8d42008-01-16 00:38:20 +000088
89 try {
90 input_->getTransport()->close();
91 } catch (TTransportException& ttx) {
92 string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
93 GlobalOutput(errStr.c_str());
94 }
95 try {
96 output_->getTransport()->close();
97 } catch (TTransportException& ttx) {
98 string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
99 GlobalOutput(errStr.c_str());
100 }
Marc Slemko35452342006-08-03 19:01:37 +0000101 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000102
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100103private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000104 TServer& server_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000106 shared_ptr<TProtocol> input_;
107 shared_ptr<TProtocol> output_;
David Reiss23248712010-10-06 17:10:08 +0000108 shared_ptr<TTransport> transport_;
Marc Slemko35452342006-08-03 19:01:37 +0000109};
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000110
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100111TThreadPoolServer::~TThreadPoolServer() {
112}
Marc Slemko16698852006-08-04 03:16:10 +0000113
Mark Slee794993d2006-09-20 01:56:10 +0000114void TThreadPoolServer::serve() {
Mark Sleed788b2e2006-09-07 01:26:35 +0000115 shared_ptr<TTransport> client;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000116 shared_ptr<TTransport> inputTransport;
117 shared_ptr<TTransport> outputTransport;
118 shared_ptr<TProtocol> inputProtocol;
119 shared_ptr<TProtocol> outputProtocol;
Mark Sleed788b2e2006-09-07 01:26:35 +0000120
Roger Meierd8f50f32012-04-11 21:48:56 +0000121 // Start the server listening
122 serverTransport_->listen();
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000123
124 // Run the preServe event
Roger Meier72957452013-06-29 00:28:50 +0200125 if (eventHandler_) {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000126 eventHandler_->preServe();
127 }
128
Mark Slee6e3f6372007-03-01 22:05:46 +0000129 while (!stop_) {
Marc Slemko16698852006-08-04 03:16:10 +0000130 try {
Mark Slee3303f362007-03-05 20:09:37 +0000131 client.reset();
132 inputTransport.reset();
133 outputTransport.reset();
134 inputProtocol.reset();
135 outputProtocol.reset();
136
Mark Sleed788b2e2006-09-07 01:26:35 +0000137 // Fetch client from server
138 client = serverTransport_->accept();
Mark Sleea5a783f2007-03-02 19:41:08 +0000139
Mark Sleed788b2e2006-09-07 01:26:35 +0000140 // Make IO transports
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000141 inputTransport = inputTransportFactory_->getTransport(client);
142 outputTransport = outputTransportFactory_->getTransport(client);
143 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
144 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
Mark Slee4af6ed72006-10-25 19:02:49 +0000145
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100146 shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000147
Mark Sleed788b2e2006-09-07 01:26:35 +0000148 // Add to threadmanager pool
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100149 shared_ptr<TThreadPoolServer::Task> task(
150 new TThreadPoolServer::Task(*this, processor, inputProtocol, outputProtocol, client));
Roger Meier5ed5e8b2013-05-06 00:21:04 +0200151 threadManager_->add(task, timeout_, taskExpiration_);
Mark Sleea5a783f2007-03-02 19:41:08 +0000152
Marc Slemko16698852006-08-04 03:16:10 +0000153 } catch (TTransportException& ttx) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100154 if (inputTransport) {
155 inputTransport->close();
156 }
157 if (outputTransport) {
158 outputTransport->close();
159 }
160 if (client) {
161 client->close();
162 }
Mark Sleea5a783f2007-03-02 19:41:08 +0000163 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000164 string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
165 GlobalOutput(errStr.c_str());
Mark Sleea5a783f2007-03-02 19:41:08 +0000166 }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000167 continue;
168 } catch (TException& tx) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100169 if (inputTransport) {
170 inputTransport->close();
171 }
172 if (outputTransport) {
173 outputTransport->close();
174 }
175 if (client) {
176 client->close();
177 }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000178 string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
179 GlobalOutput(errStr.c_str());
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000180 continue;
181 } catch (string s) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100182 if (inputTransport) {
183 inputTransport->close();
184 }
185 if (outputTransport) {
186 outputTransport->close();
187 }
188 if (client) {
189 client->close();
190 }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000191 string errStr = "TThreadPoolServer: Unknown exception: " + s;
192 GlobalOutput(errStr.c_str());
Marc Slemko16698852006-08-04 03:16:10 +0000193 break;
194 }
195 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000196
197 // If stopped manually, join the existing threads
198 if (stop_) {
199 try {
200 serverTransport_->close();
201 threadManager_->join();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100202 } catch (TException& tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000203 string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
204 GlobalOutput(errStr.c_str());
Mark Slee6e3f6372007-03-01 22:05:46 +0000205 }
Mark Sleea5a783f2007-03-02 19:41:08 +0000206 stop_ = false;
Mark Slee6e3f6372007-03-01 22:05:46 +0000207 }
Marc Slemko16698852006-08-04 03:16:10 +0000208}
Marc Slemko35452342006-08-03 19:01:37 +0000209
Mark Slee9b82d272007-05-23 05:16:07 +0000210int64_t TThreadPoolServer::getTimeout() const {
211 return timeout_;
212}
213
214void TThreadPoolServer::setTimeout(int64_t value) {
215 timeout_ = value;
216}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000217
Roger Meier5ed5e8b2013-05-06 00:21:04 +0200218int64_t TThreadPoolServer::getTaskExpiration() const {
219 return taskExpiration_;
220}
221
222void TThreadPoolServer::setTaskExpiration(int64_t value) {
223 taskExpiration_ = value;
224}
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100225}
226}
227} // apache::thrift::server