Thrift: Modifications to PeekProcessor to be able to support nested PeekProcessors
Reviewed by: boz
Test Plan: Tested with Falcon
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665100 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/processor/PeekProcessor.cpp b/lib/cpp/src/processor/PeekProcessor.cpp
index d510a5d..21df9db 100644
--- a/lib/cpp/src/processor/PeekProcessor.cpp
+++ b/lib/cpp/src/processor/PeekProcessor.cpp
@@ -1,45 +1,63 @@
#include "PeekProcessor.h"
+using namespace facebook::thrift::transport;
+using namespace facebook::thrift::protocol;
+using namespace facebook::thrift;
+
namespace facebook { namespace thrift { namespace processor {
PeekProcessor::PeekProcessor() {
- memoryBuffer_.reset(new facebook::thrift::transport::TMemoryBuffer());
+ memoryBuffer_.reset(new TMemoryBuffer());
+ targetTransport_ = memoryBuffer_;
}
PeekProcessor::~PeekProcessor() {}
-void PeekProcessor::initialize(boost::shared_ptr<facebook::thrift::TProcessor> actualProcessor,
- boost::shared_ptr<facebook::thrift::protocol::TProtocolFactory> protocolFactory,
- boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory) {
+void PeekProcessor::initialize(boost::shared_ptr<TProcessor> actualProcessor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<TPipedTransportFactory> transportFactory) {
actualProcessor_ = actualProcessor;
- pipedProtocol_ = protocolFactory->getProtocol(memoryBuffer_);
+ pipedProtocol_ = protocolFactory->getProtocol(targetTransport_);
transportFactory_ = transportFactory;
- transportFactory_->initializeTargetTransport(memoryBuffer_);
+ transportFactory_->initializeTargetTransport(targetTransport_);
}
-boost::shared_ptr<facebook::thrift::transport::TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in) {
+boost::shared_ptr<TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<TTransport> in) {
return transportFactory_->getTransport(in);
}
-bool PeekProcessor::process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
- boost::shared_ptr<facebook::thrift::protocol::TProtocol> out) {
+void PeekProcessor::setTargetTransport(boost::shared_ptr<TTransport> targetTransport) {
+ targetTransport_ = targetTransport;
+ if (boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport_)) {
+ memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport);
+ } else if (boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)) {
+ memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>(boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)->getTargetTransport());
+ }
+
+ if (!memoryBuffer_) {
+ throw TException("Target transport must be a TMemoryBuffer or a TPipedTransport with TMemoryBuffer");
+ }
+}
+
+bool PeekProcessor::process(boost::shared_ptr<TProtocol> in,
+ boost::shared_ptr<TProtocol> out) {
std::string fname;
- facebook::thrift::protocol::TMessageType mtype;
+ TMessageType mtype;
int32_t seqid;
in->readMessageBegin(fname, mtype, seqid);
- if (mtype != facebook::thrift::protocol::T_CALL) {
- throw facebook::thrift::TException("Unexpected message type");
+ if (mtype != T_CALL) {
+ throw TException("Unexpected message type");
}
// Peek at the name
peekName(fname);
- facebook::thrift::protocol::TType ftype;
+ TType ftype;
int16_t fid;
while (true) {
in->readFieldBegin(fname, ftype, fid);
- if (ftype == facebook::thrift::protocol::T_STOP) {
+ if (ftype == T_STOP) {
break;
}
@@ -73,9 +91,9 @@
void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) {
}
-void PeekProcessor::peek(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
- facebook::thrift::protocol::TType ftype,
- int16_t fid) {
+void PeekProcessor::peek(boost::shared_ptr<TProtocol> in,
+ TType ftype,
+ int16_t fid) {
in->skip(ftype);
}
diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h
index d1d227c..6b8414b 100644
--- a/lib/cpp/src/processor/PeekProcessor.h
+++ b/lib/cpp/src/processor/PeekProcessor.h
@@ -37,6 +37,8 @@
boost::shared_ptr<facebook::thrift::transport::TTransport> getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in);
+ void setTargetTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> targetTransport);
+
virtual bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
boost::shared_ptr<facebook::thrift::protocol::TProtocol> out);
@@ -54,6 +56,7 @@
boost::shared_ptr<facebook::thrift::protocol::TProtocol> pipedProtocol_;
boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory_;
boost::shared_ptr<facebook::thrift::transport::TMemoryBuffer> memoryBuffer_;
+ boost::shared_ptr<facebook::thrift::transport::TTransport> targetTransport_;
};
}}} // facebook::thrift::processor
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index 653939f..cb9a95f 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -425,6 +425,7 @@
void readEnd() {
if (pipeOnRead_) {
dstTrans_->write(rBuf_, rLen_);
+ dstTrans_->flush();
}
// reset state
@@ -437,11 +438,16 @@
void writeEnd() {
if (pipeOnWrite_) {
dstTrans_->write(wBuf_, wLen_);
+ dstTrans_->flush();
}
}
void flush();
+ boost::shared_ptr<TTransport> getTargetTransport() {
+ return dstTrans_;
+ }
+
protected:
boost::shared_ptr<TTransport> srcTrans_;
boost::shared_ptr<TTransport> dstTrans_;