Thrift: Adding StatsProcessor, PeekProcessor, TPipedFileReaderTransport, and TPipedFileReaderTransportFactory classes
- StatsProcessor can be used to print events, or keep track of event frequency
- PeekProcessor is used to examine data in a thrift event, prior to passing it along to an underlying processor
- TPipedFileReaderTransport and its factory are used to pipe a TFileReaderTransport (which TFileProcessor requires)
Also fixed some bugs in TFileTransport - next flush time was overflowing and not always being reset
Reviewed by: aditya, mcslee
Test Plan: Tested using various thrift clients (scribe, falcon) and gdb in sandbox and on dev008.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665066 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index b205e78..84618f4 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -11,7 +11,8 @@
src/concurrency/PosixThreadFactory.cpp \
src/concurrency/ThreadManager.cpp \
src/concurrency/TimerManager.cpp \
- src/protocol/TBinaryProtocol.cpp \
+ src/processor/PeekProcessor.cpp \
+ src/protocol/TBinaryProtocol.cpp \
src/transport/TFileTransport.cpp \
src/transport/THttpClient.cpp \
src/transport/TSocket.cpp \
@@ -34,14 +35,14 @@
include_thriftdir = $(includedir)/thrift
include_thrift_HEADERS = \
- config.h \
- src/Thrift.h \
- src/TProcessor.h \
- src/TLogging.h
+ config.h \
+ src/Thrift.h \
+ src/TProcessor.h \
+ src/TLogging.h
include_concurrencydir = $(include_thriftdir)/concurrency
include_concurrency_HEADERS = \
- src/concurrency/Exception.h \
+ src/concurrency/Exception.h \
src/concurrency/Mutex.h \
src/concurrency/Monitor.h \
src/concurrency/PosixThreadFactory.h \
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index affbe81..ac77da3 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -28,7 +28,7 @@
virtual bool process(boost::shared_ptr<protocol::TProtocol> in,
boost::shared_ptr<protocol::TProtocol> out) = 0;
- bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> io) {
+ virtual bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> io) {
return process(io, io);
}
diff --git a/lib/cpp/src/processor/PeekProcessor.cpp b/lib/cpp/src/processor/PeekProcessor.cpp
new file mode 100644
index 0000000..a7c5571
--- /dev/null
+++ b/lib/cpp/src/processor/PeekProcessor.cpp
@@ -0,0 +1,84 @@
+#include "PeekProcessor.h"
+
+namespace facebook { namespace thrift { namespace processor {
+
+PeekProcessor::PeekProcessor() {
+ memoryBuffer_.reset(new facebook::thrift::transport::TMemoryBuffer());
+}
+PeekProcessor::~PeekProcessor() {}
+
+void PeekProcessor::initialize(boost::shared_ptr<facebook::thrift::TProcessor> actualProcessor,
+ boost::shared_ptr<facebook::thrift::protocol::TProtocolFactory> protocolFactory,
+ boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory) {
+ actualProcessor_ = actualProcessor;
+ pipedProtocol_ = protocolFactory->getProtocol(memoryBuffer_);
+ transportFactory_ = transportFactory;
+ transportFactory_->initializeTargetTransport(memoryBuffer_);
+}
+
+boost::shared_ptr<facebook::thrift::transport::TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in) {
+ return transportFactory_->getTransport(in);
+}
+
+bool PeekProcessor::process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
+ boost::shared_ptr<facebook::thrift::protocol::TProtocol> out) {
+
+ std::string fname;
+ facebook::thrift::protocol::TMessageType mtype;
+ int32_t seqid;
+ in->readMessageBegin(fname, mtype, seqid);
+
+ if (mtype != facebook::thrift::protocol::T_CALL) {
+ throw facebook::thrift::TException("Unexpected message type");
+ }
+
+ // Peek at the name
+ peekName(fname);
+
+ facebook::thrift::protocol::TType ftype;
+ int16_t fid;
+ while (true) {
+ in->readFieldBegin(fname, ftype, fid);
+ if (ftype == facebook::thrift::protocol::T_STOP) {
+ break;
+ }
+
+ // Peek at the variable
+ peek(in, ftype, fid);
+ }
+ in->readMessageEnd();
+ in->getTransport()->readEnd();
+
+ // Done peeking at variables
+ peekEnd();
+
+ //
+ // All the data is now in memoryBuffer_ and ready to be processed
+ //
+
+ // Let's first take a peek at the full data in memory
+ uint8_t* buffer;
+ uint32_t size;
+ memoryBuffer_->getBuffer(&buffer, &size);
+ peekBuffer(buffer, size);
+
+ bool ret = actualProcessor_->process(pipedProtocol_, out);
+ memoryBuffer_->resetBuffer();
+ return ret;
+}
+
+void PeekProcessor::peekName(const std::string& fname) {
+}
+
+void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) {
+}
+
+void PeekProcessor::peek(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
+ facebook::thrift::protocol::TType ftype,
+ int16_t fid) {
+ in->skip(ftype);
+}
+
+void PeekProcessor::peekEnd() {}
+
+}}}
diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h
new file mode 100644
index 0000000..d1d227c
--- /dev/null
+++ b/lib/cpp/src/processor/PeekProcessor.h
@@ -0,0 +1,61 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef PEEKPROCESSOR_H
+#define PEEKPROCESSOR_H
+
+#include <string>
+#include <TProcessor.h>
+#include <transport/TTransport.h>
+#include <transport/TTransportUtils.h>
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace processor {
+
+/*
+ * Class for peeking at the raw data that is being processed by another processor
+ * and gives the derived class a chance to change behavior accordingly
+ *
+ * @author James Wang <jwang@facebook.com>
+ */
+class PeekProcessor : public facebook::thrift::TProcessor {
+
+ public:
+ PeekProcessor();
+ virtual ~PeekProcessor();
+
+ // Input here: actualProcessor - the underlying processor
+ // protocolFactory - the protocol factory used to wrap the memory buffer
+ // transportFactory - this TPipedTransportFactory is used to wrap the source transport
+ // via a call to getPipedTransport
+ void initialize(boost::shared_ptr<facebook::thrift::TProcessor> actualProcessor,
+ boost::shared_ptr<facebook::thrift::protocol::TProtocolFactory> protocolFactory,
+ boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory);
+
+ boost::shared_ptr<facebook::thrift::transport::TTransport> getPipedTransport(boost::shared_ptr<facebook::thrift::transport::TTransport> in);
+
+ virtual bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
+ boost::shared_ptr<facebook::thrift::protocol::TProtocol> out);
+
+ // The following three functions can be overloaded by child classes to
+ // achieve desired peeking behavior
+ virtual void peekName(const std::string& fname);
+ virtual void peekBuffer(uint8_t* buffer, uint32_t size);
+ virtual void peek(boost::shared_ptr<facebook::thrift::protocol::TProtocol> in,
+ facebook::thrift::protocol::TType ftype,
+ int16_t fid);
+ virtual void peekEnd();
+
+ private:
+ boost::shared_ptr<facebook::thrift::TProcessor> actualProcessor_;
+ boost::shared_ptr<facebook::thrift::protocol::TProtocol> pipedProtocol_;
+ boost::shared_ptr<facebook::thrift::transport::TPipedTransportFactory> transportFactory_;
+ boost::shared_ptr<facebook::thrift::transport::TMemoryBuffer> memoryBuffer_;
+};
+
+}}} // facebook::thrift::processor
+
+#endif
diff --git a/lib/cpp/src/processor/StatsProcessor.h b/lib/cpp/src/processor/StatsProcessor.h
new file mode 100644
index 0000000..e080432
--- /dev/null
+++ b/lib/cpp/src/processor/StatsProcessor.h
@@ -0,0 +1,252 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef STATSPROCESSOR_H
+#define STATSPROCESSOR_H
+
+#include <boost/shared_ptr.hpp>
+#include <transport/TTransport.h>
+#include <protocol/TProtocol.h>
+#include <TProcessor.h>
+
+namespace facebook { namespace thrift { namespace processor {
+
+/*
+ * Class for keeping track of function call statistics and printing them if desired
+ *
+ * @author James Wang <jwang@facebook.com>
+ */
+class StatsProcessor : public facebook::thrift::TProcessor {
+public:
+ StatsProcessor(bool print, bool frequency)
+ : print_(print),
+ frequency_(frequency)
+ {}
+ virtual ~StatsProcessor() {};
+
+ virtual bool process(boost::shared_ptr<facebook::thrift::protocol::TProtocol> piprot, boost::shared_ptr<facebook::thrift::protocol::TProtocol> poprot) {
+
+ piprot_ = piprot;
+
+ std::string fname;
+ facebook::thrift::protocol::TMessageType mtype;
+ int32_t seqid;
+
+ piprot_->readMessageBegin(fname, mtype, seqid);
+ if (mtype != facebook::thrift::protocol::T_CALL) {
+ if (print_) {
+ printf("Unknown message type\n");
+ }
+ throw facebook::thrift::TException("Unexpected message type");
+ }
+ if (print_) {
+ printf("%s (", fname.c_str());
+ }
+ if (frequency_) {
+ if (frequency_map_.find(fname) != frequency_map_.end()) {
+ frequency_map_[fname]++;
+ } else {
+ frequency_map_[fname] = 1;
+ }
+ }
+
+ facebook::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ while (true) {
+ piprot_->readFieldBegin(fname, ftype, fid);
+ if (ftype == facebook::thrift::protocol::T_STOP) {
+ break;
+ }
+
+ printAndPassToBuffer(ftype);
+ if (print_) {
+ printf(", ");
+ }
+ }
+
+ if (print_) {
+ printf("\b\b)\n");
+ }
+ return true;
+ }
+
+ const std::map<std::string, long long>& get_frequency_map() {
+ return frequency_map_;
+ }
+
+protected:
+ void printAndPassToBuffer(facebook::thrift::protocol::TType ftype) {
+ switch (ftype) {
+ case facebook::thrift::protocol::T_BOOL:
+ {
+ bool boolv;
+ piprot_->readBool(boolv);
+ if (print_) {
+ printf("%d", boolv);
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_BYTE:
+ {
+ int8_t bytev;
+ piprot_->readByte(bytev);
+ if (print_) {
+ printf("%d", bytev);
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_I16:
+ {
+ int16_t i16;
+ piprot_->readI16(i16);
+ if (print_) {
+ printf("%d", i16);
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_I32:
+ {
+ int32_t i32;
+ piprot_->readI32(i32);
+ if (print_) {
+ printf("%d", i32);
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_I64:
+ {
+ int64_t i64;
+ piprot_->readI64(i64);
+ if (print_) {
+ printf("%ld", i64);
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_DOUBLE:
+ {
+ double dub;
+ piprot_->readDouble(dub);
+ if (print_) {
+ printf("%f", dub);
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_STRING:
+ {
+ std::string str;
+ piprot_->readString(str);
+ if (print_) {
+ printf("%s", str.c_str());
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_STRUCT:
+ {
+ std::string name;
+ int16_t fid;
+ facebook::thrift::protocol::TType ftype;
+ piprot_->readStructBegin(name);
+ if (print_) {
+ printf("<");
+ }
+ while (true) {
+ piprot_->readFieldBegin(name, ftype, fid);
+ if (ftype == facebook::thrift::protocol::T_STOP) {
+ break;
+ }
+ printAndPassToBuffer(ftype);
+ if (print_) {
+ printf(",");
+ }
+ piprot_->readFieldEnd();
+ }
+ piprot_->readStructEnd();
+ if (print_) {
+ printf("\b>");
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_MAP:
+ {
+ facebook::thrift::protocol::TType keyType;
+ facebook::thrift::protocol::TType valType;
+ uint32_t i, size;
+ piprot_->readMapBegin(keyType, valType, size);
+ if (print_) {
+ printf("{");
+ }
+ for (i = 0; i < size; i++) {
+ printAndPassToBuffer(keyType);
+ if (print_) {
+ printf("=>");
+ }
+ printAndPassToBuffer(valType);
+ if (print_) {
+ printf(",");
+ }
+ }
+ piprot_->readMapEnd();
+ if (print_) {
+ printf("\b}");
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_SET:
+ {
+ facebook::thrift::protocol::TType elemType;
+ uint32_t i, size;
+ piprot_->readSetBegin(elemType, size);
+ if (print_) {
+ printf("{");
+ }
+ for (i = 0; i < size; i++) {
+ printAndPassToBuffer(elemType);
+ if (print_) {
+ printf(",");
+ }
+ }
+ piprot_->readSetEnd();
+ if (print_) {
+ printf("\b}");
+ }
+ }
+ break;
+ case facebook::thrift::protocol::T_LIST:
+ {
+ facebook::thrift::protocol::TType elemType;
+ uint32_t i, size;
+ piprot_->readListBegin(elemType, size);
+ if (print_) {
+ printf("[");
+ }
+ for (i = 0; i < size; i++) {
+ printAndPassToBuffer(elemType);
+ if (print_) {
+ printf(",");
+ }
+ }
+ piprot_->readListEnd();
+ if (print_) {
+ printf("\b]");
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ boost::shared_ptr<facebook::thrift::protocol::TProtocol> piprot_;
+ std::map<std::string, long long> frequency_map_;
+
+ bool print_;
+ bool frequency_;
+};
+
+}}} // facebook::thrift::processor
+
+#endif
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 9a3ca30..001783f 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -204,27 +204,14 @@
pthread_mutex_unlock(&mutex_);
}
-bool TFileTransport::swapEventBuffers(long long deadline) {
- //deadline time struc
- struct timespec ts;
- if (deadline) {
- ts.tv_sec = deadline/(1000*1000);
- ts.tv_nsec = (deadline%(1000*1000))*1000;
- }
-
- // wait for the queue to fill up
+bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
pthread_mutex_lock(&mutex_);
- while (enqueueBuffer_->isEmpty()) {
- // do a timed wait on the condition variable
- if (deadline) {
- int e = pthread_cond_timedwait(¬Empty_, &mutex_, &ts);
- if(e == ETIMEDOUT) {
- break;
- }
- } else {
- // just wait until the buffer gets an item
- pthread_cond_wait(¬Empty_, &mutex_);
- }
+ if (deadline != NULL) {
+ // if we were handed a deadline time struct, do a timed wait
+ pthread_cond_timedwait(¬Empty_, &mutex_, deadline);
+ } else {
+ // just wait until the buffer gets an item
+ pthread_cond_wait(¬Empty_, &mutex_);
}
bool swapped = false;
@@ -259,7 +246,9 @@
offset_ = lseek(fd_, 0, SEEK_END);
// Figure out the next time by which a flush must take place
- long long nextFlush = getCurrentTime() + flushMaxUs_;
+
+ struct timespec ts_next_flush;
+ getNextFlushTime(&ts_next_flush);
uint32_t unflushed = 0;
while(1) {
@@ -273,7 +262,7 @@
return;
}
- if (swapEventBuffers(nextFlush)) {
+ if (swapEventBuffers(&ts_next_flush)) {
eventInfo* outEvent;
while (NULL != (outEvent = dequeueBuffer_->getNext())) {
if (!outEvent) {
@@ -342,14 +331,23 @@
dequeueBuffer_->reset();
}
+ bool flushTimeElapsed = false;
+ struct timespec current_time;
+ clock_gettime(CLOCK_REALTIME, ¤t_time);
+
+ if (current_time.tv_sec > ts_next_flush.tv_sec ||
+ (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
+ flushTimeElapsed = true;
+ getNextFlushTime(&ts_next_flush);
+ }
+
// couple of cases from which a flush could be triggered
- if ((getCurrentTime() >= nextFlush && unflushed > 0) ||
+ if ((flushTimeElapsed && unflushed > 0) ||
unflushed > flushMaxBytes_ ||
forceFlush_) {
// sync (force flush) file to disk
fsync(fd_);
- nextFlush = getCurrentTime() + flushMaxUs_;
unflushed = 0;
// notify anybody waiting for flush completion
@@ -697,13 +695,14 @@
offset_ = lseek(fd_, 0, SEEK_CUR);
}
-uint32_t TFileTransport::getCurrentTime() {
- long long ret;
- struct timeval tv;
- gettimeofday(&tv, NULL);
- ret = tv.tv_sec;
- ret = ret*1000*1000 + tv.tv_usec;
- return ret;
+void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) {
+ clock_gettime(CLOCK_REALTIME, ts_next_flush);
+ ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000;
+ if (ts_next_flush->tv_nsec > 1000000000) {
+ ts_next_flush->tv_nsec -= 1000000000;
+ ts_next_flush->tv_sec += 1;
+ }
+ ts_next_flush->tv_sec += flushMaxUs_ / 1000000;
}
TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
@@ -773,7 +772,7 @@
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
- shared_ptr<TFileTransport> inputTransport):
+ shared_ptr<TFileReaderTransport> inputTransport):
processor_(processor),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory),
@@ -786,7 +785,7 @@
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> inputProtocolFactory,
shared_ptr<TProtocolFactory> outputProtocolFactory,
- shared_ptr<TFileTransport> inputTransport):
+ shared_ptr<TFileReaderTransport> inputTransport):
processor_(processor),
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory),
@@ -798,7 +797,7 @@
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
- shared_ptr<TFileTransport> inputTransport,
+ shared_ptr<TFileReaderTransport> inputTransport,
shared_ptr<TTransport> outputTransport):
processor_(processor),
inputProtocolFactory_(protocolFactory),
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index f4d25f4..e31e85c 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -122,12 +122,36 @@
};
/**
+ * Abstract interface for transports used to read files
+ */
+class TFileReaderTransport : virtual public TTransport {
+ public:
+ virtual int32_t getReadTimeout() = 0;
+ virtual void setReadTimeout(int32_t readTimeout) = 0;
+
+ virtual uint32_t getNumChunks() = 0;
+ virtual uint32_t getCurChunk() = 0;
+ virtual void seekToChunk(int32_t chunk) = 0;
+ virtual void seekToEnd() = 0;
+};
+
+/**
+ * Abstract interface for transports used to write files
+ */
+class TFileWriterTransport : virtual public TTransport {
+ public:
+ virtual uint32_t getChunkSize() = 0;
+ virtual void setChunkSize(uint32_t chunkSize) = 0;
+};
+
+/**
* File implementation of a transport. Reads and writes are done to a
* file on disk.
*
* @author Aditya Agarwal <aditya@facebook.com>
*/
-class TFileTransport : public TTransport {
+class TFileTransport : public TFileReaderTransport,
+ public TFileWriterTransport {
public:
TFileTransport(std::string path);
~TFileTransport();
@@ -240,7 +264,7 @@
private:
// helper functions for writing to a file
void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
- bool swapEventBuffers(long long deadline);
+ bool swapEventBuffers(struct timespec* deadline);
bool initBufferAndWriteThread();
// control for writer thread
@@ -259,7 +283,7 @@
// Utility functions
void openLogFile();
- uint32_t getCurrentTime();
+ void getNextFlushTime(struct timespec* ts_next_flush);
// Class variables
readState readState_;
@@ -358,12 +382,12 @@
*/
TFileProcessor(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
- boost::shared_ptr<TFileTransport> inputTransport);
+ boost::shared_ptr<TFileReaderTransport> inputTransport);
TFileProcessor(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
- boost::shared_ptr<TFileTransport> inputTransport);
+ boost::shared_ptr<TFileReaderTransport> inputTransport);
/**
* Constructor
@@ -375,7 +399,7 @@
*/
TFileProcessor(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
- boost::shared_ptr<TFileTransport> inputTransport,
+ boost::shared_ptr<TFileReaderTransport> inputTransport,
boost::shared_ptr<TTransport> outputTransport);
/**
@@ -396,7 +420,7 @@
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
- boost::shared_ptr<TFileTransport> inputTransport_;
+ boost::shared_ptr<TFileReaderTransport> inputTransport_;
boost::shared_ptr<TTransport> outputTransport_;
};
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
index aaefb2c..290b483 100644
--- a/lib/cpp/src/transport/TTransportUtils.cpp
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -296,4 +296,87 @@
srcTrans_->flush();
}
+TPipedFileReaderTransport::TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans)
+ : TPipedTransport(srcTrans, dstTrans),
+ srcTrans_(srcTrans) {
+}
+
+TPipedFileReaderTransport::~TPipedFileReaderTransport() {
+}
+
+bool TPipedFileReaderTransport::isOpen() {
+ return TPipedTransport::isOpen();
+}
+
+bool TPipedFileReaderTransport::peek() {
+ return TPipedTransport::peek();
+}
+
+void TPipedFileReaderTransport::open() {
+ TPipedTransport::open();
+}
+
+void TPipedFileReaderTransport::close() {
+ TPipedTransport::close();
+}
+
+uint32_t TPipedFileReaderTransport::read(uint8_t* buf, uint32_t len) {
+ return TPipedTransport::read(buf, len);
+}
+
+uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) {
+ uint32_t have = 0;
+ uint32_t get = 0;
+
+ while (have < len) {
+ get = read(buf+have, len-have);
+ if (get <= 0) {
+ throw TEOFException();
+ }
+ have += get;
+ }
+
+ return have;
+}
+
+void TPipedFileReaderTransport::readEnd() {
+ TPipedTransport::readEnd();
+}
+
+void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) {
+ TPipedTransport::write(buf, len);
+}
+
+void TPipedFileReaderTransport::writeEnd() {
+ TPipedTransport::writeEnd();
+}
+
+void TPipedFileReaderTransport::flush() {
+ TPipedTransport::flush();
+}
+
+int32_t TPipedFileReaderTransport::getReadTimeout() {
+ return srcTrans_->getReadTimeout();
+}
+
+void TPipedFileReaderTransport::setReadTimeout(int32_t readTimeout) {
+ srcTrans_->setReadTimeout(readTimeout);
+}
+
+uint32_t TPipedFileReaderTransport::getNumChunks() {
+ return srcTrans_->getNumChunks();
+}
+
+uint32_t TPipedFileReaderTransport::getCurChunk() {
+ return srcTrans_->getCurChunk();
+}
+
+void TPipedFileReaderTransport::seekToChunk(int32_t chunk) {
+ srcTrans_->seekToChunk(chunk);
+}
+
+void TPipedFileReaderTransport::seekToEnd() {
+ srcTrans_->seekToEnd();
+}
+
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index f95ad76..653939f 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -7,7 +7,9 @@
#ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
#define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
+#include <string>
#include <transport/TTransport.h>
+#include <transport/TFileTransport.h>
namespace facebook { namespace thrift { namespace transport {
@@ -349,7 +351,7 @@
*
* @author Aditya Agarwal <aditya@facebook.com>
*/
-class TPipedTransport : public TTransport {
+class TPipedTransport : virtual public TTransport {
public:
TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
boost::shared_ptr<TTransport> dstTrans) :
@@ -465,8 +467,10 @@
*/
class TPipedTransportFactory : public TTransportFactory {
public:
- TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans): dstTrans_(dstTrans) {}
-
+ TPipedTransportFactory() {}
+ TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans) {
+ initializeTargetTransport(dstTrans);
+ }
virtual ~TPipedTransportFactory() {}
/**
@@ -476,12 +480,85 @@
return boost::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
}
- private:
+ virtual void initializeTargetTransport(boost::shared_ptr<TTransport> dstTrans) {
+ if (dstTrans_.get() == NULL) {
+ dstTrans_ = dstTrans;
+ } else {
+ throw TException("Target transport already initialized");
+ }
+ }
+
+ protected:
boost::shared_ptr<TTransport> dstTrans_;
};
+/**
+ * TPipedFileTransport. This is just like a TTransport, except that
+ * it is a templatized class, so that clients who rely on a specific
+ * TTransport can still access the original transport.
+ *
+ * @author James Wang <jwang@facebook.com>
+ */
+class TPipedFileReaderTransport : public TPipedTransport,
+ public TFileReaderTransport {
+ public:
+ TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans);
+
+ ~TPipedFileReaderTransport();
+
+ // TTransport functions
+ bool isOpen();
+ bool peek();
+ void open();
+ void close();
+ uint32_t read(uint8_t* buf, uint32_t len);
+ uint32_t readAll(uint8_t* buf, uint32_t len);
+ void readEnd();
+ void write(const uint8_t* buf, uint32_t len);
+ void writeEnd();
+ void flush();
+
+ // TFileReaderTransport functions
+ int32_t getReadTimeout();
+ void setReadTimeout(int32_t readTimeout);
+ uint32_t getNumChunks();
+ uint32_t getCurChunk();
+ void seekToChunk(int32_t chunk);
+ void seekToEnd();
+
+ protected:
+ // shouldn't be used
+ TPipedFileReaderTransport();
+ boost::shared_ptr<TFileReaderTransport> srcTrans_;
+};
+
+/**
+ * Creates a TPipedFileReaderTransport from a filepath and a destination transport
+ *
+ * @author James Wang <jwang@facebook.com>
+ */
+class TPipedFileReaderTransportFactory : public TPipedTransportFactory {
+ public:
+ TPipedFileReaderTransportFactory() {}
+ TPipedFileReaderTransportFactory(boost::shared_ptr<TTransport> dstTrans)
+ : TPipedTransportFactory(dstTrans)
+ {}
+ virtual ~TPipedFileReaderTransportFactory() {}
+
+ boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
+ boost::shared_ptr<TFileReaderTransport> pFileReaderTransport = boost::dynamic_pointer_cast<TFileReaderTransport>(srcTrans);
+ if (pFileReaderTransport.get() != NULL) {
+ return getFileReaderTransport(pFileReaderTransport);
+ } else {
+ return boost::shared_ptr<TTransport>();
+ }
+ }
+
+ boost::shared_ptr<TFileReaderTransport> getFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans) {
+ return boost::shared_ptr<TFileReaderTransport>(new TPipedFileReaderTransport(srcTrans, dstTrans_));
+ }
+};
}}} // facebook::thrift::transport
-
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_