blob: 65ad68dc61fe98bff844f7a73e180d94445d1806 [file] [log] [blame]
Roger Meier86e89862012-02-10 19:53:20 +00001#include "TQTcpServer.h"
2
3#include <protocol/TProtocol.h>
4#include <async/TAsyncProcessor.h>
5
6#include <QTcpSocket>
7
8#include "TQIODeviceTransport.h"
9
10using boost::shared_ptr;
11using apache::thrift::protocol::TProtocol;
12using apache::thrift::protocol::TProtocolFactory;
13using apache::thrift::transport::TTransport;
14using apache::thrift::transport::TTransportException;
15using apache::thrift::transport::TQIODeviceTransport;
16using std::tr1::function;
17using std::tr1::bind;
18
19QT_USE_NAMESPACE
20
21namespace apache { namespace thrift { namespace async {
22
23struct TQTcpServer::ConnectionContext {
24 shared_ptr<QTcpSocket> connection_;
25 shared_ptr<TTransport> transport_;
26 shared_ptr<TProtocol> iprot_;
27 shared_ptr<TProtocol> oprot_;
28
29 explicit ConnectionContext(shared_ptr<QTcpSocket> connection,
30 shared_ptr<TTransport> transport,
31 shared_ptr<TProtocol> iprot,
32 shared_ptr<TProtocol> oprot)
33 : connection_(connection)
34 , transport_(transport)
35 , iprot_(iprot)
36 , oprot_(oprot)
37 {}
38};
39
40TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server,
41 shared_ptr<TAsyncProcessor> processor,
42 shared_ptr<TProtocolFactory> pfact,
43 QObject* parent)
44 : QObject(parent)
45 , server_(server)
46 , processor_(processor)
47 , pfact_(pfact)
48{
49 connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming()));
50}
51
52TQTcpServer::~TQTcpServer()
53{
54}
55
56void TQTcpServer::processIncoming()
57{
58 while (server_->hasPendingConnections()) {
59 // take ownership of the QTcpSocket; technically it could be deleted
60 // when the QTcpServer is destroyed, but any real app should delete this
61 // class before deleting the QTcpServer that we are using
62 shared_ptr<QTcpSocket> connection(server_->nextPendingConnection());
63
64 shared_ptr<TTransport> transport;
65 shared_ptr<TProtocol> iprot;
66 shared_ptr<TProtocol> oprot;
67
68 try {
69 transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection));
70 iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
71 oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
72 } catch(...) {
73 qWarning("[TQTcpServer] Failed to initialize transports/protocols");
74 continue;
75 }
76
77 ctxMap_[connection.get()] =
78 shared_ptr<ConnectionContext>(
79 new ConnectionContext(connection, transport, iprot, oprot));
80
81 connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode()));
82
83 // need to use QueuedConnection since we will be deleting the socket in the slot
84 connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()),
85 Qt::QueuedConnection);
86 }
87}
88
89void TQTcpServer::beginDecode()
90{
91 QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
92 Q_ASSERT(connection);
93
94 if (ctxMap_.find(connection) == ctxMap_.end())
95 {
96 qWarning("[TQTcpServer] Got data on an unknown QTcpSocket");
97 return;
98 }
99
100 shared_ptr<ConnectionContext> ctx = ctxMap_[connection];
101
102 try {
103 processor_->process(
104 bind(&TQTcpServer::finish, this,
105 ctx, std::tr1::placeholders::_1),
106 ctx->iprot_, ctx->oprot_);
107 } catch(const TTransportException& ex) {
108 qWarning("[TQTcpServer] TTransportException during processing: '%s'",
109 ex.what());
110 ctxMap_.erase(connection);
111 } catch(...) {
112 qWarning("[TQTcpServer] Unknown processor exception");
113 ctxMap_.erase(connection);
114 }
115}
116
117void TQTcpServer::socketClosed()
118{
119 QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
120 Q_ASSERT(connection);
121
122 if (ctxMap_.find(connection) == ctxMap_.end())
123 {
124 qWarning("[TQTcpServer] Unknown QTcpSocket closed");
125 return;
126 }
127
128 ctxMap_.erase(connection);
129}
130
131void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy)
132{
133 if (!healthy)
134 {
135 qWarning("[TQTcpServer] Processor failed to process data successfully");
136 ctxMap_.erase(ctx->connection_.get());
137 }
138}
139
140}}} // apache::thrift::async