blob: 4840b7f2f2c47c36e648ce8858f473ccc53c69b5 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeb3cb6292007-02-01 22:55:00 +00007#include "server/TThreadedServer.h"
8#include "transport/TTransportException.h"
9#include "concurrency/PosixThreadFactory.h"
10
11#include <string>
12#include <iostream>
13#include <pthread.h>
14#include <unistd.h>
15
T Jake Lucianib5e62212009-01-31 22:36:20 +000016namespace apache { namespace thrift { namespace server {
Mark Sleeb3cb6292007-02-01 22:55:00 +000017
Mark Slee5ea15f92007-03-05 22:55:59 +000018using boost::shared_ptr;
Mark Sleeb3cb6292007-02-01 22:55:00 +000019using namespace std;
T Jake Lucianib5e62212009-01-31 22:36:20 +000020using namespace apache::thrift;
21using namespace apache::thrift::protocol;
22using namespace apache::thrift::transport;
23using namespace apache::thrift::concurrency;
Mark Sleeb3cb6292007-02-01 22:55:00 +000024
25class TThreadedServer::Task: public Runnable {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000026
Mark Sleeb3cb6292007-02-01 22:55:00 +000027public:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000028
29 Task(TThreadedServer& server,
Mark Slee1d4ce802007-03-07 05:16:16 +000030 shared_ptr<TProcessor> processor,
Mark Sleeb3cb6292007-02-01 22:55:00 +000031 shared_ptr<TProtocol> input,
32 shared_ptr<TProtocol> output) :
Mark Slee1d4ce802007-03-07 05:16:16 +000033 server_(server),
Mark Sleeb3cb6292007-02-01 22:55:00 +000034 processor_(processor),
35 input_(input),
36 output_(output) {
37 }
38
39 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000040
Mark Sleeb3cb6292007-02-01 22:55:00 +000041 void run() {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000042 boost::shared_ptr<TServerEventHandler> eventHandler =
43 server_.getEventHandler();
44 if (eventHandler != NULL) {
45 eventHandler->clientBegin(input_, output_);
46 }
Mark Sleeb3cb6292007-02-01 22:55:00 +000047 try {
48 while (processor_->process(input_, output_)) {
49 if (!input_->getTransport()->peek()) {
50 break;
51 }
52 }
53 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000054 string errStr = string("TThreadedServer client died: ") + ttx.what();
55 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +000056 } catch (TException& x) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000057 string errStr = string("TThreadedServer exception: ") + x.what();
58 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +000059 } catch (...) {
Mark Slee2e8a8d42008-01-16 00:38:20 +000060 GlobalOutput("TThreadedServer uncaught exception.");
Mark Sleeb3cb6292007-02-01 22:55:00 +000061 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000062 if (eventHandler != NULL) {
63 eventHandler->clientEnd(input_, output_);
64 }
Mark Slee2e8a8d42008-01-16 00:38:20 +000065
66 try {
67 input_->getTransport()->close();
68 } catch (TTransportException& ttx) {
69 string errStr = string("TThreadedServer input close failed: ") + ttx.what();
70 GlobalOutput(errStr.c_str());
71 }
72 try {
73 output_->getTransport()->close();
74 } catch (TTransportException& ttx) {
75 string errStr = string("TThreadedServer output close failed: ") + ttx.what();
76 GlobalOutput(errStr.c_str());
77 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000078
Mark Slee1d4ce802007-03-07 05:16:16 +000079 // Remove this task from parent bookkeeping
80 {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000081 Synchronized s(server_.tasksMonitor_);
82 server_.tasks_.erase(this);
83 if (server_.tasks_.empty()) {
84 server_.tasksMonitor_.notify();
Mark Slee1d4ce802007-03-07 05:16:16 +000085 }
86 }
87
Mark Sleeb3cb6292007-02-01 22:55:00 +000088 }
89
90 private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000091 TThreadedServer& server_;
Mark Slee1d4ce802007-03-07 05:16:16 +000092 friend class TThreadedServer;
93
Mark Sleeb3cb6292007-02-01 22:55:00 +000094 shared_ptr<TProcessor> processor_;
95 shared_ptr<TProtocol> input_;
96 shared_ptr<TProtocol> output_;
Mark Sleeb3cb6292007-02-01 22:55:00 +000097};
98
99
100TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
101 shared_ptr<TServerTransport> serverTransport,
102 shared_ptr<TTransportFactory> transportFactory,
103 shared_ptr<TProtocolFactory> protocolFactory):
Mark Slee1d4ce802007-03-07 05:16:16 +0000104 TServer(processor, serverTransport, transportFactory, protocolFactory),
105 stop_(false) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000106 threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
107}
108
109TThreadedServer::~TThreadedServer() {}
110
111void TThreadedServer::serve() {
112
113 shared_ptr<TTransport> client;
114 shared_ptr<TTransport> inputTransport;
115 shared_ptr<TTransport> outputTransport;
116 shared_ptr<TProtocol> inputProtocol;
117 shared_ptr<TProtocol> outputProtocol;
118
119 try {
120 // Start the server listening
121 serverTransport_->listen();
122 } catch (TTransportException& ttx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000123 string errStr = string("TThreadedServer::run() listen(): ") +ttx.what();
124 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000125 return;
126 }
127
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000128 // Run the preServe event
129 if (eventHandler_ != NULL) {
130 eventHandler_->preServe();
131 }
132
133 while (!stop_) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000134 try {
Mark Slee1d4ce802007-03-07 05:16:16 +0000135 client.reset();
136 inputTransport.reset();
137 outputTransport.reset();
138 inputProtocol.reset();
139 outputProtocol.reset();
140
Mark Sleeb3cb6292007-02-01 22:55:00 +0000141 // Fetch client from server
142 client = serverTransport_->accept();
Mark Slee1d4ce802007-03-07 05:16:16 +0000143
Mark Sleeb3cb6292007-02-01 22:55:00 +0000144 // Make IO transports
145 inputTransport = inputTransportFactory_->getTransport(client);
146 outputTransport = outputTransportFactory_->getTransport(client);
147 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
148 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
149
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000150 TThreadedServer::Task* task = new TThreadedServer::Task(*this,
151 processor_,
Mark Slee1d4ce802007-03-07 05:16:16 +0000152 inputProtocol,
153 outputProtocol);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000154
Mark Slee1d4ce802007-03-07 05:16:16 +0000155 // Create a task
156 shared_ptr<Runnable> runnable =
157 shared_ptr<Runnable>(task);
Mark Sleeb3cb6292007-02-01 22:55:00 +0000158
159 // Create a thread for this task
160 shared_ptr<Thread> thread =
Mark Slee1d4ce802007-03-07 05:16:16 +0000161 shared_ptr<Thread>(threadFactory_->newThread(runnable));
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000162
Mark Slee1d4ce802007-03-07 05:16:16 +0000163 // Insert thread into the set of threads
164 {
165 Synchronized s(tasksMonitor_);
166 tasks_.insert(task);
167 }
168
Mark Sleeb3cb6292007-02-01 22:55:00 +0000169 // Start the thread!
170 thread->start();
171
172 } catch (TTransportException& ttx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000173 if (inputTransport != NULL) { inputTransport->close(); }
174 if (outputTransport != NULL) { outputTransport->close(); }
175 if (client != NULL) { client->close(); }
176 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000177 string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
178 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000179 }
Mark Slee907e3d62007-02-08 22:29:24 +0000180 continue;
181 } catch (TException& tx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000182 if (inputTransport != NULL) { inputTransport->close(); }
183 if (outputTransport != NULL) { outputTransport->close(); }
184 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000185 string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
186 GlobalOutput(errStr.c_str());
Mark Slee907e3d62007-02-08 22:29:24 +0000187 continue;
188 } catch (string s) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000189 if (inputTransport != NULL) { inputTransport->close(); }
190 if (outputTransport != NULL) { outputTransport->close(); }
191 if (client != NULL) { client->close(); }
Mark Slee2e8a8d42008-01-16 00:38:20 +0000192 string errStr = "TThreadedServer: Unknown exception: " + s;
193 GlobalOutput(errStr.c_str());
Mark Sleeb3cb6292007-02-01 22:55:00 +0000194 break;
195 }
196 }
Mark Slee1d4ce802007-03-07 05:16:16 +0000197
198 // If stopped manually, make sure to close server transport
199 if (stop_) {
200 try {
201 serverTransport_->close();
202 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000203 string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
204 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000205 }
206 try {
207 Synchronized s(tasksMonitor_);
208 while (!tasks_.empty()) {
209 tasksMonitor_.wait();
210 }
211 } catch (TException &tx) {
Mark Slee2e8a8d42008-01-16 00:38:20 +0000212 string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
213 GlobalOutput(errStr.c_str());
Mark Slee1d4ce802007-03-07 05:16:16 +0000214 }
215 stop_ = false;
216 }
217
Mark Sleeb3cb6292007-02-01 22:55:00 +0000218}
219
T Jake Lucianib5e62212009-01-31 22:36:20 +0000220}}} // apache::thrift::server