blob: 85c72dc1b1ee215dc8a62f7680a4b02cdfc213e5 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeb3cb6292007-02-01 22:55:00 +00007#include "server/TThreadedServer.h"
8#include "transport/TTransportException.h"
9#include "concurrency/PosixThreadFactory.h"
10
11#include <string>
12#include <iostream>
13#include <pthread.h>
14#include <unistd.h>
15
16namespace facebook { namespace thrift { namespace server {
17
18using namespace std;
19using namespace facebook::thrift;
20using namespace facebook::thrift::transport;
21using namespace facebook::thrift::concurrency;
22
23class TThreadedServer::Task: public Runnable {
24
25public:
26
27 Task(shared_ptr<TProcessor> processor,
28 shared_ptr<TProtocol> input,
29 shared_ptr<TProtocol> output) :
30 processor_(processor),
31 input_(input),
32 output_(output) {
33 }
34
35 ~Task() {}
36
37 void run() {
38 try {
39 while (processor_->process(input_, output_)) {
40 if (!input_->getTransport()->peek()) {
41 break;
42 }
43 }
44 } catch (TTransportException& ttx) {
45 cerr << "TThreadedServer client died: " << ttx.what() << endl;
46 } catch (TException& x) {
47 cerr << "TThreadedServer exception: " << x.what() << endl;
48 } catch (...) {
49 cerr << "TThreadedServer uncaught exception." << endl;
50 }
51 input_->getTransport()->close();
52 output_->getTransport()->close();
53 }
54
55 private:
56 shared_ptr<TProcessor> processor_;
57 shared_ptr<TProtocol> input_;
58 shared_ptr<TProtocol> output_;
59
60};
61
62
63TThreadedServer::TThreadedServer(shared_ptr<TProcessor> processor,
64 shared_ptr<TServerTransport> serverTransport,
65 shared_ptr<TTransportFactory> transportFactory,
66 shared_ptr<TProtocolFactory> protocolFactory):
67 TServer(processor, serverTransport, transportFactory, protocolFactory) {
68 threadFactory_ = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
69}
70
71TThreadedServer::~TThreadedServer() {}
72
73void TThreadedServer::serve() {
74
75 shared_ptr<TTransport> client;
76 shared_ptr<TTransport> inputTransport;
77 shared_ptr<TTransport> outputTransport;
78 shared_ptr<TProtocol> inputProtocol;
79 shared_ptr<TProtocol> outputProtocol;
80
81 try {
82 // Start the server listening
83 serverTransport_->listen();
84 } catch (TTransportException& ttx) {
85 cerr << "TThreadedServer::run() listen(): " << ttx.what() << endl;
86 return;
87 }
88
89 while (true) {
90 try {
91 // Fetch client from server
92 client = serverTransport_->accept();
93 // Make IO transports
94 inputTransport = inputTransportFactory_->getTransport(client);
95 outputTransport = outputTransportFactory_->getTransport(client);
96 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
97 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
98
99 TThreadedServer::Task* t = new TThreadedServer::Task(processor_,
100 inputProtocol,
101 outputProtocol);
102
103 // Create a thread for this task
104 shared_ptr<Thread> thread =
105 shared_ptr<Thread>(threadFactory_->newThread(shared_ptr<Runnable>(t)));
106
107 // Start the thread!
108 thread->start();
109
110 } catch (TTransportException& ttx) {
Mark Slee907e3d62007-02-08 22:29:24 +0000111 inputTransport->close();
112 outputTransport->close();
113 client->close();
114 cerr << "TThreadedServer: TServerTransport died on accept: " << ttx.what() << endl;
115 continue;
116 } catch (TException& tx) {
117 inputTransport->close();
118 outputTransport->close();
119 client->close();
120 cerr << "TThreadedServer: Caught TException: " << tx.what() << endl;
121 continue;
122 } catch (string s) {
123 inputTransport->close();
124 outputTransport->close();
125 client->close();
126 cerr << "TThreadedServer: Unknown exception: " << s << endl;
Mark Sleeb3cb6292007-02-01 22:55:00 +0000127 break;
128 }
129 }
130}
131
132}}} // facebook::thrift::server