Thrift: Changing TFileTransport to use the same buffer-swap mechanism that pillar does
Reviewed by: aditya
Tested with thrift test class
Notes: TFileTransport used to use a circular buffer. Changed this to use two large buffers, one for reading and one for writing, that are swapped whenever the writer thread finishes with the last write.
Also changed a few default constants -- force_flush timeout is now 3 sec, default buffer size is 10000 entries
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664997 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 61f1b80..3f625bc 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -13,48 +13,39 @@
namespace facebook { namespace thrift { namespace transport {
-TFileTransport::TFileTransport(string path) {
- filename_ = path;
- openLogFile();
-
- // set initial values to default
- readBuffSize_ = DEFAULT_READ_BUFF_SIZE;
- readTimeout_ = DEFAULT_READ_TIMEOUT_MS;
- chunkSize_ = DEFAULT_CHUNK_SIZE;
- eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
- flushMaxUs_ = DEFAULT_FLUSH_MAX_US;
- flushMaxBytes_ = DEFAULT_FLUSH_MAX_BYTES;
- maxEventSize_ = DEFAULT_MAX_EVENT_SIZE;
- maxCorruptedEvents_ = DEFAULT_MAX_CORRUPTED_EVENTS;
- eofSleepTime_ = DEFAULT_EOF_SLEEP_TIME_US;
-
- // initialize buffer lazily
- buffer_ = 0;
-
- // buffer is initially empty
- isEmpty_ = true;
- isFull_ = false;
-
- // both head and tail are initially at 0
- headPos_ = 0;
- tailPos_ = 0;
-
+TFileTransport::TFileTransport(string path)
+ : readState_()
+ , readBuff_(NULL)
+ , currentEvent_(NULL)
+ , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
+ , readTimeout_(NO_TAIL_READ_TIMEOUT)
+ , chunkSize_(DEFAULT_CHUNK_SIZE)
+ , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
+ , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
+ , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
+ , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
+ , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
+ , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
+ , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
+ , writerThreadId_(0)
+ , dequeueBuffer_(NULL)
+ , enqueueBuffer_(NULL)
+ , closing_(false)
+ , forceFlush_(false)
+ , filename_(path)
+ , fd_(0)
+ , bufferAndThreadInitialized_(false)
+ , offset_(0)
+ , lastBadChunk_(0)
+ , numCorruptedEventsInChunk_(0)
+{
// initialize all the condition vars/mutexes
pthread_mutex_init(&mutex_, NULL);
pthread_cond_init(¬Full_, NULL);
pthread_cond_init(¬Empty_, NULL);
pthread_cond_init(&flushed_, NULL);
- // not closing the file during init
- closing_ = false;
-
- // create writer thread on demand
- writerThreadId_ = 0;
-
- // read related variables
- // read buff initialized lazily
- readBuff_ = 0;
- currentEvent_ = 0;
+ openLogFile();
}
void TFileTransport::resetOutputFile(int fd, string filename, long long offset) {
@@ -96,18 +87,27 @@
// deal in the common case because writing is quick
pthread_join(writerThreadId_, NULL);
+ writerThreadId_ = 0;
}
- if (buffer_) {
- delete[] buffer_;
+ if (dequeueBuffer_) {
+ delete dequeueBuffer_;
+ dequeueBuffer_ = NULL;
+ }
+
+ if (enqueueBuffer_) {
+ delete enqueueBuffer_;
+ enqueueBuffer_ = NULL;
}
if (readBuff_) {
delete readBuff_;
+ readBuff_ = NULL;
}
if (currentEvent_) {
delete currentEvent_;
+ currentEvent_ = NULL;
}
// close logfile
@@ -118,6 +118,25 @@
}
}
+bool TFileTransport::initBufferAndWriteThread() {
+ if (bufferAndThreadInitialized_) {
+ T_ERROR("Trying to double-init TFileTransport");
+ return false;
+ }
+
+ if (writerThreadId_ == 0) {
+ if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
+ T_ERROR("Could not create writer thread");
+ return false;
+ }
+ }
+
+ dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+ enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+ bufferAndThreadInitialized_ = true;
+
+ return true;
+}
void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
// make sure that event size is valid
@@ -139,46 +158,34 @@
memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
toEnqueue->eventSize_ = eventLen + 4;
- // T_DEBUG_L(1, "event size: %u", eventLen);
- return enqueueEvent(toEnqueue, blockUntilFlush);
-}
-
-void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) {
// lock mutex
pthread_mutex_lock(&mutex_);
// make sure that enqueue buffer is initialized and writer thread is running
- if (buffer_ == 0) {
- buffer_ = new eventInfo*[eventBufferSize_];
- }
- if (writerThreadId_ == 0) {
- if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
- T_ERROR("Error creating write thread");
+ if (!bufferAndThreadInitialized_) {
+ if (!initBufferAndWriteThread()) {
+ delete toEnqueue;
+ pthread_mutex_unlock(&mutex_);
return;
}
}
// Can't enqueue while buffer is full
- while(isFull_) {
+ while (enqueueBuffer_->isFull()) {
pthread_cond_wait(¬Full_, &mutex_);
}
- // make a copy and enqueue at tail of buffer
- buffer_[tailPos_] = toEnqueue;
- tailPos_ = (tailPos_+1) % eventBufferSize_;
-
- // mark the buffer as non-empty
- isEmpty_ = false;
-
- // circular buffer has wrapped around (and is full)
- if(tailPos_ == headPos_) {
- // T_DEBUG("queue is full");
- isFull_ = true;
+ // add to the buffer
+ if (!enqueueBuffer_->addEvent(toEnqueue)) {
+ delete toEnqueue;
+ pthread_mutex_unlock(&mutex_);
+ return;
}
// signal anybody who's waiting for the buffer to be non-empty
pthread_cond_signal(¬Empty_);
- if(blockUntilFlush) {
+
+ if (blockUntilFlush) {
pthread_cond_wait(&flushed_, &mutex_);
}
@@ -186,61 +193,50 @@
// because condition variables can get triggered by the os for no reason
// it is probably a non-factor for the time being
pthread_mutex_unlock(&mutex_);
-
}
-eventInfo* TFileTransport::dequeueEvent(long long deadline) {
+bool TFileTransport::swapEventBuffers(long long deadline) {
//deadline time struc
struct timespec ts;
- if(deadline) {
+ if (deadline) {
ts.tv_sec = deadline/(1000*1000);
ts.tv_nsec = (deadline%(1000*1000))*1000;
}
// wait for the queue to fill up
pthread_mutex_lock(&mutex_);
- while(isEmpty_) {
+ while (enqueueBuffer_->isEmpty()) {
// do a timed wait on the condition variable
- if(deadline) {
+ if (deadline) {
int e = pthread_cond_timedwait(¬Empty_, &mutex_, &ts);
if(e == ETIMEDOUT) {
break;
}
- }
- else {
+ } else {
// just wait until the buffer gets an item
pthread_cond_wait(¬Empty_, &mutex_);
}
}
- string ret;
- bool doSignal = false;
+ bool swapped = false;
// could be empty if we timed out
- eventInfo* retEvent = 0;
- if(!isEmpty_) {
- retEvent = buffer_[headPos_];
- headPos_ = (headPos_+1) % eventBufferSize_;
+ if (!enqueueBuffer_->isEmpty()) {
+ TFileTransportBuffer *temp = enqueueBuffer_;
+ enqueueBuffer_ = dequeueBuffer_;
+ dequeueBuffer_ = temp;
- isFull_ = false;
- doSignal = true;
-
- // check if this is the last item in the buffer
- if(headPos_ == tailPos_) {
- isEmpty_ = true;
- }
+ swapped = true;
}
// unlock the mutex and signal if required
pthread_mutex_unlock(&mutex_);
- if(doSignal) {
+
+ if (swapped) {
pthread_cond_signal(¬Full_);
}
- if (!retEvent) {
- retEvent = new eventInfo();
- }
- return retEvent;
+ return swapped;
}
@@ -268,105 +264,99 @@
return;
}
- //long long start = now();
- eventInfo* outEvent = dequeueEvent(nextFlush);
- if (!outEvent) {
- T_DEBUG_L(1, "Got an empty event");
- return;
- }
-
- // sanity check on event
- if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
- T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
- delete(outEvent);
- continue;
- }
- //long long diff = now()-start;
- //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
-
- // If chunking is required, then make sure that msg does not cross chunk boundary
- if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
-
- // event size must be less than chunk size
- if(outEvent->eventSize_ > chunkSize_) {
- T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
- outEvent->eventSize_, chunkSize_);
- delete(outEvent);
- continue;
- }
-
- long long chunk1 = offset_/chunkSize_;
- long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
-
- // if adding this event will cross a chunk boundary, pad the chunk with zeros
- if(chunk1 != chunk2) {
- int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
-
- // sanity check
- if (padding <= 0) {
- T_DEBUG("Padding is empty, skipping event");
- continue;
+ if (swapEventBuffers(nextFlush)) {
+ eventInfo* outEvent;
+ while (NULL != (outEvent = dequeueBuffer_->getNext())) {
+ if (!outEvent) {
+ T_DEBUG_L(1, "Got an empty event");
+ return;
}
- if (padding > (int32_t)chunkSize_) {
- T_DEBUG("padding is larger than chunk size, skipping event");
- continue;
- }
- uint8_t zeros[padding];
- bzero(zeros, padding);
- // T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
- // padding, offset_, chunk2);
- if(-1 == ::write(fd_, zeros, padding)) {
- perror("TFileTransport: error while padding zeros");
- throw TTransportException("TFileTransport: error while padding zeros");
- }
- unflushed += padding;
- offset_ += padding;
- }
- }
- // write the dequeued event to the file
- if(outEvent->eventSize_ > 0) {
- if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
- perror("TFileTransport: error while writing event");
- throw TTransportException("TFileTransport: error while writing event");
- }
+ // sanity check on event
+ if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
+ T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
+ continue;
+ }
- unflushed += outEvent->eventSize_;
- offset_ += outEvent->eventSize_;
+ // If chunking is required, then make sure that msg does not cross chunk boundary
+ if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
+
+ // event size must be less than chunk size
+ if(outEvent->eventSize_ > chunkSize_) {
+ T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
+ outEvent->eventSize_, chunkSize_);
+ continue;
+ }
+
+ long long chunk1 = offset_/chunkSize_;
+ long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
+
+ // if adding this event will cross a chunk boundary, pad the chunk with zeros
+ if(chunk1 != chunk2) {
+ // refetch the offset to keep in sync
+ offset_ = lseek(fd_, 0, SEEK_CUR);
+ int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
+
+ // sanity check
+ if (padding <= 0) {
+ T_DEBUG("Padding is empty, skipping event");
+ continue;
+ }
+ if (padding > (int32_t)chunkSize_) {
+ T_DEBUG("padding is larger than chunk size, skipping event");
+ continue;
+ }
+ uint8_t zeros[padding];
+ bzero(zeros, padding);
+ // T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
+ // padding, offset_, chunk2);
+ if(-1 == ::write(fd_, zeros, padding)) {
+ perror("TFileTransport: error while padding zeros");
+ throw TTransportException("TFileTransport: error while padding zeros");
+ }
+ unflushed += padding;
+ offset_ += padding;
+ }
+ }
+
+ // write the dequeued event to the file
+ if(outEvent->eventSize_ > 0) {
+ if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
+ perror("TFileTransport: error while writing event");
+ throw TTransportException("TFileTransport: error while writing event");
+ }
+
+ unflushed += outEvent->eventSize_;
+ offset_ += outEvent->eventSize_;
+ }
+ }
+ dequeueBuffer_->reset();
}
// couple of cases from which a flush could be triggered
- if((getCurrentTime() >= nextFlush && unflushed > 0) ||
+ if ((getCurrentTime() >= nextFlush && unflushed > 0) ||
unflushed > flushMaxBytes_ ||
- (outEvent && (outEvent->eventSize_== 0)) ) {
- //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
+ forceFlush_) {
// sync (force flush) file to disk
fsync(fd_);
nextFlush = getCurrentTime() + flushMaxUs_;
unflushed = 0;
- // notify anybody(thing?) waiting for flush completion
- pthread_mutex_lock(&mutex_);
- notFlushed_ = false;
- pthread_mutex_unlock(&mutex_);
+ // notify anybody waiting for flush completion
+ forceFlush_ = false;
pthread_cond_broadcast(&flushed_);
}
- // deallocate dequeued event
- delete(outEvent);
}
}
void TFileTransport::flush() {
- eventInfo* flushEvent = new eventInfo();
- notFlushed_ = true;
-
- enqueueEvent(flushEvent, false);
-
// wait for flush to take place
pthread_mutex_lock(&mutex_);
- while(notFlushed_) {
+ forceFlush_ = true;
+
+ while (forceFlush_) {
pthread_cond_wait(&flushed_, &mutex_);
}
@@ -569,14 +559,14 @@
void TFileTransport::performRecovery() {
// perform some kickass recovery
uint32_t curChunk = getCurChunk();
- if (lastBadChunk == curChunk) {
- numCorruptedEventsinChunk++;
+ if (lastBadChunk_ == curChunk) {
+ numCorruptedEventsInChunk_++;
} else {
- lastBadChunk = curChunk;
- numCorruptedEventsinChunk = 1;
+ lastBadChunk_ = curChunk;
+ numCorruptedEventsInChunk_ = 1;
}
- if (numCorruptedEventsinChunk < maxCorruptedEvents_) {
+ if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
// maybe there was an error in reading the file from disk
// seek to the beginning of chunk and try again
seekToChunk(curChunk);
@@ -706,6 +696,70 @@
return ret;
}
+TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
+ : bufferMode_(WRITE)
+ , writePoint_(0)
+ , readPoint_(0)
+ , size_(size)
+{
+ buffer_ = new eventInfo*[size];
+}
+
+TFileTransportBuffer::~TFileTransportBuffer() {
+ if (buffer_) {
+ for (uint32_t i = 0; i < writePoint_; i++) {
+ delete buffer_[i];
+ }
+ delete[] buffer_;
+ buffer_ = NULL;
+ }
+}
+
+bool TFileTransportBuffer::addEvent(eventInfo *event) {
+ if (bufferMode_ == READ) {
+ perror("Trying to write to a buffer in read mode");
+ }
+ if (writePoint_ < size_) {
+ buffer_[writePoint_++] = event;
+ return true;
+ } else {
+ // buffer is full
+ return false;
+ }
+}
+
+eventInfo* TFileTransportBuffer::getNext() {
+ if (bufferMode_ == WRITE) {
+ bufferMode_ = READ;
+ }
+ if (readPoint_ < writePoint_) {
+ return buffer_[readPoint_++];
+ } else {
+ // no more entries
+ return NULL;
+ }
+}
+
+void TFileTransportBuffer::reset() {
+ if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
+ T_DEBUG("Resetting a buffer with unread entries");
+ }
+ // Clean up the old entries
+ for (uint32_t i = 0; i < writePoint_; i++) {
+ delete buffer_[i];
+ }
+ bufferMode_ = WRITE;
+ writePoint_ = 0;
+ readPoint_ = 0;
+}
+
+bool TFileTransportBuffer::isFull() {
+ return writePoint_ == size_;
+}
+
+bool TFileTransportBuffer::isEmpty() {
+ return writePoint_ == 0;
+}
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index a58be2c..237234e 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -75,6 +75,47 @@
} readState;
/**
+ * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
+ * to be written to disk. Should be used in the following way:
+ * 1) Buffer created
+ * 2) Buffer written to (addEvent)
+ * 3) Buffer read from (getNext)
+ * 4) Buffer reset (reset)
+ * 5) Go back to 2, or destroy buffer
+ *
+ * The buffer should never be written to after it is read from, unless it is reset first.
+ * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
+ * which uses the buffer in this way.
+ *
+ * @author James Wang <jwang@facebook.com>
+ */
+class TFileTransportBuffer {
+ public:
+ TFileTransportBuffer(uint32_t size);
+ ~TFileTransportBuffer();
+
+ bool addEvent(eventInfo *event);
+ eventInfo* getNext();
+ void reset();
+ bool isFull();
+ bool isEmpty();
+
+ private:
+ TFileTransportBuffer(); // should not be used
+
+ enum mode {
+ WRITE,
+ READ
+ };
+ mode bufferMode_;
+
+ uint32_t writePoint_;
+ uint32_t readPoint_;
+ uint32_t size_;
+ eventInfo** buffer_;
+};
+
+/**
* File implementation of a transport. Reads and writes are done to a
* file on disk.
*
@@ -138,14 +179,13 @@
}
void setEventBufferSize(uint32_t bufferSize) {
- if (bufferSize) {
- if (buffer_) {
- delete[] buffer_;
- }
- eventBufferSize_ = bufferSize;
- buffer_ = new eventInfo*[eventBufferSize_];
+ if (bufferAndThreadInitialized_) {
+ perror("Cannot change the buffer size after writer thread started");
+ return;
}
+ eventBufferSize_ = bufferSize;
}
+
uint32_t getEventBufferSize() {
return eventBufferSize_;
}
@@ -194,8 +234,8 @@
private:
// helper functions for writing to a file
void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
- void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush);
- eventInfo* dequeueEvent(long long deadline);
+ bool swapEventBuffers(long long deadline);
+ bool initBufferAndWriteThread();
// control for writer thread
static void* startWriterThread(void* ptr) {
@@ -218,7 +258,6 @@
// Class variables
readState readState_;
uint8_t* readBuff_;
-
eventInfo* currentEvent_;
uint32_t readBuffSize_;
@@ -231,17 +270,13 @@
uint32_t chunkSize_;
static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
- // size of string buffer
+ // size of event buffers
uint32_t eventBufferSize_;
- static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024;
+ static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
- // circular buffer to hold data in before it is flushed. This is an array of strings. Each
- // element of the array stores a msg that needs to be written to the file
- eventInfo** buffer_;
-
// max number of microseconds that can pass without flushing
uint32_t flushMaxUs_;
- static const uint32_t DEFAULT_FLUSH_MAX_US = 20000;
+ static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
// max number of bytes that can be written without flushing
uint32_t flushMaxBytes_;
@@ -266,33 +301,35 @@
// writer thread id
pthread_t writerThreadId_;
- // variables that determine position of head/tail of circular buffer
- int headPos_, tailPos_;
-
- // variables indicating whether the buffer is full or empty
- bool isFull_, isEmpty_;
+ // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
+ // needs to be written to the file. The buffers are swapped by the writer thread.
+ TFileTransportBuffer *dequeueBuffer_;
+ TFileTransportBuffer *enqueueBuffer_;
+
+ // conditions used to block when the buffer is full or empty
pthread_cond_t notFull_, notEmpty_;
- bool closing_;
+ volatile bool closing_;
// To keep track of whether the buffer has been flushed
pthread_cond_t flushed_;
- bool notFlushed_;
+ volatile bool forceFlush_;
- // Mutex that is grabbed when enqueueing, dequeueing and flushing
- // from the circular buffer
+ // Mutex that is grabbed when enqueueing and swapping the read/write buffers
pthread_mutex_t mutex_;
// File information
string filename_;
int fd_;
+ // Whether the writer thread and buffers have been initialized
+ bool bufferAndThreadInitialized_;
+
// Offset within the file
off_t offset_;
// event corruption information
- uint32_t lastBadChunk;
- uint32_t numCorruptedEventsinChunk;
-
+ uint32_t lastBadChunk_;
+ uint32_t numCorruptedEventsInChunk_;
};
// Exception thrown when EOF is hit