blob: 34584bd4e8b1e3adbe83e596b2e0bca0ea7c3055 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeb3cb6292007-02-01 22:55:00 +00007#include "server/TThreadedServer.h"
8#include "transport/TTransportException.h"
9#include "concurrency/PosixThreadFactory.h"
10
11#include <string>
12#include <iostream>
13#include <pthread.h>
14#include <unistd.h>
15
16namespace facebook { namespace thrift { namespace server {
17
Mark Slee5ea15f92007-03-05 22:55:59 +000018using boost::shared_ptr;
Mark Sleeb3cb6292007-02-01 22:55:00 +000019using namespace std;
20using namespace facebook::thrift;
Mark Slee5ea15f92007-03-05 22:55:59 +000021using namespace facebook::thrift::protocol;
Mark Sleeb3cb6292007-02-01 22:55:00 +000022using namespace facebook::thrift::transport;
23using namespace facebook::thrift::concurrency;
24
25class TThreadedServer::Task: public Runnable {
26
27public:
28
Mark Slee1d4ce802007-03-07 05:16:16 +000029 Task(TThreadedServer* server,
30 shared_ptr<TProcessor> processor,
Mark Sleeb3cb6292007-02-01 22:55:00 +000031 shared_ptr<TProtocol> input,
32 shared_ptr<TProtocol> output) :
Mark Slee1d4ce802007-03-07 05:16:16 +000033 server_(server),
Mark Sleeb3cb6292007-02-01 22:55:00 +000034 processor_(processor),
35 input_(input),
36 output_(output) {
37 }
38
39 ~Task() {}
40
41 void run() {
42 try {
43 while (processor_->process(input_, output_)) {
44 if (!input_->getTransport()->peek()) {
45 break;
46 }
47 }
48 } catch (TTransportException& ttx) {
49 cerr << "TThreadedServer client died: " << ttx.what() << endl;
50 } catch (TException& x) {
51 cerr << "TThreadedServer exception: " << x.what() << endl;
52 } catch (...) {
53 cerr << "TThreadedServer uncaught exception." << endl;
54 }
55 input_->getTransport()->close();
56 output_->getTransport()->close();
Mark Slee1d4ce802007-03-07 05:16:16 +000057
58 // Remove this task from parent bookkeeping
59 {
60 Synchronized s(server_->tasksMonitor_);
61 server_->tasks_.erase(this);
62 if (server_->tasks_.empty()) {
63 server_->tasksMonitor_.notify();
64 }
65 }
66
Mark Sleeb3cb6292007-02-01 22:55:00 +000067 }
68
69 private:
Mark Slee1d4ce802007-03-07 05:16:16 +000070 TThreadedServer* server_;
71 friend class TThreadedServer;
72
Mark Sleeb3cb6292007-02-01 22:55:00 +000073 shared_ptr<TProcessor> processor_;
74 shared_ptr<TProtocol> input_;
75 shared_ptr<TProtocol> output_;
Mark Sleeb3cb6292007-02-01 22:55:00 +000076};
77
78
79TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
80 shared_ptr<TServerTransport> serverTransport,
81 shared_ptr<TTransportFactory> transportFactory,
82 shared_ptr<TProtocolFactory> protocolFactory):
Mark Slee1d4ce802007-03-07 05:16:16 +000083 TServer(processor, serverTransport, transportFactory, protocolFactory),
84 stop_(false) {
Mark Sleeb3cb6292007-02-01 22:55:00 +000085 threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
86}
87
88TThreadedServer::~TThreadedServer() {}
89
90void TThreadedServer::serve() {
91
92 shared_ptr<TTransport> client;
93 shared_ptr<TTransport> inputTransport;
94 shared_ptr<TTransport> outputTransport;
95 shared_ptr<TProtocol> inputProtocol;
96 shared_ptr<TProtocol> outputProtocol;
97
98 try {
99 // Start the server listening
100 serverTransport_->listen();
101 } catch (TTransportException& ttx) {
102 cerr << "TThreadedServer::run() listen(): " << ttx.what() << endl;
103 return;
104 }
105
Mark Slee1d4ce802007-03-07 05:16:16 +0000106 while (!stop_) {
Mark Sleeb3cb6292007-02-01 22:55:00 +0000107 try {
Mark Slee1d4ce802007-03-07 05:16:16 +0000108 client.reset();
109 inputTransport.reset();
110 outputTransport.reset();
111 inputProtocol.reset();
112 outputProtocol.reset();
113
Mark Sleeb3cb6292007-02-01 22:55:00 +0000114 // Fetch client from server
115 client = serverTransport_->accept();
Mark Slee1d4ce802007-03-07 05:16:16 +0000116
Mark Sleeb3cb6292007-02-01 22:55:00 +0000117 // Make IO transports
118 inputTransport = inputTransportFactory_->getTransport(client);
119 outputTransport = outputTransportFactory_->getTransport(client);
120 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
121 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
122
Mark Slee1d4ce802007-03-07 05:16:16 +0000123 TThreadedServer::Task* task = new TThreadedServer::Task(this,
124 processor_,
125 inputProtocol,
126 outputProtocol);
127
128 // Create a task
129 shared_ptr<Runnable> runnable =
130 shared_ptr<Runnable>(task);
Mark Sleeb3cb6292007-02-01 22:55:00 +0000131
132 // Create a thread for this task
133 shared_ptr<Thread> thread =
Mark Slee1d4ce802007-03-07 05:16:16 +0000134 shared_ptr<Thread>(threadFactory_->newThread(runnable));
Mark Sleeb3cb6292007-02-01 22:55:00 +0000135
Mark Slee1d4ce802007-03-07 05:16:16 +0000136 // Insert thread into the set of threads
137 {
138 Synchronized s(tasksMonitor_);
139 tasks_.insert(task);
140 }
141
Mark Sleeb3cb6292007-02-01 22:55:00 +0000142 // Start the thread!
143 thread->start();
144
145 } catch (TTransportException& ttx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000146 if (inputTransport != NULL) { inputTransport->close(); }
147 if (outputTransport != NULL) { outputTransport->close(); }
148 if (client != NULL) { client->close(); }
149 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
150 cerr << "TThreadedServer: TServerTransport died on accept: " << ttx.what() << endl;
151 }
Mark Slee907e3d62007-02-08 22:29:24 +0000152 continue;
153 } catch (TException& tx) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000154 if (inputTransport != NULL) { inputTransport->close(); }
155 if (outputTransport != NULL) { outputTransport->close(); }
156 if (client != NULL) { client->close(); }
Mark Slee907e3d62007-02-08 22:29:24 +0000157 cerr << "TThreadedServer: Caught TException: " << tx.what() << endl;
158 continue;
159 } catch (string s) {
Mark Slee1d4ce802007-03-07 05:16:16 +0000160 if (inputTransport != NULL) { inputTransport->close(); }
161 if (outputTransport != NULL) { outputTransport->close(); }
162 if (client != NULL) { client->close(); }
Mark Slee907e3d62007-02-08 22:29:24 +0000163 cerr << "TThreadedServer: Unknown exception: " << s << endl;
Mark Sleeb3cb6292007-02-01 22:55:00 +0000164 break;
165 }
166 }
Mark Slee1d4ce802007-03-07 05:16:16 +0000167
168 // If stopped manually, make sure to close server transport
169 if (stop_) {
170 try {
171 serverTransport_->close();
172 } catch (TException &tx) {
173 cerr << "TThreadedServer: Exception shutting down: " << tx.what() << endl;
174 }
175 try {
176 Synchronized s(tasksMonitor_);
177 while (!tasks_.empty()) {
178 tasksMonitor_.wait();
179 }
180 } catch (TException &tx) {
181 cerr << "TThreadedServer: Exception joining workers: " << tx.what() << endl;
182 }
183 stop_ = false;
184 }
185
Mark Sleeb3cb6292007-02-01 22:55:00 +0000186}
187
188}}} // facebook::thrift::server