| /* | 
 |  * Licensed to the Apache Software Foundation (ASF) under one | 
 |  * or more contributor license agreements. See the NOTICE file | 
 |  * distributed with this work for additional information | 
 |  * regarding copyright ownership. The ASF licenses this file | 
 |  * to you under the Apache License, Version 2.0 (the | 
 |  * "License"); you may not use this file except in compliance | 
 |  * with the License. You may obtain a copy of the License at | 
 |  * | 
 |  *   http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, | 
 |  * software distributed under the License is distributed on an | 
 |  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 |  * KIND, either express or implied. See the License for the | 
 |  * specific language governing permissions and limitations | 
 |  * under the License. | 
 |  */ | 
 |  | 
 | #include "TQTcpServer.h" | 
 | #include "TQIODeviceTransport.h" | 
 |  | 
 | #include <QTcpSocket> | 
 |  | 
 | #include <tr1/functional> | 
 |  | 
 | #include <protocol/TProtocol.h> | 
 | #include <async/TAsyncProcessor.h> | 
 |  | 
 | using boost::shared_ptr; | 
 | using apache::thrift::protocol::TProtocol; | 
 | using apache::thrift::protocol::TProtocolFactory; | 
 | using apache::thrift::transport::TTransport; | 
 | using apache::thrift::transport::TTransportException; | 
 | using apache::thrift::transport::TQIODeviceTransport; | 
 | using std::tr1::function; | 
 | using std::tr1::bind; | 
 |  | 
 | QT_USE_NAMESPACE | 
 |  | 
 | namespace apache { namespace thrift { namespace async { | 
 |  | 
 | struct TQTcpServer::ConnectionContext { | 
 |   shared_ptr<QTcpSocket> connection_; | 
 |   shared_ptr<TTransport> transport_; | 
 |   shared_ptr<TProtocol> iprot_; | 
 |   shared_ptr<TProtocol> oprot_; | 
 |  | 
 |   explicit ConnectionContext(shared_ptr<QTcpSocket> connection, | 
 |                              shared_ptr<TTransport> transport, | 
 |                              shared_ptr<TProtocol> iprot, | 
 |                              shared_ptr<TProtocol> oprot) | 
 |     : connection_(connection) | 
 |     , transport_(transport) | 
 |     , iprot_(iprot) | 
 |     , oprot_(oprot) | 
 |   {} | 
 | }; | 
 |  | 
 | TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server, | 
 |                          shared_ptr<TAsyncProcessor> processor, | 
 |                          shared_ptr<TProtocolFactory> pfact, | 
 |                          QObject* parent) | 
 |   : QObject(parent) | 
 |   , server_(server) | 
 |   , processor_(processor) | 
 |   , pfact_(pfact) | 
 | { | 
 |   connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming())); | 
 | } | 
 |  | 
 | TQTcpServer::~TQTcpServer() | 
 | { | 
 | } | 
 |  | 
 | void TQTcpServer::processIncoming() | 
 | { | 
 |   while (server_->hasPendingConnections()) { | 
 |     // take ownership of the QTcpSocket; technically it could be deleted | 
 |     // when the QTcpServer is destroyed, but any real app should delete this | 
 |     // class before deleting the QTcpServer that we are using | 
 |     shared_ptr<QTcpSocket> connection(server_->nextPendingConnection()); | 
 |      | 
 |     shared_ptr<TTransport> transport; | 
 |     shared_ptr<TProtocol> iprot; | 
 |     shared_ptr<TProtocol> oprot; | 
 |      | 
 |     try { | 
 |       transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection)); | 
 |       iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport)); | 
 |       oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport)); | 
 |     } catch(...) { | 
 |       qWarning("[TQTcpServer] Failed to initialize transports/protocols"); | 
 |       continue; | 
 |     } | 
 |      | 
 |     ctxMap_[connection.get()] = | 
 |       shared_ptr<ConnectionContext>( | 
 |          new ConnectionContext(connection, transport, iprot, oprot)); | 
 |      | 
 |     connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode())); | 
 |      | 
 |     // need to use QueuedConnection since we will be deleting the socket in the slot | 
 |     connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()), | 
 |             Qt::QueuedConnection); | 
 |   } | 
 | } | 
 |  | 
 | void TQTcpServer::beginDecode() | 
 | { | 
 |   QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender())); | 
 |   Q_ASSERT(connection); | 
 |  | 
 |   if (ctxMap_.find(connection) == ctxMap_.end()) { | 
 |     qWarning("[TQTcpServer] Got data on an unknown QTcpSocket"); | 
 |     return; | 
 |   } | 
 |    | 
 |   shared_ptr<ConnectionContext> ctx = ctxMap_[connection]; | 
 |    | 
 |   try { | 
 |     processor_->process( | 
 |       bind(&TQTcpServer::finish, this, | 
 |            ctx, std::tr1::placeholders::_1), | 
 |       ctx->iprot_, ctx->oprot_); | 
 |   } catch(const TTransportException& ex) { | 
 |     qWarning("[TQTcpServer] TTransportException during processing: '%s'", | 
 |              ex.what()); | 
 |     ctxMap_.erase(connection); | 
 |   } catch(...) { | 
 |     qWarning("[TQTcpServer] Unknown processor exception"); | 
 |     ctxMap_.erase(connection); | 
 |   } | 
 | } | 
 |  | 
 | void TQTcpServer::socketClosed() | 
 | { | 
 |   QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender())); | 
 |   Q_ASSERT(connection); | 
 |  | 
 |   if (ctxMap_.find(connection) == ctxMap_.end()) { | 
 |     qWarning("[TQTcpServer] Unknown QTcpSocket closed"); | 
 |     return; | 
 |   } | 
 |    | 
 |   ctxMap_.erase(connection); | 
 | } | 
 |  | 
 | void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy) | 
 | { | 
 |   if (!healthy) { | 
 |     qWarning("[TQTcpServer] Processor failed to process data successfully"); | 
 |     ctxMap_.erase(ctx->connection_.get()); | 
 |   } | 
 | } | 
 |  | 
 | }}} // apache::thrift::async |