blob: cc30f8ff763368652732408d5efd52e563339ac1 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Sleeb3cb6292007-02-01 22:55:00 +000020#include "server/TThreadedServer.h"
21#include "transport/TTransportException.h"
22#include "concurrency/PosixThreadFactory.h"
23
24#include <string>
25#include <iostream>
26#include <pthread.h>
27#include <unistd.h>
28
T Jake Lucianib5e62212009-01-31 22:36:20 +000029namespace apache { namespace thrift { namespace server {
Mark Sleeb3cb6292007-02-01 22:55:00 +000030
Mark Slee5ea15f92007-03-05 22:55:59 +000031using boost::shared_ptr;
Mark Sleeb3cb6292007-02-01 22:55:00 +000032using namespace std;
T Jake Lucianib5e62212009-01-31 22:36:20 +000033using namespace apache::thrift;
34using namespace apache::thrift::protocol;
35using namespace apache::thrift::transport;
36using namespace apache::thrift::concurrency;
Mark Sleeb3cb6292007-02-01 22:55:00 +000037
38class TThreadedServer::Task: public Runnable {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000039
Mark Sleeb3cb6292007-02-01 22:55:00 +000040public:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000041
42 Task(TThreadedServer& server,
Mark Slee1d4ce802007-03-07 05:16:16 +000043 shared_ptr<TProcessor> processor,
Mark Sleeb3cb6292007-02-01 22:55:00 +000044 shared_ptr<TProtocol> input,
45 shared_ptr<TProtocol> output) :
Mark Slee1d4ce802007-03-07 05:16:16 +000046 server_(server),
Mark Sleeb3cb6292007-02-01 22:55:00 +000047 processor_(processor),
48 input_(input),
49 output_(output) {
50 }
51
52 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000053
Mark Sleeb3cb6292007-02-01 22:55:00 +000054 void run() {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000055 boost::shared_ptr<TServerEventHandler> eventHandler =
56 server_.getEventHandler();
57 if (eventHandler != NULL) {
58 eventHandler->clientBegin(input_, output_);
59 }
Mark Sleeb3cb6292007-02-01 22:55:00 +000060 try {
61 while (processor_->process(input_, output_)) {
62 if (!input_->getTransport()->peek()) {
63 break;
64 }
65 }
66 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000067 string errStr = string("TThreadedServer client died: ") + ttx.what();
68 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +000069 } catch (TException& x) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000070 string errStr = string("TThreadedServer exception: ") + x.what();
71 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +000072 } catch (...) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000073 GlobalOutput("TThreadedServer uncaught exception.");
Mark Sleeb3cb6292007-02-01 22:55:00 +000074 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000075 if (eventHandler != NULL) {
76 eventHandler->clientEnd(input_, output_);
77 }
Mark Slee2e8a8d42008-01-16 00:38:20 +000078
79 try {
80 input_->getTransport()->close();
81 } catch (TTransportException& ttx) {
82 string errStr = string("TThreadedServer input close failed: ") + ttx.what();
83 GlobalOutput(errStr.c_str());
84 }
85 try {
86 output_->getTransport()->close();
87 } catch (TTransportException& ttx) {
88 string errStr = string("TThreadedServer output close failed: ") + ttx.what();
89 GlobalOutput(errStr.c_str());
90 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000091
Mark Slee1d4ce802007-03-07 05:16:16 +000092 // Remove this task from parent bookkeeping
93 {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000094 Synchronized s(server_.tasksMonitor_);
95 server_.tasks_.erase(this);
96 if (server_.tasks_.empty()) {
97 server_.tasksMonitor_.notify();
Mark Slee1d4ce802007-03-07 05:16:16 +000098 }
99 }
100
Mark Sleeb3cb6292007-02-01 22:55:00 +0000101 }
102
103 private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000104 TThreadedServer& server_;
Mark Slee1d4ce802007-03-07 05:16:16 +0000105 friend class TThreadedServer;
106
Mark Sleeb3cb6292007-02-01 22:55:00 +0000107 shared_ptr<TProcessor> processor_;
108 shared_ptr<TProtocol> input_;
109 shared_ptr<TProtocol> output_;
Mark Sleeb3cb6292007-02-01 22:55:00 +0000110};
111
112
113TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
114 shared_ptr<TServerTransport> serverTransport,
115 shared_ptr<TTransportFactory> transportFactory,
116 shared_ptr<TProtocolFactory> protocolFactory):
Mark Slee1d4ce802007-03-07 05:16:16 +0000117 TServer(processor, serverTransport, transportFactory, protocolFactory),
118 stop_(false) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000119 threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
120}
121
David Reiss45d56962009-03-14 23:35:16 +0000122TThreadedServer::TThreadedServer(boost::shared_ptr<TProcessor> processor,
123 boost::shared_ptr<TServerTransport> serverTransport,
124 boost::shared_ptr<TTransportFactory> transportFactory,
125 boost::shared_ptr<TProtocolFactory> protocolFactory,
126 boost::shared_ptr<ThreadFactory> threadFactory):
127 TServer(processor, serverTransport, transportFactory, protocolFactory),
128 threadFactory_(threadFactory),
129 stop_(false) {
130}
131
Mark Sleeb3cb6292007-02-01 22:55:00 +0000132TThreadedServer::~TThreadedServer() {}
133
134void TThreadedServer::serve() {
135
136 shared_ptr<TTransport> client;
137 shared_ptr<TTransport> inputTransport;
138 shared_ptr<TTransport> outputTransport;
139 shared_ptr<TProtocol> inputProtocol;
140 shared_ptr<TProtocol> outputProtocol;
141
142 try {
143 // Start the server listening
144 serverTransport_->listen();
145 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000146 string errStr = string("TThreadedServer::run() listen(): ") +ttx.what();
147 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000148 return;
149 }
150
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000151 // Run the preServe event
152 if (eventHandler_ != NULL) {
153 eventHandler_->preServe();
154 }
155
156 while (!stop_) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000157 try {
Mark Slee1d4ce802007-03-07 05:16:16 +0000158 client.reset();
159 inputTransport.reset();
160 outputTransport.reset();
161 inputProtocol.reset();
162 outputProtocol.reset();
163
Mark Sleeb3cb6292007-02-01 22:55:00 +0000164 // Fetch client from server
165 client = serverTransport_->accept();
Mark Slee1d4ce802007-03-07 05:16:16 +0000166
Mark Sleeb3cb6292007-02-01 22:55:00 +0000167 // Make IO transports
168 inputTransport = inputTransportFactory_->getTransport(client);
169 outputTransport = outputTransportFactory_->getTransport(client);
170 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
171 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
172
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000173 TThreadedServer::Task* task = new TThreadedServer::Task(*this,
174 processor_,
Mark Slee1d4ce802007-03-07 05:16:16 +0000175 inputProtocol,
176 outputProtocol);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000177
Mark Slee1d4ce802007-03-07 05:16:16 +0000178 // Create a task
179 shared_ptr<Runnable> runnable =
180 shared_ptr<Runnable>(task);
Mark Sleeb3cb6292007-02-01 22:55:00 +0000181
182 // Create a thread for this task
183 shared_ptr<Thread> thread =
Mark Slee1d4ce802007-03-07 05:16:16 +0000184 shared_ptr<Thread>(threadFactory_->newThread(runnable));
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000185
Mark Slee1d4ce802007-03-07 05:16:16 +0000186 // Insert thread into the set of threads
187 {
188 Synchronized s(tasksMonitor_);
189 tasks_.insert(task);
190 }
191
Mark Sleeb3cb6292007-02-01 22:55:00 +0000192 // Start the thread!
193 thread->start();
194
195 } catch (TTransportException& ttx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000196 if (inputTransport != NULL) { inputTransport->close(); }
197 if (outputTransport != NULL) { outputTransport->close(); }
198 if (client != NULL) { client->close(); }
199 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000200 string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
201 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000202 }
Mark Slee907e3d62007-02-08 22:29:24 +0000203 continue;
204 } catch (TException& tx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000205 if (inputTransport != NULL) { inputTransport->close(); }
206 if (outputTransport != NULL) { outputTransport->close(); }
207 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000208 string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
209 GlobalOutput(errStr.c_str());
Mark Slee907e3d62007-02-08 22:29:24 +0000210 continue;
211 } catch (string s) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000212 if (inputTransport != NULL) { inputTransport->close(); }
213 if (outputTransport != NULL) { outputTransport->close(); }
214 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000215 string errStr = "TThreadedServer: Unknown exception: " + s;
216 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000217 break;
218 }
219 }
Mark Slee1d4ce802007-03-07 05:16:16 +0000220
221 // If stopped manually, make sure to close server transport
222 if (stop_) {
223 try {
224 serverTransport_->close();
225 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000226 string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
227 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000228 }
229 try {
230 Synchronized s(tasksMonitor_);
231 while (!tasks_.empty()) {
232 tasksMonitor_.wait();
233 }
234 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000235 string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
236 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000237 }
238 stop_ = false;
239 }
240
Mark Sleeb3cb6292007-02-01 22:55:00 +0000241}
242
T Jake Lucianib5e62212009-01-31 22:36:20 +0000243}}} // apache::thrift::server