blob: 43f74635319a5bbef2a0dfbcfb3f84b0dde95ba8 [file] [log] [blame]
Marc Slemko35452342006-08-03 19:01:37 +00001#include "server/TThreadPoolServer.h"
Marc Slemko35452342006-08-03 19:01:37 +00002#include "transport/TTransportException.h"
3#include "concurrency/Thread.h"
4#include "concurrency/ThreadManager.h"
5#include <string>
6#include <iostream>
7
8namespace facebook { namespace thrift { namespace server {
9
10using namespace std;
11using namespace facebook::thrift::concurrency;
12using namespace facebook::thrift::transport;
13
Marc Slemko16698852006-08-04 03:16:10 +000014class TThreadPoolServer::Task: public Runnable {
Mark Slee2f6404d2006-10-10 01:37:40 +000015
Marc Slemko16698852006-08-04 03:16:10 +000016public:
Marc Slemko35452342006-08-03 19:01:37 +000017
Marc Slemko16698852006-08-04 03:16:10 +000018 Task(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000019 shared_ptr<TTransport> input,
20 shared_ptr<TTransport> output) :
Mark Slee2f6404d2006-10-10 01:37:40 +000021 processor_(processor),
22 input_(input),
23 output_(output) {
Marc Slemko35452342006-08-03 19:01:37 +000024 }
Marc Slemko16698852006-08-04 03:16:10 +000025
26 ~Task() {}
Marc Slemko35452342006-08-03 19:01:37 +000027
Mark Sleed788b2e2006-09-07 01:26:35 +000028 void run() {
Marc Slemko16698852006-08-04 03:16:10 +000029 while(true) {
Marc Slemko35452342006-08-03 19:01:37 +000030 try {
Mark Slee2f6404d2006-10-10 01:37:40 +000031 processor_->process(input_, output_);
Marc Slemko35452342006-08-03 19:01:37 +000032 } catch (TTransportException& ttx) {
Mark Sleed788b2e2006-09-07 01:26:35 +000033 break;
Marc Slemko16698852006-08-04 03:16:10 +000034 } catch(...) {
Mark Sleed788b2e2006-09-07 01:26:35 +000035 break;
Marc Slemko35452342006-08-03 19:01:37 +000036 }
37 }
Mark Slee2f6404d2006-10-10 01:37:40 +000038 input_->close();
39 output_->close();
Marc Slemko35452342006-08-03 19:01:37 +000040 }
Mark Slee2f6404d2006-10-10 01:37:40 +000041
42 private:
43 shared_ptr<TProcessor> processor_;
44 shared_ptr<TTransport> input_;
45 shared_ptr<TTransport> output_;
46
Marc Slemko35452342006-08-03 19:01:37 +000047};
Marc Slemko16698852006-08-04 03:16:10 +000048
49TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000050 shared_ptr<TServerTransport> serverTransport,
51 shared_ptr<TTransportFactory> transportFactory,
52 shared_ptr<ThreadManager> threadManager,
53 shared_ptr<TServerOptions> options) :
54 TServer(processor, serverTransport, transportFactory, options),
Marc Slemko16698852006-08-04 03:16:10 +000055 threadManager_(threadManager) {
56}
Mark Sleed788b2e2006-09-07 01:26:35 +000057
Marc Slemko16698852006-08-04 03:16:10 +000058TThreadPoolServer::~TThreadPoolServer() {}
59
Mark Slee794993d2006-09-20 01:56:10 +000060void TThreadPoolServer::serve() {
Marc Slemko16698852006-08-04 03:16:10 +000061
Mark Sleed788b2e2006-09-07 01:26:35 +000062 shared_ptr<TTransport> client;
63 pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
64
Marc Slemko16698852006-08-04 03:16:10 +000065 try {
66 // Start the server listening
67 serverTransport_->listen();
68 } catch (TTransportException& ttx) {
69 cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl;
70 return;
71 }
72
Mark Sleed788b2e2006-09-07 01:26:35 +000073 while (true) {
Marc Slemko16698852006-08-04 03:16:10 +000074 try {
Mark Sleed788b2e2006-09-07 01:26:35 +000075 // Fetch client from server
76 client = serverTransport_->accept();
77 // Make IO transports
78 io = transportFactory_->getIOTransports(client);
79 // Add to threadmanager pool
80 threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second)));
Marc Slemko16698852006-08-04 03:16:10 +000081 } catch (TTransportException& ttx) {
82 break;
83 }
84 }
85}
Marc Slemko35452342006-08-03 19:01:37 +000086
87}}} // facebook::thrift::server