blob: bd39893e31e7792046943cddcd35573c53e0e939 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <functional>
#include <memory>
#include <thrift/qt/TQTcpServer.h>
#include <thrift/qt/TQIODeviceTransport.h>
#include <QMetaType>
#include <QTcpSocket>
#include <thrift/protocol/TProtocol.h>
#include <thrift/async/TAsyncProcessor.h>
using apache::thrift::protocol::TProtocol;
using apache::thrift::protocol::TProtocolFactory;
using apache::thrift::transport::TTransport;
using apache::thrift::transport::TTransportException;
using apache::thrift::transport::TQIODeviceTransport;
using std::bind;
using std::function;
using std::placeholders::_1;
using std::shared_ptr;
QT_USE_NAMESPACE
namespace apache {
namespace thrift {
namespace async {
struct TQTcpServer::ConnectionContext {
shared_ptr<QTcpSocket> connection_;
shared_ptr<TTransport> transport_;
shared_ptr<TProtocol> iprot_;
shared_ptr<TProtocol> oprot_;
explicit ConnectionContext(shared_ptr<QTcpSocket> connection,
shared_ptr<TTransport> transport,
shared_ptr<TProtocol> iprot,
shared_ptr<TProtocol> oprot)
: connection_(connection), transport_(transport), iprot_(iprot), oprot_(oprot) {}
};
TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server,
shared_ptr<TAsyncProcessor> processor,
shared_ptr<TProtocolFactory> pfact,
QObject* parent)
: QObject(parent), server_(server), processor_(processor), pfact_(pfact) {
qRegisterMetaType<QTcpSocket*>("QTcpSocket*");
connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming()));
}
TQTcpServer::~TQTcpServer() = default;
void TQTcpServer::processIncoming() {
while (server_->hasPendingConnections()) {
// take ownership of the QTcpSocket; technically it could be deleted
// when the QTcpServer is destroyed, but any real app should delete this
// class before deleting the QTcpServer that we are using
shared_ptr<QTcpSocket> connection(server_->nextPendingConnection());
shared_ptr<TTransport> transport;
shared_ptr<TProtocol> iprot;
shared_ptr<TProtocol> oprot;
try {
transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection));
iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
} catch (...) {
qWarning("[TQTcpServer] Failed to initialize transports/protocols");
continue;
}
ctxMap_[connection.get()]
= shared_ptr<ConnectionContext>(new ConnectionContext(connection, transport, iprot, oprot));
connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode()));
connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()));
}
}
void TQTcpServer::beginDecode() {
auto* connection(qobject_cast<QTcpSocket*>(sender()));
Q_ASSERT(connection);
if (ctxMap_.find(connection) == ctxMap_.end()) {
qWarning("[TQTcpServer] Got data on an unknown QTcpSocket");
return;
}
shared_ptr<ConnectionContext> ctx = ctxMap_[connection];
try {
processor_
->process(bind(&TQTcpServer::finish, this, ctx, _1),
ctx->iprot_,
ctx->oprot_);
} catch (const TTransportException& ex) {
qWarning("[TQTcpServer] TTransportException during processing: '%s'", ex.what());
scheduleDeleteConnectionContext(connection);
} catch (...) {
qWarning("[TQTcpServer] Unknown processor exception");
scheduleDeleteConnectionContext(connection);
}
}
void TQTcpServer::socketClosed() {
auto* connection(qobject_cast<QTcpSocket*>(sender()));
Q_ASSERT(connection);
scheduleDeleteConnectionContext(connection);
}
void TQTcpServer::deleteConnectionContext(QTcpSocket* connection) {
const ConnectionContextMap::size_type deleted = ctxMap_.erase(connection);
if (0 == deleted) {
qWarning("[TQTcpServer] Unknown QTcpSocket");
}
}
void TQTcpServer::scheduleDeleteConnectionContext(QTcpSocket* connection) {
QMetaObject::invokeMethod(this, "deleteConnectionContext", Qt::QueuedConnection, Q_ARG(QTcpSocket*, connection));
}
void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy) {
if (!healthy) {
qWarning("[TQTcpServer] Processor failed to process data successfully");
deleteConnectionContext(ctx->connection_.get());
}
}
}
}
} // apache::thrift::async