blob: f08ec3d82edd4a5c2112c253d1597bb342d7cdd2 [file] [log] [blame]
Roger Meier7699b402012-04-08 18:18:44 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Roger Meier86e89862012-02-10 19:53:20 +000019
Roger Meier19a99152012-02-11 19:09:30 +000020#include "TQTcpServer.h"
21#include "TQIODeviceTransport.h"
Roger Meier86e89862012-02-10 19:53:20 +000022
23#include <QTcpSocket>
24
Roger Meier19a99152012-02-11 19:09:30 +000025#include <tr1/functional>
26
27#include <protocol/TProtocol.h>
28#include <async/TAsyncProcessor.h>
Roger Meier86e89862012-02-10 19:53:20 +000029
30using boost::shared_ptr;
31using apache::thrift::protocol::TProtocol;
32using apache::thrift::protocol::TProtocolFactory;
33using apache::thrift::transport::TTransport;
34using apache::thrift::transport::TTransportException;
35using apache::thrift::transport::TQIODeviceTransport;
36using std::tr1::function;
37using std::tr1::bind;
38
39QT_USE_NAMESPACE
40
41namespace apache { namespace thrift { namespace async {
42
43struct TQTcpServer::ConnectionContext {
44 shared_ptr<QTcpSocket> connection_;
45 shared_ptr<TTransport> transport_;
46 shared_ptr<TProtocol> iprot_;
47 shared_ptr<TProtocol> oprot_;
48
49 explicit ConnectionContext(shared_ptr<QTcpSocket> connection,
50 shared_ptr<TTransport> transport,
51 shared_ptr<TProtocol> iprot,
52 shared_ptr<TProtocol> oprot)
53 : connection_(connection)
54 , transport_(transport)
55 , iprot_(iprot)
56 , oprot_(oprot)
57 {}
58};
59
60TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server,
61 shared_ptr<TAsyncProcessor> processor,
62 shared_ptr<TProtocolFactory> pfact,
63 QObject* parent)
64 : QObject(parent)
65 , server_(server)
66 , processor_(processor)
67 , pfact_(pfact)
68{
69 connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming()));
70}
71
72TQTcpServer::~TQTcpServer()
73{
74}
75
76void TQTcpServer::processIncoming()
77{
78 while (server_->hasPendingConnections()) {
79 // take ownership of the QTcpSocket; technically it could be deleted
80 // when the QTcpServer is destroyed, but any real app should delete this
81 // class before deleting the QTcpServer that we are using
82 shared_ptr<QTcpSocket> connection(server_->nextPendingConnection());
83
84 shared_ptr<TTransport> transport;
85 shared_ptr<TProtocol> iprot;
86 shared_ptr<TProtocol> oprot;
87
88 try {
89 transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection));
90 iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
91 oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
92 } catch(...) {
93 qWarning("[TQTcpServer] Failed to initialize transports/protocols");
94 continue;
95 }
96
97 ctxMap_[connection.get()] =
98 shared_ptr<ConnectionContext>(
99 new ConnectionContext(connection, transport, iprot, oprot));
100
101 connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode()));
102
103 // need to use QueuedConnection since we will be deleting the socket in the slot
104 connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()),
105 Qt::QueuedConnection);
106 }
107}
108
109void TQTcpServer::beginDecode()
110{
111 QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
112 Q_ASSERT(connection);
113
Roger Meier19a99152012-02-11 19:09:30 +0000114 if (ctxMap_.find(connection) == ctxMap_.end()) {
Roger Meier86e89862012-02-10 19:53:20 +0000115 qWarning("[TQTcpServer] Got data on an unknown QTcpSocket");
116 return;
117 }
118
119 shared_ptr<ConnectionContext> ctx = ctxMap_[connection];
120
121 try {
122 processor_->process(
123 bind(&TQTcpServer::finish, this,
124 ctx, std::tr1::placeholders::_1),
125 ctx->iprot_, ctx->oprot_);
126 } catch(const TTransportException& ex) {
127 qWarning("[TQTcpServer] TTransportException during processing: '%s'",
128 ex.what());
129 ctxMap_.erase(connection);
130 } catch(...) {
131 qWarning("[TQTcpServer] Unknown processor exception");
132 ctxMap_.erase(connection);
133 }
134}
135
136void TQTcpServer::socketClosed()
137{
138 QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
139 Q_ASSERT(connection);
140
Roger Meier19a99152012-02-11 19:09:30 +0000141 if (ctxMap_.find(connection) == ctxMap_.end()) {
Roger Meier86e89862012-02-10 19:53:20 +0000142 qWarning("[TQTcpServer] Unknown QTcpSocket closed");
143 return;
144 }
145
146 ctxMap_.erase(connection);
147}
148
149void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy)
150{
Roger Meier19a99152012-02-11 19:09:30 +0000151 if (!healthy) {
Roger Meier86e89862012-02-10 19:53:20 +0000152 qWarning("[TQTcpServer] Processor failed to process data successfully");
153 ctxMap_.erase(ctx->connection_.get());
154 }
155}
156
157}}} // apache::thrift::async