blob: 4285b053ef42221729343ca5b99567a98599e6d3 [file] [log] [blame]
Marc Slemko35452342006-08-03 19:01:37 +00001#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00002#include "transport/TTransportException.h"
3#include "concurrency/Thread.h"
4#include "concurrency/ThreadManager.h"
5#include <string>
6#include <iostream>
7
8namespace facebook { namespace thrift { namespace server {
9
10using namespace std;
11using namespace facebook::thrift::concurrency;
12using namespace facebook::thrift::transport;
13
Marc Slemko16698852006-08-04 03:16:10 +000014class TThreadPoolServer::Task: public Runnable {
Marc Slemko35452342006-08-03 19:01:37 +000015
Marc Slemko16698852006-08-04 03:16:10 +000016 shared_ptr<TProcessor> _processor;
Mark Sleed788b2e2006-09-07 01:26:35 +000017 shared_ptr<TTransport> _input;
18 shared_ptr<TTransport> _output;
Marc Slemko35452342006-08-03 19:01:37 +000019
Marc Slemko16698852006-08-04 03:16:10 +000020public:
Marc Slemko35452342006-08-03 19:01:37 +000021
Marc Slemko16698852006-08-04 03:16:10 +000022 Task(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000023 shared_ptr<TTransport> input,
24 shared_ptr<TTransport> output) :
Marc Slemko16698852006-08-04 03:16:10 +000025 _processor(processor),
Mark Sleed788b2e2006-09-07 01:26:35 +000026 _input(input),
27 _output(output) {
Marc Slemko35452342006-08-03 19:01:37 +000028 }
Marc Slemko16698852006-08-04 03:16:10 +000029
30 ~Task() {}
Marc Slemko35452342006-08-03 19:01:37 +000031
Mark Sleed788b2e2006-09-07 01:26:35 +000032 void run() {
Marc Slemko16698852006-08-04 03:16:10 +000033 while(true) {
Marc Slemko35452342006-08-03 19:01:37 +000034 try {
Mark Sleed788b2e2006-09-07 01:26:35 +000035 _processor->process(_input, _output);
Marc Slemko35452342006-08-03 19:01:37 +000036 } catch (TTransportException& ttx) {
Mark Sleed788b2e2006-09-07 01:26:35 +000037 break;
Marc Slemko16698852006-08-04 03:16:10 +000038 } catch(...) {
Mark Sleed788b2e2006-09-07 01:26:35 +000039 break;
Marc Slemko35452342006-08-03 19:01:37 +000040 }
41 }
Mark Sleed788b2e2006-09-07 01:26:35 +000042 _input->close();
43 _output->close();
Marc Slemko35452342006-08-03 19:01:37 +000044 }
45};
Marc Slemko16698852006-08-04 03:16:10 +000046
47TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000048 shared_ptr<TServerTransport> serverTransport,
49 shared_ptr<TTransportFactory> transportFactory,
50 shared_ptr<ThreadManager> threadManager,
51 shared_ptr<TServerOptions> options) :
52 TServer(processor, serverTransport, transportFactory, options),
Marc Slemko16698852006-08-04 03:16:10 +000053 threadManager_(threadManager) {
54}
Mark Sleed788b2e2006-09-07 01:26:35 +000055
Marc Slemko16698852006-08-04 03:16:10 +000056TThreadPoolServer::~TThreadPoolServer() {}
57
Mark Slee794993d2006-09-20 01:56:10 +000058void TThreadPoolServer::serve() {
Marc Slemko16698852006-08-04 03:16:10 +000059
Mark Sleed788b2e2006-09-07 01:26:35 +000060 shared_ptr<TTransport> client;
61 pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
62
Marc Slemko16698852006-08-04 03:16:10 +000063 try {
64 // Start the server listening
65 serverTransport_->listen();
66 } catch (TTransportException& ttx) {
67 cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl;
68 return;
69 }
70
Mark Sleed788b2e2006-09-07 01:26:35 +000071 while (true) {
Marc Slemko16698852006-08-04 03:16:10 +000072 try {
Mark Sleed788b2e2006-09-07 01:26:35 +000073 // Fetch client from server
74 client = serverTransport_->accept();
75 // Make IO transports
76 io = transportFactory_->getIOTransports(client);
77 // Add to threadmanager pool
78 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second)));
Marc Slemko16698852006-08-04 03:16:10 +000079 } catch (TTransportException& ttx) {
80 break;
81 }
82 }
83}
Marc Slemko35452342006-08-03 19:01:37 +000084
85}}} // facebook::thrift::server