blob: 6b816a4f6b3149c6625974eda2056efcdc02af0b [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Sleeb3cb6292007-02-01 22:55:00 +000020#include "server/TThreadedServer.h"
21#include "transport/TTransportException.h"
Roger Meier3faaedf2011-10-02 10:51:45 +000022#include <concurrency/PlatformThreadFactory.h>
Mark Sleeb3cb6292007-02-01 22:55:00 +000023
24#include <string>
25#include <iostream>
Roger Meier3faaedf2011-10-02 10:51:45 +000026
27#ifdef HAVE_UNISTD_H
Mark Sleeb3cb6292007-02-01 22:55:00 +000028#include <unistd.h>
Roger Meier3faaedf2011-10-02 10:51:45 +000029#endif
Mark Sleeb3cb6292007-02-01 22:55:00 +000030
T Jake Lucianib5e62212009-01-31 22:36:20 +000031namespace apache { namespace thrift { namespace server {
Mark Sleeb3cb6292007-02-01 22:55:00 +000032
Mark Slee5ea15f92007-03-05 22:55:59 +000033using boost::shared_ptr;
Mark Sleeb3cb6292007-02-01 22:55:00 +000034using namespace std;
T Jake Lucianib5e62212009-01-31 22:36:20 +000035using namespace apache::thrift;
36using namespace apache::thrift::protocol;
37using namespace apache::thrift::transport;
38using namespace apache::thrift::concurrency;
Mark Sleeb3cb6292007-02-01 22:55:00 +000039
40class TThreadedServer::Task: public Runnable {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000041
Mark Sleeb3cb6292007-02-01 22:55:00 +000042public:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000043
44 Task(TThreadedServer& server,
Mark Slee1d4ce802007-03-07 05:16:16 +000045 shared_ptr<TProcessor> processor,
Mark Sleeb3cb6292007-02-01 22:55:00 +000046 shared_ptr<TProtocol> input,
David Reiss23248712010-10-06 17:10:08 +000047 shared_ptr<TProtocol> output,
48 shared_ptr<TTransport> transport) :
Mark Slee1d4ce802007-03-07 05:16:16 +000049 server_(server),
Mark Sleeb3cb6292007-02-01 22:55:00 +000050 processor_(processor),
51 input_(input),
David Reiss23248712010-10-06 17:10:08 +000052 output_(output),
53 transport_(transport) {
Mark Sleeb3cb6292007-02-01 22:55:00 +000054 }
55
56 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000057
Mark Sleeb3cb6292007-02-01 22:55:00 +000058 void run() {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000059 boost::shared_ptr<TServerEventHandler> eventHandler =
60 server_.getEventHandler();
David Reiss23248712010-10-06 17:10:08 +000061 void* connectionContext = NULL;
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000062 if (eventHandler != NULL) {
David Reiss23248712010-10-06 17:10:08 +000063 connectionContext = eventHandler->createContext(input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000064 }
Mark Sleeb3cb6292007-02-01 22:55:00 +000065 try {
David Reiss23248712010-10-06 17:10:08 +000066 for (;;) {
67 if (eventHandler != NULL) {
68 eventHandler->processContext(connectionContext, transport_);
69 }
70 if (!processor_->process(input_, output_, connectionContext) ||
71 !input_->getTransport()->peek()) {
Mark Sleeb3cb6292007-02-01 22:55:00 +000072 break;
73 }
74 }
Bryan Duxbury1e987582011-08-25 17:33:03 +000075 } catch (const TTransportException& ttx) {
Bryan Duxbury756d73e2011-08-25 17:30:21 +000076 if (ttx.getType() != TTransportException::END_OF_FILE) {
77 string errStr = string("TThreadedServer client died: ") + ttx.what();
78 GlobalOutput(errStr.c_str());
79 }
Bryan Duxbury1e987582011-08-25 17:33:03 +000080 } catch (const std::exception &x) {
81 GlobalOutput.printf("TThreadedServer exception: %s: %s",
82 typeid(x).name(), x.what());
Mark Sleeb3cb6292007-02-01 22:55:00 +000083 } catch (...) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000084 GlobalOutput("TThreadedServer uncaught exception.");
Mark Sleeb3cb6292007-02-01 22:55:00 +000085 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000086 if (eventHandler != NULL) {
David Reiss23248712010-10-06 17:10:08 +000087 eventHandler->deleteContext(connectionContext, input_, output_);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000088 }
Mark Slee2e8a8d42008-01-16 00:38:20 +000089
90 try {
91 input_->getTransport()->close();
92 } catch (TTransportException& ttx) {
93 string errStr = string("TThreadedServer input close failed: ") + ttx.what();
94 GlobalOutput(errStr.c_str());
95 }
96 try {
97 output_->getTransport()->close();
98 } catch (TTransportException& ttx) {
99 string errStr = string("TThreadedServer output close failed: ") + ttx.what();
100 GlobalOutput(errStr.c_str());
101 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000102
Mark Slee1d4ce802007-03-07 05:16:16 +0000103 // Remove this task from parent bookkeeping
104 {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000105 Synchronized s(server_.tasksMonitor_);
106 server_.tasks_.erase(this);
107 if (server_.tasks_.empty()) {
108 server_.tasksMonitor_.notify();
Mark Slee1d4ce802007-03-07 05:16:16 +0000109 }
110 }
111
Mark Sleeb3cb6292007-02-01 22:55:00 +0000112 }
113
114 private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000115 TThreadedServer& server_;
Mark Slee1d4ce802007-03-07 05:16:16 +0000116 friend class TThreadedServer;
117
Mark Sleeb3cb6292007-02-01 22:55:00 +0000118 shared_ptr<TProcessor> processor_;
119 shared_ptr<TProtocol> input_;
120 shared_ptr<TProtocol> output_;
David Reiss23248712010-10-06 17:10:08 +0000121 shared_ptr<TTransport> transport_;
Mark Sleeb3cb6292007-02-01 22:55:00 +0000122};
123
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000124void TThreadedServer::init() {
125 stop_ = false;
Mark Sleeb3cb6292007-02-01 22:55:00 +0000126
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000127 if (!threadFactory_) {
Roger Meier3faaedf2011-10-02 10:51:45 +0000128 threadFactory_.reset(new PlatformThreadFactory);
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000129 }
David Reiss45d56962009-03-14 23:35:16 +0000130}
131
Mark Sleeb3cb6292007-02-01 22:55:00 +0000132TThreadedServer::~TThreadedServer() {}
133
134void TThreadedServer::serve() {
135
136 shared_ptr<TTransport> client;
137 shared_ptr<TTransport> inputTransport;
138 shared_ptr<TTransport> outputTransport;
139 shared_ptr<TProtocol> inputProtocol;
140 shared_ptr<TProtocol> outputProtocol;
141
142 try {
143 // Start the server listening
144 serverTransport_->listen();
145 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000146 string errStr = string("TThreadedServer::run() listen(): ") +ttx.what();
147 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000148 return;
149 }
150
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000151 // Run the preServe event
152 if (eventHandler_ != NULL) {
153 eventHandler_->preServe();
154 }
155
156 while (!stop_) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000157 try {
Mark Slee1d4ce802007-03-07 05:16:16 +0000158 client.reset();
159 inputTransport.reset();
160 outputTransport.reset();
161 inputProtocol.reset();
162 outputProtocol.reset();
163
Mark Sleeb3cb6292007-02-01 22:55:00 +0000164 // Fetch client from server
165 client = serverTransport_->accept();
Mark Slee1d4ce802007-03-07 05:16:16 +0000166
Mark Sleeb3cb6292007-02-01 22:55:00 +0000167 // Make IO transports
168 inputTransport = inputTransportFactory_->getTransport(client);
169 outputTransport = outputTransportFactory_->getTransport(client);
170 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
171 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
172
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000173 shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
174 outputProtocol, client);
175
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000176 TThreadedServer::Task* task = new TThreadedServer::Task(*this,
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000177 processor,
Mark Slee1d4ce802007-03-07 05:16:16 +0000178 inputProtocol,
David Reiss23248712010-10-06 17:10:08 +0000179 outputProtocol,
180 client);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000181
Mark Slee1d4ce802007-03-07 05:16:16 +0000182 // Create a task
183 shared_ptr<Runnable> runnable =
184 shared_ptr<Runnable>(task);
Mark Sleeb3cb6292007-02-01 22:55:00 +0000185
186 // Create a thread for this task
187 shared_ptr<Thread> thread =
Mark Slee1d4ce802007-03-07 05:16:16 +0000188 shared_ptr<Thread>(threadFactory_->newThread(runnable));
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000189
Mark Slee1d4ce802007-03-07 05:16:16 +0000190 // Insert thread into the set of threads
191 {
192 Synchronized s(tasksMonitor_);
193 tasks_.insert(task);
194 }
195
Mark Sleeb3cb6292007-02-01 22:55:00 +0000196 // Start the thread!
197 thread->start();
198
199 } catch (TTransportException& ttx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000200 if (inputTransport != NULL) { inputTransport->close(); }
201 if (outputTransport != NULL) { outputTransport->close(); }
202 if (client != NULL) { client->close(); }
203 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000204 string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
205 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000206 }
Mark Slee907e3d62007-02-08 22:29:24 +0000207 continue;
208 } catch (TException& tx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000209 if (inputTransport != NULL) { inputTransport->close(); }
210 if (outputTransport != NULL) { outputTransport->close(); }
211 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000212 string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
213 GlobalOutput(errStr.c_str());
Mark Slee907e3d62007-02-08 22:29:24 +0000214 continue;
215 } catch (string s) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000216 if (inputTransport != NULL) { inputTransport->close(); }
217 if (outputTransport != NULL) { outputTransport->close(); }
218 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000219 string errStr = "TThreadedServer: Unknown exception: " + s;
220 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000221 break;
222 }
223 }
Mark Slee1d4ce802007-03-07 05:16:16 +0000224
225 // If stopped manually, make sure to close server transport
226 if (stop_) {
227 try {
228 serverTransport_->close();
229 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000230 string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
231 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000232 }
233 try {
234 Synchronized s(tasksMonitor_);
235 while (!tasks_.empty()) {
236 tasksMonitor_.wait();
237 }
238 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000239 string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
240 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000241 }
242 stop_ = false;
243 }
244
Mark Sleeb3cb6292007-02-01 22:55:00 +0000245}
246
T Jake Lucianib5e62212009-01-31 22:36:20 +0000247}}} // apache::thrift::server