Thrift now a TLP - INFRA-3116
git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferTransports.cpp b/lib/cpp/src/transport/TBufferTransports.cpp
new file mode 100644
index 0000000..7a7e5e9
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferTransports.cpp
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cassert>
+#include <algorithm>
+
+#include <transport/TBufferTransports.h>
+
+using std::string;
+
+namespace apache { namespace thrift { namespace transport {
+
+
+uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
+ uint32_t want = len;
+ uint32_t have = rBound_ - rBase_;
+
+ // We should only take the slow path if we can't satisfy the read
+ // with the data already in the buffer.
+ assert(have < want);
+
+ // Copy out whatever we have.
+ if (have > 0) {
+ memcpy(buf, rBase_, have);
+ want -= have;
+ buf += have;
+ }
+ // Get more from underlying transport up to buffer size.
+ // Note that this makes a lot of sense if len < rBufSize_
+ // and almost no sense otherwise. TODO(dreiss): Fix that
+ // case (possibly including some readv hotness).
+ setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
+
+ // Hand over whatever we have.
+ uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
+ memcpy(buf, rBase_, give);
+ rBase_ += give;
+ want -= give;
+
+ return (len - want);
+}
+
+void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
+ uint32_t have_bytes = wBase_ - wBuf_.get();
+ uint32_t space = wBound_ - wBase_;
+ // We should only take the slow path if we can't accomodate the write
+ // with the free space already in the buffer.
+ assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
+
+ // Now here's the tricky question: should we copy data from buf into our
+ // internal buffer and write it from there, or should we just write out
+ // the current internal buffer in one syscall and write out buf in another.
+ // If our currently buffered data plus buf is at least double our buffer
+ // size, we will have to do two syscalls no matter what (except in the
+ // degenerate case when our buffer is empty), so there is no use copying.
+ // Otherwise, there is sort of a sliding scale. If we have N-1 bytes
+ // buffered and need to write 2, it would be crazy to do two syscalls.
+ // On the other hand, if we have 2 bytes buffered and are writing 2N-3,
+ // we can save a syscall in the short term by loading up our buffer, writing
+ // it out, and copying the rest of the bytes into our buffer. Of course,
+ // if we get another 2-byte write, we haven't saved any syscalls at all,
+ // and have just copied nearly 2N bytes for nothing. Finding a perfect
+ // policy would require predicting the size of future writes, so we're just
+ // going to always eschew syscalls if we have less than 2N bytes to write.
+
+ // The case where we have to do two syscalls.
+ // This case also covers the case where the buffer is empty,
+ // but it is clearer (I think) to think of it as two separate cases.
+ if ((have_bytes + len >= 2*wBufSize_) || (have_bytes == 0)) {
+ // TODO(dreiss): writev
+ if (have_bytes > 0) {
+ transport_->write(wBuf_.get(), have_bytes);
+ }
+ transport_->write(buf, len);
+ wBase_ = wBuf_.get();
+ return;
+ }
+
+ // Fill up our internal buffer for a write.
+ memcpy(wBase_, buf, space);
+ buf += space;
+ len -= space;
+ transport_->write(wBuf_.get(), wBufSize_);
+
+ // Copy the rest into our buffer.
+ assert(len < wBufSize_);
+ memcpy(wBuf_.get(), buf, len);
+ wBase_ = wBuf_.get() + len;
+ return;
+}
+
+const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
+ // If the request is bigger than our buffer, we are hosed.
+ if (*len > rBufSize_) {
+ return NULL;
+ }
+
+ // The number of bytes of data we have already.
+ uint32_t have = rBound_ - rBase_;
+ // The number of additional bytes we need from the underlying transport.
+ int32_t need = *len - have;
+ // The space from the start of the buffer to the end of our data.
+ uint32_t offset = rBound_ - rBuf_.get();
+ assert(need > 0);
+
+ // If we have less than half our buffer space available, shift the data
+ // we have down to the start. If the borrow is big compared to our buffer,
+ // this could be kind of a waste, but if the borrow is small, it frees up
+ // space at the end of our buffer to do a bigger single read from the
+ // underlying transport. Also, if our needs extend past the end of the
+ // buffer, we have to do a copy no matter what.
+ if ((offset > rBufSize_/2) || (offset + need > rBufSize_)) {
+ memmove(rBuf_.get(), rBase_, have);
+ setReadBuffer(rBuf_.get(), have);
+ }
+
+ // First try to fill up the buffer.
+ uint32_t got = transport_->read(rBound_, rBufSize_ - have);
+ rBound_ += got;
+ need -= got;
+
+ // If that fails, readAll until we get what we need.
+ if (need > 0) {
+ rBound_ += transport_->readAll(rBound_, need);
+ }
+
+ *len = rBound_ - rBase_;
+ return rBase_;
+}
+
+void TBufferedTransport::flush() {
+ // Write out any data waiting in the write buffer.
+ uint32_t have_bytes = wBase_ - wBuf_.get();
+ if (have_bytes > 0) {
+ // Note that we reset wBase_ prior to the underlying write
+ // to ensure we're in a sane state (i.e. internal buffer cleaned)
+ // if the underlying write throws up an exception
+ wBase_ = wBuf_.get();
+ transport_->write(wBuf_.get(), have_bytes);
+ }
+
+ // Flush the underlying transport.
+ transport_->flush();
+}
+
+
+uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
+ uint32_t want = len;
+ uint32_t have = rBound_ - rBase_;
+
+ // We should only take the slow path if we can't satisfy the read
+ // with the data already in the buffer.
+ assert(have < want);
+
+ // Copy out whatever we have.
+ if (have > 0) {
+ memcpy(buf, rBase_, have);
+ want -= have;
+ buf += have;
+ }
+
+ // Read another frame.
+ readFrame();
+
+ // TODO(dreiss): Should we warn when reads cross frames?
+
+ // Hand over whatever we have.
+ uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
+ memcpy(buf, rBase_, give);
+ rBase_ += give;
+ want -= give;
+
+ return (len - want);
+}
+
+void TFramedTransport::readFrame() {
+ // TODO(dreiss): Think about using readv here, even though it would
+ // result in (gasp) read-ahead.
+
+ // Read the size of the next frame.
+ int32_t sz;
+ transport_->readAll((uint8_t*)&sz, sizeof(sz));
+ sz = ntohl(sz);
+
+ if (sz < 0) {
+ throw TTransportException("Frame size has negative value");
+ }
+
+ // Read the frame payload, and reset markers.
+ if (sz > static_cast<int32_t>(rBufSize_)) {
+ rBuf_.reset(new uint8_t[sz]);
+ rBufSize_ = sz;
+ }
+ transport_->readAll(rBuf_.get(), sz);
+ setReadBuffer(rBuf_.get(), sz);
+}
+
+void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
+ // Double buffer size until sufficient.
+ uint32_t have = wBase_ - wBuf_.get();
+ while (wBufSize_ < len + have) {
+ wBufSize_ *= 2;
+ }
+
+ // TODO(dreiss): Consider modifying this class to use malloc/free
+ // so we can use realloc here.
+
+ // Allocate new buffer.
+ uint8_t* new_buf = new uint8_t[wBufSize_];
+
+ // Copy the old buffer to the new one.
+ memcpy(new_buf, wBuf_.get(), have);
+
+ // Now point buf to the new one.
+ wBuf_.reset(new_buf);
+ wBase_ = wBuf_.get() + have;
+ wBound_ = wBuf_.get() + wBufSize_;
+
+ // Copy the data into the new buffer.
+ memcpy(wBase_, buf, len);
+ wBase_ += len;
+}
+
+void TFramedTransport::flush() {
+ int32_t sz_hbo, sz_nbo;
+ assert(wBufSize_ > sizeof(sz_nbo));
+
+ // Slip the frame size into the start of the buffer.
+ sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo));
+ sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
+ memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
+
+ if (sz_hbo > 0) {
+ // Note that we reset wBase_ (with a pad for the frame size)
+ // prior to the underlying write to ensure we're in a sane state
+ // (i.e. internal buffer cleaned) if the underlying write throws
+ // up an exception
+ wBase_ = wBuf_.get() + sizeof(sz_nbo);
+
+ // Write size and frame body.
+ transport_->write(wBuf_.get(), sizeof(sz_nbo)+sz_hbo);
+ }
+
+ // Flush the underlying transport.
+ transport_->flush();
+}
+
+const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
+ // Don't try to be clever with shifting buffers.
+ // If the fast path failed let the protocol use its slow path.
+ // Besides, who is going to try to borrow across messages?
+ return NULL;
+}
+
+
+void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
+ // Correct rBound_ so we can use the fast path in the future.
+ rBound_ = wBase_;
+
+ // Decide how much to give.
+ uint32_t give = std::min(len, available_read());
+
+ *out_start = rBase_;
+ *out_give = give;
+
+ // Preincrement rBase_ so the caller doesn't have to.
+ rBase_ += give;
+}
+
+uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len) {
+ uint8_t* start;
+ uint32_t give;
+ computeRead(len, &start, &give);
+
+ // Copy into the provided buffer.
+ memcpy(buf, start, give);
+
+ return give;
+}
+
+uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) {
+ // Don't get some stupid assertion failure.
+ if (buffer_ == NULL) {
+ return 0;
+ }
+
+ uint8_t* start;
+ uint32_t give;
+ computeRead(len, &start, &give);
+
+ // Append to the provided string.
+ str.append((char*)start, give);
+
+ return give;
+}
+
+void TMemoryBuffer::ensureCanWrite(uint32_t len) {
+ // Check available space
+ uint32_t avail = available_write();
+ if (len <= avail) {
+ return;
+ }
+
+ if (!owner_) {
+ throw TTransportException("Insufficient space in external MemoryBuffer");
+ }
+
+ // Grow the buffer as necessary.
+ while (len > avail) {
+ bufferSize_ *= 2;
+ wBound_ = buffer_ + bufferSize_;
+ avail = available_write();
+ }
+
+ // Allocate into a new pointer so we don't bork ours if it fails.
+ void* new_buffer = std::realloc(buffer_, bufferSize_);
+ if (new_buffer == NULL) {
+ throw TTransportException("Out of memory.");
+ }
+
+ ptrdiff_t offset = (uint8_t*)new_buffer - buffer_;
+ buffer_ += offset;
+ rBase_ += offset;
+ rBound_ += offset;
+ wBase_ += offset;
+ wBound_ += offset;
+}
+
+void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len) {
+ ensureCanWrite(len);
+
+ // Copy into the buffer and increment wBase_.
+ memcpy(wBase_, buf, len);
+ wBase_ += len;
+}
+
+void TMemoryBuffer::wroteBytes(uint32_t len) {
+ uint32_t avail = available_write();
+ if (len > avail) {
+ throw TTransportException("Client wrote more bytes than size of buffer.");
+ }
+ wBase_ += len;
+}
+
+const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) {
+ rBound_ = wBase_;
+ if (available_read() >= *len) {
+ *len = available_read();
+ return rBase_;
+ }
+ return NULL;
+}
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h
new file mode 100644
index 0000000..1908205
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferTransports.h
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
+#define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1
+
+#include <cstring>
+#include "boost/scoped_array.hpp"
+
+#include <transport/TTransport.h>
+
+#ifdef __GNUC__
+#define TDB_LIKELY(val) (__builtin_expect((val), 1))
+#define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
+#else
+#define TDB_LIKELY(val) (val)
+#define TDB_UNLIKELY(val) (val)
+#endif
+
+namespace apache { namespace thrift { namespace transport {
+
+
+/**
+ * Base class for all transports that use read/write buffers for performance.
+ *
+ * TBufferBase is designed to implement the fast-path "memcpy" style
+ * operations that work in the common case. It does so with small and
+ * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract
+ * class. Subclasses are expected to define the "slow path" operations
+ * that have to be done when the buffers are full or empty.
+ *
+ */
+class TBufferBase : public TTransport {
+
+ public:
+
+ /**
+ * Fast-path read.
+ *
+ * When we have enough data buffered to fulfill the read, we can satisfy it
+ * with a single memcpy, then adjust our internal pointers. If the buffer
+ * is empty, we call out to our slow path, implemented by a subclass.
+ * This method is meant to eventually be nonvirtual and inlinable.
+ */
+ uint32_t read(uint8_t* buf, uint32_t len) {
+ uint8_t* new_rBase = rBase_ + len;
+ if (TDB_LIKELY(new_rBase <= rBound_)) {
+ std::memcpy(buf, rBase_, len);
+ rBase_ = new_rBase;
+ return len;
+ }
+ return readSlow(buf, len);
+ }
+
+ /**
+ * Fast-path write.
+ *
+ * When we have enough empty space in our buffer to accomodate the write, we
+ * can satisfy it with a single memcpy, then adjust our internal pointers.
+ * If the buffer is full, we call out to our slow path, implemented by a
+ * subclass. This method is meant to eventually be nonvirtual and
+ * inlinable.
+ */
+ void write(const uint8_t* buf, uint32_t len) {
+ uint8_t* new_wBase = wBase_ + len;
+ if (TDB_LIKELY(new_wBase <= wBound_)) {
+ std::memcpy(wBase_, buf, len);
+ wBase_ = new_wBase;
+ return;
+ }
+ writeSlow(buf, len);
+ }
+
+ /**
+ * Fast-path borrow. A lot like the fast-path read.
+ */
+ const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
+ if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
+ // With strict aliasing, writing to len shouldn't force us to
+ // refetch rBase_ from memory. TODO(dreiss): Verify this.
+ *len = rBound_ - rBase_;
+ return rBase_;
+ }
+ return borrowSlow(buf, len);
+ }
+
+ /**
+ * Consume doesn't require a slow path.
+ */
+ void consume(uint32_t len) {
+ if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
+ rBase_ += len;
+ } else {
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "consume did not follow a borrow.");
+ }
+ }
+
+
+ protected:
+
+ /// Slow path read.
+ virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
+
+ /// Slow path write.
+ virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
+
+ /**
+ * Slow path borrow.
+ *
+ * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len
+ */
+ virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
+
+ /**
+ * Trivial constructor.
+ *
+ * Initialize pointers safely. Constructing is not a very
+ * performance-sensitive operation, so it is okay to just leave it to
+ * the concrete class to set up pointers correctly.
+ */
+ TBufferBase()
+ : rBase_(NULL)
+ , rBound_(NULL)
+ , wBase_(NULL)
+ , wBound_(NULL)
+ {}
+
+ /// Convenience mutator for setting the read buffer.
+ void setReadBuffer(uint8_t* buf, uint32_t len) {
+ rBase_ = buf;
+ rBound_ = buf+len;
+ }
+
+ /// Convenience mutator for setting the write buffer.
+ void setWriteBuffer(uint8_t* buf, uint32_t len) {
+ wBase_ = buf;
+ wBound_ = buf+len;
+ }
+
+ virtual ~TBufferBase() {}
+
+ /// Reads begin here.
+ uint8_t* rBase_;
+ /// Reads may extend to just before here.
+ uint8_t* rBound_;
+
+ /// Writes begin here.
+ uint8_t* wBase_;
+ /// Writes may extend to just before here.
+ uint8_t* wBound_;
+};
+
+
+/**
+ * Base class for all transport which wraps transport to new one.
+ */
+class TUnderlyingTransport : public TBufferBase {
+ public:
+ static const int DEFAULT_BUFFER_SIZE = 512;
+
+ virtual bool peek() {
+ return (rBase_ < rBound_) || transport_->peek();
+ }
+
+ void open() {
+ transport_->open();
+ }
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ void close() {
+ flush();
+ transport_->close();
+ }
+
+ boost::shared_ptr<TTransport> getUnderlyingTransport() {
+ return transport_;
+ }
+
+ protected:
+ boost::shared_ptr<TTransport> transport_;
+
+ uint32_t rBufSize_;
+ uint32_t wBufSize_;
+ boost::scoped_array<uint8_t> rBuf_;
+ boost::scoped_array<uint8_t> wBuf_;
+
+ TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+ : transport_(transport)
+ , rBufSize_(sz)
+ , wBufSize_(sz)
+ , rBuf_(new uint8_t[rBufSize_])
+ , wBuf_(new uint8_t[wBufSize_]) {}
+
+ TUnderlyingTransport(boost::shared_ptr<TTransport> transport)
+ : transport_(transport)
+ , rBufSize_(DEFAULT_BUFFER_SIZE)
+ , wBufSize_(DEFAULT_BUFFER_SIZE)
+ , rBuf_(new uint8_t[rBufSize_])
+ , wBuf_(new uint8_t[wBufSize_]) {}
+
+ TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
+ : transport_(transport)
+ , rBufSize_(rsz)
+ , wBufSize_(wsz)
+ , rBuf_(new uint8_t[rBufSize_])
+ , wBuf_(new uint8_t[wBufSize_]) {}
+};
+
+/**
+ * Buffered transport. For reads it will read more data than is requested
+ * and will serve future data out of a local buffer. For writes, data is
+ * stored to an in memory buffer before being written out.
+ *
+ */
+class TBufferedTransport : public TUnderlyingTransport {
+ public:
+
+ /// Use default buffer sizes.
+ TBufferedTransport(boost::shared_ptr<TTransport> transport)
+ : TUnderlyingTransport(transport)
+ {
+ initPointers();
+ }
+
+ /// Use specified buffer sizes.
+ TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+ : TUnderlyingTransport(transport, sz)
+ {
+ initPointers();
+ }
+
+ /// Use specified read and write buffer sizes.
+ TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
+ : TUnderlyingTransport(transport, rsz, wsz)
+ {
+ initPointers();
+ }
+
+ virtual bool peek() {
+ /* shigin: see THRIFT-96 discussion */
+ if (rBase_ == rBound_) {
+ setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
+ }
+ return (rBound_ > rBase_);
+ }
+ virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
+
+ virtual void writeSlow(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+
+ /**
+ * The following behavior is currently implemented by TBufferedTransport,
+ * but that may change in a future version:
+ * 1/ If len is at most rBufSize_, borrow will never return NULL.
+ * Depending on the underlying transport, it could throw an exception
+ * or hang forever.
+ * 2/ Some borrow requests may copy bytes internally. However,
+ * if len is at most rBufSize_/2, none of the copied bytes
+ * will ever have to be copied again. For optimial performance,
+ * stay under this limit.
+ */
+ virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
+
+ protected:
+ void initPointers() {
+ setReadBuffer(rBuf_.get(), 0);
+ setWriteBuffer(wBuf_.get(), wBufSize_);
+ // Write size never changes.
+ }
+};
+
+
+/**
+ * Wraps a transport into a buffered one.
+ *
+ */
+class TBufferedTransportFactory : public TTransportFactory {
+ public:
+ TBufferedTransportFactory() {}
+
+ virtual ~TBufferedTransportFactory() {}
+
+ /**
+ * Wraps the transport into a buffered one.
+ */
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
+ }
+
+};
+
+
+/**
+ * Framed transport. All writes go into an in-memory buffer until flush is
+ * called, at which point the transport writes the length of the entire
+ * binary chunk followed by the data payload. This allows the receiver on the
+ * other end to always do fixed-length reads.
+ *
+ */
+class TFramedTransport : public TUnderlyingTransport {
+ public:
+
+ /// Use default buffer sizes.
+ TFramedTransport(boost::shared_ptr<TTransport> transport)
+ : TUnderlyingTransport(transport)
+ {
+ initPointers();
+ }
+
+ TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
+ : TUnderlyingTransport(transport, sz)
+ {
+ initPointers();
+ }
+
+ virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
+
+ virtual void writeSlow(const uint8_t* buf, uint32_t len);
+
+ virtual void flush();
+
+ const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
+
+ protected:
+ /**
+ * Reads a frame of input from the underlying stream.
+ */
+ void readFrame();
+
+ void initPointers() {
+ setReadBuffer(NULL, 0);
+ setWriteBuffer(wBuf_.get(), wBufSize_);
+
+ // Pad the buffer so we can insert the size later.
+ int32_t pad = 0;
+ this->write((uint8_t*)&pad, sizeof(pad));
+ }
+};
+
+/**
+ * Wraps a transport into a framed one.
+ *
+ */
+class TFramedTransportFactory : public TTransportFactory {
+ public:
+ TFramedTransportFactory() {}
+
+ virtual ~TFramedTransportFactory() {}
+
+ /**
+ * Wraps the transport into a framed one.
+ */
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TTransport>(new TFramedTransport(trans));
+ }
+
+};
+
+
+/**
+ * A memory buffer is a tranpsort that simply reads from and writes to an
+ * in memory buffer. Anytime you call write on it, the data is simply placed
+ * into a buffer, and anytime you call read, data is read from that buffer.
+ *
+ * The buffers are allocated using C constructs malloc,realloc, and the size
+ * doubles as necessary. We've considered using scoped
+ *
+ */
+class TMemoryBuffer : public TBufferBase {
+ private:
+
+ // Common initialization done by all constructors.
+ void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
+ if (buf == NULL && size != 0) {
+ assert(owner);
+ buf = (uint8_t*)std::malloc(size);
+ if (buf == NULL) {
+ throw TTransportException("Out of memory");
+ }
+ }
+
+ buffer_ = buf;
+ bufferSize_ = size;
+
+ rBase_ = buffer_;
+ rBound_ = buffer_ + wPos;
+ // TODO(dreiss): Investigate NULL-ing this if !owner.
+ wBase_ = buffer_ + wPos;
+ wBound_ = buffer_ + bufferSize_;
+
+ owner_ = owner;
+
+ // rBound_ is really an artifact. In principle, it should always be
+ // equal to wBase_. We update it in a few places (computeRead, etc.).
+ }
+
+ public:
+ static const uint32_t defaultSize = 1024;
+
+ /**
+ * This enum specifies how a TMemoryBuffer should treat
+ * memory passed to it via constructors or resetBuffer.
+ *
+ * OBSERVE:
+ * TMemoryBuffer will simply store a pointer to the memory.
+ * It is the callers responsibility to ensure that the pointer
+ * remains valid for the lifetime of the TMemoryBuffer,
+ * and that it is properly cleaned up.
+ * Note that no data can be written to observed buffers.
+ *
+ * COPY:
+ * TMemoryBuffer will make an internal copy of the buffer.
+ * The caller has no responsibilities.
+ *
+ * TAKE_OWNERSHIP:
+ * TMemoryBuffer will become the "owner" of the buffer,
+ * and will be responsible for freeing it.
+ * The membory must have been allocated with malloc.
+ */
+ enum MemoryPolicy
+ { OBSERVE = 1
+ , COPY = 2
+ , TAKE_OWNERSHIP = 3
+ };
+
+ /**
+ * Construct a TMemoryBuffer with a default-sized buffer,
+ * owned by the TMemoryBuffer object.
+ */
+ TMemoryBuffer() {
+ initCommon(NULL, defaultSize, true, 0);
+ }
+
+ /**
+ * Construct a TMemoryBuffer with a buffer of a specified size,
+ * owned by the TMemoryBuffer object.
+ *
+ * @param sz The initial size of the buffer.
+ */
+ TMemoryBuffer(uint32_t sz) {
+ initCommon(NULL, sz, true, 0);
+ }
+
+ /**
+ * Construct a TMemoryBuffer with buf as its initial contents.
+ *
+ * @param buf The initial contents of the buffer.
+ * Note that, while buf is a non-const pointer,
+ * TMemoryBuffer will not write to it if policy == OBSERVE,
+ * so it is safe to const_cast<uint8_t*>(whatever).
+ * @param sz The size of @c buf.
+ * @param policy See @link MemoryPolicy @endlink .
+ */
+ TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
+ if (buf == NULL && sz != 0) {
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "TMemoryBuffer given null buffer with non-zero size.");
+ }
+
+ switch (policy) {
+ case OBSERVE:
+ case TAKE_OWNERSHIP:
+ initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
+ break;
+ case COPY:
+ initCommon(NULL, sz, true, 0);
+ this->write(buf, sz);
+ break;
+ default:
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "Invalid MemoryPolicy for TMemoryBuffer");
+ }
+ }
+
+ ~TMemoryBuffer() {
+ if (owner_) {
+ std::free(buffer_);
+ }
+ }
+
+ bool isOpen() {
+ return true;
+ }
+
+ bool peek() {
+ return (rBase_ < wBase_);
+ }
+
+ void open() {}
+
+ void close() {}
+
+ // TODO(dreiss): Make bufPtr const.
+ void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
+ *bufPtr = rBase_;
+ *sz = wBase_ - rBase_;
+ }
+
+ std::string getBufferAsString() {
+ if (buffer_ == NULL) {
+ return "";
+ }
+ uint8_t* buf;
+ uint32_t sz;
+ getBuffer(&buf, &sz);
+ return std::string((char*)buf, (std::string::size_type)sz);
+ }
+
+ void appendBufferToString(std::string& str) {
+ if (buffer_ == NULL) {
+ return;
+ }
+ uint8_t* buf;
+ uint32_t sz;
+ getBuffer(&buf, &sz);
+ str.append((char*)buf, sz);
+ }
+
+ void resetBuffer(bool reset_capacity = false) {
+ if (reset_capacity)
+ {
+ assert(owner_);
+
+ void* new_buffer = std::realloc(buffer_, defaultSize);
+
+ if (new_buffer == NULL) {
+ throw TTransportException("Out of memory.");
+ }
+
+ buffer_ = (uint8_t*) new_buffer;
+ bufferSize_ = defaultSize;
+
+ wBound_ = buffer_ + bufferSize_;
+ }
+
+ rBase_ = buffer_;
+ rBound_ = buffer_;
+ wBase_ = buffer_;
+ // It isn't safe to write into a buffer we don't own.
+ if (!owner_) {
+ wBound_ = wBase_;
+ bufferSize_ = 0;
+ }
+ }
+
+ /// See constructor documentation.
+ void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
+ // Use a variant of the copy-and-swap trick for assignment operators.
+ // This is sub-optimal in terms of performance for two reasons:
+ // 1/ The constructing and swapping of the (small) values
+ // in the temporary object takes some time, and is not necessary.
+ // 2/ If policy == COPY, we allocate the new buffer before
+ // freeing the old one, precluding the possibility of
+ // reusing that memory.
+ // I doubt that either of these problems could be optimized away,
+ // but the second is probably no a common case, and the first is minor.
+ // I don't expect resetBuffer to be a common operation, so I'm willing to
+ // bite the performance bullet to make the method this simple.
+
+ // Construct the new buffer.
+ TMemoryBuffer new_buffer(buf, sz, policy);
+ // Move it into ourself.
+ this->swap(new_buffer);
+ // Our old self gets destroyed.
+ }
+
+ std::string readAsString(uint32_t len) {
+ std::string str;
+ (void)readAppendToString(str, len);
+ return str;
+ }
+
+ uint32_t readAppendToString(std::string& str, uint32_t len);
+
+ void readEnd() {
+ if (rBase_ == wBase_) {
+ resetBuffer();
+ }
+ }
+
+ uint32_t available_read() const {
+ // Remember, wBase_ is the real rBound_.
+ return wBase_ - rBase_;
+ }
+
+ uint32_t available_write() const {
+ return wBound_ - wBase_;
+ }
+
+ // Returns a pointer to where the client can write data to append to
+ // the TMemoryBuffer, and ensures the buffer is big enough to accomodate a
+ // write of the provided length. The returned pointer is very convenient for
+ // passing to read(), recv(), or similar. You must call wroteBytes() as soon
+ // as data is written or the buffer will not be aware that data has changed.
+ uint8_t* getWritePtr(uint32_t len) {
+ ensureCanWrite(len);
+ return wBase_;
+ }
+
+ // Informs the buffer that the client has written 'len' bytes into storage
+ // that had been provided by getWritePtr().
+ void wroteBytes(uint32_t len);
+
+ protected:
+ void swap(TMemoryBuffer& that) {
+ using std::swap;
+ swap(buffer_, that.buffer_);
+ swap(bufferSize_, that.bufferSize_);
+
+ swap(rBase_, that.rBase_);
+ swap(rBound_, that.rBound_);
+ swap(wBase_, that.wBase_);
+ swap(wBound_, that.wBound_);
+
+ swap(owner_, that.owner_);
+ }
+
+ // Make sure there's at least 'len' bytes available for writing.
+ void ensureCanWrite(uint32_t len);
+
+ // Compute the position and available data for reading.
+ void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
+
+ uint32_t readSlow(uint8_t* buf, uint32_t len);
+
+ void writeSlow(const uint8_t* buf, uint32_t len);
+
+ const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
+
+ // Data buffer
+ uint8_t* buffer_;
+
+ // Allocated buffer size
+ uint32_t bufferSize_;
+
+ // Is this object the owner of the buffer?
+ bool owner_;
+
+ // Don't forget to update constrctors, initCommon, and swap if
+ // you add new members.
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
diff --git a/lib/cpp/src/transport/TFDTransport.cpp b/lib/cpp/src/transport/TFDTransport.cpp
new file mode 100644
index 0000000..a042f8b
--- /dev/null
+++ b/lib/cpp/src/transport/TFDTransport.cpp
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cerrno>
+#include <exception>
+
+#include <transport/TFDTransport.h>
+
+#include <unistd.h>
+
+using namespace std;
+
+namespace apache { namespace thrift { namespace transport {
+
+void TFDTransport::close() {
+ if (!isOpen()) {
+ return;
+ }
+
+ int rv = ::close(fd_);
+ int errno_copy = errno;
+ fd_ = -1;
+ // Have to check uncaught_exception because this is called in the destructor.
+ if (rv < 0 && !std::uncaught_exception()) {
+ throw TTransportException(TTransportException::UNKNOWN,
+ "TFDTransport::close()",
+ errno_copy);
+ }
+}
+
+uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) {
+ ssize_t rv = ::read(fd_, buf, len);
+ if (rv < 0) {
+ int errno_copy = errno;
+ throw TTransportException(TTransportException::UNKNOWN,
+ "TFDTransport::read()",
+ errno_copy);
+ }
+ return rv;
+}
+
+void TFDTransport::write(const uint8_t* buf, uint32_t len) {
+ while (len > 0) {
+ ssize_t rv = ::write(fd_, buf, len);
+
+ if (rv < 0) {
+ int errno_copy = errno;
+ throw TTransportException(TTransportException::UNKNOWN,
+ "TFDTransport::write()",
+ errno_copy);
+ } else if (rv == 0) {
+ throw TTransportException(TTransportException::END_OF_FILE,
+ "TFDTransport::write()");
+ }
+
+ buf += rv;
+ len -= rv;
+ }
+}
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/TFDTransport.h b/lib/cpp/src/transport/TFDTransport.h
new file mode 100644
index 0000000..bda5d82
--- /dev/null
+++ b/lib/cpp/src/transport/TFDTransport.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TFDTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFDTRANSPORT_H_ 1
+
+#include <string>
+#include <sys/time.h>
+
+#include "TTransport.h"
+#include "TServerSocket.h"
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * Dead-simple wrapper around a file descriptor.
+ *
+ */
+class TFDTransport : public TTransport {
+ public:
+ enum ClosePolicy
+ { NO_CLOSE_ON_DESTROY = 0
+ , CLOSE_ON_DESTROY = 1
+ };
+
+ TFDTransport(int fd, ClosePolicy close_policy = NO_CLOSE_ON_DESTROY)
+ : fd_(fd)
+ , close_policy_(close_policy)
+ {}
+
+ ~TFDTransport() {
+ if (close_policy_ == CLOSE_ON_DESTROY) {
+ close();
+ }
+ }
+
+ bool isOpen() { return fd_ >= 0; }
+
+ void open() {}
+
+ void close();
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void setFD(int fd) { fd_ = fd; }
+ int getFD() { return fd_; }
+
+ protected:
+ int fd_;
+ ClosePolicy close_policy_;
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TFDTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
new file mode 100644
index 0000000..f67b9e3
--- /dev/null
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -0,0 +1,953 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "TFileTransport.h"
+#include "TTransportUtils.h"
+
+#include <pthread.h>
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#else
+#include <time.h>
+#endif
+#include <fcntl.h>
+#include <errno.h>
+#include <unistd.h>
+#ifdef HAVE_STRINGS_H
+#include <strings.h>
+#endif
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <sys/stat.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+using boost::shared_ptr;
+using namespace std;
+using namespace apache::thrift::protocol;
+
+#ifndef HAVE_CLOCK_GETTIME
+
+/**
+ * Fake clock_gettime for systems like darwin
+ *
+ */
+#define CLOCK_REALTIME 0
+static int clock_gettime(int clk_id /*ignored*/, struct timespec *tp) {
+ struct timeval now;
+
+ int rv = gettimeofday(&now, NULL);
+ if (rv != 0) {
+ return rv;
+ }
+
+ tp->tv_sec = now.tv_sec;
+ tp->tv_nsec = now.tv_usec * 1000;
+ return 0;
+}
+#endif
+
+TFileTransport::TFileTransport(string path, bool readOnly)
+ : readState_()
+ , readBuff_(NULL)
+ , currentEvent_(NULL)
+ , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
+ , readTimeout_(NO_TAIL_READ_TIMEOUT)
+ , chunkSize_(DEFAULT_CHUNK_SIZE)
+ , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
+ , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
+ , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
+ , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
+ , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
+ , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
+ , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
+ , writerThreadId_(0)
+ , dequeueBuffer_(NULL)
+ , enqueueBuffer_(NULL)
+ , closing_(false)
+ , forceFlush_(false)
+ , filename_(path)
+ , fd_(0)
+ , bufferAndThreadInitialized_(false)
+ , offset_(0)
+ , lastBadChunk_(0)
+ , numCorruptedEventsInChunk_(0)
+ , readOnly_(readOnly)
+{
+ // initialize all the condition vars/mutexes
+ pthread_mutex_init(&mutex_, NULL);
+ pthread_cond_init(¬Full_, NULL);
+ pthread_cond_init(¬Empty_, NULL);
+ pthread_cond_init(&flushed_, NULL);
+
+ openLogFile();
+}
+
+void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) {
+ filename_ = filename;
+ offset_ = offset;
+
+ // check if current file is still open
+ if (fd_ > 0) {
+ // flush any events in the queue
+ flush();
+ GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
+ if (-1 == ::close(fd_)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
+ }
+ }
+
+ if (fd) {
+ fd_ = fd;
+ } else {
+ // open file if the input fd is 0
+ openLogFile();
+ }
+}
+
+
+TFileTransport::~TFileTransport() {
+ // flush the buffer if a writer thread is active
+ if (writerThreadId_ > 0) {
+ // reduce the flush timeout so that closing is quicker
+ setFlushMaxUs(300*1000);
+
+ // flush output buffer
+ flush();
+
+ // set state to closing
+ closing_ = true;
+
+ // TODO: make sure event queue is empty
+ // currently only the write buffer is flushed
+ // we dont actually wait until the queue is empty. This shouldn't be a big
+ // deal in the common case because writing is quick
+
+ pthread_join(writerThreadId_, NULL);
+ writerThreadId_ = 0;
+ }
+
+ if (dequeueBuffer_) {
+ delete dequeueBuffer_;
+ dequeueBuffer_ = NULL;
+ }
+
+ if (enqueueBuffer_) {
+ delete enqueueBuffer_;
+ enqueueBuffer_ = NULL;
+ }
+
+ if (readBuff_) {
+ delete[] readBuff_;
+ readBuff_ = NULL;
+ }
+
+ if (currentEvent_) {
+ delete currentEvent_;
+ currentEvent_ = NULL;
+ }
+
+ // close logfile
+ if (fd_ > 0) {
+ if(-1 == ::close(fd_)) {
+ GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", errno);
+ }
+ }
+}
+
+bool TFileTransport::initBufferAndWriteThread() {
+ if (bufferAndThreadInitialized_) {
+ T_ERROR("Trying to double-init TFileTransport");
+ return false;
+ }
+
+ if (writerThreadId_ == 0) {
+ if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
+ T_ERROR("Could not create writer thread");
+ return false;
+ }
+ }
+
+ dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+ enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+ bufferAndThreadInitialized_ = true;
+
+ return true;
+}
+
+void TFileTransport::write(const uint8_t* buf, uint32_t len) {
+ if (readOnly_) {
+ throw TTransportException("TFileTransport: attempting to write to file opened readonly");
+ }
+
+ enqueueEvent(buf, len, false);
+}
+
+void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+ // can't enqueue more events if file is going to close
+ if (closing_) {
+ return;
+ }
+
+ // make sure that event size is valid
+ if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
+ T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
+ return;
+ }
+
+ if (eventLen == 0) {
+ T_ERROR("cannot enqueue an empty event");
+ return;
+ }
+
+ eventInfo* toEnqueue = new eventInfo();
+ toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4);
+ // first 4 bytes is the event length
+ memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
+ // actual event contents
+ memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
+ toEnqueue->eventSize_ = eventLen + 4;
+
+ // lock mutex
+ pthread_mutex_lock(&mutex_);
+
+ // make sure that enqueue buffer is initialized and writer thread is running
+ if (!bufferAndThreadInitialized_) {
+ if (!initBufferAndWriteThread()) {
+ delete toEnqueue;
+ pthread_mutex_unlock(&mutex_);
+ return;
+ }
+ }
+
+ // Can't enqueue while buffer is full
+ while (enqueueBuffer_->isFull()) {
+ pthread_cond_wait(¬Full_, &mutex_);
+ }
+
+ // add to the buffer
+ if (!enqueueBuffer_->addEvent(toEnqueue)) {
+ delete toEnqueue;
+ pthread_mutex_unlock(&mutex_);
+ return;
+ }
+
+ // signal anybody who's waiting for the buffer to be non-empty
+ pthread_cond_signal(¬Empty_);
+
+ if (blockUntilFlush) {
+ pthread_cond_wait(&flushed_, &mutex_);
+ }
+
+ // this really should be a loop where it makes sure it got flushed
+ // because condition variables can get triggered by the os for no reason
+ // it is probably a non-factor for the time being
+ pthread_mutex_unlock(&mutex_);
+}
+
+bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
+ pthread_mutex_lock(&mutex_);
+ if (deadline != NULL) {
+ // if we were handed a deadline time struct, do a timed wait
+ pthread_cond_timedwait(¬Empty_, &mutex_, deadline);
+ } else {
+ // just wait until the buffer gets an item
+ pthread_cond_wait(¬Empty_, &mutex_);
+ }
+
+ bool swapped = false;
+
+ // could be empty if we timed out
+ if (!enqueueBuffer_->isEmpty()) {
+ TFileTransportBuffer *temp = enqueueBuffer_;
+ enqueueBuffer_ = dequeueBuffer_;
+ dequeueBuffer_ = temp;
+
+ swapped = true;
+ }
+
+ // unlock the mutex and signal if required
+ pthread_mutex_unlock(&mutex_);
+
+ if (swapped) {
+ pthread_cond_signal(¬Full_);
+ }
+
+ return swapped;
+}
+
+
+void TFileTransport::writerThread() {
+ // open file if it is not open
+ if(!fd_) {
+ openLogFile();
+ }
+
+ // set the offset to the correct value (EOF)
+ try {
+ seekToEnd();
+ } catch (TException &te) {
+ }
+
+ // throw away any partial events
+ offset_ += readState_.lastDispatchPtr_;
+ ftruncate(fd_, offset_);
+ readState_.resetAllValues();
+
+ // Figure out the next time by which a flush must take place
+
+ struct timespec ts_next_flush;
+ getNextFlushTime(&ts_next_flush);
+ uint32_t unflushed = 0;
+
+ while(1) {
+ // this will only be true when the destructor is being invoked
+ if(closing_) {
+ // empty out both the buffers
+ if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+ if (-1 == ::close(fd_)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
+ }
+ // just be safe and sync to disk
+ fsync(fd_);
+ fd_ = 0;
+ pthread_exit(NULL);
+ return;
+ }
+ }
+
+ if (swapEventBuffers(&ts_next_flush)) {
+ eventInfo* outEvent;
+ while (NULL != (outEvent = dequeueBuffer_->getNext())) {
+ if (!outEvent) {
+ T_DEBUG_L(1, "Got an empty event");
+ return;
+ }
+
+ // sanity check on event
+ if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
+ T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
+ continue;
+ }
+
+ // If chunking is required, then make sure that msg does not cross chunk boundary
+ if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
+
+ // event size must be less than chunk size
+ if(outEvent->eventSize_ > chunkSize_) {
+ T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
+ outEvent->eventSize_, chunkSize_);
+ continue;
+ }
+
+ int64_t chunk1 = offset_/chunkSize_;
+ int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
+
+ // if adding this event will cross a chunk boundary, pad the chunk with zeros
+ if (chunk1 != chunk2) {
+ // refetch the offset to keep in sync
+ offset_ = lseek(fd_, 0, SEEK_CUR);
+ int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_);
+
+ uint8_t zeros[padding];
+ bzero(zeros, padding);
+ if (-1 == ::write(fd_, zeros, padding)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while padding zeros", errno_copy);
+ }
+ unflushed += padding;
+ offset_ += padding;
+ }
+ }
+
+ // write the dequeued event to the file
+ if (outEvent->eventSize_ > 0) {
+ if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while writing event", errno_copy);
+ }
+
+ unflushed += outEvent->eventSize_;
+ offset_ += outEvent->eventSize_;
+ }
+ }
+ dequeueBuffer_->reset();
+ }
+
+ bool flushTimeElapsed = false;
+ struct timespec current_time;
+ clock_gettime(CLOCK_REALTIME, ¤t_time);
+
+ if (current_time.tv_sec > ts_next_flush.tv_sec ||
+ (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
+ flushTimeElapsed = true;
+ getNextFlushTime(&ts_next_flush);
+ }
+
+ // couple of cases from which a flush could be triggered
+ if ((flushTimeElapsed && unflushed > 0) ||
+ unflushed > flushMaxBytes_ ||
+ forceFlush_) {
+
+ // sync (force flush) file to disk
+ fsync(fd_);
+ unflushed = 0;
+
+ // notify anybody waiting for flush completion
+ forceFlush_ = false;
+ pthread_cond_broadcast(&flushed_);
+ }
+ }
+}
+
+void TFileTransport::flush() {
+ // file must be open for writing for any flushing to take place
+ if (writerThreadId_ <= 0) {
+ return;
+ }
+ // wait for flush to take place
+ pthread_mutex_lock(&mutex_);
+
+ forceFlush_ = true;
+
+ while (forceFlush_) {
+ pthread_cond_wait(&flushed_, &mutex_);
+ }
+
+ pthread_mutex_unlock(&mutex_);
+}
+
+
+uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
+ uint32_t have = 0;
+ uint32_t get = 0;
+
+ while (have < len) {
+ get = read(buf+have, len-have);
+ if (get <= 0) {
+ throw TEOFException();
+ }
+ have += get;
+ }
+
+ return have;
+}
+
+uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
+ // check if there an event is ready to be read
+ if (!currentEvent_) {
+ currentEvent_ = readEvent();
+ }
+
+ // did not manage to read an event from the file. This could have happened
+ // if the timeout expired or there was some other error
+ if (!currentEvent_) {
+ return 0;
+ }
+
+ // read as much of the current event as possible
+ int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
+ if (remaining <= (int32_t)len) {
+ // copy over anything thats remaining
+ if (remaining > 0) {
+ memcpy(buf,
+ currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
+ remaining);
+ }
+ delete(currentEvent_);
+ currentEvent_ = NULL;
+ return remaining;
+ }
+
+ // read as much as possible
+ memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
+ currentEvent_->eventBuffPos_ += len;
+ return len;
+}
+
+eventInfo* TFileTransport::readEvent() {
+ int readTries = 0;
+
+ if (!readBuff_) {
+ readBuff_ = new uint8_t[readBuffSize_];
+ }
+
+ while (1) {
+ // read from the file if read buffer is exhausted
+ if (readState_.bufferPtr_ == readState_.bufferLen_) {
+ // advance the offset pointer
+ offset_ += readState_.bufferLen_;
+ readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
+ // if (readState_.bufferLen_) {
+ // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
+ // }
+ readState_.bufferPtr_ = 0;
+ readState_.lastDispatchPtr_ = 0;
+
+ // read error
+ if (readState_.bufferLen_ == -1) {
+ readState_.resetAllValues();
+ GlobalOutput("TFileTransport: error while reading from file");
+ throw TTransportException("TFileTransport: error while reading from file");
+ } else if (readState_.bufferLen_ == 0) { // EOF
+ // wait indefinitely if there is no timeout
+ if (readTimeout_ == TAIL_READ_TIMEOUT) {
+ usleep(eofSleepTime_);
+ continue;
+ } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
+ // reset state
+ readState_.resetState(0);
+ return NULL;
+ } else if (readTimeout_ > 0) {
+ // timeout already expired once
+ if (readTries > 0) {
+ readState_.resetState(0);
+ return NULL;
+ } else {
+ usleep(readTimeout_ * 1000);
+ readTries++;
+ continue;
+ }
+ }
+ }
+ }
+
+ readTries = 0;
+
+ // attempt to read an event from the buffer
+ while(readState_.bufferPtr_ < readState_.bufferLen_) {
+ if (readState_.readingSize_) {
+ if(readState_.eventSizeBuffPos_ == 0) {
+ if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
+ ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
+ // skip one byte towards chunk boundary
+ // T_DEBUG_L(1, "Skipping a byte");
+ readState_.bufferPtr_++;
+ continue;
+ }
+ }
+
+ readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
+ readBuff_[readState_.bufferPtr_++];
+ if (readState_.eventSizeBuffPos_ == 4) {
+ // 0 length event indicates padding
+ if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
+ // T_DEBUG_L(1, "Got padding");
+ readState_.resetState(readState_.lastDispatchPtr_);
+ continue;
+ }
+ // got a valid event
+ readState_.readingSize_ = false;
+ if (readState_.event_) {
+ delete(readState_.event_);
+ }
+ readState_.event_ = new eventInfo();
+ readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
+
+ // check if the event is corrupted and perform recovery if required
+ if (isEventCorrupted()) {
+ performRecovery();
+ // start from the top
+ break;
+ }
+ }
+ } else {
+ if (!readState_.event_->eventBuff_) {
+ readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
+ readState_.event_->eventBuffPos_ = 0;
+ }
+ // take either the entire event or the remaining bytes in the buffer
+ int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
+ readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
+
+ // copy data from read buffer into event buffer
+ memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
+ readBuff_ + readState_.bufferPtr_,
+ reclaimBuffer);
+
+ // increment position ptrs
+ readState_.event_->eventBuffPos_ += reclaimBuffer;
+ readState_.bufferPtr_ += reclaimBuffer;
+
+ // check if the event has been read in full
+ if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
+ // set the completed event to the current event
+ eventInfo* completeEvent = readState_.event_;
+ completeEvent->eventBuffPos_ = 0;
+
+ readState_.event_ = NULL;
+ readState_.resetState(readState_.bufferPtr_);
+
+ // exit criteria
+ return completeEvent;
+ }
+ }
+ }
+
+ }
+}
+
+bool TFileTransport::isEventCorrupted() {
+ // an error is triggered if:
+ if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
+ // 1. Event size is larger than user-speficied max-event size
+ T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
+ readState_.event_->eventSize_, maxEventSize_);
+ return true;
+ } else if (readState_.event_->eventSize_ > chunkSize_) {
+ // 2. Event size is larger than chunk size
+ T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
+ readState_.event_->eventSize_, chunkSize_);
+ return true;
+ } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
+ ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
+ // 3. size indicates that event crosses chunk boundary
+ T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%ld",
+ readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4);
+ return true;
+ }
+
+ return false;
+}
+
+void TFileTransport::performRecovery() {
+ // perform some kickass recovery
+ uint32_t curChunk = getCurChunk();
+ if (lastBadChunk_ == curChunk) {
+ numCorruptedEventsInChunk_++;
+ } else {
+ lastBadChunk_ = curChunk;
+ numCorruptedEventsInChunk_ = 1;
+ }
+
+ if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
+ // maybe there was an error in reading the file from disk
+ // seek to the beginning of chunk and try again
+ seekToChunk(curChunk);
+ } else {
+
+ // just skip ahead to the next chunk if we not already at the last chunk
+ if (curChunk != (getNumChunks() - 1)) {
+ seekToChunk(curChunk + 1);
+ } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
+ // if tailing the file, wait until there is enough data to start
+ // the next chunk
+ while(curChunk == (getNumChunks() - 1)) {
+ usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US);
+ }
+ seekToChunk(curChunk + 1);
+ } else {
+ // pretty hosed at this stage, rewind the file back to the last successful
+ // point and punt on the error
+ readState_.resetState(readState_.lastDispatchPtr_);
+ currentEvent_ = NULL;
+ char errorMsg[1024];
+ sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
+ offset_ + readState_.lastDispatchPtr_);
+ GlobalOutput(errorMsg);
+ throw TTransportException(errorMsg);
+ }
+ }
+
+}
+
+void TFileTransport::seekToChunk(int32_t chunk) {
+ if (fd_ <= 0) {
+ throw TTransportException("File not open");
+ }
+
+ int32_t numChunks = getNumChunks();
+
+ // file is empty, seeking to chunk is pointless
+ if (numChunks == 0) {
+ return;
+ }
+
+ // negative indicates reverse seek (from the end)
+ if (chunk < 0) {
+ chunk += numChunks;
+ }
+
+ // too large a value for reverse seek, just seek to beginning
+ if (chunk < 0) {
+ T_DEBUG("Incorrect value for reverse seek. Seeking to beginning...", chunk)
+ chunk = 0;
+ }
+
+ // cannot seek past EOF
+ bool seekToEnd = false;
+ uint32_t minEndOffset = 0;
+ if (chunk >= numChunks) {
+ T_DEBUG("Trying to seek past EOF. Seeking to EOF instead...");
+ seekToEnd = true;
+ chunk = numChunks - 1;
+ // this is the min offset to process events till
+ minEndOffset = lseek(fd_, 0, SEEK_END);
+ }
+
+ off_t newOffset = off_t(chunk) * chunkSize_;
+ offset_ = lseek(fd_, newOffset, SEEK_SET);
+ readState_.resetAllValues();
+ currentEvent_ = NULL;
+ if (offset_ == -1) {
+ GlobalOutput("TFileTransport: lseek error in seekToChunk");
+ throw TTransportException("TFileTransport: lseek error in seekToChunk");
+ }
+
+ // seek to EOF if user wanted to go to last chunk
+ if (seekToEnd) {
+ uint32_t oldReadTimeout = getReadTimeout();
+ setReadTimeout(NO_TAIL_READ_TIMEOUT);
+ // keep on reading unti the last event at point of seekChunk call
+ while (readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
+ setReadTimeout(oldReadTimeout);
+ }
+
+}
+
+void TFileTransport::seekToEnd() {
+ seekToChunk(getNumChunks());
+}
+
+uint32_t TFileTransport::getNumChunks() {
+ if (fd_ <= 0) {
+ return 0;
+ }
+
+ struct stat f_info;
+ int rv = fstat(fd_, &f_info);
+
+ if (rv < 0) {
+ int errno_copy = errno;
+ throw TTransportException(TTransportException::UNKNOWN,
+ "TFileTransport::getNumChunks() (fstat)",
+ errno_copy);
+ }
+
+ if (f_info.st_size > 0) {
+ return ((f_info.st_size)/chunkSize_) + 1;
+ }
+
+ // empty file has no chunks
+ return 0;
+}
+
+uint32_t TFileTransport::getCurChunk() {
+ return offset_/chunkSize_;
+}
+
+// Utility Functions
+void TFileTransport::openLogFile() {
+ mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
+ int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
+ fd_ = ::open(filename_.c_str(), flags, mode);
+ offset_ = 0;
+
+ // make sure open call was successful
+ if(fd_ == -1) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
+ }
+
+}
+
+void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) {
+ clock_gettime(CLOCK_REALTIME, ts_next_flush);
+ ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000;
+ if (ts_next_flush->tv_nsec > 1000000000) {
+ ts_next_flush->tv_nsec -= 1000000000;
+ ts_next_flush->tv_sec += 1;
+ }
+ ts_next_flush->tv_sec += flushMaxUs_ / 1000000;
+}
+
+TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
+ : bufferMode_(WRITE)
+ , writePoint_(0)
+ , readPoint_(0)
+ , size_(size)
+{
+ buffer_ = new eventInfo*[size];
+}
+
+TFileTransportBuffer::~TFileTransportBuffer() {
+ if (buffer_) {
+ for (uint32_t i = 0; i < writePoint_; i++) {
+ delete buffer_[i];
+ }
+ delete[] buffer_;
+ buffer_ = NULL;
+ }
+}
+
+bool TFileTransportBuffer::addEvent(eventInfo *event) {
+ if (bufferMode_ == READ) {
+ GlobalOutput("Trying to write to a buffer in read mode");
+ }
+ if (writePoint_ < size_) {
+ buffer_[writePoint_++] = event;
+ return true;
+ } else {
+ // buffer is full
+ return false;
+ }
+}
+
+eventInfo* TFileTransportBuffer::getNext() {
+ if (bufferMode_ == WRITE) {
+ bufferMode_ = READ;
+ }
+ if (readPoint_ < writePoint_) {
+ return buffer_[readPoint_++];
+ } else {
+ // no more entries
+ return NULL;
+ }
+}
+
+void TFileTransportBuffer::reset() {
+ if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
+ T_DEBUG("Resetting a buffer with unread entries");
+ }
+ // Clean up the old entries
+ for (uint32_t i = 0; i < writePoint_; i++) {
+ delete buffer_[i];
+ }
+ bufferMode_ = WRITE;
+ writePoint_ = 0;
+ readPoint_ = 0;
+}
+
+bool TFileTransportBuffer::isFull() {
+ return writePoint_ == size_;
+}
+
+bool TFileTransportBuffer::isEmpty() {
+ return writePoint_ == 0;
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileReaderTransport> inputTransport):
+ processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport) {
+
+ // default the output transport to a null transport (common case)
+ outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<TFileReaderTransport> inputTransport):
+ processor_(processor),
+ inputProtocolFactory_(inputProtocolFactory),
+ outputProtocolFactory_(outputProtocolFactory),
+ inputTransport_(inputTransport) {
+
+ // default the output transport to a null transport (common case)
+ outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileReaderTransport> inputTransport,
+ shared_ptr<TTransport> outputTransport):
+ processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport),
+ outputTransport_(outputTransport) {};
+
+void TFileProcessor::process(uint32_t numEvents, bool tail) {
+ shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+ shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
+
+ // set the read timeout to 0 if tailing is required
+ int32_t oldReadTimeout = inputTransport_->getReadTimeout();
+ if (tail) {
+ // save old read timeout so it can be restored
+ inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
+ }
+
+ uint32_t numProcessed = 0;
+ while(1) {
+ // bad form to use exceptions for flow control but there is really
+ // no other way around it
+ try {
+ processor_->process(inputProtocol, outputProtocol);
+ numProcessed++;
+ if ( (numEvents > 0) && (numProcessed == numEvents)) {
+ return;
+ }
+ } catch (TEOFException& teof) {
+ if (!tail) {
+ break;
+ }
+ } catch (TException &te) {
+ cerr << te.what() << endl;
+ break;
+ }
+ }
+
+ // restore old read timeout
+ if (tail) {
+ inputTransport_->setReadTimeout(oldReadTimeout);
+ }
+
+}
+
+void TFileProcessor::processChunk() {
+ shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+ shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
+
+ uint32_t curChunk = inputTransport_->getCurChunk();
+
+ while(1) {
+ // bad form to use exceptions for flow control but there is really
+ // no other way around it
+ try {
+ processor_->process(inputProtocol, outputProtocol);
+ if (curChunk != inputTransport_->getCurChunk()) {
+ break;
+ }
+ } catch (TEOFException& teof) {
+ break;
+ } catch (TException &te) {
+ cerr << te.what() << endl;
+ break;
+ }
+ }
+}
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
new file mode 100644
index 0000000..fbaf2cd
--- /dev/null
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
+
+#include "TTransport.h"
+#include "Thrift.h"
+#include "TProcessor.h"
+
+#include <string>
+#include <stdio.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace transport {
+
+using apache::thrift::TProcessor;
+using apache::thrift::protocol::TProtocolFactory;
+
+// Data pertaining to a single event
+typedef struct eventInfo {
+ uint8_t* eventBuff_;
+ uint32_t eventSize_;
+ uint32_t eventBuffPos_;
+
+ eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+ ~eventInfo() {
+ if (eventBuff_) {
+ delete[] eventBuff_;
+ }
+ }
+} eventInfo;
+
+// information about current read state
+typedef struct readState {
+ eventInfo* event_;
+
+ // keep track of event size
+ uint8_t eventSizeBuff_[4];
+ uint8_t eventSizeBuffPos_;
+ bool readingSize_;
+
+ // read buffer variables
+ int32_t bufferPtr_;
+ int32_t bufferLen_;
+
+ // last successful dispatch point
+ int32_t lastDispatchPtr_;
+
+ void resetState(uint32_t lastDispatchPtr) {
+ readingSize_ = true;
+ eventSizeBuffPos_ = 0;
+ lastDispatchPtr_ = lastDispatchPtr;
+ }
+
+ void resetAllValues() {
+ resetState(0);
+ bufferPtr_ = 0;
+ bufferLen_ = 0;
+ if (event_) {
+ delete(event_);
+ }
+ event_ = 0;
+ }
+
+ readState() {
+ event_ = 0;
+ resetAllValues();
+ }
+
+ ~readState() {
+ if (event_) {
+ delete(event_);
+ }
+ }
+
+} readState;
+
+/**
+ * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
+ * to be written to disk. Should be used in the following way:
+ * 1) Buffer created
+ * 2) Buffer written to (addEvent)
+ * 3) Buffer read from (getNext)
+ * 4) Buffer reset (reset)
+ * 5) Go back to 2, or destroy buffer
+ *
+ * The buffer should never be written to after it is read from, unless it is reset first.
+ * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
+ * which uses the buffer in this way.
+ *
+ */
+class TFileTransportBuffer {
+ public:
+ TFileTransportBuffer(uint32_t size);
+ ~TFileTransportBuffer();
+
+ bool addEvent(eventInfo *event);
+ eventInfo* getNext();
+ void reset();
+ bool isFull();
+ bool isEmpty();
+
+ private:
+ TFileTransportBuffer(); // should not be used
+
+ enum mode {
+ WRITE,
+ READ
+ };
+ mode bufferMode_;
+
+ uint32_t writePoint_;
+ uint32_t readPoint_;
+ uint32_t size_;
+ eventInfo** buffer_;
+};
+
+/**
+ * Abstract interface for transports used to read files
+ */
+class TFileReaderTransport : virtual public TTransport {
+ public:
+ virtual int32_t getReadTimeout() = 0;
+ virtual void setReadTimeout(int32_t readTimeout) = 0;
+
+ virtual uint32_t getNumChunks() = 0;
+ virtual uint32_t getCurChunk() = 0;
+ virtual void seekToChunk(int32_t chunk) = 0;
+ virtual void seekToEnd() = 0;
+};
+
+/**
+ * Abstract interface for transports used to write files
+ */
+class TFileWriterTransport : virtual public TTransport {
+ public:
+ virtual uint32_t getChunkSize() = 0;
+ virtual void setChunkSize(uint32_t chunkSize) = 0;
+};
+
+/**
+ * File implementation of a transport. Reads and writes are done to a
+ * file on disk.
+ *
+ */
+class TFileTransport : public TFileReaderTransport,
+ public TFileWriterTransport {
+ public:
+ TFileTransport(std::string path, bool readOnly=false);
+ ~TFileTransport();
+
+ // TODO: what is the correct behaviour for this?
+ // the log file is generally always open
+ bool isOpen() {
+ return true;
+ }
+
+ void write(const uint8_t* buf, uint32_t len);
+ void flush();
+
+ uint32_t readAll(uint8_t* buf, uint32_t len);
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ // log-file specific functions
+ void seekToChunk(int32_t chunk);
+ void seekToEnd();
+ uint32_t getNumChunks();
+ uint32_t getCurChunk();
+
+ // for changing the output file
+ void resetOutputFile(int fd, std::string filename, int64_t offset);
+
+ // Setter/Getter functions for user-controllable options
+ void setReadBuffSize(uint32_t readBuffSize) {
+ if (readBuffSize) {
+ readBuffSize_ = readBuffSize;
+ }
+ }
+ uint32_t getReadBuffSize() {
+ return readBuffSize_;
+ }
+
+ static const int32_t TAIL_READ_TIMEOUT = -1;
+ static const int32_t NO_TAIL_READ_TIMEOUT = 0;
+ void setReadTimeout(int32_t readTimeout) {
+ readTimeout_ = readTimeout;
+ }
+ int32_t getReadTimeout() {
+ return readTimeout_;
+ }
+
+ void setChunkSize(uint32_t chunkSize) {
+ if (chunkSize) {
+ chunkSize_ = chunkSize;
+ }
+ }
+ uint32_t getChunkSize() {
+ return chunkSize_;
+ }
+
+ void setEventBufferSize(uint32_t bufferSize) {
+ if (bufferAndThreadInitialized_) {
+ GlobalOutput("Cannot change the buffer size after writer thread started");
+ return;
+ }
+ eventBufferSize_ = bufferSize;
+ }
+
+ uint32_t getEventBufferSize() {
+ return eventBufferSize_;
+ }
+
+ void setFlushMaxUs(uint32_t flushMaxUs) {
+ if (flushMaxUs) {
+ flushMaxUs_ = flushMaxUs;
+ }
+ }
+ uint32_t getFlushMaxUs() {
+ return flushMaxUs_;
+ }
+
+ void setFlushMaxBytes(uint32_t flushMaxBytes) {
+ if (flushMaxBytes) {
+ flushMaxBytes_ = flushMaxBytes;
+ }
+ }
+ uint32_t getFlushMaxBytes() {
+ return flushMaxBytes_;
+ }
+
+ void setMaxEventSize(uint32_t maxEventSize) {
+ maxEventSize_ = maxEventSize;
+ }
+ uint32_t getMaxEventSize() {
+ return maxEventSize_;
+ }
+
+ void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
+ maxCorruptedEvents_ = maxCorruptedEvents;
+ }
+ uint32_t getMaxCorruptedEvents() {
+ return maxCorruptedEvents_;
+ }
+
+ void setEofSleepTimeUs(uint32_t eofSleepTime) {
+ if (eofSleepTime) {
+ eofSleepTime_ = eofSleepTime;
+ }
+ }
+ uint32_t getEofSleepTimeUs() {
+ return eofSleepTime_;
+ }
+
+ private:
+ // helper functions for writing to a file
+ void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+ bool swapEventBuffers(struct timespec* deadline);
+ bool initBufferAndWriteThread();
+
+ // control for writer thread
+ static void* startWriterThread(void* ptr) {
+ (((TFileTransport*)ptr)->writerThread());
+ return 0;
+ }
+ void writerThread();
+
+ // helper functions for reading from a file
+ eventInfo* readEvent();
+
+ // event corruption-related functions
+ bool isEventCorrupted();
+ void performRecovery();
+
+ // Utility functions
+ void openLogFile();
+ void getNextFlushTime(struct timespec* ts_next_flush);
+
+ // Class variables
+ readState readState_;
+ uint8_t* readBuff_;
+ eventInfo* currentEvent_;
+
+ uint32_t readBuffSize_;
+ static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
+
+ int32_t readTimeout_;
+ static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
+
+ // size of chunks that file will be split up into
+ uint32_t chunkSize_;
+ static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+
+ // size of event buffers
+ uint32_t eventBufferSize_;
+ static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
+
+ // max number of microseconds that can pass without flushing
+ uint32_t flushMaxUs_;
+ static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
+
+ // max number of bytes that can be written without flushing
+ uint32_t flushMaxBytes_;
+ static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
+
+ // max event size
+ uint32_t maxEventSize_;
+ static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
+
+ // max number of corrupted events per chunk
+ uint32_t maxCorruptedEvents_;
+ static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
+
+ // sleep duration when EOF is hit
+ uint32_t eofSleepTime_;
+ static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+
+ // sleep duration when a corrupted event is encountered
+ uint32_t corruptedEventSleepTime_;
+ static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
+
+ // writer thread id
+ pthread_t writerThreadId_;
+
+ // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
+ // needs to be written to the file. The buffers are swapped by the writer thread.
+ TFileTransportBuffer *dequeueBuffer_;
+ TFileTransportBuffer *enqueueBuffer_;
+
+ // conditions used to block when the buffer is full or empty
+ pthread_cond_t notFull_, notEmpty_;
+ volatile bool closing_;
+
+ // To keep track of whether the buffer has been flushed
+ pthread_cond_t flushed_;
+ volatile bool forceFlush_;
+
+ // Mutex that is grabbed when enqueueing and swapping the read/write buffers
+ pthread_mutex_t mutex_;
+
+ // File information
+ std::string filename_;
+ int fd_;
+
+ // Whether the writer thread and buffers have been initialized
+ bool bufferAndThreadInitialized_;
+
+ // Offset within the file
+ off_t offset_;
+
+ // event corruption information
+ uint32_t lastBadChunk_;
+ uint32_t numCorruptedEventsInChunk_;
+
+ bool readOnly_;
+};
+
+// Exception thrown when EOF is hit
+class TEOFException : public TTransportException {
+ public:
+ TEOFException():
+ TTransportException(TTransportException::END_OF_FILE) {};
+};
+
+
+// wrapper class to process events from a file containing thrift events
+class TFileProcessor {
+ public:
+ /**
+ * Constructor that defaults output transport to null transport
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport file transport
+ */
+ TFileProcessor(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<TFileReaderTransport> inputTransport);
+
+ TFileProcessor(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+ boost::shared_ptr<TFileReaderTransport> inputTransport);
+
+ /**
+ * Constructor
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport input file transport
+ * @param output output transport
+ */
+ TFileProcessor(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<TFileReaderTransport> inputTransport,
+ boost::shared_ptr<TTransport> outputTransport);
+
+ /**
+ * processes events from the file
+ *
+ * @param numEvents number of events to process (0 for unlimited)
+ * @param tail tails the file if true
+ */
+ void process(uint32_t numEvents, bool tail);
+
+ /**
+ * process events until the end of the chunk
+ *
+ */
+ void processChunk();
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
+ boost::shared_ptr<TFileReaderTransport> inputTransport_;
+ boost::shared_ptr<TTransport> outputTransport_;
+};
+
+
+}}} // apache::thrift::transport
+
+#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
diff --git a/lib/cpp/src/transport/THttpClient.cpp b/lib/cpp/src/transport/THttpClient.cpp
new file mode 100644
index 0000000..59f2339
--- /dev/null
+++ b/lib/cpp/src/transport/THttpClient.cpp
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <sstream>
+
+#include "THttpClient.h"
+#include "TSocket.h"
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+
+/**
+ * Http client implementation.
+ *
+ */
+
+// Yeah, yeah, hacky to put these here, I know.
+static const char* CRLF = "\r\n";
+static const int CRLF_LEN = 2;
+
+THttpClient::THttpClient(boost::shared_ptr<TTransport> transport, string host, string path) :
+ transport_(transport),
+ host_(host),
+ path_(path),
+ readHeaders_(true),
+ chunked_(false),
+ chunkedDone_(false),
+ chunkSize_(0),
+ contentLength_(0),
+ httpBuf_(NULL),
+ httpPos_(0),
+ httpBufLen_(0),
+ httpBufSize_(1024) {
+ init();
+}
+
+THttpClient::THttpClient(string host, int port, string path) :
+ host_(host),
+ path_(path),
+ readHeaders_(true),
+ chunked_(false),
+ chunkedDone_(false),
+ chunkSize_(0),
+ contentLength_(0),
+ httpBuf_(NULL),
+ httpPos_(0),
+ httpBufLen_(0),
+ httpBufSize_(1024) {
+ transport_ = boost::shared_ptr<TTransport>(new TSocket(host, port));
+ init();
+}
+
+void THttpClient::init() {
+ httpBuf_ = (char*)std::malloc(httpBufSize_+1);
+ if (httpBuf_ == NULL) {
+ throw TTransportException("Out of memory.");
+ }
+ httpBuf_[httpBufLen_] = '\0';
+}
+
+THttpClient::~THttpClient() {
+ if (httpBuf_ != NULL) {
+ std::free(httpBuf_);
+ }
+}
+
+uint32_t THttpClient::read(uint8_t* buf, uint32_t len) {
+ if (readBuffer_.available_read() == 0) {
+ readBuffer_.resetBuffer();
+ uint32_t got = readMoreData();
+ if (got == 0) {
+ return 0;
+ }
+ }
+ return readBuffer_.read(buf, len);
+}
+
+void THttpClient::readEnd() {
+ // Read any pending chunked data (footers etc.)
+ if (chunked_) {
+ while (!chunkedDone_) {
+ readChunked();
+ }
+ }
+}
+
+uint32_t THttpClient::readMoreData() {
+ // Get more data!
+ refill();
+
+ if (readHeaders_) {
+ readHeaders();
+ }
+
+ if (chunked_) {
+ return readChunked();
+ } else {
+ return readContent(contentLength_);
+ }
+}
+
+uint32_t THttpClient::readChunked() {
+ uint32_t length = 0;
+
+ char* line = readLine();
+ uint32_t chunkSize = parseChunkSize(line);
+ if (chunkSize == 0) {
+ readChunkedFooters();
+ } else {
+ // Read data content
+ length += readContent(chunkSize);
+ // Read trailing CRLF after content
+ readLine();
+ }
+ return length;
+}
+
+void THttpClient::readChunkedFooters() {
+ // End of data, read footer lines until a blank one appears
+ while (true) {
+ char* line = readLine();
+ if (strlen(line) == 0) {
+ chunkedDone_ = true;
+ break;
+ }
+ }
+}
+
+uint32_t THttpClient::parseChunkSize(char* line) {
+ char* semi = strchr(line, ';');
+ if (semi != NULL) {
+ *semi = '\0';
+ }
+ int size = 0;
+ sscanf(line, "%x", &size);
+ return (uint32_t)size;
+}
+
+uint32_t THttpClient::readContent(uint32_t size) {
+ uint32_t need = size;
+ while (need > 0) {
+ uint32_t avail = httpBufLen_ - httpPos_;
+ if (avail == 0) {
+ // We have given all the data, reset position to head of the buffer
+ httpPos_ = 0;
+ httpBufLen_ = 0;
+ refill();
+
+ // Now have available however much we read
+ avail = httpBufLen_;
+ }
+ uint32_t give = avail;
+ if (need < give) {
+ give = need;
+ }
+ readBuffer_.write((uint8_t*)(httpBuf_+httpPos_), give);
+ httpPos_ += give;
+ need -= give;
+ }
+ return size;
+}
+
+char* THttpClient::readLine() {
+ while (true) {
+ char* eol = NULL;
+
+ eol = strstr(httpBuf_+httpPos_, CRLF);
+
+ // No CRLF yet?
+ if (eol == NULL) {
+ // Shift whatever we have now to front and refill
+ shift();
+ refill();
+ } else {
+ // Return pointer to next line
+ *eol = '\0';
+ char* line = httpBuf_+httpPos_;
+ httpPos_ = (eol-httpBuf_) + CRLF_LEN;
+ return line;
+ }
+ }
+
+}
+
+void THttpClient::shift() {
+ if (httpBufLen_ > httpPos_) {
+ // Shift down remaining data and read more
+ uint32_t length = httpBufLen_ - httpPos_;
+ memmove(httpBuf_, httpBuf_+httpPos_, length);
+ httpBufLen_ = length;
+ } else {
+ httpBufLen_ = 0;
+ }
+ httpPos_ = 0;
+ httpBuf_[httpBufLen_] = '\0';
+}
+
+void THttpClient::refill() {
+ uint32_t avail = httpBufSize_ - httpBufLen_;
+ if (avail <= (httpBufSize_ / 4)) {
+ httpBufSize_ *= 2;
+ httpBuf_ = (char*)std::realloc(httpBuf_, httpBufSize_+1);
+ if (httpBuf_ == NULL) {
+ throw TTransportException("Out of memory.");
+ }
+ }
+
+ // Read more data
+ uint32_t got = transport_->read((uint8_t*)(httpBuf_+httpBufLen_), httpBufSize_-httpBufLen_);
+ httpBufLen_ += got;
+ httpBuf_[httpBufLen_] = '\0';
+
+ if (got == 0) {
+ throw TTransportException("Could not refill buffer");
+ }
+}
+
+void THttpClient::readHeaders() {
+ // Initialize headers state variables
+ contentLength_ = 0;
+ chunked_ = false;
+ chunkedDone_ = false;
+ chunkSize_ = 0;
+
+ // Control state flow
+ bool statusLine = true;
+ bool finished = false;
+
+ // Loop until headers are finished
+ while (true) {
+ char* line = readLine();
+
+ if (strlen(line) == 0) {
+ if (finished) {
+ readHeaders_ = false;
+ return;
+ } else {
+ // Must have been an HTTP 100, keep going for another status line
+ statusLine = true;
+ }
+ } else {
+ if (statusLine) {
+ statusLine = false;
+ finished = parseStatusLine(line);
+ } else {
+ parseHeader(line);
+ }
+ }
+ }
+}
+
+bool THttpClient::parseStatusLine(char* status) {
+ char* http = status;
+
+ char* code = strchr(http, ' ');
+ if (code == NULL) {
+ throw TTransportException(string("Bad Status: ") + status);
+ }
+
+ *code = '\0';
+ while (*(code++) == ' ');
+
+ char* msg = strchr(code, ' ');
+ if (msg == NULL) {
+ throw TTransportException(string("Bad Status: ") + status);
+ }
+ *msg = '\0';
+
+ if (strcmp(code, "200") == 0) {
+ // HTTP 200 = OK, we got the response
+ return true;
+ } else if (strcmp(code, "100") == 0) {
+ // HTTP 100 = continue, just keep reading
+ return false;
+ } else {
+ throw TTransportException(string("Bad Status: ") + status);
+ }
+}
+
+void THttpClient::parseHeader(char* header) {
+ char* colon = strchr(header, ':');
+ if (colon == NULL) {
+ return;
+ }
+ uint32_t sz = colon - header;
+ char* value = colon+1;
+
+ if (strncmp(header, "Transfer-Encoding", sz) == 0) {
+ if (strstr(value, "chunked") != NULL) {
+ chunked_ = true;
+ }
+ } else if (strncmp(header, "Content-Length", sz) == 0) {
+ chunked_ = false;
+ contentLength_ = atoi(value);
+ }
+}
+
+void THttpClient::write(const uint8_t* buf, uint32_t len) {
+ writeBuffer_.write(buf, len);
+}
+
+void THttpClient::flush() {
+ // Fetch the contents of the write buffer
+ uint8_t* buf;
+ uint32_t len;
+ writeBuffer_.getBuffer(&buf, &len);
+
+ // Construct the HTTP header
+ std::ostringstream h;
+ h <<
+ "POST " << path_ << " HTTP/1.1" << CRLF <<
+ "Host: " << host_ << CRLF <<
+ "Content-Type: application/x-thrift" << CRLF <<
+ "Content-Length: " << len << CRLF <<
+ "Accept: application/x-thrift" << CRLF <<
+ "User-Agent: C++/THttpClient" << CRLF <<
+ CRLF;
+ string header = h.str();
+
+ // Write the header, then the data, then flush
+ transport_->write((const uint8_t*)header.c_str(), header.size());
+ transport_->write(buf, len);
+ transport_->flush();
+
+ // Reset the buffer and header variables
+ writeBuffer_.resetBuffer();
+ readHeaders_ = true;
+}
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/THttpClient.h b/lib/cpp/src/transport/THttpClient.h
new file mode 100644
index 0000000..f4be4c1
--- /dev/null
+++ b/lib/cpp/src/transport/THttpClient.h
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
+#define _THRIFT_TRANSPORT_THTTPCLIENT_H_ 1
+
+#include <transport/TBufferTransports.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * HTTP client implementation of the thrift transport. This was irritating
+ * to write, but the alternatives in C++ land are daunting. Linking CURL
+ * requires 23 dynamic libraries last time I checked (WTF?!?). All we have
+ * here is a VERY basic HTTP/1.1 client which supports HTTP 100 Continue,
+ * chunked transfer encoding, keepalive, etc. Tested against Apache.
+ *
+ */
+class THttpClient : public TTransport {
+ public:
+ THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path="");
+
+ THttpClient(std::string host, int port, std::string path="");
+
+ virtual ~THttpClient();
+
+ void open() {
+ transport_->open();
+ }
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ bool peek() {
+ return transport_->peek();
+ }
+
+ void close() {
+ transport_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void readEnd();
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+ private:
+ void init();
+
+ protected:
+
+ boost::shared_ptr<TTransport> transport_;
+
+ TMemoryBuffer writeBuffer_;
+ TMemoryBuffer readBuffer_;
+
+ std::string host_;
+ std::string path_;
+
+ bool readHeaders_;
+ bool chunked_;
+ bool chunkedDone_;
+ uint32_t chunkSize_;
+ uint32_t contentLength_;
+
+ char* httpBuf_;
+ uint32_t httpPos_;
+ uint32_t httpBufLen_;
+ uint32_t httpBufSize_;
+
+ uint32_t readMoreData();
+ char* readLine();
+
+ void readHeaders();
+ void parseHeader(char* header);
+ bool parseStatusLine(char* status);
+
+ uint32_t readChunked();
+ void readChunkedFooters();
+ uint32_t parseChunkSize(char* line);
+
+ uint32_t readContent(uint32_t size);
+
+ void refill();
+ void shift();
+
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
new file mode 100644
index 0000000..9b47aa5
--- /dev/null
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstring>
+#include <sys/socket.h>
+#include <sys/poll.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <errno.h>
+
+#include "TSocket.h"
+#include "TServerSocket.h"
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+using boost::shared_ptr;
+
+TServerSocket::TServerSocket(int port) :
+ port_(port),
+ serverSocket_(-1),
+ acceptBacklog_(1024),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ retryLimit_(0),
+ retryDelay_(0),
+ tcpSendBuffer_(0),
+ tcpRecvBuffer_(0),
+ intSock1_(-1),
+ intSock2_(-1) {}
+
+TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
+ port_(port),
+ serverSocket_(-1),
+ acceptBacklog_(1024),
+ sendTimeout_(sendTimeout),
+ recvTimeout_(recvTimeout),
+ retryLimit_(0),
+ retryDelay_(0),
+ tcpSendBuffer_(0),
+ tcpRecvBuffer_(0),
+ intSock1_(-1),
+ intSock2_(-1) {}
+
+TServerSocket::~TServerSocket() {
+ close();
+}
+
+void TServerSocket::setSendTimeout(int sendTimeout) {
+ sendTimeout_ = sendTimeout;
+}
+
+void TServerSocket::setRecvTimeout(int recvTimeout) {
+ recvTimeout_ = recvTimeout;
+}
+
+void TServerSocket::setRetryLimit(int retryLimit) {
+ retryLimit_ = retryLimit;
+}
+
+void TServerSocket::setRetryDelay(int retryDelay) {
+ retryDelay_ = retryDelay;
+}
+
+void TServerSocket::setTcpSendBuffer(int tcpSendBuffer) {
+ tcpSendBuffer_ = tcpSendBuffer;
+}
+
+void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
+ tcpRecvBuffer_ = tcpRecvBuffer;
+}
+
+void TServerSocket::listen() {
+ int sv[2];
+ if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput.perror("TServerSocket::listen() socketpair() ", errno);
+ intSock1_ = -1;
+ intSock2_ = -1;
+ } else {
+ intSock1_ = sv[1];
+ intSock2_ = sv[0];
+ }
+
+ struct addrinfo hints, *res, *res0;
+ int error;
+ char port[sizeof("65536") + 1];
+ std::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+ sprintf(port, "%d", port_);
+
+ // Wildcard address
+ error = getaddrinfo(NULL, port, &hints, &res0);
+ if (error) {
+ GlobalOutput.printf("getaddrinfo %d: %s", error, gai_strerror(error));
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket.");
+ }
+
+ // Pick the ipv6 address first since ipv4 addresses can be mapped
+ // into ipv6 space.
+ for (res = res0; res; res = res->ai_next) {
+ if (res->ai_family == AF_INET6 || res->ai_next == NULL)
+ break;
+ }
+
+ serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (serverSocket_ == -1) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.", errno_copy);
+ }
+
+ // Set reusaddress to prevent 2MSL delay on accept
+ int one = 1;
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR,
+ &one, sizeof(one))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_REUSEADDR ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR", errno_copy);
+ }
+
+ // Set TCP buffer sizes
+ if (tcpSendBuffer_ > 0) {
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_SNDBUF,
+ &tcpSendBuffer_, sizeof(tcpSendBuffer_))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_SNDBUF", errno_copy);
+ }
+ }
+
+ if (tcpRecvBuffer_ > 0) {
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_RCVBUF,
+ &tcpRecvBuffer_, sizeof(tcpRecvBuffer_))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_RCVBUF", errno_copy);
+ }
+ }
+
+ // Defer accept
+ #ifdef TCP_DEFER_ACCEPT
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
+ &one, sizeof(one))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", errno_copy);
+ }
+ #endif // #ifdef TCP_DEFER_ACCEPT
+
+ #ifdef IPV6_V6ONLY
+ int zero = 0;
+ if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY,
+ &zero, sizeof(zero))) {
+ GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", errno);
+ }
+ #endif // #ifdef IPV6_V6ONLY
+
+ // Turn linger off, don't want to block on calls to close
+ struct linger ling = {0, 0};
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
+ &ling, sizeof(ling))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
+ }
+
+ // TCP Nodelay, speed over bandwidth
+ if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
+ &one, sizeof(one))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
+ }
+
+ // Set NONBLOCK on the accept socket
+ int flags = fcntl(serverSocket_, F_GETFL, 0);
+ if (flags == -1) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() fcntl() F_GETFL ", errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ }
+
+ if (-1 == fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() fcntl() O_NONBLOCK ", errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ }
+
+ // prepare the port information
+ // we may want to try to bind more than once, since SO_REUSEADDR doesn't
+ // always seem to work. The client can configure the retry variables.
+ int retries = 0;
+ do {
+ if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
+ break;
+ }
+
+ // use short circuit evaluation here to only sleep if we need to
+ } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+
+ // free addrinfo
+ freeaddrinfo(res0);
+
+ // throw an error if we failed to bind properly
+ if (retries > retryLimit_) {
+ char errbuf[1024];
+ sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
+ GlobalOutput(errbuf);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
+ }
+
+ // Call listen
+ if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
+ }
+
+ // The socket is now listening!
+}
+
+shared_ptr<TTransport> TServerSocket::acceptImpl() {
+ if (serverSocket_ < 0) {
+ throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
+ }
+
+ struct pollfd fds[2];
+
+ int maxEintrs = 5;
+ int numEintrs = 0;
+
+ while (true) {
+ std::memset(fds, 0 , sizeof(fds));
+ fds[0].fd = serverSocket_;
+ fds[0].events = POLLIN;
+ if (intSock2_ >= 0) {
+ fds[1].fd = intSock2_;
+ fds[1].events = POLLIN;
+ }
+ int ret = poll(fds, 2, -1);
+
+ if (ret < 0) {
+ // error cases
+ if (errno == EINTR && (numEintrs++ < maxEintrs)) {
+ // EINTR needs to be handled manually and we can tolerate
+ // a certain number
+ continue;
+ }
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::acceptImpl() poll() ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
+ } else if (ret > 0) {
+ // Check for an interrupt signal
+ if (intSock2_ >= 0 && (fds[1].revents & POLLIN)) {
+ int8_t buf;
+ if (-1 == recv(intSock2_, &buf, sizeof(int8_t), 0)) {
+ GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", errno);
+ }
+ throw TTransportException(TTransportException::INTERRUPTED);
+ }
+
+ // Check for the actual server socket being ready
+ if (fds[0].revents & POLLIN) {
+ break;
+ }
+ } else {
+ GlobalOutput("TServerSocket::acceptImpl() poll 0");
+ throw TTransportException(TTransportException::UNKNOWN);
+ }
+ }
+
+ struct sockaddr_storage clientAddress;
+ int size = sizeof(clientAddress);
+ int clientSocket = ::accept(serverSocket_,
+ (struct sockaddr *) &clientAddress,
+ (socklen_t *) &size);
+
+ if (clientSocket < 0) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
+ }
+
+ // Make sure client socket is blocking
+ int flags = fcntl(clientSocket, F_GETFL, 0);
+ if (flags == -1) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_GETFL ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_GETFL)", errno_copy);
+ }
+
+ if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_SETFL ~O_NONBLOCK ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_SETFL)", errno_copy);
+ }
+
+ shared_ptr<TSocket> client(new TSocket(clientSocket));
+ if (sendTimeout_ > 0) {
+ client->setSendTimeout(sendTimeout_);
+ }
+ if (recvTimeout_ > 0) {
+ client->setRecvTimeout(recvTimeout_);
+ }
+
+ return client;
+}
+
+void TServerSocket::interrupt() {
+ if (intSock1_ >= 0) {
+ int8_t byte = 0;
+ if (-1 == send(intSock1_, &byte, sizeof(int8_t), 0)) {
+ GlobalOutput.perror("TServerSocket::interrupt() send() ", errno);
+ }
+ }
+}
+
+void TServerSocket::close() {
+ if (serverSocket_ >= 0) {
+ shutdown(serverSocket_, SHUT_RDWR);
+ ::close(serverSocket_);
+ }
+ if (intSock1_ >= 0) {
+ ::close(intSock1_);
+ }
+ if (intSock2_ >= 0) {
+ ::close(intSock2_);
+ }
+ serverSocket_ = -1;
+ intSock1_ = -1;
+ intSock2_ = -1;
+}
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h
new file mode 100644
index 0000000..a6be017
--- /dev/null
+++ b/lib/cpp/src/transport/TServerSocket.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
+#define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1
+
+#include "TServerTransport.h"
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace transport {
+
+class TSocket;
+
+/**
+ * Server socket implementation of TServerTransport. Wrapper around a unix
+ * socket listen and accept calls.
+ *
+ */
+class TServerSocket : public TServerTransport {
+ public:
+ TServerSocket(int port);
+ TServerSocket(int port, int sendTimeout, int recvTimeout);
+
+ ~TServerSocket();
+
+ void setSendTimeout(int sendTimeout);
+ void setRecvTimeout(int recvTimeout);
+
+ void setRetryLimit(int retryLimit);
+ void setRetryDelay(int retryDelay);
+
+ void setTcpSendBuffer(int tcpSendBuffer);
+ void setTcpRecvBuffer(int tcpRecvBuffer);
+
+ void listen();
+ void close();
+
+ void interrupt();
+
+ protected:
+ boost::shared_ptr<TTransport> acceptImpl();
+
+ private:
+ int port_;
+ int serverSocket_;
+ int acceptBacklog_;
+ int sendTimeout_;
+ int recvTimeout_;
+ int retryLimit_;
+ int retryDelay_;
+ int tcpSendBuffer_;
+ int tcpRecvBuffer_;
+
+ int intSock1_;
+ int intSock2_;
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
diff --git a/lib/cpp/src/transport/TServerTransport.h b/lib/cpp/src/transport/TServerTransport.h
new file mode 100644
index 0000000..40bbc6c
--- /dev/null
+++ b/lib/cpp/src/transport/TServerTransport.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ 1
+
+#include "TTransport.h"
+#include "TTransportException.h"
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * Server transport framework. A server needs to have some facility for
+ * creating base transports to read/write from.
+ *
+ */
+class TServerTransport {
+ public:
+ virtual ~TServerTransport() {}
+
+ /**
+ * Starts the server transport listening for new connections. Prior to this
+ * call most transports will not return anything when accept is called.
+ *
+ * @throws TTransportException if we were unable to listen
+ */
+ virtual void listen() {}
+
+ /**
+ * Gets a new dynamically allocated transport object and passes it to the
+ * caller. Note that it is the explicit duty of the caller to free the
+ * allocated object. The returned TTransport object must always be in the
+ * opened state. NULL should never be returned, instead an Exception should
+ * always be thrown.
+ *
+ * @return A new TTransport object
+ * @throws TTransportException if there is an error
+ */
+ boost::shared_ptr<TTransport> accept() {
+ boost::shared_ptr<TTransport> result = acceptImpl();
+ if (result == NULL) {
+ throw TTransportException("accept() may not return NULL");
+ }
+ return result;
+ }
+
+ /**
+ * For "smart" TServerTransport implementations that work in a multi
+ * threaded context this can be used to break out of an accept() call.
+ * It is expected that the transport will throw a TTransportException
+ * with the interrupted error code.
+ */
+ virtual void interrupt() {}
+
+ /**
+ * Closes this transport such that future calls to accept will do nothing.
+ */
+ virtual void close() = 0;
+
+ protected:
+ TServerTransport() {}
+
+ /**
+ * Subclasses should implement this function for accept.
+ *
+ * @return A newly allocated TTransport object
+ * @throw TTransportException If an error occurs
+ */
+ virtual boost::shared_ptr<TTransport> acceptImpl() = 0;
+
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TShortReadTransport.h b/lib/cpp/src/transport/TShortReadTransport.h
new file mode 100644
index 0000000..3df8a57
--- /dev/null
+++ b/lib/cpp/src/transport/TShortReadTransport.h
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_ 1
+
+#include <cstdlib>
+
+#include <transport/TTransport.h>
+
+namespace apache { namespace thrift { namespace transport { namespace test {
+
+/**
+ * This class is only meant for testing. It wraps another transport.
+ * Calls to read are passed through with some probability. Otherwise,
+ * the read amount is randomly reduced before being passed through.
+ *
+ */
+class TShortReadTransport : public TTransport {
+ public:
+ TShortReadTransport(boost::shared_ptr<TTransport> transport, double full_prob)
+ : transport_(transport)
+ , fullProb_(full_prob)
+ {}
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ bool peek() {
+ return transport_->peek();
+ }
+
+ void open() {
+ transport_->open();
+ }
+
+ void close() {
+ transport_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len) {
+ if (len == 0) {
+ return 0;
+ }
+
+ if (rand()/(double)RAND_MAX >= fullProb_) {
+ len = 1 + rand()%len;
+ }
+ return transport_->read(buf, len);
+ }
+
+ void write(const uint8_t* buf, uint32_t len) {
+ transport_->write(buf, len);
+ }
+
+ void flush() {
+ transport_->flush();
+ }
+
+ const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
+ return transport_->borrow(buf, len);
+ }
+
+ void consume(uint32_t len) {
+ return transport_->consume(len);
+ }
+
+ boost::shared_ptr<TTransport> getUnderlyingTransport() {
+ return transport_;
+ }
+
+ protected:
+ boost::shared_ptr<TTransport> transport_;
+ double fullProb_;
+};
+
+}}}} // apache::thrift::transport::test
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TSimpleFileTransport.cpp b/lib/cpp/src/transport/TSimpleFileTransport.cpp
new file mode 100644
index 0000000..e58a574
--- /dev/null
+++ b/lib/cpp/src/transport/TSimpleFileTransport.cpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TSimpleFileTransport.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+TSimpleFileTransport::
+TSimpleFileTransport(const std::string& path, bool read, bool write)
+ : TFDTransport(-1, TFDTransport::CLOSE_ON_DESTROY) {
+ int flags = 0;
+ if (read && write) {
+ flags = O_RDWR;
+ } else if (read) {
+ flags = O_RDONLY;
+ } else if (write) {
+ flags = O_WRONLY;
+ } else {
+ throw TTransportException("Neither READ nor WRITE specified");
+ }
+ if (write) {
+ flags |= O_CREAT | O_APPEND;
+ }
+ int fd = ::open(path.c_str(),
+ flags,
+ S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH);
+ if (fd < 0) {
+ throw TTransportException("failed to open file for writing: " + path);
+ }
+ setFD(fd);
+ open();
+}
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/TSimpleFileTransport.h b/lib/cpp/src/transport/TSimpleFileTransport.h
new file mode 100644
index 0000000..6cc52ea
--- /dev/null
+++ b/lib/cpp/src/transport/TSimpleFileTransport.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TSIMPLEFILETRANSPORT_H_
+#define _THRIFT_TRANSPORT_TSIMPLEFILETRANSPORT_H_ 1
+
+#include "TFDTransport.h"
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * Dead-simple wrapper around a file.
+ *
+ * Writeable files are opened with O_CREAT and O_APPEND
+ */
+class TSimpleFileTransport : public TFDTransport {
+ public:
+ TSimpleFileTransport(const std::string& path,
+ bool read = true,
+ bool write = false);
+};
+
+}}} // apache::thrift::transport
+
+#endif // _THRIFT_TRANSPORT_TSIMPLEFILETRANSPORT_H_
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
new file mode 100644
index 0000000..3395dab
--- /dev/null
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <cstring>
+#include <sstream>
+#include <sys/socket.h>
+#include <sys/poll.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include "concurrency/Monitor.h"
+#include "TSocket.h"
+#include "TTransportException.h"
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+
+// Global var to track total socket sys calls
+uint32_t g_socket_syscalls = 0;
+
+/**
+ * TSocket implementation.
+ *
+ */
+
+TSocket::TSocket(string host, int port) :
+ host_(host),
+ port_(port),
+ socket_(-1),
+ connTimeout_(0),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ lingerOn_(1),
+ lingerVal_(0),
+ noDelay_(1),
+ maxRecvRetries_(5) {
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+}
+
+TSocket::TSocket() :
+ host_(""),
+ port_(0),
+ socket_(-1),
+ connTimeout_(0),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ lingerOn_(1),
+ lingerVal_(0),
+ noDelay_(1),
+ maxRecvRetries_(5) {
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+}
+
+TSocket::TSocket(int socket) :
+ host_(""),
+ port_(0),
+ socket_(socket),
+ connTimeout_(0),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ lingerOn_(1),
+ lingerVal_(0),
+ noDelay_(1),
+ maxRecvRetries_(5) {
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+}
+
+TSocket::~TSocket() {
+ close();
+}
+
+bool TSocket::isOpen() {
+ return (socket_ >= 0);
+}
+
+bool TSocket::peek() {
+ if (!isOpen()) {
+ return false;
+ }
+ uint8_t buf;
+ int r = recv(socket_, &buf, 1, MSG_PEEK);
+ if (r == -1) {
+ int errno_copy = errno;
+ #ifdef __FreeBSD__
+ /* shigin:
+ * freebsd returns -1 and ECONNRESET if socket was closed by
+ * the other side
+ */
+ if (errno_copy == ECONNRESET)
+ {
+ close();
+ return false;
+ }
+ #endif
+ GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
+ }
+ return (r > 0);
+}
+
+void TSocket::openConnection(struct addrinfo *res) {
+ if (isOpen()) {
+ throw TTransportException(TTransportException::ALREADY_OPEN);
+ }
+
+ socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (socket_ == -1) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
+ }
+
+ // Send timeout
+ if (sendTimeout_ > 0) {
+ setSendTimeout(sendTimeout_);
+ }
+
+ // Recv timeout
+ if (recvTimeout_ > 0) {
+ setRecvTimeout(recvTimeout_);
+ }
+
+ // Linger
+ setLinger(lingerOn_, lingerVal_);
+
+ // No delay
+ setNoDelay(noDelay_);
+
+ // Set the socket to be non blocking for connect if a timeout exists
+ int flags = fcntl(socket_, F_GETFL, 0);
+ if (connTimeout_ > 0) {
+ if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::open() fcntl() " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ }
+ } else {
+ if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::open() fcntl " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ }
+ }
+
+ // Connect the socket
+ int ret = connect(socket_, res->ai_addr, res->ai_addrlen);
+
+ // success case
+ if (ret == 0) {
+ goto done;
+ }
+
+ if (errno != EINPROGRESS) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy);
+ }
+
+
+ struct pollfd fds[1];
+ std::memset(fds, 0 , sizeof(fds));
+ fds[0].fd = socket_;
+ fds[0].events = POLLOUT;
+ ret = poll(fds, 1, connTimeout_);
+
+ if (ret > 0) {
+ // Ensure the socket is connected and that there are no errors set
+ int val;
+ socklen_t lon;
+ lon = sizeof(int);
+ int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
+ if (ret2 == -1) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy);
+ }
+ // no errors on socket, go to town
+ if (val == 0) {
+ goto done;
+ }
+ GlobalOutput.perror("TSocket::open() error on socket (after poll) " + getSocketInfo(), val);
+ throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val);
+ } else if (ret == 0) {
+ // socket timed out
+ string errStr = "TSocket::open() timed out " + getSocketInfo();
+ GlobalOutput(errStr.c_str());
+ throw TTransportException(TTransportException::NOT_OPEN, "open() timed out");
+ } else {
+ // error on poll()
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::open() poll() " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "poll() failed", errno_copy);
+ }
+
+ done:
+ // Set socket back to normal mode (blocking)
+ fcntl(socket_, F_SETFL, flags);
+}
+
+void TSocket::open() {
+ if (isOpen()) {
+ throw TTransportException(TTransportException::ALREADY_OPEN);
+ }
+
+ // Validate port number
+ if (port_ < 0 || port_ > 65536) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid");
+ }
+
+ struct addrinfo hints, *res, *res0;
+ res = NULL;
+ res0 = NULL;
+ int error;
+ char port[sizeof("65536")];
+ std::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+ sprintf(port, "%d", port_);
+
+ error = getaddrinfo(host_.c_str(), port, &hints, &res0);
+
+ if (error) {
+ string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(gai_strerror(error));
+ GlobalOutput(errStr.c_str());
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for client socket.");
+ }
+
+ // Cycle through all the returned addresses until one
+ // connects or push the exception up.
+ for (res = res0; res; res = res->ai_next) {
+ try {
+ openConnection(res);
+ break;
+ } catch (TTransportException& ttx) {
+ if (res->ai_next) {
+ close();
+ } else {
+ close();
+ freeaddrinfo(res0); // cleanup on failure
+ throw;
+ }
+ }
+ }
+
+ // Free address structure memory
+ freeaddrinfo(res0);
+}
+
+void TSocket::close() {
+ if (socket_ >= 0) {
+ shutdown(socket_, SHUT_RDWR);
+ ::close(socket_);
+ }
+ socket_ = -1;
+}
+
+uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
+ if (socket_ < 0) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
+ }
+
+ int32_t retries = 0;
+
+ // EAGAIN can be signalled both when a timeout has occurred and when
+ // the system is out of resources (an awesome undocumented feature).
+ // The following is an approximation of the time interval under which
+ // EAGAIN is taken to indicate an out of resources error.
+ uint32_t eagainThresholdMicros = 0;
+ if (recvTimeout_) {
+ // if a readTimeout is specified along with a max number of recv retries, then
+ // the threshold will ensure that the read timeout is not exceeded even in the
+ // case of resource errors
+ eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2);
+ }
+
+ try_again:
+ // Read from the socket
+ struct timeval begin;
+ gettimeofday(&begin, NULL);
+ int got = recv(socket_, buf, len, 0);
+ int errno_copy = errno; //gettimeofday can change errno
+ struct timeval end;
+ gettimeofday(&end, NULL);
+ uint32_t readElapsedMicros = (((end.tv_sec - begin.tv_sec) * 1000 * 1000)
+ + (((uint64_t)(end.tv_usec - begin.tv_usec))));
+ ++g_socket_syscalls;
+
+ // Check for error on read
+ if (got < 0) {
+ if (errno_copy == EAGAIN) {
+ // check if this is the lack of resources or timeout case
+ if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
+ if (retries++ < maxRecvRetries_) {
+ usleep(50);
+ goto try_again;
+ } else {
+ throw TTransportException(TTransportException::TIMED_OUT,
+ "EAGAIN (unavailable resources)");
+ }
+ } else {
+ // infer that timeout has been hit
+ throw TTransportException(TTransportException::TIMED_OUT,
+ "EAGAIN (timed out)");
+ }
+ }
+
+ // If interrupted, try again
+ if (errno_copy == EINTR && retries++ < maxRecvRetries_) {
+ goto try_again;
+ }
+
+ // Now it's not a try again case, but a real probblez
+ GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
+
+ // If we disconnect with no linger time
+ if (errno_copy == ECONNRESET) {
+ #ifdef __FreeBSD__
+ /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
+ * ECONNRESET if peer performed shutdown
+ */
+ close();
+ return 0;
+ #else
+ throw TTransportException(TTransportException::NOT_OPEN, "ECONNRESET");
+ #endif
+ }
+
+ // This ish isn't open
+ if (errno_copy == ENOTCONN) {
+ throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN");
+ }
+
+ // Timed out!
+ if (errno_copy == ETIMEDOUT) {
+ throw TTransportException(TTransportException::TIMED_OUT, "ETIMEDOUT");
+ }
+
+ // Some other error, whatevz
+ throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
+ }
+
+ // The remote host has closed the socket
+ if (got == 0) {
+ close();
+ return 0;
+ }
+
+ // Pack data into string
+ return got;
+}
+
+void TSocket::write(const uint8_t* buf, uint32_t len) {
+ if (socket_ < 0) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
+ }
+
+ uint32_t sent = 0;
+
+ while (sent < len) {
+
+ int flags = 0;
+ #ifdef MSG_NOSIGNAL
+ // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+ // check for the EPIPE return condition and close the socket in that case
+ flags |= MSG_NOSIGNAL;
+ #endif // ifdef MSG_NOSIGNAL
+
+ int b = send(socket_, buf + sent, len - sent, flags);
+ ++g_socket_syscalls;
+
+ // Fail on a send error
+ if (b < 0) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::write() send() " + getSocketInfo(), errno_copy);
+
+ if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) {
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
+ }
+
+ throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
+ }
+
+ // Fail on blocked send
+ if (b == 0) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
+ }
+ sent += b;
+ }
+}
+
+std::string TSocket::getHost() {
+ return host_;
+}
+
+int TSocket::getPort() {
+ return port_;
+}
+
+void TSocket::setHost(string host) {
+ host_ = host;
+}
+
+void TSocket::setPort(int port) {
+ port_ = port;
+}
+
+void TSocket::setLinger(bool on, int linger) {
+ lingerOn_ = on;
+ lingerVal_ = linger;
+ if (socket_ < 0) {
+ return;
+ }
+
+ struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
+ int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
+ if (ret == -1) {
+ int errno_copy = errno; // Copy errno because we're allocating memory.
+ GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy);
+ }
+}
+
+void TSocket::setNoDelay(bool noDelay) {
+ noDelay_ = noDelay;
+ if (socket_ < 0) {
+ return;
+ }
+
+ // Set socket to NODELAY
+ int v = noDelay_ ? 1 : 0;
+ int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
+ if (ret == -1) {
+ int errno_copy = errno; // Copy errno because we're allocating memory.
+ GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy);
+ }
+}
+
+void TSocket::setConnTimeout(int ms) {
+ connTimeout_ = ms;
+}
+
+void TSocket::setRecvTimeout(int ms) {
+ if (ms < 0) {
+ char errBuf[512];
+ sprintf(errBuf, "TSocket::setRecvTimeout with negative input: %d\n", ms);
+ GlobalOutput(errBuf);
+ return;
+ }
+ recvTimeout_ = ms;
+
+ if (socket_ < 0) {
+ return;
+ }
+
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+
+ // Copy because poll may modify
+ struct timeval r = recvTimeval_;
+ int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
+ if (ret == -1) {
+ int errno_copy = errno; // Copy errno because we're allocating memory.
+ GlobalOutput.perror("TSocket::setRecvTimeout() setsockopt() " + getSocketInfo(), errno_copy);
+ }
+}
+
+void TSocket::setSendTimeout(int ms) {
+ if (ms < 0) {
+ char errBuf[512];
+ sprintf(errBuf, "TSocket::setSendTimeout with negative input: %d\n", ms);
+ GlobalOutput(errBuf);
+ return;
+ }
+ sendTimeout_ = ms;
+
+ if (socket_ < 0) {
+ return;
+ }
+
+ struct timeval s = {(int)(sendTimeout_/1000),
+ (int)((sendTimeout_%1000)*1000)};
+ int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
+ if (ret == -1) {
+ int errno_copy = errno; // Copy errno because we're allocating memory.
+ GlobalOutput.perror("TSocket::setSendTimeout() setsockopt() " + getSocketInfo(), errno_copy);
+ }
+}
+
+void TSocket::setMaxRecvRetries(int maxRecvRetries) {
+ maxRecvRetries_ = maxRecvRetries;
+}
+
+string TSocket::getSocketInfo() {
+ std::ostringstream oss;
+ oss << "<Host: " << host_ << " Port: " << port_ << ">";
+ return oss.str();
+}
+
+std::string TSocket::getPeerHost() {
+ if (peerHost_.empty()) {
+ struct sockaddr_storage addr;
+ socklen_t addrLen = sizeof(addr);
+
+ if (socket_ < 0) {
+ return host_;
+ }
+
+ int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+
+ if (rv != 0) {
+ return peerHost_;
+ }
+
+ char clienthost[NI_MAXHOST];
+ char clientservice[NI_MAXSERV];
+
+ getnameinfo((sockaddr*) &addr, addrLen,
+ clienthost, sizeof(clienthost),
+ clientservice, sizeof(clientservice), 0);
+
+ peerHost_ = clienthost;
+ }
+ return peerHost_;
+}
+
+std::string TSocket::getPeerAddress() {
+ if (peerAddress_.empty()) {
+ struct sockaddr_storage addr;
+ socklen_t addrLen = sizeof(addr);
+
+ if (socket_ < 0) {
+ return peerAddress_;
+ }
+
+ int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+
+ if (rv != 0) {
+ return peerAddress_;
+ }
+
+ char clienthost[NI_MAXHOST];
+ char clientservice[NI_MAXSERV];
+
+ getnameinfo((sockaddr*) &addr, addrLen,
+ clienthost, sizeof(clienthost),
+ clientservice, sizeof(clientservice),
+ NI_NUMERICHOST|NI_NUMERICSERV);
+
+ peerAddress_ = clienthost;
+ peerPort_ = std::atoi(clientservice);
+ }
+ return peerAddress_;
+}
+
+int TSocket::getPeerPort() {
+ getPeerAddress();
+ return peerPort_;
+}
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
new file mode 100644
index 0000000..b0f445a
--- /dev/null
+++ b/lib/cpp/src/transport/TSocket.h
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TSOCKET_H_
+#define _THRIFT_TRANSPORT_TSOCKET_H_ 1
+
+#include <string>
+#include <sys/time.h>
+
+#include "TTransport.h"
+#include "TServerSocket.h"
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * TCP Socket implementation of the TTransport interface.
+ *
+ */
+class TSocket : public TTransport {
+ /**
+ * We allow the TServerSocket acceptImpl() method to access the private
+ * members of a socket so that it can access the TSocket(int socket)
+ * constructor which creates a socket object from the raw UNIX socket
+ * handle.
+ */
+ friend class TServerSocket;
+
+ public:
+ /**
+ * Constructs a new socket. Note that this does NOT actually connect the
+ * socket.
+ *
+ */
+ TSocket();
+
+ /**
+ * Constructs a new socket. Note that this does NOT actually connect the
+ * socket.
+ *
+ * @param host An IP address or hostname to connect to
+ * @param port The port to connect on
+ */
+ TSocket(std::string host, int port);
+
+ /**
+ * Destroyes the socket object, closing it if necessary.
+ */
+ virtual ~TSocket();
+
+ /**
+ * Whether the socket is alive.
+ *
+ * @return Is the socket alive?
+ */
+ bool isOpen();
+
+ /**
+ * Calls select on the socket to see if there is more data available.
+ */
+ bool peek();
+
+ /**
+ * Creates and opens the UNIX socket.
+ *
+ * @throws TTransportException If the socket could not connect
+ */
+ virtual void open();
+
+ /**
+ * Shuts down communications on the socket.
+ */
+ void close();
+
+ /**
+ * Reads from the underlying socket.
+ */
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ /**
+ * Writes to the underlying socket.
+ */
+ void write(const uint8_t* buf, uint32_t len);
+
+ /**
+ * Get the host that the socket is connected to
+ *
+ * @return string host identifier
+ */
+ std::string getHost();
+
+ /**
+ * Get the port that the socket is connected to
+ *
+ * @return int port number
+ */
+ int getPort();
+
+ /**
+ * Set the host that socket will connect to
+ *
+ * @param host host identifier
+ */
+ void setHost(std::string host);
+
+ /**
+ * Set the port that socket will connect to
+ *
+ * @param port port number
+ */
+ void setPort(int port);
+
+ /**
+ * Controls whether the linger option is set on the socket.
+ *
+ * @param on Whether SO_LINGER is on
+ * @param linger If linger is active, the number of seconds to linger for
+ */
+ void setLinger(bool on, int linger);
+
+ /**
+ * Whether to enable/disable Nagle's algorithm.
+ *
+ * @param noDelay Whether or not to disable the algorithm.
+ * @return
+ */
+ void setNoDelay(bool noDelay);
+
+ /**
+ * Set the connect timeout
+ */
+ void setConnTimeout(int ms);
+
+ /**
+ * Set the receive timeout
+ */
+ void setRecvTimeout(int ms);
+
+ /**
+ * Set the send timeout
+ */
+ void setSendTimeout(int ms);
+
+ /**
+ * Set the max number of recv retries in case of an EAGAIN
+ * error
+ */
+ void setMaxRecvRetries(int maxRecvRetries);
+
+ /**
+ * Get socket information formated as a string <Host: x Port: x>
+ */
+ std::string getSocketInfo();
+
+ /**
+ * Returns the DNS name of the host to which the socket is connected
+ */
+ std::string getPeerHost();
+
+ /**
+ * Returns the address of the host to which the socket is connected
+ */
+ std::string getPeerAddress();
+
+ /**
+ * Returns the port of the host to which the socket is connected
+ **/
+ int getPeerPort();
+
+
+ protected:
+ /**
+ * Constructor to create socket from raw UNIX handle. Never called directly
+ * but used by the TServerSocket class.
+ */
+ TSocket(int socket);
+
+ /** connect, called by open */
+ void openConnection(struct addrinfo *res);
+
+ /** Host to connect to */
+ std::string host_;
+
+ /** Peer hostname */
+ std::string peerHost_;
+
+ /** Peer address */
+ std::string peerAddress_;
+
+ /** Peer port */
+ int peerPort_;
+
+ /** Port number to connect on */
+ int port_;
+
+ /** Underlying UNIX socket handle */
+ int socket_;
+
+ /** Connect timeout in ms */
+ int connTimeout_;
+
+ /** Send timeout in ms */
+ int sendTimeout_;
+
+ /** Recv timeout in ms */
+ int recvTimeout_;
+
+ /** Linger on */
+ bool lingerOn_;
+
+ /** Linger val */
+ int lingerVal_;
+
+ /** Nodelay */
+ bool noDelay_;
+
+ /** Recv EGAIN retries */
+ int maxRecvRetries_;
+
+ /** Recv timeout timeval */
+ struct timeval recvTimeval_;
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSOCKET_H_
+
diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp
new file mode 100644
index 0000000..1150282
--- /dev/null
+++ b/lib/cpp/src/transport/TSocketPool.cpp
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <algorithm>
+#include <iostream>
+
+#include "TSocketPool.h"
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+
+using boost::shared_ptr;
+
+/**
+ * TSocketPoolServer implementation
+ *
+ */
+TSocketPoolServer::TSocketPoolServer()
+ : host_(""),
+ port_(0),
+ socket_(-1),
+ lastFailTime_(0),
+ consecutiveFailures_(0) {}
+
+/**
+ * Constructor for TSocketPool server
+ */
+TSocketPoolServer::TSocketPoolServer(const string &host, int port)
+ : host_(host),
+ port_(port),
+ socket_(-1),
+ lastFailTime_(0),
+ consecutiveFailures_(0) {}
+
+/**
+ * TSocketPool implementation.
+ *
+ */
+
+TSocketPool::TSocketPool() : TSocket(),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true) {
+}
+
+TSocketPool::TSocketPool(const vector<string> &hosts,
+ const vector<int> &ports) : TSocket(),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true)
+{
+ if (hosts.size() != ports.size()) {
+ GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
+ throw TTransportException(TTransportException::BAD_ARGS);
+ }
+
+ for (unsigned int i = 0; i < hosts.size(); ++i) {
+ addServer(hosts[i], ports[i]);
+ }
+}
+
+TSocketPool::TSocketPool(const vector<pair<string, int> >& servers) : TSocket(),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true)
+{
+ for (unsigned i = 0; i < servers.size(); ++i) {
+ addServer(servers[i].first, servers[i].second);
+ }
+}
+
+TSocketPool::TSocketPool(const vector< shared_ptr<TSocketPoolServer> >& servers) : TSocket(),
+ servers_(servers),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true)
+{
+}
+
+TSocketPool::TSocketPool(const string& host, int port) : TSocket(),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true)
+{
+ addServer(host, port);
+}
+
+TSocketPool::~TSocketPool() {
+ vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
+ vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
+ for (; iter != iterEnd; ++iter) {
+ setCurrentServer(*iter);
+ TSocketPool::close();
+ }
+}
+
+void TSocketPool::addServer(const string& host, int port) {
+ servers_.push_back(shared_ptr<TSocketPoolServer>(new TSocketPoolServer(host, port)));
+}
+
+void TSocketPool::setServers(const vector< shared_ptr<TSocketPoolServer> >& servers) {
+ servers_ = servers;
+}
+
+void TSocketPool::getServers(vector< shared_ptr<TSocketPoolServer> >& servers) {
+ servers = servers_;
+}
+
+void TSocketPool::setNumRetries(int numRetries) {
+ numRetries_ = numRetries;
+}
+
+void TSocketPool::setRetryInterval(int retryInterval) {
+ retryInterval_ = retryInterval;
+}
+
+
+void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) {
+ maxConsecutiveFailures_ = maxConsecutiveFailures;
+}
+
+void TSocketPool::setRandomize(bool randomize) {
+ randomize_ = randomize;
+}
+
+void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) {
+ alwaysTryLast_ = alwaysTryLast;
+}
+
+void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) {
+ currentServer_ = server;
+ host_ = server->host_;
+ port_ = server->port_;
+ socket_ = server->socket_;
+}
+
+/* TODO: without apc we ignore a lot of functionality from the php version */
+void TSocketPool::open() {
+ if (randomize_) {
+ random_shuffle(servers_.begin(), servers_.end());
+ }
+
+ unsigned int numServers = servers_.size();
+ for (unsigned int i = 0; i < numServers; ++i) {
+
+ shared_ptr<TSocketPoolServer> &server = servers_[i];
+ bool retryIntervalPassed = (server->lastFailTime_ == 0);
+ bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
+
+ // Impersonate the server socket
+ setCurrentServer(server);
+
+ if (isOpen()) {
+ // already open means we're done
+ return;
+ }
+
+ if (server->lastFailTime_ > 0) {
+ // The server was marked as down, so check if enough time has elapsed to retry
+ int elapsedTime = time(NULL) - server->lastFailTime_;
+ if (elapsedTime > retryInterval_) {
+ retryIntervalPassed = true;
+ }
+ }
+
+ if (retryIntervalPassed || isLastServer) {
+ for (int j = 0; j < numRetries_; ++j) {
+ try {
+ TSocket::open();
+
+ // Copy over the opened socket so that we can keep it persistent
+ server->socket_ = socket_;
+
+ // reset lastFailTime_ is required
+ if (server->lastFailTime_) {
+ server->lastFailTime_ = 0;
+ }
+
+ // success
+ return;
+ } catch (TException e) {
+ string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what();
+ GlobalOutput(errStr.c_str());
+ // connection failed
+ }
+ }
+
+ ++server->consecutiveFailures_;
+ if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
+ // Mark server as down
+ server->consecutiveFailures_ = 0;
+ server->lastFailTime_ = time(NULL);
+ }
+ }
+ }
+
+ GlobalOutput("TSocketPool::open: all connections failed");
+ throw TTransportException(TTransportException::NOT_OPEN);
+}
+
+void TSocketPool::close() {
+ if (isOpen()) {
+ TSocket::close();
+ currentServer_->socket_ = -1;
+ }
+}
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/TSocketPool.h b/lib/cpp/src/transport/TSocketPool.h
new file mode 100644
index 0000000..8c50669
--- /dev/null
+++ b/lib/cpp/src/transport/TSocketPool.h
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TSOCKETPOOL_H_
+#define _THRIFT_TRANSPORT_TSOCKETPOOL_H_ 1
+
+#include <vector>
+#include "TSocket.h"
+
+namespace apache { namespace thrift { namespace transport {
+
+ /**
+ * Class to hold server information for TSocketPool
+ *
+ */
+class TSocketPoolServer {
+
+ public:
+ /**
+ * Default constructor for server info
+ */
+ TSocketPoolServer();
+
+ /**
+ * Constructor for TSocketPool server
+ */
+ TSocketPoolServer(const std::string &host, int port);
+
+ // Host name
+ std::string host_;
+
+ // Port to connect on
+ int port_;
+
+ // Socket for the server
+ int socket_;
+
+ // Last time connecting to this server failed
+ int lastFailTime_;
+
+ // Number of consecutive times connecting to this server failed
+ int consecutiveFailures_;
+};
+
+/**
+ * TCP Socket implementation of the TTransport interface.
+ *
+ */
+class TSocketPool : public TSocket {
+
+ public:
+
+ /**
+ * Socket pool constructor
+ */
+ TSocketPool();
+
+ /**
+ * Socket pool constructor
+ *
+ * @param hosts list of host names
+ * @param ports list of port names
+ */
+ TSocketPool(const std::vector<std::string> &hosts,
+ const std::vector<int> &ports);
+
+ /**
+ * Socket pool constructor
+ *
+ * @param servers list of pairs of host name and port
+ */
+ TSocketPool(const std::vector<std::pair<std::string, int> >& servers);
+
+ /**
+ * Socket pool constructor
+ *
+ * @param servers list of TSocketPoolServers
+ */
+ TSocketPool(const std::vector< boost::shared_ptr<TSocketPoolServer> >& servers);
+
+ /**
+ * Socket pool constructor
+ *
+ * @param host single host
+ * @param port single port
+ */
+ TSocketPool(const std::string& host, int port);
+
+ /**
+ * Destroyes the socket object, closing it if necessary.
+ */
+ virtual ~TSocketPool();
+
+ /**
+ * Add a server to the pool
+ */
+ void addServer(const std::string& host, int port);
+
+ /**
+ * Set list of servers in this pool
+ */
+ void setServers(const std::vector< boost::shared_ptr<TSocketPoolServer> >& servers);
+
+ /**
+ * Get list of servers in this pool
+ */
+ void getServers(std::vector< boost::shared_ptr<TSocketPoolServer> >& servers);
+
+ /**
+ * Sets how many times to keep retrying a host in the connect function.
+ */
+ void setNumRetries(int numRetries);
+
+ /**
+ * Sets how long to wait until retrying a host if it was marked down
+ */
+ void setRetryInterval(int retryInterval);
+
+ /**
+ * Sets how many times to keep retrying a host before marking it as down.
+ */
+ void setMaxConsecutiveFailures(int maxConsecutiveFailures);
+
+ /**
+ * Turns randomization in connect order on or off.
+ */
+ void setRandomize(bool randomize);
+
+ /**
+ * Whether to always try the last server.
+ */
+ void setAlwaysTryLast(bool alwaysTryLast);
+
+ /**
+ * Creates and opens the UNIX socket.
+ */
+ void open();
+
+ /*
+ * Closes the UNIX socket
+ */
+ void close();
+
+ protected:
+
+ void setCurrentServer(const boost::shared_ptr<TSocketPoolServer> &server);
+
+ /** List of servers to connect to */
+ std::vector< boost::shared_ptr<TSocketPoolServer> > servers_;
+
+ /** Current server */
+ boost::shared_ptr<TSocketPoolServer> currentServer_;
+
+ /** How many times to retry each host in connect */
+ int numRetries_;
+
+ /** Retry interval in seconds, how long to not try a host if it has been
+ * marked as down.
+ */
+ int retryInterval_;
+
+ /** Max consecutive failures before marking a host down. */
+ int maxConsecutiveFailures_;
+
+ /** Try hosts in order? or Randomized? */
+ bool randomize_;
+
+ /** Always try last host, even if marked down? */
+ bool alwaysTryLast_;
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSOCKETPOOL_H_
+
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
new file mode 100644
index 0000000..eb0d5df
--- /dev/null
+++ b/lib/cpp/src/transport/TTransport.h
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
+
+#include <Thrift.h>
+#include <boost/shared_ptr.hpp>
+#include <transport/TTransportException.h>
+#include <string>
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * Generic interface for a method of transporting data. A TTransport may be
+ * capable of either reading or writing, but not necessarily both.
+ *
+ */
+class TTransport {
+ public:
+ /**
+ * Virtual deconstructor.
+ */
+ virtual ~TTransport() {}
+
+ /**
+ * Whether this transport is open.
+ */
+ virtual bool isOpen() {
+ return false;
+ }
+
+ /**
+ * Tests whether there is more data to read or if the remote side is
+ * still open. By default this is true whenever the transport is open,
+ * but implementations should add logic to test for this condition where
+ * possible (i.e. on a socket).
+ * This is used by a server to check if it should listen for another
+ * request.
+ */
+ virtual bool peek() {
+ return isOpen();
+ }
+
+ /**
+ * Opens the transport for communications.
+ *
+ * @return bool Whether the transport was successfully opened
+ * @throws TTransportException if opening failed
+ */
+ virtual void open() {
+ throw TTransportException(TTransportException::NOT_OPEN, "Cannot open base TTransport.");
+ }
+
+ /**
+ * Closes the transport.
+ */
+ virtual void close() {
+ throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
+ }
+
+ /**
+ * Attempt to read up to the specified number of bytes into the string.
+ *
+ * @param buf Reference to the location to write the data
+ * @param len How many bytes to read
+ * @return How many bytes were actually read
+ * @throws TTransportException If an error occurs
+ */
+ virtual uint32_t read(uint8_t* /* buf */, uint32_t /* len */) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot read.");
+ }
+
+ /**
+ * Reads the given amount of data in its entirety no matter what.
+ *
+ * @param s Reference to location for read data
+ * @param len How many bytes to read
+ * @return How many bytes read, which must be equal to size
+ * @throws TTransportException If insufficient data was read
+ */
+ virtual uint32_t readAll(uint8_t* buf, uint32_t len) {
+ uint32_t have = 0;
+ uint32_t get = 0;
+
+ while (have < len) {
+ get = read(buf+have, len-have);
+ if (get <= 0) {
+ throw TTransportException("No more data to read.");
+ }
+ have += get;
+ }
+
+ return have;
+ }
+
+ /**
+ * Called when read is completed.
+ * This can be over-ridden to perform a transport-specific action
+ * e.g. logging the request to a file
+ *
+ */
+ virtual void readEnd() {
+ // default behaviour is to do nothing
+ return;
+ }
+
+ /**
+ * Writes the string in its entirety to the buffer.
+ *
+ * @param buf The data to write out
+ * @throws TTransportException if an error occurs
+ */
+ virtual void write(const uint8_t* /* buf */, uint32_t /* len */) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot write.");
+ }
+
+ /**
+ * Called when write is completed.
+ * This can be over-ridden to perform a transport-specific action
+ * at the end of a request.
+ *
+ */
+ virtual void writeEnd() {
+ // default behaviour is to do nothing
+ return;
+ }
+
+ /**
+ * Flushes any pending data to be written. Typically used with buffered
+ * transport mechanisms.
+ *
+ * @throws TTransportException if an error occurs
+ */
+ virtual void flush() {}
+
+ /**
+ * Attempts to return a pointer to \c len bytes, possibly copied into \c buf.
+ * Does not consume the bytes read (i.e.: a later read will return the same
+ * data). This method is meant to support protocols that need to read
+ * variable-length fields. They can attempt to borrow the maximum amount of
+ * data that they will need, then consume (see next method) what they
+ * actually use. Some transports will not support this method and others
+ * will fail occasionally, so protocols must be prepared to use read if
+ * borrow fails.
+ *
+ * @oaram buf A buffer where the data can be stored if needed.
+ * If borrow doesn't return buf, then the contents of
+ * buf after the call are undefined.
+ * @param len *len should initially contain the number of bytes to borrow.
+ * If borrow succeeds, *len will contain the number of bytes
+ * available in the returned pointer. This will be at least
+ * what was requested, but may be more if borrow returns
+ * a pointer to an internal buffer, rather than buf.
+ * If borrow fails, the contents of *len are undefined.
+ * @return If the borrow succeeds, return a pointer to the borrowed data.
+ * This might be equal to \c buf, or it might be a pointer into
+ * the transport's internal buffers.
+ * @throws TTransportException if an error occurs
+ */
+ virtual const uint8_t* borrow(uint8_t* /* buf */, uint32_t* /* len */) {
+ return NULL;
+ }
+
+ /**
+ * Remove len bytes from the transport. This should always follow a borrow
+ * of at least len bytes, and should always succeed.
+ * TODO(dreiss): Is there any transport that could borrow but fail to
+ * consume, or that would require a buffer to dump the consumed data?
+ *
+ * @param len How many bytes to consume
+ * @throws TTransportException If an error occurs
+ */
+ virtual void consume(uint32_t /* len */) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot consume.");
+ }
+
+ protected:
+ /**
+ * Simple constructor.
+ */
+ TTransport() {}
+};
+
+/**
+ * Generic factory class to make an input and output transport out of a
+ * source transport. Commonly used inside servers to make input and output
+ * streams out of raw clients.
+ *
+ */
+class TTransportFactory {
+ public:
+ TTransportFactory() {}
+
+ virtual ~TTransportFactory() {}
+
+ /**
+ * Default implementation does nothing, just returns the transport given.
+ */
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return trans;
+ }
+
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TTransportException.cpp b/lib/cpp/src/transport/TTransportException.cpp
new file mode 100644
index 0000000..f0aaedc
--- /dev/null
+++ b/lib/cpp/src/transport/TTransportException.cpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <transport/TTransportException.h>
+#include <boost/lexical_cast.hpp>
+#include <cstring>
+#include <config.h>
+
+using std::string;
+using boost::lexical_cast;
+
+namespace apache { namespace thrift { namespace transport {
+
+}}} // apache::thrift::transport
+
diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h
new file mode 100644
index 0000000..330785c
--- /dev/null
+++ b/lib/cpp/src/transport/TTransportException.h
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_
+#define _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_ 1
+
+#include <string>
+#include <Thrift.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * Class to encapsulate all the possible types of transport errors that may
+ * occur in various transport systems. This provides a sort of generic
+ * wrapper around the shitty UNIX E_ error codes that lets a common code
+ * base of error handling to be used for various types of transports, i.e.
+ * pipes etc.
+ *
+ */
+class TTransportException : public apache::thrift::TException {
+ public:
+ /**
+ * Error codes for the various types of exceptions.
+ */
+ enum TTransportExceptionType
+ { UNKNOWN = 0
+ , NOT_OPEN = 1
+ , ALREADY_OPEN = 2
+ , TIMED_OUT = 3
+ , END_OF_FILE = 4
+ , INTERRUPTED = 5
+ , BAD_ARGS = 6
+ , CORRUPTED_DATA = 7
+ , INTERNAL_ERROR = 8
+ };
+
+ TTransportException() :
+ apache::thrift::TException(),
+ type_(UNKNOWN) {}
+
+ TTransportException(TTransportExceptionType type) :
+ apache::thrift::TException(),
+ type_(type) {}
+
+ TTransportException(const std::string& message) :
+ apache::thrift::TException(message),
+ type_(UNKNOWN) {}
+
+ TTransportException(TTransportExceptionType type, const std::string& message) :
+ apache::thrift::TException(message),
+ type_(type) {}
+
+ TTransportException(TTransportExceptionType type,
+ const std::string& message,
+ int errno_copy) :
+ apache::thrift::TException(message + ": " + TOutput::strerror_s(errno_copy)),
+ type_(type) {}
+
+ virtual ~TTransportException() throw() {}
+
+ /**
+ * Returns an error code that provides information about the type of error
+ * that has occurred.
+ *
+ * @return Error code
+ */
+ TTransportExceptionType getType() const throw() {
+ return type_;
+ }
+
+ virtual const char* what() const throw() {
+ if (message_.empty()) {
+ switch (type_) {
+ case UNKNOWN : return "TTransportException: Unknown transport exception";
+ case NOT_OPEN : return "TTransportException: Transport not open";
+ case ALREADY_OPEN : return "TTransportException: Transport already open";
+ case TIMED_OUT : return "TTransportException: Timed out";
+ case END_OF_FILE : return "TTransportException: End of file";
+ case INTERRUPTED : return "TTransportException: Interrupted";
+ case BAD_ARGS : return "TTransportException: Invalid arguments";
+ case CORRUPTED_DATA : return "TTransportException: Corrupted Data";
+ case INTERNAL_ERROR : return "TTransportException: Internal error";
+ default : return "TTransportException: (Invalid exception type)";
+ }
+ } else {
+ return message_.c_str();
+ }
+ }
+
+ protected:
+ /** Just like strerror_r but returns a C++ string object. */
+ std::string strerror_s(int errno_copy);
+
+ /** Error code */
+ TTransportExceptionType type_;
+
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
new file mode 100644
index 0000000..a840fa6
--- /dev/null
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <transport/TTransportUtils.h>
+
+using std::string;
+
+namespace apache { namespace thrift { namespace transport {
+
+uint32_t TPipedTransport::read(uint8_t* buf, uint32_t len) {
+ uint32_t need = len;
+
+ // We don't have enough data yet
+ if (rLen_-rPos_ < need) {
+ // Copy out whatever we have
+ if (rLen_-rPos_ > 0) {
+ memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+ need -= rLen_-rPos_;
+ buf += rLen_-rPos_;
+ rPos_ = rLen_;
+ }
+
+ // Double the size of the underlying buffer if it is full
+ if (rLen_ == rBufSize_) {
+ rBufSize_ *=2;
+ rBuf_ = (uint8_t *)std::realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+ }
+
+ // try to fill up the buffer
+ rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+ }
+
+
+ // Hand over whatever we have
+ uint32_t give = need;
+ if (rLen_-rPos_ < give) {
+ give = rLen_-rPos_;
+ }
+ if (give > 0) {
+ memcpy(buf, rBuf_+rPos_, give);
+ rPos_ += give;
+ need -= give;
+ }
+
+ return (len - need);
+}
+
+void TPipedTransport::write(const uint8_t* buf, uint32_t len) {
+ if (len == 0) {
+ return;
+ }
+
+ // Make the buffer as big as it needs to be
+ if ((len + wLen_) >= wBufSize_) {
+ uint32_t newBufSize = wBufSize_*2;
+ while ((len + wLen_) >= newBufSize) {
+ newBufSize *= 2;
+ }
+ wBuf_ = (uint8_t *)std::realloc(wBuf_, sizeof(uint8_t) * newBufSize);
+ wBufSize_ = newBufSize;
+ }
+
+ // Copy into the buffer
+ memcpy(wBuf_ + wLen_, buf, len);
+ wLen_ += len;
+}
+
+void TPipedTransport::flush() {
+ // Write out any data waiting in the write buffer
+ if (wLen_ > 0) {
+ srcTrans_->write(wBuf_, wLen_);
+ wLen_ = 0;
+ }
+
+ // Flush the underlying transport
+ srcTrans_->flush();
+}
+
+TPipedFileReaderTransport::TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans)
+ : TPipedTransport(srcTrans, dstTrans),
+ srcTrans_(srcTrans) {
+}
+
+TPipedFileReaderTransport::~TPipedFileReaderTransport() {
+}
+
+bool TPipedFileReaderTransport::isOpen() {
+ return TPipedTransport::isOpen();
+}
+
+bool TPipedFileReaderTransport::peek() {
+ return TPipedTransport::peek();
+}
+
+void TPipedFileReaderTransport::open() {
+ TPipedTransport::open();
+}
+
+void TPipedFileReaderTransport::close() {
+ TPipedTransport::close();
+}
+
+uint32_t TPipedFileReaderTransport::read(uint8_t* buf, uint32_t len) {
+ return TPipedTransport::read(buf, len);
+}
+
+uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) {
+ uint32_t have = 0;
+ uint32_t get = 0;
+
+ while (have < len) {
+ get = read(buf+have, len-have);
+ if (get <= 0) {
+ throw TEOFException();
+ }
+ have += get;
+ }
+
+ return have;
+}
+
+void TPipedFileReaderTransport::readEnd() {
+ TPipedTransport::readEnd();
+}
+
+void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) {
+ TPipedTransport::write(buf, len);
+}
+
+void TPipedFileReaderTransport::writeEnd() {
+ TPipedTransport::writeEnd();
+}
+
+void TPipedFileReaderTransport::flush() {
+ TPipedTransport::flush();
+}
+
+int32_t TPipedFileReaderTransport::getReadTimeout() {
+ return srcTrans_->getReadTimeout();
+}
+
+void TPipedFileReaderTransport::setReadTimeout(int32_t readTimeout) {
+ srcTrans_->setReadTimeout(readTimeout);
+}
+
+uint32_t TPipedFileReaderTransport::getNumChunks() {
+ return srcTrans_->getNumChunks();
+}
+
+uint32_t TPipedFileReaderTransport::getCurChunk() {
+ return srcTrans_->getCurChunk();
+}
+
+void TPipedFileReaderTransport::seekToChunk(int32_t chunk) {
+ srcTrans_->seekToChunk(chunk);
+}
+
+void TPipedFileReaderTransport::seekToEnd() {
+ srcTrans_->seekToEnd();
+}
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
new file mode 100644
index 0000000..d65c916
--- /dev/null
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
+#define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
+
+#include <cstdlib>
+#include <cstring>
+#include <string>
+#include <algorithm>
+#include <transport/TTransport.h>
+// Include the buffered transports that used to be defined here.
+#include <transport/TBufferTransports.h>
+#include <transport/TFileTransport.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+/**
+ * The null transport is a dummy transport that doesn't actually do anything.
+ * It's sort of an analogy to /dev/null, you can never read anything from it
+ * and it will let you write anything you want to it, though it won't actually
+ * go anywhere.
+ *
+ */
+class TNullTransport : public TTransport {
+ public:
+ TNullTransport() {}
+
+ ~TNullTransport() {}
+
+ bool isOpen() {
+ return true;
+ }
+
+ void open() {}
+
+ void write(const uint8_t* /* buf */, uint32_t /* len */) {
+ return;
+ }
+
+};
+
+
+/**
+ * TPipedTransport. This transport allows piping of a request from one
+ * transport to another either when readEnd() or writeEnd(). The typical
+ * use case for this is to log a request or a reply to disk.
+ * The underlying buffer expands to a keep a copy of the entire
+ * request/response.
+ *
+ */
+class TPipedTransport : virtual public TTransport {
+ public:
+ TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
+ boost::shared_ptr<TTransport> dstTrans) :
+ srcTrans_(srcTrans),
+ dstTrans_(dstTrans),
+ rBufSize_(512), rPos_(0), rLen_(0),
+ wBufSize_(512), wLen_(0) {
+
+ // default is to to pipe the request when readEnd() is called
+ pipeOnRead_ = true;
+ pipeOnWrite_ = false;
+
+ rBuf_ = (uint8_t*) std::malloc(sizeof(uint8_t) * rBufSize_);
+ wBuf_ = (uint8_t*) std::malloc(sizeof(uint8_t) * wBufSize_);
+ }
+
+ TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
+ boost::shared_ptr<TTransport> dstTrans,
+ uint32_t sz) :
+ srcTrans_(srcTrans),
+ dstTrans_(dstTrans),
+ rBufSize_(512), rPos_(0), rLen_(0),
+ wBufSize_(sz), wLen_(0) {
+
+ rBuf_ = (uint8_t*) std::malloc(sizeof(uint8_t) * rBufSize_);
+ wBuf_ = (uint8_t*) std::malloc(sizeof(uint8_t) * wBufSize_);
+ }
+
+ ~TPipedTransport() {
+ std::free(rBuf_);
+ std::free(wBuf_);
+ }
+
+ bool isOpen() {
+ return srcTrans_->isOpen();
+ }
+
+ bool peek() {
+ if (rPos_ >= rLen_) {
+ // Double the size of the underlying buffer if it is full
+ if (rLen_ == rBufSize_) {
+ rBufSize_ *=2;
+ rBuf_ = (uint8_t *)std::realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+ }
+
+ // try to fill up the buffer
+ rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+ }
+ return (rLen_ > rPos_);
+ }
+
+
+ void open() {
+ srcTrans_->open();
+ }
+
+ void close() {
+ srcTrans_->close();
+ }
+
+ void setPipeOnRead(bool pipeVal) {
+ pipeOnRead_ = pipeVal;
+ }
+
+ void setPipeOnWrite(bool pipeVal) {
+ pipeOnWrite_ = pipeVal;
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void readEnd() {
+
+ if (pipeOnRead_) {
+ dstTrans_->write(rBuf_, rPos_);
+ dstTrans_->flush();
+ }
+
+ srcTrans_->readEnd();
+
+ // If requests are being pipelined, copy down our read-ahead data,
+ // then reset our state.
+ int read_ahead = rLen_ - rPos_;
+ memcpy(rBuf_, rBuf_ + rPos_, read_ahead);
+ rPos_ = 0;
+ rLen_ = read_ahead;
+ }
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void writeEnd() {
+ if (pipeOnWrite_) {
+ dstTrans_->write(wBuf_, wLen_);
+ dstTrans_->flush();
+ }
+ }
+
+ void flush();
+
+ boost::shared_ptr<TTransport> getTargetTransport() {
+ return dstTrans_;
+ }
+
+ protected:
+ boost::shared_ptr<TTransport> srcTrans_;
+ boost::shared_ptr<TTransport> dstTrans_;
+
+ uint8_t* rBuf_;
+ uint32_t rBufSize_;
+ uint32_t rPos_;
+ uint32_t rLen_;
+
+ uint8_t* wBuf_;
+ uint32_t wBufSize_;
+ uint32_t wLen_;
+
+ bool pipeOnRead_;
+ bool pipeOnWrite_;
+};
+
+
+/**
+ * Wraps a transport into a pipedTransport instance.
+ *
+ */
+class TPipedTransportFactory : public TTransportFactory {
+ public:
+ TPipedTransportFactory() {}
+ TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans) {
+ initializeTargetTransport(dstTrans);
+ }
+ virtual ~TPipedTransportFactory() {}
+
+ /**
+ * Wraps the base transport into a piped transport.
+ */
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
+ return boost::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
+ }
+
+ virtual void initializeTargetTransport(boost::shared_ptr<TTransport> dstTrans) {
+ if (dstTrans_.get() == NULL) {
+ dstTrans_ = dstTrans;
+ } else {
+ throw TException("Target transport already initialized");
+ }
+ }
+
+ protected:
+ boost::shared_ptr<TTransport> dstTrans_;
+};
+
+/**
+ * TPipedFileTransport. This is just like a TTransport, except that
+ * it is a templatized class, so that clients who rely on a specific
+ * TTransport can still access the original transport.
+ *
+ */
+class TPipedFileReaderTransport : public TPipedTransport,
+ public TFileReaderTransport {
+ public:
+ TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans);
+
+ ~TPipedFileReaderTransport();
+
+ // TTransport functions
+ bool isOpen();
+ bool peek();
+ void open();
+ void close();
+ uint32_t read(uint8_t* buf, uint32_t len);
+ uint32_t readAll(uint8_t* buf, uint32_t len);
+ void readEnd();
+ void write(const uint8_t* buf, uint32_t len);
+ void writeEnd();
+ void flush();
+
+ // TFileReaderTransport functions
+ int32_t getReadTimeout();
+ void setReadTimeout(int32_t readTimeout);
+ uint32_t getNumChunks();
+ uint32_t getCurChunk();
+ void seekToChunk(int32_t chunk);
+ void seekToEnd();
+
+ protected:
+ // shouldn't be used
+ TPipedFileReaderTransport();
+ boost::shared_ptr<TFileReaderTransport> srcTrans_;
+};
+
+/**
+ * Creates a TPipedFileReaderTransport from a filepath and a destination transport
+ *
+ */
+class TPipedFileReaderTransportFactory : public TPipedTransportFactory {
+ public:
+ TPipedFileReaderTransportFactory() {}
+ TPipedFileReaderTransportFactory(boost::shared_ptr<TTransport> dstTrans)
+ : TPipedTransportFactory(dstTrans)
+ {}
+ virtual ~TPipedFileReaderTransportFactory() {}
+
+ boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
+ boost::shared_ptr<TFileReaderTransport> pFileReaderTransport = boost::dynamic_pointer_cast<TFileReaderTransport>(srcTrans);
+ if (pFileReaderTransport.get() != NULL) {
+ return getFileReaderTransport(pFileReaderTransport);
+ } else {
+ return boost::shared_ptr<TTransport>();
+ }
+ }
+
+ boost::shared_ptr<TFileReaderTransport> getFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans) {
+ return boost::shared_ptr<TFileReaderTransport>(new TPipedFileReaderTransport(srcTrans, dstTrans_));
+ }
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
diff --git a/lib/cpp/src/transport/TZlibTransport.cpp b/lib/cpp/src/transport/TZlibTransport.cpp
new file mode 100644
index 0000000..2f14e90
--- /dev/null
+++ b/lib/cpp/src/transport/TZlibTransport.cpp
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cassert>
+#include <cstring>
+#include <algorithm>
+#include <transport/TZlibTransport.h>
+#include <zlib.h>
+
+using std::string;
+
+namespace apache { namespace thrift { namespace transport {
+
+// Don't call this outside of the constructor.
+void TZlibTransport::initZlib() {
+ int rv;
+ bool r_init = false;
+ try {
+ rstream_ = new z_stream;
+ wstream_ = new z_stream;
+
+ rstream_->zalloc = Z_NULL;
+ wstream_->zalloc = Z_NULL;
+ rstream_->zfree = Z_NULL;
+ wstream_->zfree = Z_NULL;
+ rstream_->opaque = Z_NULL;
+ wstream_->opaque = Z_NULL;
+
+ rstream_->next_in = crbuf_;
+ wstream_->next_in = uwbuf_;
+ rstream_->next_out = urbuf_;
+ wstream_->next_out = cwbuf_;
+ rstream_->avail_in = 0;
+ wstream_->avail_in = 0;
+ rstream_->avail_out = urbuf_size_;
+ wstream_->avail_out = cwbuf_size_;
+
+ rv = inflateInit(rstream_);
+ checkZlibRv(rv, rstream_->msg);
+
+ // Have to set this flag so we know whether to de-initialize.
+ r_init = true;
+
+ rv = deflateInit(wstream_, Z_DEFAULT_COMPRESSION);
+ checkZlibRv(rv, wstream_->msg);
+ }
+
+ catch (...) {
+ if (r_init) {
+ rv = inflateEnd(rstream_);
+ checkZlibRvNothrow(rv, rstream_->msg);
+ }
+ // There is no way we can get here if wstream_ was initialized.
+
+ throw;
+ }
+}
+
+inline void TZlibTransport::checkZlibRv(int status, const char* message) {
+ if (status != Z_OK) {
+ throw TZlibTransportException(status, message);
+ }
+}
+
+inline void TZlibTransport::checkZlibRvNothrow(int status, const char* message) {
+ if (status != Z_OK) {
+ string output = "TZlibTransport: zlib failure in destructor: " +
+ TZlibTransportException::errorMessage(status, message);
+ GlobalOutput(output.c_str());
+ }
+}
+
+TZlibTransport::~TZlibTransport() {
+ int rv;
+ rv = inflateEnd(rstream_);
+ checkZlibRvNothrow(rv, rstream_->msg);
+ rv = deflateEnd(wstream_);
+ checkZlibRvNothrow(rv, wstream_->msg);
+
+ delete[] urbuf_;
+ delete[] crbuf_;
+ delete[] uwbuf_;
+ delete[] cwbuf_;
+ delete rstream_;
+ delete wstream_;
+}
+
+bool TZlibTransport::isOpen() {
+ return (readAvail() > 0) || transport_->isOpen();
+}
+
+// READING STRATEGY
+//
+// We have two buffers for reading: one containing the compressed data (crbuf_)
+// and one containing the uncompressed data (urbuf_). When read is called,
+// we repeat the following steps until we have satisfied the request:
+// - Copy data from urbuf_ into the caller's buffer.
+// - If we had enough, return.
+// - If urbuf_ is empty, read some data into it from the underlying transport.
+// - Inflate data from crbuf_ into urbuf_.
+//
+// In standalone objects, we set input_ended_ to true when inflate returns
+// Z_STREAM_END. This allows to make sure that a checksum was verified.
+
+inline int TZlibTransport::readAvail() {
+ return urbuf_size_ - rstream_->avail_out - urpos_;
+}
+
+uint32_t TZlibTransport::read(uint8_t* buf, uint32_t len) {
+ int need = len;
+
+ // TODO(dreiss): Skip urbuf on big reads.
+
+ while (true) {
+ // Copy out whatever we have available, then give them the min of
+ // what we have and what they want, then advance indices.
+ int give = std::min(readAvail(), need);
+ memcpy(buf, urbuf_ + urpos_, give);
+ need -= give;
+ buf += give;
+ urpos_ += give;
+
+ // If they were satisfied, we are done.
+ if (need == 0) {
+ return len;
+ }
+
+ // If we get to this point, we need to get some more data.
+
+ // If zlib has reported the end of a stream, we can't really do any more.
+ if (input_ended_) {
+ return len - need;
+ }
+
+ // The uncompressed read buffer is empty, so reset the stream fields.
+ rstream_->next_out = urbuf_;
+ rstream_->avail_out = urbuf_size_;
+ urpos_ = 0;
+
+ // If we don't have any more compressed data available,
+ // read some from the underlying transport.
+ if (rstream_->avail_in == 0) {
+ uint32_t got = transport_->read(crbuf_, crbuf_size_);
+ if (got == 0) {
+ return len - need;
+ }
+ rstream_->next_in = crbuf_;
+ rstream_->avail_in = got;
+ }
+
+ // We have some compressed data now. Uncompress it.
+ int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH);
+
+ if (zlib_rv == Z_STREAM_END) {
+ if (standalone_) {
+ input_ended_ = true;
+ }
+ } else {
+ checkZlibRv(zlib_rv, rstream_->msg);
+ }
+
+ // Okay. The read buffer should have whatever we can give it now.
+ // Loop back to the start and try to give some more.
+ }
+}
+
+
+// WRITING STRATEGY
+//
+// We buffer up small writes before sending them to zlib, so our logic is:
+// - Is the write big?
+// - Send the buffer to zlib.
+// - Send this data to zlib.
+// - Is the write small?
+// - Is there insufficient space in the buffer for it?
+// - Send the buffer to zlib.
+// - Copy the data to the buffer.
+//
+// We have two buffers for writing also: the uncompressed buffer (mentioned
+// above) and the compressed buffer. When sending data to zlib we loop over
+// the following until the source (uncompressed buffer or big write) is empty:
+// - Is there no more space in the compressed buffer?
+// - Write the compressed buffer to the underlying transport.
+// - Deflate from the source into the compressed buffer.
+
+void TZlibTransport::write(const uint8_t* buf, uint32_t len) {
+ // zlib's "deflate" function has enough logic in it that I think
+ // we're better off (performance-wise) buffering up small writes.
+ if ((int)len > MIN_DIRECT_DEFLATE_SIZE) {
+ flushToZlib(uwbuf_, uwpos_);
+ uwpos_ = 0;
+ flushToZlib(buf, len);
+ } else if (len > 0) {
+ if (uwbuf_size_ - uwpos_ < (int)len) {
+ flushToZlib(uwbuf_, uwpos_);
+ uwpos_ = 0;
+ }
+ memcpy(uwbuf_ + uwpos_, buf, len);
+ uwpos_ += len;
+ }
+}
+
+void TZlibTransport::flush() {
+ flushToZlib(uwbuf_, uwpos_, true);
+ assert((int)wstream_->avail_out != cwbuf_size_);
+ transport_->write(cwbuf_, cwbuf_size_ - wstream_->avail_out);
+ transport_->flush();
+}
+
+void TZlibTransport::flushToZlib(const uint8_t* buf, int len, bool finish) {
+ int flush = (finish ? Z_FINISH : Z_NO_FLUSH);
+
+ wstream_->next_in = const_cast<uint8_t*>(buf);
+ wstream_->avail_in = len;
+
+ while (wstream_->avail_in > 0 || finish) {
+ // If our ouput buffer is full, flush to the underlying transport.
+ if (wstream_->avail_out == 0) {
+ transport_->write(cwbuf_, cwbuf_size_);
+ wstream_->next_out = cwbuf_;
+ wstream_->avail_out = cwbuf_size_;
+ }
+
+ int zlib_rv = deflate(wstream_, flush);
+
+ if (finish && zlib_rv == Z_STREAM_END) {
+ assert(wstream_->avail_in == 0);
+ break;
+ }
+
+ checkZlibRv(zlib_rv, wstream_->msg);
+ }
+}
+
+const uint8_t* TZlibTransport::borrow(uint8_t* buf, uint32_t* len) {
+ // Don't try to be clever with shifting buffers.
+ // If we have enough data, give a pointer to it,
+ // otherwise let the protcol use its slow path.
+ if (readAvail() >= (int)*len) {
+ *len = (uint32_t)readAvail();
+ return urbuf_ + urpos_;
+ }
+ return NULL;
+}
+
+void TZlibTransport::consume(uint32_t len) {
+ if (readAvail() >= (int)len) {
+ urpos_ += len;
+ } else {
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "consume did not follow a borrow.");
+ }
+}
+
+void TZlibTransport::verifyChecksum() {
+ if (!standalone_) {
+ throw TTransportException(
+ TTransportException::BAD_ARGS,
+ "TZLibTransport can only verify checksums for standalone objects.");
+ }
+
+ if (!input_ended_) {
+ // This should only be called when reading is complete,
+ // but it's possible that the whole checksum has not been fed to zlib yet.
+ // We try to read an extra byte here to force zlib to finish the stream.
+ // It might not always be easy to "unread" this byte,
+ // but we throw an exception if we get it, which is not really
+ // a recoverable error, so it doesn't matter.
+ uint8_t buf[1];
+ uint32_t got = this->read(buf, sizeof(buf));
+ if (got || !input_ended_) {
+ throw TTransportException(
+ TTransportException::CORRUPTED_DATA,
+ "Zlib stream not complete.");
+ }
+ }
+
+ // If the checksum had been bad, we would have gotten an error while
+ // inflating.
+}
+
+
+}}} // apache::thrift::transport
diff --git a/lib/cpp/src/transport/TZlibTransport.h b/lib/cpp/src/transport/TZlibTransport.h
new file mode 100644
index 0000000..1439d9d
--- /dev/null
+++ b/lib/cpp/src/transport/TZlibTransport.h
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_ 1
+
+#include <boost/lexical_cast.hpp>
+#include <transport/TTransport.h>
+
+struct z_stream_s;
+
+namespace apache { namespace thrift { namespace transport {
+
+class TZlibTransportException : public TTransportException {
+ public:
+ TZlibTransportException(int status, const char* msg) :
+ TTransportException(TTransportException::INTERNAL_ERROR,
+ errorMessage(status, msg)),
+ zlib_status_(status),
+ zlib_msg_(msg == NULL ? "(null)" : msg) {}
+
+ virtual ~TZlibTransportException() throw() {}
+
+ int getZlibStatus() { return zlib_status_; }
+ std::string getZlibMessage() { return zlib_msg_; }
+
+ static std::string errorMessage(int status, const char* msg) {
+ std::string rv = "zlib error: ";
+ if (msg) {
+ rv += msg;
+ } else {
+ rv += "(no message)";
+ }
+ rv += " (status = ";
+ rv += boost::lexical_cast<std::string>(status);
+ rv += ")";
+ return rv;
+ }
+
+ int zlib_status_;
+ std::string zlib_msg_;
+};
+
+/**
+ * This transport uses zlib's compressed format on the "far" side.
+ *
+ * There are two kinds of TZlibTransport objects:
+ * - Standalone objects are used to encode self-contained chunks of data
+ * (like structures). They include checksums.
+ * - Non-standalone transports are used for RPC. They are not implemented yet.
+ *
+ * TODO(dreiss): Don't do an extra copy of the compressed data if
+ * the underlying transport is TBuffered or TMemory.
+ *
+ */
+class TZlibTransport : public TTransport {
+ public:
+
+ /**
+ * @param transport The transport to read compressed data from
+ * and write compressed data to.
+ * @param use_for_rpc True if this object will be used for RPC,
+ * false if this is a standalone object.
+ * @param urbuf_size Uncompressed buffer size for reading.
+ * @param crbuf_size Compressed buffer size for reading.
+ * @param uwbuf_size Uncompressed buffer size for writing.
+ * @param cwbuf_size Compressed buffer size for writing.
+ *
+ * TODO(dreiss): Write a constructor that isn't a pain.
+ */
+ TZlibTransport(boost::shared_ptr<TTransport> transport,
+ bool use_for_rpc,
+ int urbuf_size = DEFAULT_URBUF_SIZE,
+ int crbuf_size = DEFAULT_CRBUF_SIZE,
+ int uwbuf_size = DEFAULT_UWBUF_SIZE,
+ int cwbuf_size = DEFAULT_CWBUF_SIZE) :
+ transport_(transport),
+ standalone_(!use_for_rpc),
+ urpos_(0),
+ uwpos_(0),
+ input_ended_(false),
+ output_flushed_(false),
+ urbuf_size_(urbuf_size),
+ crbuf_size_(crbuf_size),
+ uwbuf_size_(uwbuf_size),
+ cwbuf_size_(cwbuf_size),
+ urbuf_(NULL),
+ crbuf_(NULL),
+ uwbuf_(NULL),
+ cwbuf_(NULL),
+ rstream_(NULL),
+ wstream_(NULL)
+ {
+
+ if (!standalone_) {
+ throw TTransportException(
+ TTransportException::BAD_ARGS,
+ "TZLibTransport has not been tested for RPC.");
+ }
+
+ if (uwbuf_size_ < MIN_DIRECT_DEFLATE_SIZE) {
+ // Have to copy this into a local because of a linking issue.
+ int minimum = MIN_DIRECT_DEFLATE_SIZE;
+ throw TTransportException(
+ TTransportException::BAD_ARGS,
+ "TZLibTransport: uncompressed write buffer must be at least"
+ + boost::lexical_cast<std::string>(minimum) + ".");
+ }
+
+ try {
+ urbuf_ = new uint8_t[urbuf_size];
+ crbuf_ = new uint8_t[crbuf_size];
+ uwbuf_ = new uint8_t[uwbuf_size];
+ cwbuf_ = new uint8_t[cwbuf_size];
+
+ // Don't call this outside of the constructor.
+ initZlib();
+
+ } catch (...) {
+ delete[] urbuf_;
+ delete[] crbuf_;
+ delete[] uwbuf_;
+ delete[] cwbuf_;
+ throw;
+ }
+ }
+
+ // Don't call this outside of the constructor.
+ void initZlib();
+
+ ~TZlibTransport();
+
+ bool isOpen();
+
+ void open() {
+ transport_->open();
+ }
+
+ void close() {
+ transport_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+ const uint8_t* borrow(uint8_t* buf, uint32_t* len);
+
+ void consume(uint32_t len);
+
+ void verifyChecksum();
+
+ /**
+ * TODO(someone_smart): Choose smart defaults.
+ */
+ static const int DEFAULT_URBUF_SIZE = 128;
+ static const int DEFAULT_CRBUF_SIZE = 1024;
+ static const int DEFAULT_UWBUF_SIZE = 128;
+ static const int DEFAULT_CWBUF_SIZE = 1024;
+
+ protected:
+
+ inline void checkZlibRv(int status, const char* msg);
+ inline void checkZlibRvNothrow(int status, const char* msg);
+ inline int readAvail();
+ void flushToZlib(const uint8_t* buf, int len, bool finish = false);
+
+ // Writes smaller than this are buffered up.
+ // Larger (or equal) writes are dumped straight to zlib.
+ static const int MIN_DIRECT_DEFLATE_SIZE = 32;
+
+ boost::shared_ptr<TTransport> transport_;
+ bool standalone_;
+
+ int urpos_;
+ int uwpos_;
+
+ /// True iff zlib has reached the end of a stream.
+ /// This is only ever true in standalone protcol objects.
+ bool input_ended_;
+ /// True iff we have flushed the output stream.
+ /// This is only ever true in standalone protcol objects.
+ bool output_flushed_;
+
+ int urbuf_size_;
+ int crbuf_size_;
+ int uwbuf_size_;
+ int cwbuf_size_;
+
+ uint8_t* urbuf_;
+ uint8_t* crbuf_;
+ uint8_t* uwbuf_;
+ uint8_t* cwbuf_;
+
+ struct z_stream_s* rstream_;
+ struct z_stream_s* wstream_;
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_