blob: dcbf2f2fdadb53c78ea2be13e3e6798bad774639 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko35452342006-08-03 19:01:37 +00007#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00008#include "transport/TTransportException.h"
9#include "concurrency/Thread.h"
10#include "concurrency/ThreadManager.h"
11#include <string>
12#include <iostream>
13
14namespace facebook { namespace thrift { namespace server {
15
16using namespace std;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000017using namespace facebook::thrift;
Marc Slemko35452342006-08-03 19:01:37 +000018using namespace facebook::thrift::concurrency;
19using namespace facebook::thrift::transport;
20
Marc Slemko16698852006-08-04 03:16:10 +000021class TThreadPoolServer::Task: public Runnable {
Mark Slee2f6404d2006-10-10 01:37:40 +000022
Marc Slemko16698852006-08-04 03:16:10 +000023public:
Marc Slemko35452342006-08-03 19:01:37 +000024
Marc Slemko16698852006-08-04 03:16:10 +000025 Task(shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000026 shared_ptr<TProtocol> input,
27 shared_ptr<TProtocol> output) :
Mark Slee2f6404d2006-10-10 01:37:40 +000028 processor_(processor),
29 input_(input),
30 output_(output) {
Marc Slemko35452342006-08-03 19:01:37 +000031 }
Marc Slemko16698852006-08-04 03:16:10 +000032
33 ~Task() {}
Marc Slemko35452342006-08-03 19:01:37 +000034
Mark Sleeeb0d0242007-01-25 07:58:55 +000035 void run() {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000036 try {
37 while (processor_->process(input_, output_)) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000038 if (!input_->getTransport()->peek()) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000039 break;
40 }
Marc Slemko35452342006-08-03 19:01:37 +000041 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000042 } catch (TTransportException& ttx) {
Martin Kraemeree341cb2007-02-05 21:40:38 +000043 // This is reasonably expected, client didn't send a full request so just
44 // ignore him
45 //cerr << "TThreadPoolServer client died: " << ttx.what() << endl;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000046 } catch (TException& x) {
47 cerr << "TThreadPoolServer exception: " << x.what() << endl;
48 } catch (...) {
49 cerr << "TThreadPoolServer uncaught exception." << endl;
Marc Slemko35452342006-08-03 19:01:37 +000050 }
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000051 input_->getTransport()->close();
52 output_->getTransport()->close();
Marc Slemko35452342006-08-03 19:01:37 +000053 }
Mark Slee2f6404d2006-10-10 01:37:40 +000054
55 private:
56 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +000057 shared_ptr<TProtocol> input_;
58 shared_ptr<TProtocol> output_;
Mark Slee2f6404d2006-10-10 01:37:40 +000059
Marc Slemko35452342006-08-03 19:01:37 +000060};
Marc Slemko16698852006-08-04 03:16:10 +000061
62TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000063 shared_ptr<TServerTransport> serverTransport,
64 shared_ptr<TTransportFactory> transportFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000065 shared_ptr<TProtocolFactory> protocolFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000066 shared_ptr<ThreadManager> threadManager) :
67 TServer(processor, serverTransport, transportFactory, protocolFactory),
Mark Slee6e3f6372007-03-01 22:05:46 +000068 threadManager_(threadManager),
69 stop_(false) {}
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000070
71TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
72 shared_ptr<TServerTransport> serverTransport,
73 shared_ptr<TTransportFactory> inputTransportFactory,
74 shared_ptr<TTransportFactory> outputTransportFactory,
75 shared_ptr<TProtocolFactory> inputProtocolFactory,
76 shared_ptr<TProtocolFactory> outputProtocolFactory,
77 shared_ptr<ThreadManager> threadManager) :
78 TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
79 inputProtocolFactory, outputProtocolFactory),
Mark Slee6e3f6372007-03-01 22:05:46 +000080 threadManager_(threadManager),
81 stop_(false) {}
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000082
Mark Sleed788b2e2006-09-07 01:26:35 +000083
Marc Slemko16698852006-08-04 03:16:10 +000084TThreadPoolServer::~TThreadPoolServer() {}
85
Mark Slee794993d2006-09-20 01:56:10 +000086void TThreadPoolServer::serve() {
Mark Sleed788b2e2006-09-07 01:26:35 +000087 shared_ptr<TTransport> client;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000088 shared_ptr<TTransport> inputTransport;
89 shared_ptr<TTransport> outputTransport;
90 shared_ptr<TProtocol> inputProtocol;
91 shared_ptr<TProtocol> outputProtocol;
Mark Sleed788b2e2006-09-07 01:26:35 +000092
Marc Slemko16698852006-08-04 03:16:10 +000093 try {
94 // Start the server listening
95 serverTransport_->listen();
96 } catch (TTransportException& ttx) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000097 cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
Marc Slemko16698852006-08-04 03:16:10 +000098 return;
99 }
100
Mark Slee6e3f6372007-03-01 22:05:46 +0000101 while (!stop_) {
Marc Slemko16698852006-08-04 03:16:10 +0000102 try {
Mark Sleed788b2e2006-09-07 01:26:35 +0000103 // Fetch client from server
104 client = serverTransport_->accept();
Mark Sleea5a783f2007-03-02 19:41:08 +0000105
Mark Sleed788b2e2006-09-07 01:26:35 +0000106 // Make IO transports
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000107 inputTransport = inputTransportFactory_->getTransport(client);
108 outputTransport = outputTransportFactory_->getTransport(client);
109 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
110 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
Mark Slee4af6ed72006-10-25 19:02:49 +0000111
Mark Sleed788b2e2006-09-07 01:26:35 +0000112 // Add to threadmanager pool
Mark Sleea5a783f2007-03-02 19:41:08 +0000113 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
114
Marc Slemko16698852006-08-04 03:16:10 +0000115 } catch (TTransportException& ttx) {
Mark Sleea5a783f2007-03-02 19:41:08 +0000116 if (inputTransport.get() != NULL) { inputTransport->close(); }
117 if (outputTransport.get() != NULL) { outputTransport->close(); }
118 if (client.get() != NULL) { client->close(); }
119 if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
120 cerr << "TThreadPoolServer: TServerTransport died on accept: " << ttx.what() << endl;
121 }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000122 continue;
123 } catch (TException& tx) {
Mark Sleea5a783f2007-03-02 19:41:08 +0000124 if (inputTransport.get() != NULL) { inputTransport->close(); }
125 if (outputTransport.get() != NULL) { outputTransport->close(); }
126 if (client.get() != NULL) { client->close(); }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000127 cerr << "TThreadPoolServer: Caught TException: " << tx.what() << endl;
128 continue;
129 } catch (string s) {
Mark Sleea5a783f2007-03-02 19:41:08 +0000130 if (inputTransport.get() != NULL) { inputTransport->close(); }
131 if (outputTransport.get() != NULL) { outputTransport->close(); }
132 if (client.get() != NULL) { client->close(); }
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000133 cerr << "TThreadPoolServer: Unknown exception: " << s << endl;
Marc Slemko16698852006-08-04 03:16:10 +0000134 break;
135 }
136 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000137
138 // If stopped manually, join the existing threads
139 if (stop_) {
140 try {
141 serverTransport_->close();
142 threadManager_->join();
143 } catch (TException &tx) {
144 cerr << "TThreadPoolServer: Exception shutting down: " << tx.what() << endl;
145 }
Mark Sleea5a783f2007-03-02 19:41:08 +0000146 stop_ = false;
Mark Slee6e3f6372007-03-01 22:05:46 +0000147 }
Mark Slee6e3f6372007-03-01 22:05:46 +0000148
Marc Slemko16698852006-08-04 03:16:10 +0000149}
Marc Slemko35452342006-08-03 19:01:37 +0000150
151}}} // facebook::thrift::server