blob: 18319be76c8fecc16a6b2f32d71eb8b49109a87b [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko35452342006-08-03 19:01:37 +000020#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +000021#include "transport/TTransportException.h"
22#include "concurrency/Thread.h"
23#include "concurrency/ThreadManager.h"
24#include <string>
25#include <iostream>
26
T Jake Lucianib5e62212009-01-31 22:36:20 +000027namespace apache { namespace thrift { namespace server {
Marc Slemko35452342006-08-03 19:01:37 +000028
Mark Slee5ea15f92007-03-05 22:55:59 +000029using boost::shared_ptr;
Marc Slemko35452342006-08-03 19:01:37 +000030using namespace std;
T Jake Lucianib5e62212009-01-31 22:36:20 +000031using namespace apache::thrift;
32using namespace apache::thrift::concurrency;
33using namespace apache::thrift::protocol;;
34using namespace apache::thrift::transport;
Marc Slemko35452342006-08-03 19:01:37 +000035
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000036class TThreadPoolServer::Task : public Runnable {
37
Marc Slemko16698852006-08-04 03:16:10 +000038public:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000039
40 Task(TThreadPoolServer &server,
41 shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000042 shared_ptr<TProtocol> input,
David Reiss23248712010-10-06 17:10:08 +000043 shared_ptr<TProtocol> output,
44 shared_ptr<TTransport> transport) :
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000045 server_(server),
Mark Slee2f6404d2006-10-10 01:37:40 +000046 processor_(processor),
47 input_(input),
David Reiss23248712010-10-06 17:10:08 +000048 output_(output),
49 transport_(transport) {
Marc Slemko35452342006-08-03 19:01:37 +000050 }
Marc Slemko16698852006-08-04 03:16:10 +000051
52 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000053
Mark Sleeeb0d0242007-01-25 07:58:55 +000054 void run() {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000055 boost::shared_ptr<TServerEventHandler> eventHandler =
56 server_.getEventHandler();
David Reiss23248712010-10-06 17:10:08 +000057 void* connectionContext = NULL;
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000058 if (eventHandler != NULL) {
David Reiss23248712010-10-06 17:10:08 +000059 connectionContext = eventHandler->createContext(input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000060 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000061 try {
David Reiss23248712010-10-06 17:10:08 +000062 for (;;) {
63 if (eventHandler != NULL) {
64 eventHandler->processContext(connectionContext, transport_);
65 }
66 if (!processor_->process(input_, output_, connectionContext) ||
67 !input_->getTransport()->peek()) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000068 break;
69 }
Marc Slemko35452342006-08-03 19:01:37 +000070 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000071 } catch (TTransportException& ttx) {
Martin Kraemeree341cb2007-02-05 21:40:38 +000072 // This is reasonably expected, client didn't send a full request so just
73 // ignore him
Mark Slee2e8a8d42008-01-16 00:38:20 +000074 // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
75 // GlobalOutput(errStr.c_str());
Mark Sleeb9ff32a2006-11-16 01:00:24 +000076 } catch (TException& x) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000077 string errStr = string("TThreadPoolServer exception: ") + x.what();
78 GlobalOutput(errStr.c_str());
pfung78ee85c2007-12-13 22:30:47 +000079 } catch (std::exception &x) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000080 string errStr = string("TThreadPoolServer, std::exception: ") + x.what();
81 GlobalOutput(errStr.c_str());
David Reiss58e4d2c2010-03-09 05:19:43 +000082 } catch (...) {
83 GlobalOutput("TThreadPoolServer, unexpected exception in "
84 "TThreadPoolServer::Task::run()");
Marc Slemko35452342006-08-03 19:01:37 +000085 }
pfunge8abada2008-01-05 23:23:53 +000086
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000087 if (eventHandler != NULL) {
David Reiss23248712010-10-06 17:10:08 +000088 eventHandler->deleteContext(connectionContext, input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000089 }
Mark Slee2e8a8d42008-01-16 00:38:20 +000090
91 try {
92 input_->getTransport()->close();
93 } catch (TTransportException& ttx) {
94 string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
95 GlobalOutput(errStr.c_str());
96 }
97 try {
98 output_->getTransport()->close();
99 } catch (TTransportException& ttx) {
100 string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
101 GlobalOutput(errStr.c_str());
102 }
103
Marc Slemko35452342006-08-03 19:01:37 +0000104 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000105
106 private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000107 TServer& server_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000108 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000109 shared_ptr<TProtocol> input_;
110 shared_ptr<TProtocol> output_;
David Reiss23248712010-10-06 17:10:08 +0000111 shared_ptr<TTransport> transport_;
Marc Slemko35452342006-08-03 19:01:37 +0000112};
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000113
Marc Slemko16698852006-08-04 03:16:10 +0000114TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +0000115 shared_ptr<TServerTransport> serverTransport,
116 shared_ptr<TTransportFactory> transportFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +0000117 shared_ptr<TProtocolFactory> protocolFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +0000118 shared_ptr<ThreadManager> threadManager) :
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000119 TServer(processor, serverTransport, transportFactory, protocolFactory),
Mark Slee6e3f6372007-03-01 22:05:46 +0000120 threadManager_(threadManager),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000121 stop_(false), timeout_(0) {}
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000122
123TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
124 shared_ptr<TServerTransport> serverTransport,
125 shared_ptr<TTransportFactory> inputTransportFactory,
126 shared_ptr<TTransportFactory> outputTransportFactory,
127 shared_ptr<TProtocolFactory> inputProtocolFactory,
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000128 shared_ptr<TProtocolFactory> outputProtocolFactory,
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000129 shared_ptr<ThreadManager> threadManager) :
130 TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
131 inputProtocolFactory, outputProtocolFactory),
Mark Slee6e3f6372007-03-01 22:05:46 +0000132 threadManager_(threadManager),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000133 stop_(false), timeout_(0) {}
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000134
Mark Sleed788b2e2006-09-07 01:26:35 +0000135
Marc Slemko16698852006-08-04 03:16:10 +0000136TThreadPoolServer::~TThreadPoolServer() {}
137
Mark Slee794993d2006-09-20 01:56:10 +0000138void TThreadPoolServer::serve() {
Mark Sleed788b2e2006-09-07 01:26:35 +0000139 shared_ptr<TTransport> client;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000140 shared_ptr<TTransport> inputTransport;
141 shared_ptr<TTransport> outputTransport;
142 shared_ptr<TProtocol> inputProtocol;
143 shared_ptr<TProtocol> outputProtocol;
Mark Sleed788b2e2006-09-07 01:26:35 +0000144
Marc Slemko16698852006-08-04 03:16:10 +0000145 try {
146 // Start the server listening
147 serverTransport_->listen();
148 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000149 string errStr = string("TThreadPoolServer::run() listen(): ") + ttx.what();
150 GlobalOutput(errStr.c_str());
Marc Slemko16698852006-08-04 03:16:10 +0000151 return;
152 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000153
154 // Run the preServe event
155 if (eventHandler_ != NULL) {
156 eventHandler_->preServe();
157 }
158
Mark Slee6e3f6372007-03-01 22:05:46 +0000159 while (!stop_) {
Marc Slemko16698852006-08-04 03:16:10 +0000160 try {
Mark Slee3303f362007-03-05 20:09:37 +0000161 client.reset();
162 inputTransport.reset();
163 outputTransport.reset();
164 inputProtocol.reset();
165 outputProtocol.reset();
166
Mark Sleed788b2e2006-09-07 01:26:35 +0000167 // Fetch client from server
168 client = serverTransport_->accept();
Mark Sleea5a783f2007-03-02 19:41:08 +0000169
Mark Sleed788b2e2006-09-07 01:26:35 +0000170 // Make IO transports
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000171 inputTransport = inputTransportFactory_->getTransport(client);
172 outputTransport = outputTransportFactory_->getTransport(client);
173 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
174 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
Mark Slee4af6ed72006-10-25 19:02:49 +0000175
Mark Sleed788b2e2006-09-07 01:26:35 +0000176 // Add to threadmanager pool
David Reiss23248712010-10-06 17:10:08 +0000177 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_);
Mark Sleea5a783f2007-03-02 19:41:08 +0000178
Marc Slemko16698852006-08-04 03:16:10 +0000179 } catch (TTransportException& ttx) {
Mark Slee3303f362007-03-05 20:09:37 +0000180 if (inputTransport != NULL) { inputTransport->close(); }
181 if (outputTransport != NULL) { outputTransport->close(); }
182 if (client != NULL) { client->close(); }
Mark Sleea5a783f2007-03-02 19:41:08 +0000183 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000184 string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
185 GlobalOutput(errStr.c_str());
Mark Sleea5a783f2007-03-02 19:41:08 +0000186 }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000187 continue;
188 } catch (TException& tx) {
Mark Slee3303f362007-03-05 20:09:37 +0000189 if (inputTransport != NULL) { inputTransport->close(); }
190 if (outputTransport != NULL) { outputTransport->close(); }
191 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000192 string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
193 GlobalOutput(errStr.c_str());
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000194 continue;
195 } catch (string s) {
Mark Slee3303f362007-03-05 20:09:37 +0000196 if (inputTransport != NULL) { inputTransport->close(); }
197 if (outputTransport != NULL) { outputTransport->close(); }
198 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000199 string errStr = "TThreadPoolServer: Unknown exception: " + s;
200 GlobalOutput(errStr.c_str());
Marc Slemko16698852006-08-04 03:16:10 +0000201 break;
202 }
203 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000204
205 // If stopped manually, join the existing threads
206 if (stop_) {
207 try {
208 serverTransport_->close();
209 threadManager_->join();
210 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000211 string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
212 GlobalOutput(errStr.c_str());
Mark Slee6e3f6372007-03-01 22:05:46 +0000213 }
Mark Sleea5a783f2007-03-02 19:41:08 +0000214 stop_ = false;
Mark Slee6e3f6372007-03-01 22:05:46 +0000215 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000216
Marc Slemko16698852006-08-04 03:16:10 +0000217}
Marc Slemko35452342006-08-03 19:01:37 +0000218
Mark Slee9b82d272007-05-23 05:16:07 +0000219int64_t TThreadPoolServer::getTimeout() const {
220 return timeout_;
221}
222
223void TThreadPoolServer::setTimeout(int64_t value) {
224 timeout_ = value;
225}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000226
T Jake Lucianib5e62212009-01-31 22:36:20 +0000227}}} // apache::thrift::server