blob: 3620d973ba272b100f3d5dea489390954968c9c8 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeb3cb6292007-02-01 22:55:00 +00007#include "server/TThreadedServer.h"
8#include "transport/TTransportException.h"
9#include "concurrency/PosixThreadFactory.h"
10
11#include <string>
12#include <iostream>
13#include <pthread.h>
14#include <unistd.h>
15
T Jake Lucianib5e62212009-01-31 22:36:20 +000016namespace apache { namespace thrift { namespace server {
Mark Sleeb3cb6292007-02-01 22:55:00 +000017
Mark Slee5ea15f92007-03-05 22:55:59 +000018using boost::shared_ptr;
Mark Sleeb3cb6292007-02-01 22:55:00 +000019using namespace std;
T Jake Lucianib5e62212009-01-31 22:36:20 +000020using namespace apache::thrift;
21using namespace apache::thrift::protocol;
22using namespace apache::thrift::transport;
23using namespace apache::thrift::concurrency;
Mark Sleeb3cb6292007-02-01 22:55:00 +000024
25class TThreadedServer::Task: public Runnable {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000026
Mark Sleeb3cb6292007-02-01 22:55:00 +000027public:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000028
29 Task(TThreadedServer& server,
Mark Slee1d4ce802007-03-07 05:16:16 +000030 shared_ptr<TProcessor> processor,
Mark Sleeb3cb6292007-02-01 22:55:00 +000031 shared_ptr<TProtocol> input,
32 shared_ptr<TProtocol> output) :
Mark Slee1d4ce802007-03-07 05:16:16 +000033 server_(server),
Mark Sleeb3cb6292007-02-01 22:55:00 +000034 processor_(processor),
35 input_(input),
36 output_(output) {
37 }
38
39 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000040
Mark Sleeb3cb6292007-02-01 22:55:00 +000041 void run() {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000042 boost::shared_ptr<TServerEventHandler> eventHandler =
43 server_.getEventHandler();
44 if (eventHandler != NULL) {
45 eventHandler->clientBegin(input_, output_);
46 }
Mark Sleeb3cb6292007-02-01 22:55:00 +000047 try {
48 while (processor_->process(input_, output_)) {
49 if (!input_->getTransport()->peek()) {
50 break;
51 }
52 }
53 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000054 string errStr = string("TThreadedServer client died: ") + ttx.what();
55 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +000056 } catch (TException& x) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000057 string errStr = string("TThreadedServer exception: ") + x.what();
58 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +000059 } catch (...) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000060 GlobalOutput("TThreadedServer uncaught exception.");
Mark Sleeb3cb6292007-02-01 22:55:00 +000061 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000062 if (eventHandler != NULL) {
63 eventHandler->clientEnd(input_, output_);
64 }
Mark Slee2e8a8d42008-01-16 00:38:20 +000065
66 try {
67 input_->getTransport()->close();
68 } catch (TTransportException& ttx) {
69 string errStr = string("TThreadedServer input close failed: ") + ttx.what();
70 GlobalOutput(errStr.c_str());
71 }
72 try {
73 output_->getTransport()->close();
74 } catch (TTransportException& ttx) {
75 string errStr = string("TThreadedServer output close failed: ") + ttx.what();
76 GlobalOutput(errStr.c_str());
77 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000078
Mark Slee1d4ce802007-03-07 05:16:16 +000079 // Remove this task from parent bookkeeping
80 {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000081 Synchronized s(server_.tasksMonitor_);
82 server_.tasks_.erase(this);
83 if (server_.tasks_.empty()) {
84 server_.tasksMonitor_.notify();
Mark Slee1d4ce802007-03-07 05:16:16 +000085 }
86 }
87
Mark Sleeb3cb6292007-02-01 22:55:00 +000088 }
89
90 private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000091 TThreadedServer& server_;
Mark Slee1d4ce802007-03-07 05:16:16 +000092 friend class TThreadedServer;
93
Mark Sleeb3cb6292007-02-01 22:55:00 +000094 shared_ptr<TProcessor> processor_;
95 shared_ptr<TProtocol> input_;
96 shared_ptr<TProtocol> output_;
Mark Sleeb3cb6292007-02-01 22:55:00 +000097};
98
99
100TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
101 shared_ptr<TServerTransport> serverTransport,
102 shared_ptr<TTransportFactory> transportFactory,
103 shared_ptr<TProtocolFactory> protocolFactory):
Mark Slee1d4ce802007-03-07 05:16:16 +0000104 TServer(processor, serverTransport, transportFactory, protocolFactory),
105 stop_(false) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000106 threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
107}
108
David Reiss45d56962009-03-14 23:35:16 +0000109TThreadedServer::TThreadedServer(boost::shared_ptr<TProcessor> processor,
110 boost::shared_ptr<TServerTransport> serverTransport,
111 boost::shared_ptr<TTransportFactory> transportFactory,
112 boost::shared_ptr<TProtocolFactory> protocolFactory,
113 boost::shared_ptr<ThreadFactory> threadFactory):
114 TServer(processor, serverTransport, transportFactory, protocolFactory),
115 threadFactory_(threadFactory),
116 stop_(false) {
117}
118
Mark Sleeb3cb6292007-02-01 22:55:00 +0000119TThreadedServer::~TThreadedServer() {}
120
121void TThreadedServer::serve() {
122
123 shared_ptr<TTransport> client;
124 shared_ptr<TTransport> inputTransport;
125 shared_ptr<TTransport> outputTransport;
126 shared_ptr<TProtocol> inputProtocol;
127 shared_ptr<TProtocol> outputProtocol;
128
129 try {
130 // Start the server listening
131 serverTransport_->listen();
132 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000133 string errStr = string("TThreadedServer::run() listen(): ") +ttx.what();
134 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000135 return;
136 }
137
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000138 // Run the preServe event
139 if (eventHandler_ != NULL) {
140 eventHandler_->preServe();
141 }
142
143 while (!stop_) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000144 try {
Mark Slee1d4ce802007-03-07 05:16:16 +0000145 client.reset();
146 inputTransport.reset();
147 outputTransport.reset();
148 inputProtocol.reset();
149 outputProtocol.reset();
150
Mark Sleeb3cb6292007-02-01 22:55:00 +0000151 // Fetch client from server
152 client = serverTransport_->accept();
Mark Slee1d4ce802007-03-07 05:16:16 +0000153
Mark Sleeb3cb6292007-02-01 22:55:00 +0000154 // Make IO transports
155 inputTransport = inputTransportFactory_->getTransport(client);
156 outputTransport = outputTransportFactory_->getTransport(client);
157 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
158 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
159
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000160 TThreadedServer::Task* task = new TThreadedServer::Task(*this,
161 processor_,
Mark Slee1d4ce802007-03-07 05:16:16 +0000162 inputProtocol,
163 outputProtocol);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000164
Mark Slee1d4ce802007-03-07 05:16:16 +0000165 // Create a task
166 shared_ptr<Runnable> runnable =
167 shared_ptr<Runnable>(task);
Mark Sleeb3cb6292007-02-01 22:55:00 +0000168
169 // Create a thread for this task
170 shared_ptr<Thread> thread =
Mark Slee1d4ce802007-03-07 05:16:16 +0000171 shared_ptr<Thread>(threadFactory_->newThread(runnable));
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000172
Mark Slee1d4ce802007-03-07 05:16:16 +0000173 // Insert thread into the set of threads
174 {
175 Synchronized s(tasksMonitor_);
176 tasks_.insert(task);
177 }
178
Mark Sleeb3cb6292007-02-01 22:55:00 +0000179 // Start the thread!
180 thread->start();
181
182 } catch (TTransportException& ttx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000183 if (inputTransport != NULL) { inputTransport->close(); }
184 if (outputTransport != NULL) { outputTransport->close(); }
185 if (client != NULL) { client->close(); }
186 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000187 string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
188 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000189 }
Mark Slee907e3d62007-02-08 22:29:24 +0000190 continue;
191 } catch (TException& tx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000192 if (inputTransport != NULL) { inputTransport->close(); }
193 if (outputTransport != NULL) { outputTransport->close(); }
194 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000195 string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
196 GlobalOutput(errStr.c_str());
Mark Slee907e3d62007-02-08 22:29:24 +0000197 continue;
198 } catch (string s) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000199 if (inputTransport != NULL) { inputTransport->close(); }
200 if (outputTransport != NULL) { outputTransport->close(); }
201 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000202 string errStr = "TThreadedServer: Unknown exception: " + s;
203 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000204 break;
205 }
206 }
Mark Slee1d4ce802007-03-07 05:16:16 +0000207
208 // If stopped manually, make sure to close server transport
209 if (stop_) {
210 try {
211 serverTransport_->close();
212 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000213 string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
214 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000215 }
216 try {
217 Synchronized s(tasksMonitor_);
218 while (!tasks_.empty()) {
219 tasksMonitor_.wait();
220 }
221 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000222 string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
223 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000224 }
225 stop_ = false;
226 }
227
Mark Sleeb3cb6292007-02-01 22:55:00 +0000228}
229
T Jake Lucianib5e62212009-01-31 22:36:20 +0000230}}} // apache::thrift::server