blob: 0894cfa5f8293b6ae14eede3529e7330a77885ab [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include "server/TThreadPoolServer.h"
21#include "transport/TTransportException.h"
22#include "concurrency/Thread.h"
23#include "concurrency/ThreadManager.h"
24#include <string>
25#include <iostream>
26
27namespace apache { namespace thrift { namespace server {
28
29using boost::shared_ptr;
30using namespace std;
31using namespace apache::thrift;
32using namespace apache::thrift::concurrency;
33using namespace apache::thrift::protocol;;
34using namespace apache::thrift::transport;
35
36class TThreadPoolServer::Task : public Runnable {
37
38public:
39
40 Task(TThreadPoolServer &server,
41 shared_ptr<TProcessor> processor,
42 shared_ptr<TProtocol> input,
43 shared_ptr<TProtocol> output) :
44 server_(server),
45 processor_(processor),
46 input_(input),
47 output_(output) {
48 }
49
50 ~Task() {}
51
52 void run() {
53 boost::shared_ptr<TServerEventHandler> eventHandler =
54 server_.getEventHandler();
55 if (eventHandler != NULL) {
56 eventHandler->clientBegin(input_, output_);
57 }
58 try {
59 while (processor_->process(input_, output_)) {
60 if (!input_->getTransport()->peek()) {
61 break;
62 }
63 }
64 } catch (TTransportException& ttx) {
65 // This is reasonably expected, client didn't send a full request so just
66 // ignore him
67 // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
68 // GlobalOutput(errStr.c_str());
69 } catch (TException& x) {
70 string errStr = string("TThreadPoolServer exception: ") + x.what();
71 GlobalOutput(errStr.c_str());
72 } catch (std::exception &x) {
73 string errStr = string("TThreadPoolServer, std::exception: ") + x.what();
74 GlobalOutput(errStr.c_str());
75 }
76
77 if (eventHandler != NULL) {
78 eventHandler->clientEnd(input_, output_);
79 }
80
81 try {
82 input_->getTransport()->close();
83 } catch (TTransportException& ttx) {
84 string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
85 GlobalOutput(errStr.c_str());
86 }
87 try {
88 output_->getTransport()->close();
89 } catch (TTransportException& ttx) {
90 string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
91 GlobalOutput(errStr.c_str());
92 }
93
94 }
95
96 private:
97 TServer& server_;
98 shared_ptr<TProcessor> processor_;
99 shared_ptr<TProtocol> input_;
100 shared_ptr<TProtocol> output_;
101
102};
103
104TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
105 shared_ptr<TServerTransport> serverTransport,
106 shared_ptr<TTransportFactory> transportFactory,
107 shared_ptr<TProtocolFactory> protocolFactory,
108 shared_ptr<ThreadManager> threadManager) :
109 TServer(processor, serverTransport, transportFactory, protocolFactory),
110 threadManager_(threadManager),
111 stop_(false), timeout_(0) {}
112
113TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
114 shared_ptr<TServerTransport> serverTransport,
115 shared_ptr<TTransportFactory> inputTransportFactory,
116 shared_ptr<TTransportFactory> outputTransportFactory,
117 shared_ptr<TProtocolFactory> inputProtocolFactory,
118 shared_ptr<TProtocolFactory> outputProtocolFactory,
119 shared_ptr<ThreadManager> threadManager) :
120 TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
121 inputProtocolFactory, outputProtocolFactory),
122 threadManager_(threadManager),
123 stop_(false), timeout_(0) {}
124
125
126TThreadPoolServer::~TThreadPoolServer() {}
127
128void TThreadPoolServer::serve() {
129 shared_ptr<TTransport> client;
130 shared_ptr<TTransport> inputTransport;
131 shared_ptr<TTransport> outputTransport;
132 shared_ptr<TProtocol> inputProtocol;
133 shared_ptr<TProtocol> outputProtocol;
134
135 try {
136 // Start the server listening
137 serverTransport_->listen();
138 } catch (TTransportException& ttx) {
139 string errStr = string("TThreadPoolServer::run() listen(): ") + ttx.what();
140 GlobalOutput(errStr.c_str());
141 return;
142 }
143
144 // Run the preServe event
145 if (eventHandler_ != NULL) {
146 eventHandler_->preServe();
147 }
148
149 while (!stop_) {
150 try {
151 client.reset();
152 inputTransport.reset();
153 outputTransport.reset();
154 inputProtocol.reset();
155 outputProtocol.reset();
156
157 // Fetch client from server
158 client = serverTransport_->accept();
159
160 // Make IO transports
161 inputTransport = inputTransportFactory_->getTransport(client);
162 outputTransport = outputTransportFactory_->getTransport(client);
163 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
164 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
165
166 // Add to threadmanager pool
167 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
168
169 } catch (TTransportException& ttx) {
170 if (inputTransport != NULL) { inputTransport->close(); }
171 if (outputTransport != NULL) { outputTransport->close(); }
172 if (client != NULL) { client->close(); }
173 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
174 string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
175 GlobalOutput(errStr.c_str());
176 }
177 continue;
178 } catch (TException& tx) {
179 if (inputTransport != NULL) { inputTransport->close(); }
180 if (outputTransport != NULL) { outputTransport->close(); }
181 if (client != NULL) { client->close(); }
182 string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
183 GlobalOutput(errStr.c_str());
184 continue;
185 } catch (string s) {
186 if (inputTransport != NULL) { inputTransport->close(); }
187 if (outputTransport != NULL) { outputTransport->close(); }
188 if (client != NULL) { client->close(); }
189 string errStr = "TThreadPoolServer: Unknown exception: " + s;
190 GlobalOutput(errStr.c_str());
191 break;
192 }
193 }
194
195 // If stopped manually, join the existing threads
196 if (stop_) {
197 try {
198 serverTransport_->close();
199 threadManager_->join();
200 } catch (TException &tx) {
201 string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
202 GlobalOutput(errStr.c_str());
203 }
204 stop_ = false;
205 }
206
207}
208
209int64_t TThreadPoolServer::getTimeout() const {
210 return timeout_;
211}
212
213void TThreadPoolServer::setTimeout(int64_t value) {
214 timeout_ = value;
215}
216
217}}} // apache::thrift::server