blob: a06117efe9c142084a7186f7515961f8812ebcef [file] [log] [blame]
Mark Sleeb3cb6292007-02-01 22:55:00 +00001#include "server/TThreadedServer.h"
2#include "transport/TTransportException.h"
3#include "concurrency/PosixThreadFactory.h"
4
5#include <string>
6#include <iostream>
7#include <pthread.h>
8#include <unistd.h>
9
10namespace facebook { namespace thrift { namespace server {
11
12using namespace std;
13using namespace facebook::thrift;
14using namespace facebook::thrift::transport;
15using namespace facebook::thrift::concurrency;
16
17class TThreadedServer::Task: public Runnable {
18
19public:
20
21 Task(shared_ptr<TProcessor> processor,
22 shared_ptr<TProtocol> input,
23 shared_ptr<TProtocol> output) :
24 processor_(processor),
25 input_(input),
26 output_(output) {
27 }
28
29 ~Task() {}
30
31 void run() {
32 try {
33 while (processor_->process(input_, output_)) {
34 if (!input_->getTransport()->peek()) {
35 break;
36 }
37 }
38 } catch (TTransportException& ttx) {
39 cerr << "TThreadedServer client died: " << ttx.what() << endl;
40 } catch (TException& x) {
41 cerr << "TThreadedServer exception: " << x.what() << endl;
42 } catch (...) {
43 cerr << "TThreadedServer uncaught exception." << endl;
44 }
45 input_->getTransport()->close();
46 output_->getTransport()->close();
47 }
48
49 private:
50 shared_ptr<TProcessor> processor_;
51 shared_ptr<TProtocol> input_;
52 shared_ptr<TProtocol> output_;
53
54};
55
56
57TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
58 shared_ptr<TServerTransport> serverTransport,
59 shared_ptr<TTransportFactory> transportFactory,
60 shared_ptr<TProtocolFactory> protocolFactory):
61 TServer(processor, serverTransport, transportFactory, protocolFactory) {
62 threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
63}
64
65TThreadedServer::~TThreadedServer() {}
66
67void TThreadedServer::serve() {
68
69 shared_ptr<TTransport> client;
70 shared_ptr<TTransport> inputTransport;
71 shared_ptr<TTransport> outputTransport;
72 shared_ptr<TProtocol> inputProtocol;
73 shared_ptr<TProtocol> outputProtocol;
74
75 try {
76 // Start the server listening
77 serverTransport_->listen();
78 } catch (TTransportException& ttx) {
79 cerr << "TThreadedServer::run() listen(): " << ttx.what() << endl;
80 return;
81 }
82
83 while (true) {
84 try {
85 // Fetch client from server
86 client = serverTransport_->accept();
87 // Make IO transports
88 inputTransport = inputTransportFactory_->getTransport(client);
89 outputTransport = outputTransportFactory_->getTransport(client);
90 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
91 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
92
93 TThreadedServer::Task* t = new TThreadedServer::Task(processor_,
94 inputProtocol,
95 outputProtocol);
96
97 // Create a thread for this task
98 shared_ptr<Thread> thread =
99 shared_ptr<Thread>(threadFactory_->newThread(shared_ptr<Runnable>(t)));
100
101 // Start the thread!
102 thread->start();
103
104 } catch (TTransportException& ttx) {
105 break;
106 }
107 }
108}
109
110}}} // facebook::thrift::server