blob: b1ef1e025229bd6314beccb0bdc4a64f08059137 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Sleeb3cb6292007-02-01 22:55:00 +000020#include "server/TThreadedServer.h"
21#include "transport/TTransportException.h"
22#include "concurrency/PosixThreadFactory.h"
23
24#include <string>
25#include <iostream>
26#include <pthread.h>
27#include <unistd.h>
28
T Jake Lucianib5e62212009-01-31 22:36:20 +000029namespace apache { namespace thrift { namespace server {
Mark Sleeb3cb6292007-02-01 22:55:00 +000030
Mark Slee5ea15f92007-03-05 22:55:59 +000031using boost::shared_ptr;
Mark Sleeb3cb6292007-02-01 22:55:00 +000032using namespace std;
T Jake Lucianib5e62212009-01-31 22:36:20 +000033using namespace apache::thrift;
34using namespace apache::thrift::protocol;
35using namespace apache::thrift::transport;
36using namespace apache::thrift::concurrency;
Mark Sleeb3cb6292007-02-01 22:55:00 +000037
38class TThreadedServer::Task: public Runnable {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000039
Mark Sleeb3cb6292007-02-01 22:55:00 +000040public:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000041
42 Task(TThreadedServer& server,
Mark Slee1d4ce802007-03-07 05:16:16 +000043 shared_ptr<TProcessor> processor,
Mark Sleeb3cb6292007-02-01 22:55:00 +000044 shared_ptr<TProtocol> input,
David Reiss23248712010-10-06 17:10:08 +000045 shared_ptr<TProtocol> output,
46 shared_ptr<TTransport> transport) :
Mark Slee1d4ce802007-03-07 05:16:16 +000047 server_(server),
Mark Sleeb3cb6292007-02-01 22:55:00 +000048 processor_(processor),
49 input_(input),
David Reiss23248712010-10-06 17:10:08 +000050 output_(output),
51 transport_(transport) {
Mark Sleeb3cb6292007-02-01 22:55:00 +000052 }
53
54 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000055
Mark Sleeb3cb6292007-02-01 22:55:00 +000056 void run() {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000057 boost::shared_ptr<TServerEventHandler> eventHandler =
58 server_.getEventHandler();
David Reiss23248712010-10-06 17:10:08 +000059 void* connectionContext = NULL;
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000060 if (eventHandler != NULL) {
David Reiss23248712010-10-06 17:10:08 +000061 connectionContext = eventHandler->createContext(input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000062 }
Mark Sleeb3cb6292007-02-01 22:55:00 +000063 try {
David Reiss23248712010-10-06 17:10:08 +000064 for (;;) {
65 if (eventHandler != NULL) {
66 eventHandler->processContext(connectionContext, transport_);
67 }
68 if (!processor_->process(input_, output_, connectionContext) ||
69 !input_->getTransport()->peek()) {
Mark Sleeb3cb6292007-02-01 22:55:00 +000070 break;
71 }
72 }
73 } catch (TTransportException& ttx) {
Bryan Duxbury756d73e2011-08-25 17:30:21 +000074 if (ttx.getType() != TTransportException::END_OF_FILE) {
75 string errStr = string("TThreadedServer client died: ") + ttx.what();
76 GlobalOutput(errStr.c_str());
77 }
Mark Sleeb3cb6292007-02-01 22:55:00 +000078 } catch (TException& x) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000079 string errStr = string("TThreadedServer exception: ") + x.what();
80 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +000081 } catch (...) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000082 GlobalOutput("TThreadedServer uncaught exception.");
Mark Sleeb3cb6292007-02-01 22:55:00 +000083 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000084 if (eventHandler != NULL) {
David Reiss23248712010-10-06 17:10:08 +000085 eventHandler->deleteContext(connectionContext, input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000086 }
Mark Slee2e8a8d42008-01-16 00:38:20 +000087
88 try {
89 input_->getTransport()->close();
90 } catch (TTransportException& ttx) {
91 string errStr = string("TThreadedServer input close failed: ") + ttx.what();
92 GlobalOutput(errStr.c_str());
93 }
94 try {
95 output_->getTransport()->close();
96 } catch (TTransportException& ttx) {
97 string errStr = string("TThreadedServer output close failed: ") + ttx.what();
98 GlobalOutput(errStr.c_str());
99 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000100
Mark Slee1d4ce802007-03-07 05:16:16 +0000101 // Remove this task from parent bookkeeping
102 {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000103 Synchronized s(server_.tasksMonitor_);
104 server_.tasks_.erase(this);
105 if (server_.tasks_.empty()) {
106 server_.tasksMonitor_.notify();
Mark Slee1d4ce802007-03-07 05:16:16 +0000107 }
108 }
109
Mark Sleeb3cb6292007-02-01 22:55:00 +0000110 }
111
112 private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000113 TThreadedServer& server_;
Mark Slee1d4ce802007-03-07 05:16:16 +0000114 friend class TThreadedServer;
115
Mark Sleeb3cb6292007-02-01 22:55:00 +0000116 shared_ptr<TProcessor> processor_;
117 shared_ptr<TProtocol> input_;
118 shared_ptr<TProtocol> output_;
David Reiss23248712010-10-06 17:10:08 +0000119 shared_ptr<TTransport> transport_;
Mark Sleeb3cb6292007-02-01 22:55:00 +0000120};
121
122
123TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
124 shared_ptr<TServerTransport> serverTransport,
125 shared_ptr<TTransportFactory> transportFactory,
126 shared_ptr<TProtocolFactory> protocolFactory):
Mark Slee1d4ce802007-03-07 05:16:16 +0000127 TServer(processor, serverTransport, transportFactory, protocolFactory),
128 stop_(false) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000129 threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
130}
131
David Reiss45d56962009-03-14 23:35:16 +0000132TThreadedServer::TThreadedServer(boost::shared_ptr<TProcessor> processor,
133 boost::shared_ptr<TServerTransport> serverTransport,
134 boost::shared_ptr<TTransportFactory> transportFactory,
135 boost::shared_ptr<TProtocolFactory> protocolFactory,
136 boost::shared_ptr<ThreadFactory> threadFactory):
137 TServer(processor, serverTransport, transportFactory, protocolFactory),
138 threadFactory_(threadFactory),
139 stop_(false) {
140}
141
Mark Sleeb3cb6292007-02-01 22:55:00 +0000142TThreadedServer::~TThreadedServer() {}
143
144void TThreadedServer::serve() {
145
146 shared_ptr<TTransport> client;
147 shared_ptr<TTransport> inputTransport;
148 shared_ptr<TTransport> outputTransport;
149 shared_ptr<TProtocol> inputProtocol;
150 shared_ptr<TProtocol> outputProtocol;
151
152 try {
153 // Start the server listening
154 serverTransport_->listen();
155 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000156 string errStr = string("TThreadedServer::run() listen(): ") +ttx.what();
157 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000158 return;
159 }
160
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000161 // Run the preServe event
162 if (eventHandler_ != NULL) {
163 eventHandler_->preServe();
164 }
165
166 while (!stop_) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000167 try {
Mark Slee1d4ce802007-03-07 05:16:16 +0000168 client.reset();
169 inputTransport.reset();
170 outputTransport.reset();
171 inputProtocol.reset();
172 outputProtocol.reset();
173
Mark Sleeb3cb6292007-02-01 22:55:00 +0000174 // Fetch client from server
175 client = serverTransport_->accept();
Mark Slee1d4ce802007-03-07 05:16:16 +0000176
Mark Sleeb3cb6292007-02-01 22:55:00 +0000177 // Make IO transports
178 inputTransport = inputTransportFactory_->getTransport(client);
179 outputTransport = outputTransportFactory_->getTransport(client);
180 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
181 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
182
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000183 TThreadedServer::Task* task = new TThreadedServer::Task(*this,
184 processor_,
Mark Slee1d4ce802007-03-07 05:16:16 +0000185 inputProtocol,
David Reiss23248712010-10-06 17:10:08 +0000186 outputProtocol,
187 client);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000188
Mark Slee1d4ce802007-03-07 05:16:16 +0000189 // Create a task
190 shared_ptr<Runnable> runnable =
191 shared_ptr<Runnable>(task);
Mark Sleeb3cb6292007-02-01 22:55:00 +0000192
193 // Create a thread for this task
194 shared_ptr<Thread> thread =
Mark Slee1d4ce802007-03-07 05:16:16 +0000195 shared_ptr<Thread>(threadFactory_->newThread(runnable));
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000196
Mark Slee1d4ce802007-03-07 05:16:16 +0000197 // Insert thread into the set of threads
198 {
199 Synchronized s(tasksMonitor_);
200 tasks_.insert(task);
201 }
202
Mark Sleeb3cb6292007-02-01 22:55:00 +0000203 // Start the thread!
204 thread->start();
205
206 } catch (TTransportException& ttx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000207 if (inputTransport != NULL) { inputTransport->close(); }
208 if (outputTransport != NULL) { outputTransport->close(); }
209 if (client != NULL) { client->close(); }
210 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000211 string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
212 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000213 }
Mark Slee907e3d62007-02-08 22:29:24 +0000214 continue;
215 } catch (TException& tx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000216 if (inputTransport != NULL) { inputTransport->close(); }
217 if (outputTransport != NULL) { outputTransport->close(); }
218 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000219 string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
220 GlobalOutput(errStr.c_str());
Mark Slee907e3d62007-02-08 22:29:24 +0000221 continue;
222 } catch (string s) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000223 if (inputTransport != NULL) { inputTransport->close(); }
224 if (outputTransport != NULL) { outputTransport->close(); }
225 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000226 string errStr = "TThreadedServer: Unknown exception: " + s;
227 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000228 break;
229 }
230 }
Mark Slee1d4ce802007-03-07 05:16:16 +0000231
232 // If stopped manually, make sure to close server transport
233 if (stop_) {
234 try {
235 serverTransport_->close();
236 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000237 string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
238 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000239 }
240 try {
241 Synchronized s(tasksMonitor_);
242 while (!tasks_.empty()) {
243 tasksMonitor_.wait();
244 }
245 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000246 string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
247 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000248 }
249 stop_ = false;
250 }
251
Mark Sleeb3cb6292007-02-01 22:55:00 +0000252}
253
T Jake Lucianib5e62212009-01-31 22:36:20 +0000254}}} // apache::thrift::server