blob: 1a9898abb9ca45096b8239c33854b41d5f04276c [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko35452342006-08-03 19:01:37 +00007#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00008#include "transport/TTransportException.h"
9#include "concurrency/Thread.h"
10#include "concurrency/ThreadManager.h"
11#include <string>
12#include <iostream>
13
14namespace facebook { namespace thrift { namespace server {
15
16using namespace std;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000017using namespace facebook::thrift;
Marc Slemko35452342006-08-03 19:01:37 +000018using namespace facebook::thrift::concurrency;
19using namespace facebook::thrift::transport;
20
Marc Slemko16698852006-08-04 03:16:10 +000021class TThreadPoolServer::Task: public Runnable {
Mark Slee2f6404d2006-10-10 01:37:40 +000022
Marc Slemko16698852006-08-04 03:16:10 +000023public:
Marc Slemko35452342006-08-03 19:01:37 +000024
Marc Slemko16698852006-08-04 03:16:10 +000025 Task(shared_ptr<TProcessor> processor,
Mark Slee4af6ed72006-10-25 19:02:49 +000026 shared_ptr<TProtocol> input,
27 shared_ptr<TProtocol> output) :
Mark Slee2f6404d2006-10-10 01:37:40 +000028 processor_(processor),
29 input_(input),
30 output_(output) {
Marc Slemko35452342006-08-03 19:01:37 +000031 }
Marc Slemko16698852006-08-04 03:16:10 +000032
33 ~Task() {}
Marc Slemko35452342006-08-03 19:01:37 +000034
Mark Sleeeb0d0242007-01-25 07:58:55 +000035 void run() {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000036 try {
37 while (processor_->process(input_, output_)) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000038 if (!input_->getTransport()->peek()) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000039 break;
40 }
Marc Slemko35452342006-08-03 19:01:37 +000041 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +000042 } catch (TTransportException& ttx) {
Martin Kraemeree341cb2007-02-05 21:40:38 +000043 // This is reasonably expected, client didn't send a full request so just
44 // ignore him
45 //cerr << "TThreadPoolServer client died: " << ttx.what() << endl;
Mark Sleeb9ff32a2006-11-16 01:00:24 +000046 } catch (TException& x) {
47 cerr << "TThreadPoolServer exception: " << x.what() << endl;
48 } catch (...) {
49 cerr << "TThreadPoolServer uncaught exception." << endl;
Marc Slemko35452342006-08-03 19:01:37 +000050 }
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000051 input_->getTransport()->close();
52 output_->getTransport()->close();
Marc Slemko35452342006-08-03 19:01:37 +000053 }
Mark Slee2f6404d2006-10-10 01:37:40 +000054
55 private:
56 shared_ptr<TProcessor> processor_;
Mark Slee4af6ed72006-10-25 19:02:49 +000057 shared_ptr<TProtocol> input_;
58 shared_ptr<TProtocol> output_;
Mark Slee2f6404d2006-10-10 01:37:40 +000059
Marc Slemko35452342006-08-03 19:01:37 +000060};
Marc Slemko16698852006-08-04 03:16:10 +000061
62TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000063 shared_ptr<TServerTransport> serverTransport,
64 shared_ptr<TTransportFactory> transportFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000065 shared_ptr<TProtocolFactory> protocolFactory,
Mark Slee4af6ed72006-10-25 19:02:49 +000066 shared_ptr<ThreadManager> threadManager) :
67 TServer(processor, serverTransport, transportFactory, protocolFactory),
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000068 threadManager_(threadManager) {}
69
70TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
71 shared_ptr<TServerTransport> serverTransport,
72 shared_ptr<TTransportFactory> inputTransportFactory,
73 shared_ptr<TTransportFactory> outputTransportFactory,
74 shared_ptr<TProtocolFactory> inputProtocolFactory,
75 shared_ptr<TProtocolFactory> outputProtocolFactory,
76 shared_ptr<ThreadManager> threadManager) :
77 TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
78 inputProtocolFactory, outputProtocolFactory),
79 threadManager_(threadManager) {}
80
Mark Sleed788b2e2006-09-07 01:26:35 +000081
Marc Slemko16698852006-08-04 03:16:10 +000082TThreadPoolServer::~TThreadPoolServer() {}
83
Mark Slee794993d2006-09-20 01:56:10 +000084void TThreadPoolServer::serve() {
Marc Slemko16698852006-08-04 03:16:10 +000085
Mark Sleed788b2e2006-09-07 01:26:35 +000086 shared_ptr<TTransport> client;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +000087 shared_ptr<TTransport> inputTransport;
88 shared_ptr<TTransport> outputTransport;
89 shared_ptr<TProtocol> inputProtocol;
90 shared_ptr<TProtocol> outputProtocol;
Mark Sleed788b2e2006-09-07 01:26:35 +000091
Marc Slemko16698852006-08-04 03:16:10 +000092 try {
93 // Start the server listening
94 serverTransport_->listen();
95 } catch (TTransportException& ttx) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000096 cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
Marc Slemko16698852006-08-04 03:16:10 +000097 return;
98 }
99
Mark Sleed788b2e2006-09-07 01:26:35 +0000100 while (true) {
Marc Slemko16698852006-08-04 03:16:10 +0000101 try {
Mark Sleed788b2e2006-09-07 01:26:35 +0000102 // Fetch client from server
103 client = serverTransport_->accept();
104 // Make IO transports
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000105 inputTransport = inputTransportFactory_->getTransport(client);
106 outputTransport = outputTransportFactory_->getTransport(client);
107 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
108 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
Mark Slee4af6ed72006-10-25 19:02:49 +0000109
Mark Sleed788b2e2006-09-07 01:26:35 +0000110 // Add to threadmanager pool
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000111 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_,
112 inputProtocol,
113 outputProtocol)));
Marc Slemko16698852006-08-04 03:16:10 +0000114 } catch (TTransportException& ttx) {
Aditya Agarwalfdef47e2007-02-07 03:54:18 +0000115 inputTransport->close();
116 outputTransport->close();
117 client->close();
118 cerr << "TThreadPoolServer: TServerTransport died on accept: " << ttx.what() << endl;
119 continue;
120 } catch (TException& tx) {
121 inputTransport->close();
122 outputTransport->close();
123 client->close();
124 cerr << "TThreadPoolServer: Caught TException: " << tx.what() << endl;
125 continue;
126 } catch (string s) {
127 inputTransport->close();
128 outputTransport->close();
129 client->close();
130 cerr << "TThreadPoolServer: Unknown exception: " << s << endl;
Marc Slemko16698852006-08-04 03:16:10 +0000131 break;
132 }
133 }
134}
Marc Slemko35452342006-08-03 19:01:37 +0000135
136}}} // facebook::thrift::server