blob: fe58ff3e92414ea8c39aaa83fb97771bc402293e [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier12d70532011-12-14 23:35:28 +000020#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
Marc Slemko35452342006-08-03 19:01:37 +000024#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +000025#include "transport/TTransportException.h"
26#include "concurrency/Thread.h"
27#include "concurrency/ThreadManager.h"
28#include <string>
29#include <iostream>
30
T Jake Lucianib5e62212009-01-31 22:36:20 +000031namespace apache { namespace thrift { namespace server {
Marc Slemko35452342006-08-03 19:01:37 +000032
Mark Slee5ea15f92007-03-05 22:55:59 +000033using boost::shared_ptr;
Marc Slemko35452342006-08-03 19:01:37 +000034using namespace std;
T Jake Lucianib5e62212009-01-31 22:36:20 +000035using namespace apache::thrift;
36using namespace apache::thrift::concurrency;
Roger Meier0069cc42010-10-13 18:10:18 +000037using namespace apache::thrift::protocol;
T Jake Lucianib5e62212009-01-31 22:36:20 +000038using namespace apache::thrift::transport;
Marc Slemko35452342006-08-03 19:01:37 +000039
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000040class TThreadPoolServer::Task : public Runnable {
41
Marc Slemko16698852006-08-04 03:16:10 +000042public:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000043
44 Task(TThreadPoolServer &server,
45 shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000046 shared_ptr<TProtocol> input,
David Reiss23248712010-10-06 17:10:08 +000047 shared_ptr<TProtocol> output,
48 shared_ptr<TTransport> transport) :
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000049 server_(server),
Mark Slee2f6404d2006-10-10 01:37:40 +000050 processor_(processor),
51 input_(input),
David Reiss23248712010-10-06 17:10:08 +000052 output_(output),
53 transport_(transport) {
Marc Slemko35452342006-08-03 19:01:37 +000054 }
Marc Slemko16698852006-08-04 03:16:10 +000055
56 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000057
Mark Sleeeb0d0242007-01-25 07:58:55 +000058 void run() {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000059 boost::shared_ptr<TServerEventHandler> eventHandler =
60 server_.getEventHandler();
David Reiss23248712010-10-06 17:10:08 +000061 void* connectionContext = NULL;
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000062 if (eventHandler != NULL) {
David Reiss23248712010-10-06 17:10:08 +000063 connectionContext = eventHandler->createContext(input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000064 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000065 try {
David Reiss23248712010-10-06 17:10:08 +000066 for (;;) {
67 if (eventHandler != NULL) {
68 eventHandler->processContext(connectionContext, transport_);
69 }
70 if (!processor_->process(input_, output_, connectionContext) ||
71 !input_->getTransport()->peek()) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000072 break;
73 }
Marc Slemko35452342006-08-03 19:01:37 +000074 }
Bryan Duxbury1e987582011-08-25 17:33:03 +000075 } catch (const TTransportException& ttx) {
Martin Kraemeree341cb2007-02-05 21:40:38 +000076 // This is reasonably expected, client didn't send a full request so just
77 // ignore him
Mark Slee2e8a8d42008-01-16 00:38:20 +000078 // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
79 // GlobalOutput(errStr.c_str());
Bryan Duxbury1e987582011-08-25 17:33:03 +000080 } catch (const std::exception& x) {
81 GlobalOutput.printf("TThreadPoolServer exception %s: %s",
82 typeid(x).name(), x.what());
David Reiss58e4d2c2010-03-09 05:19:43 +000083 } catch (...) {
84 GlobalOutput("TThreadPoolServer, unexpected exception in "
85 "TThreadPoolServer::Task::run()");
Marc Slemko35452342006-08-03 19:01:37 +000086 }
pfunge8abada2008-01-05 23:23:53 +000087
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000088 if (eventHandler != NULL) {
David Reiss23248712010-10-06 17:10:08 +000089 eventHandler->deleteContext(connectionContext, input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000090 }
Mark Slee2e8a8d42008-01-16 00:38:20 +000091
92 try {
93 input_->getTransport()->close();
94 } catch (TTransportException& ttx) {
95 string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
96 GlobalOutput(errStr.c_str());
97 }
98 try {
99 output_->getTransport()->close();
100 } catch (TTransportException& ttx) {
101 string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
102 GlobalOutput(errStr.c_str());
103 }
104
Marc Slemko35452342006-08-03 19:01:37 +0000105 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000106
107 private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000108 TServer& server_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000109 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000110 shared_ptr<TProtocol> input_;
111 shared_ptr<TProtocol> output_;
David Reiss23248712010-10-06 17:10:08 +0000112 shared_ptr<TTransport> transport_;
Marc Slemko35452342006-08-03 19:01:37 +0000113};
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000114
Marc Slemko16698852006-08-04 03:16:10 +0000115TThreadPoolServer::~TThreadPoolServer() {}
116
Mark Slee794993d2006-09-20 01:56:10 +0000117void TThreadPoolServer::serve() {
Mark Sleed788b2e2006-09-07 01:26:35 +0000118 shared_ptr<TTransport> client;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000119 shared_ptr<TTransport> inputTransport;
120 shared_ptr<TTransport> outputTransport;
121 shared_ptr<TProtocol> inputProtocol;
122 shared_ptr<TProtocol> outputProtocol;
Mark Sleed788b2e2006-09-07 01:26:35 +0000123
Roger Meierd8f50f32012-04-11 21:48:56 +0000124 // Start the server listening
125 serverTransport_->listen();
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000126
127 // Run the preServe event
128 if (eventHandler_ != NULL) {
129 eventHandler_->preServe();
130 }
131
Mark Slee6e3f6372007-03-01 22:05:46 +0000132 while (!stop_) {
Marc Slemko16698852006-08-04 03:16:10 +0000133 try {
Mark Slee3303f362007-03-05 20:09:37 +0000134 client.reset();
135 inputTransport.reset();
136 outputTransport.reset();
137 inputProtocol.reset();
138 outputProtocol.reset();
139
Mark Sleed788b2e2006-09-07 01:26:35 +0000140 // Fetch client from server
141 client = serverTransport_->accept();
Mark Sleea5a783f2007-03-02 19:41:08 +0000142
Mark Sleed788b2e2006-09-07 01:26:35 +0000143 // Make IO transports
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000144 inputTransport = inputTransportFactory_->getTransport(client);
145 outputTransport = outputTransportFactory_->getTransport(client);
146 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
147 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
Mark Slee4af6ed72006-10-25 19:02:49 +0000148
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000149 shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
150 outputProtocol, client);
151
Mark Sleed788b2e2006-09-07 01:26:35 +0000152 // Add to threadmanager pool
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000153 shared_ptr<TThreadPoolServer::Task> task(new TThreadPoolServer::Task(
154 *this, processor, inputProtocol, outputProtocol, client));
155 threadManager_->add(task, timeout_);
Mark Sleea5a783f2007-03-02 19:41:08 +0000156
Marc Slemko16698852006-08-04 03:16:10 +0000157 } catch (TTransportException& ttx) {
Mark Slee3303f362007-03-05 20:09:37 +0000158 if (inputTransport != NULL) { inputTransport->close(); }
159 if (outputTransport != NULL) { outputTransport->close(); }
160 if (client != NULL) { client->close(); }
Mark Sleea5a783f2007-03-02 19:41:08 +0000161 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000162 string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
163 GlobalOutput(errStr.c_str());
Mark Sleea5a783f2007-03-02 19:41:08 +0000164 }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000165 continue;
166 } catch (TException& tx) {
Mark Slee3303f362007-03-05 20:09:37 +0000167 if (inputTransport != NULL) { inputTransport->close(); }
168 if (outputTransport != NULL) { outputTransport->close(); }
169 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000170 string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
171 GlobalOutput(errStr.c_str());
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000172 continue;
173 } catch (string s) {
Mark Slee3303f362007-03-05 20:09:37 +0000174 if (inputTransport != NULL) { inputTransport->close(); }
175 if (outputTransport != NULL) { outputTransport->close(); }
176 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000177 string errStr = "TThreadPoolServer: Unknown exception: " + s;
178 GlobalOutput(errStr.c_str());
Marc Slemko16698852006-08-04 03:16:10 +0000179 break;
180 }
181 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000182
183 // If stopped manually, join the existing threads
184 if (stop_) {
185 try {
186 serverTransport_->close();
187 threadManager_->join();
188 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000189 string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
190 GlobalOutput(errStr.c_str());
Mark Slee6e3f6372007-03-01 22:05:46 +0000191 }
Mark Sleea5a783f2007-03-02 19:41:08 +0000192 stop_ = false;
Mark Slee6e3f6372007-03-01 22:05:46 +0000193 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000194
Marc Slemko16698852006-08-04 03:16:10 +0000195}
Marc Slemko35452342006-08-03 19:01:37 +0000196
Mark Slee9b82d272007-05-23 05:16:07 +0000197int64_t TThreadPoolServer::getTimeout() const {
198 return timeout_;
199}
200
201void TThreadPoolServer::setTimeout(int64_t value) {
202 timeout_ = value;
203}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000204
T Jake Lucianib5e62212009-01-31 22:36:20 +0000205}}} // apache::thrift::server