blob: d07e9dab80796b944db5f5af99c721abe6276354 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeb3cb6292007-02-01 22:55:00 +00007#include "server/TThreadedServer.h"
8#include "transport/TTransportException.h"
9#include "concurrency/PosixThreadFactory.h"
10
11#include <string>
12#include <iostream>
13#include <pthread.h>
14#include <unistd.h>
15
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000016namespace facebook { namespace thrift { namespace server {
Mark Sleeb3cb6292007-02-01 22:55:00 +000017
Mark Slee5ea15f92007-03-05 22:55:59 +000018using boost::shared_ptr;
Mark Sleeb3cb6292007-02-01 22:55:00 +000019using namespace std;
20using namespace facebook::thrift;
Mark Slee5ea15f92007-03-05 22:55:59 +000021using namespace facebook::thrift::protocol;
Mark Sleeb3cb6292007-02-01 22:55:00 +000022using namespace facebook::thrift::transport;
23using namespace facebook::thrift::concurrency;
24
25class TThreadedServer::Task: public Runnable {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000026
Mark Sleeb3cb6292007-02-01 22:55:00 +000027public:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000028
29 Task(TThreadedServer& server,
Mark Slee1d4ce802007-03-07 05:16:16 +000030 shared_ptr<TProcessor> processor,
Mark Sleeb3cb6292007-02-01 22:55:00 +000031 shared_ptr<TProtocol> input,
32 shared_ptr<TProtocol> output) :
Mark Slee1d4ce802007-03-07 05:16:16 +000033 server_(server),
Mark Sleeb3cb6292007-02-01 22:55:00 +000034 processor_(processor),
35 input_(input),
36 output_(output) {
37 }
38
39 ~Task() {}
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000040
Mark Sleeb3cb6292007-02-01 22:55:00 +000041 void run() {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000042 boost::shared_ptr<TServerEventHandler> eventHandler =
43 server_.getEventHandler();
44 if (eventHandler != NULL) {
45 eventHandler->clientBegin(input_, output_);
46 }
Mark Sleeb3cb6292007-02-01 22:55:00 +000047 try {
48 while (processor_->process(input_, output_)) {
49 if (!input_->getTransport()->peek()) {
50 break;
51 }
52 }
53 } catch (TTransportException& ttx) {
54 cerr << "TThreadedServer client died: " << ttx.what() << endl;
55 } catch (TException& x) {
56 cerr << "TThreadedServer exception: " << x.what() << endl;
57 } catch (...) {
58 cerr << "TThreadedServer uncaught exception." << endl;
59 }
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000060 if (eventHandler != NULL) {
61 eventHandler->clientEnd(input_, output_);
62 }
Mark Sleeb3cb6292007-02-01 22:55:00 +000063 input_->getTransport()->close();
64 output_->getTransport()->close();
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000065
Mark Slee1d4ce802007-03-07 05:16:16 +000066 // Remove this task from parent bookkeeping
67 {
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000068 Synchronized s(server_.tasksMonitor_);
69 server_.tasks_.erase(this);
70 if (server_.tasks_.empty()) {
71 server_.tasksMonitor_.notify();
Mark Slee1d4ce802007-03-07 05:16:16 +000072 }
73 }
74
Mark Sleeb3cb6292007-02-01 22:55:00 +000075 }
76
77 private:
Mark Sleeb4d3e7b2007-11-28 01:51:43 +000078 TThreadedServer& server_;
Mark Slee1d4ce802007-03-07 05:16:16 +000079 friend class TThreadedServer;
80
Mark Sleeb3cb6292007-02-01 22:55:00 +000081 shared_ptr<TProcessor> processor_;
82 shared_ptr<TProtocol> input_;
83 shared_ptr<TProtocol> output_;
Mark Sleeb3cb6292007-02-01 22:55:00 +000084};
85
86
87TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
88 shared_ptr<TServerTransport> serverTransport,
89 shared_ptr<TTransportFactory> transportFactory,
90 shared_ptr<TProtocolFactory> protocolFactory):
Mark Slee1d4ce802007-03-07 05:16:16 +000091 TServer(processor, serverTransport, transportFactory, protocolFactory),
92 stop_(false) {
Mark Sleeb3cb6292007-02-01 22:55:00 +000093 threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
94}
95
96TThreadedServer::~TThreadedServer() {}
97
98void TThreadedServer::serve() {
99
100 shared_ptr<TTransport> client;
101 shared_ptr<TTransport> inputTransport;
102 shared_ptr<TTransport> outputTransport;
103 shared_ptr<TProtocol> inputProtocol;
104 shared_ptr<TProtocol> outputProtocol;
105
106 try {
107 // Start the server listening
108 serverTransport_->listen();
109 } catch (TTransportException& ttx) {
110 cerr << "TThreadedServer::run() listen(): " << ttx.what() << endl;
111 return;
112 }
113
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000114 // Run the preServe event
115 if (eventHandler_ != NULL) {
116 eventHandler_->preServe();
117 }
118
119 while (!stop_) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000120 try {
Mark Slee1d4ce802007-03-07 05:16:16 +0000121 client.reset();
122 inputTransport.reset();
123 outputTransport.reset();
124 inputProtocol.reset();
125 outputProtocol.reset();
126
Mark Sleeb3cb6292007-02-01 22:55:00 +0000127 // Fetch client from server
128 client = serverTransport_->accept();
Mark Slee1d4ce802007-03-07 05:16:16 +0000129
Mark Sleeb3cb6292007-02-01 22:55:00 +0000130 // Make IO transports
131 inputTransport = inputTransportFactory_->getTransport(client);
132 outputTransport = outputTransportFactory_->getTransport(client);
133 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
134 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
135
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000136 TThreadedServer::Task* task = new TThreadedServer::Task(*this,
137 processor_,
Mark Slee1d4ce802007-03-07 05:16:16 +0000138 inputProtocol,
139 outputProtocol);
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000140
Mark Slee1d4ce802007-03-07 05:16:16 +0000141 // Create a task
142 shared_ptr<Runnable> runnable =
143 shared_ptr<Runnable>(task);
Mark Sleeb3cb6292007-02-01 22:55:00 +0000144
145 // Create a thread for this task
146 shared_ptr<Thread> thread =
Mark Slee1d4ce802007-03-07 05:16:16 +0000147 shared_ptr<Thread>(threadFactory_->newThread(runnable));
Mark Sleeb4d3e7b2007-11-28 01:51:43 +0000148
Mark Slee1d4ce802007-03-07 05:16:16 +0000149 // Insert thread into the set of threads
150 {
151 Synchronized s(tasksMonitor_);
152 tasks_.insert(task);
153 }
154
Mark Sleeb3cb6292007-02-01 22:55:00 +0000155 // Start the thread!
156 thread->start();
157
158 } catch (TTransportException& ttx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000159 if (inputTransport != NULL) { inputTransport->close(); }
160 if (outputTransport != NULL) { outputTransport->close(); }
161 if (client != NULL) { client->close(); }
162 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
163 cerr << "TThreadedServer: TServerTransport died on accept: " << ttx.what() << endl;
164 }
Mark Slee907e3d62007-02-08 22:29:24 +0000165 continue;
166 } catch (TException& tx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000167 if (inputTransport != NULL) { inputTransport->close(); }
168 if (outputTransport != NULL) { outputTransport->close(); }
169 if (client != NULL) { client->close(); }
Mark Slee907e3d62007-02-08 22:29:24 +0000170 cerr << "TThreadedServer: Caught TException: " << tx.what() << endl;
171 continue;
172 } catch (string s) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000173 if (inputTransport != NULL) { inputTransport->close(); }
174 if (outputTransport != NULL) { outputTransport->close(); }
175 if (client != NULL) { client->close(); }
Mark Slee907e3d62007-02-08 22:29:24 +0000176 cerr << "TThreadedServer: Unknown exception: " << s << endl;
Mark Sleeb3cb6292007-02-01 22:55:00 +0000177 break;
178 }
179 }
Mark Slee1d4ce802007-03-07 05:16:16 +0000180
181 // If stopped manually, make sure to close server transport
182 if (stop_) {
183 try {
184 serverTransport_->close();
185 } catch (TException &tx) {
186 cerr << "TThreadedServer: Exception shutting down: " << tx.what() << endl;
187 }
188 try {
189 Synchronized s(tasksMonitor_);
190 while (!tasks_.empty()) {
191 tasksMonitor_.wait();
192 }
193 } catch (TException &tx) {
194 cerr << "TThreadedServer: Exception joining workers: " << tx.what() << endl;
195 }
196 stop_ = false;
197 }
198
Mark Sleeb3cb6292007-02-01 22:55:00 +0000199}
200
201}}} // facebook::thrift::server