-- TFileTransport (Thrift Logfile)
Summary:
-- TBufferedFileWriter.h/cpp will be renamed to TFileTransport.h/cpp in the next commit.
-- TFileTransport is essentially reading and writing thrift calls to/from a file instead of a
socket.
-- The code/design is somewhat similar to pillar_logfile but there are some significant changes.
todo:
-- still need to do error correction/detection
Reviewed By: Mark Slee
Test Plan:
-- Wrote test in thrift/test/cpp/src/main.cpp that appends to a file and replays requests
Notes:
It's finally time to port search over to Thrift
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664889 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.cpp b/lib/cpp/src/transport/TBufferedFileWriter.cpp
index c3ed250..39e2074 100644
--- a/lib/cpp/src/transport/TBufferedFileWriter.cpp
+++ b/lib/cpp/src/transport/TBufferedFileWriter.cpp
@@ -1,43 +1,34 @@
#include "TBufferedFileWriter.h"
+#include "TTransportUtils.h"
#include <pthread.h>
-#include <cassert>
-#include <cstdlib>
-#include <string>
-#include <sys/time.h>
-#include <sys/types.h>
+ #include <sys/time.h>
#include <fcntl.h>
#include <errno.h>
+#include <unistd.h>
+#include <iostream>
-using std::string;
+using namespace std;
namespace facebook { namespace thrift { namespace transport {
-TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz) {
- init(filename, sz, 0, 0);
-}
+TFileTransport::TFileTransport(string path) {
+ filename_ = path;
+ openLogFile();
-TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset) {
- init(filename, sz, fd, offset);
-}
+ // set initial values to default
+ readBuffSize_ = DEFAULT_READ_BUFF_SIZE;
+ readTimeout_ = DEFAULT_READ_TIMEOUT_MS;
+ chunkSize_ = DEFAULT_CHUNK_SIZE;
+ eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
+ flushMaxUs_ = DEFAULT_FLUSH_MAX_US;
+ flushMaxBytes_ = DEFAULT_FLUSH_MAX_BYTES;
+ maxEventSize_ = DEFAULT_MAX_EVENT_SIZE;
+ maxCorruptedEvents_ = DEFAULT_MAX_CORRUPTED_EVENTS;
+ eofSleepTime_ = DEFAULT_EOF_SLEEP_TIME_US;
-void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long offset) {
- // validate buffer size
- sz_ = sz;
- if (sz_ <= 0) {
- throw TTransportException("invalid input buffer size");
- }
-
- // set file-related variables
- fd_ = 0;
- resetOutputFile(fd, filename, offset);
-
- // set default values of flush related params
- flushMaxBytes_ = 1024 * 100;
- flushMaxUs_ = 20 * 1000;
-
- // allocate event buffer
- buffer_ = new eventInfo[sz_];
+ // initialize buffer lazily
+ buffer_ = 0;
// buffer is initially empty
isEmpty_ = true;
@@ -47,9 +38,6 @@
headPos_ = 0;
tailPos_ = 0;
- // for lack of a better option, set chunk size to 0. Users can change this to whatever they want
- chunkSize_ = 0;
-
// initialize all the condition vars/mutexes
pthread_mutex_init(&mutex_, NULL);
pthread_cond_init(¬Full_, NULL);
@@ -59,42 +47,70 @@
// not closing the file during init
closing_ = false;
- // spawn writer thread
- pthread_create(&writer_, NULL, startWriterThread, (void *)this);
+ // create writer thread on demand
+ writerThreadId_ = 0;
+
+ // read related variables
+ // read buff initialized lazily
+ readBuff_ = 0;
+ currentEvent_ = 0;
}
-void TBufferedFileWriter::resetOutputFile(int fd, string filename, long long offset) {
+void TFileTransport::resetOutputFile(int fd, string filename, long long offset) {
filename_ = filename;
offset_ = offset;
// check if current file is still open
if (fd_ > 0) {
- // TODO: unclear if this should throw an error
- fprintf(stderr, "error, current file not closed (trying to open %s)\n", filename_.c_str());
+ // TODO: should there be a flush here?
+ fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str());
::close(fd_);
}
- fd_ = fd;
+
+ if (fd) {
+ fd_ = fd;
+ } else {
+ // open file if the input fd is 0
+ openLogFile();
+ }
}
-TBufferedFileWriter::~TBufferedFileWriter() {
- // flush output buffer
- flush();
+TFileTransport::~TFileTransport() {
+ // TODO: Make sure the buffer is actually flushed
+ // flush the buffer if a writer thread is active
+ if (writerThreadId_ > 0) {
+ // flush output buffer
+ flush();
- // send a signal to write thread to end
- closing_ = true;
- pthread_join(writer_, NULL);
+ // send a signal to write thread to end
+ closing_ = true;
+ pthread_join(writerThreadId_, NULL);
+ }
- delete[] buffer_;
+ if (buffer_) {
+ delete[] buffer_;
+ }
- // TODO: should the file be closed here?
+ if (readBuff_) {
+ delete readBuff_;
+ }
+
+ if (currentEvent_) {
+ delete currentEvent_;
+ }
+
+ // close logfile
+ if (fd_ > 0) {
+ ::close(fd_);
+ }
}
-void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
// make sure that event size is valid
if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
- // T_ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
+ T_DEBUG("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
return;
}
@@ -103,17 +119,33 @@
return;
}
- eventInfo toEnqueue;
- uint8_t* bufCopy = (uint8_t *)malloc(sizeof(uint8_t) * eventLen);
- toEnqueue.payLoad_ = bufCopy;
- toEnqueue.eventSize_ = eventLen;
+ eventInfo* toEnqueue = new eventInfo();
+ toEnqueue->eventBuff_ = (uint8_t *)malloc((sizeof(uint8_t) * eventLen) + 4);
+ // first 4 bytes is the event length
+ memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
+ // actual event contents
+ memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
+ toEnqueue->eventSize_ = eventLen + 4;
+ // T_DEBUG_L(1, "event size: %u", eventLen);
return enqueueEvent(toEnqueue, blockUntilFlush);
}
-void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush) {
- // Lock mutex
+void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) {
+ // lock mutex
pthread_mutex_lock(&mutex_);
+
+ // make sure that enqueue buffer is initialized and writer thread is running
+ if (buffer_ == 0) {
+ buffer_ = new eventInfo*[eventBufferSize_];
+ }
+ if (writerThreadId_ == 0) {
+ if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
+ T_ERROR("Error creating write thread");
+ return;
+ }
+ }
+
// Can't enqueue while buffer is full
while(isFull_) {
pthread_cond_wait(¬Full_, &mutex_);
@@ -121,7 +153,7 @@
// make a copy and enqueue at tail of buffer
buffer_[tailPos_] = toEnqueue;
- tailPos_ = (tailPos_+1) % sz_;
+ tailPos_ = (tailPos_+1) % eventBufferSize_;
// mark the buffer as non-empty
isEmpty_ = false;
@@ -146,7 +178,7 @@
}
-eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) {
+eventInfo* TFileTransport::dequeueEvent(long long deadline) {
//deadline time struc
struct timespec ts;
if(deadline) {
@@ -174,10 +206,10 @@
bool doSignal = false;
// could be empty if we timed out
- eventInfo retEvent;
+ eventInfo* retEvent = 0;
if(!isEmpty_) {
retEvent = buffer_[headPos_];
- headPos_ = (headPos_+1) % sz_;
+ headPos_ = (headPos_+1) % eventBufferSize_;
isFull_ = false;
doSignal = true;
@@ -194,16 +226,129 @@
pthread_cond_signal(¬Full_);
}
+ if (!retEvent) {
+ retEvent = new eventInfo();
+ }
return retEvent;
}
-void TBufferedFileWriter::flush()
-{
- eventInfo flushEvent;
- flushEvent.payLoad_ = NULL;
- flushEvent.eventSize_ = 0;
+void TFileTransport::writerThread() {
+ // open file if it is not open
+ if(!fd_) {
+ openLogFile();
+ }
+ // set the offset to the correct value (EOF)
+ offset_ = lseek(fd_, 0, SEEK_END);
+
+ // Figure out the next time by which a flush must take place
+ long long nextFlush = getCurrentTime() + flushMaxUs_;
+ uint32_t unflushed = 0;
+
+ while(1) {
+ // this will only be true when the destructor is being invoked
+ if(closing_) {
+ if(-1 == ::close(fd_)) {
+ perror("TFileTransport: error in close");
+ }
+ throw TTransportException("error in file close");
+ fd_ = 0;
+ return;
+ }
+
+ //long long start = now();
+ eventInfo* outEvent = dequeueEvent(nextFlush);
+ if (!outEvent) {
+ T_DEBUG_L(1, "Got an empty event");
+ return;
+ }
+
+ // sanity check on event
+ if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
+ T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
+ delete(outEvent);
+ continue;
+ }
+ //long long diff = now()-start;
+ //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
+
+ // If chunking is required, then make sure that msg does not cross chunk boundary
+ if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
+
+ // event size must be less than chunk size
+ if(outEvent->eventSize_ > chunkSize_) {
+ T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
+ outEvent->eventSize_, chunkSize_);
+ delete(outEvent);
+ continue;
+ }
+
+ long long chunk1 = offset_/chunkSize_;
+ long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
+
+ // if adding this event will cross a chunk boundary, pad the chunk with zeros
+ if(chunk1 != chunk2) {
+ int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
+
+ // sanity check
+ if (padding <= 0) {
+ T_DEBUG("Padding is empty, skipping event");
+ continue;
+ }
+ if (padding > (int32_t)chunkSize_) {
+ T_DEBUG("padding is larger than chunk size, skipping event");
+ continue;
+ }
+ // T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
+ uint8_t zeros[padding];
+ bzero(zeros, padding);
+ T_DEBUG_L(1, "Adding padding of %u bytes at %lu", padding, offset_);
+ if(-1 == ::write(fd_, zeros, padding)) {
+ perror("TFileTransport: error while padding zeros");
+ throw TTransportException("TFileTransport: error while padding zeros");
+ }
+ unflushed += padding;
+ offset_ += padding;
+ }
+ }
+
+ // write the dequeued event to the file
+ if(outEvent->eventSize_ > 0) {
+ if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
+ perror("TFileTransport: error while writing event");
+ // TODO: should this trigger an exception or simply continue?
+ throw TTransportException("TFileTransport: error while writing event");
+ }
+
+ unflushed += outEvent->eventSize_;
+ offset_ += outEvent->eventSize_;
+ }
+
+ // couple of cases from which a flush could be triggered
+ if((getCurrentTime() >= nextFlush && unflushed > 0) ||
+ unflushed > flushMaxBytes_ ||
+ (outEvent && (outEvent->eventSize_== 0)) ) {
+ //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
+
+ // sync (force flush) file to disk
+ fsync(fd_);
+ nextFlush = getCurrentTime() + flushMaxUs_;
+ unflushed = 0;
+
+ // notify anybody(thing?) waiting for flush completion
+ pthread_mutex_lock(&mutex_);
+ notFlushed_ = false;
+ pthread_mutex_unlock(&mutex_);
+ pthread_cond_broadcast(&flushed_);
+ }
+ // deallocate dequeued event
+ delete(outEvent);
+ }
+}
+
+void TFileTransport::flush() {
+ eventInfo* flushEvent = new eventInfo();
notFlushed_ = true;
enqueueEvent(flushEvent, false);
@@ -218,20 +363,264 @@
pthread_mutex_unlock(&mutex_);
}
-void TBufferedFileWriter::openOutputFile() {
+
+uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
+ uint32_t have = 0;
+ uint32_t get = 0;
+
+ while (have < len) {
+ get = read(buf+have, len-have);
+ if (get <= 0) {
+ throw TEOFException();
+ }
+ have += get;
+ }
+
+ return have;
+}
+
+uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
+ // check if there an event is ready to be read
+ if (!currentEvent_) {
+ readEvent();
+ }
+
+ // did not manage to read an event from the file. This could have happened
+ // if the timeout expired or there was some other error
+ if (!currentEvent_) {
+ return 0;
+ }
+
+ // read as much of the current event as possible
+ int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
+ if (remaining <= (int32_t)len) {
+ memcpy(buf,
+ currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
+ remaining);
+ delete(currentEvent_);
+ currentEvent_ = 0;
+ return remaining;
+ }
+
+ // read as much as possible
+ memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
+ currentEvent_->eventBuffPos_ += len;
+ return len;
+}
+
+bool TFileTransport::readEvent() {
+ int readTries = 0;
+
+ if (!readBuff_) {
+ readBuff_ = new uint8_t[readBuffSize_];
+ }
+
+ while (1) {
+ // check if there is anything in the read buffer
+ if (readState_.bufferPtr_ == readState_.bufferLen_) {
+ // advance the offset pointer
+ offset_ += readState_.bufferLen_;
+ readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
+ if (readState_.bufferLen_) {
+ T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
+ }
+ readState_.bufferPtr_ = 0;
+ readState_.lastDispatchPtr_ = 0;
+
+ // read error
+ if (readState_.bufferLen_ == -1) {
+ readState_.resetAllValues();
+ perror("TFileTransport: error while reading from file");
+ // TODO: should this trigger an exception or simply continue?
+ throw TTransportException("TFileTransport: error while reading from file");
+ } else if (readState_.bufferLen_ == 0) { // EOF
+ // wait indefinitely if there is no timeout
+ if (readTimeout_ == -1) {
+ usleep(eofSleepTime_);
+ continue;
+ } else if (readTimeout_ == 0) {
+ // reset state
+ readState_.resetState(0);
+ return false;
+ } else if (readTimeout_ > 0) {
+ // timeout already expired once
+ if (readTries > 0) {
+ readState_.resetState(0);
+ return false;
+ } else {
+ usleep(readTimeout_ * 1000);
+ readTries++;
+ continue;
+ }
+ }
+ }
+ }
+
+ readTries = 0;
+
+ // attempt to read an event from the buffer
+ while(readState_.bufferPtr_ < readState_.bufferLen_) {
+ if (readState_.readingSize_) {
+ if(readState_.eventSizeBuffPos_ == 0) {
+ if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
+ ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
+ // skip one byte towards chunk boundary
+ // T_DEBUG_L(1, "Skipping a byte");
+ readState_.bufferPtr_++;
+ continue;
+ }
+ }
+
+ readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
+ readBuff_[readState_.bufferPtr_++];
+ bool eventCorruption = false;
+ if (readState_.eventSizeBuffPos_ == 4) {
+ // 0 length event indicates padding
+ if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
+ T_DEBUG_L(1, "Got padding");
+ readState_.resetState(readState_.lastDispatchPtr_);
+ continue;
+ }
+ // got a valid event
+ readState_.readingSize_ = false;
+ if (readState_.event_) {
+ delete(readState_.event_);
+ }
+ readState_.event_ = new eventInfo();
+ readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
+
+ T_DEBUG_L(0, "Event size: %u", readState_.event_->eventSize_);
+
+ // TODO
+ // make sure event is valid, an error is triggered if:
+ // 1. Event size is larger than user-speficied max-event size
+
+ // 2. Event size is larger than chunk size
+
+ // 3. size indicates that event crosses chunk boundary
+
+ }
+
+ if (eventCorruption) {
+ // perform some kickass recovery
+ }
+ } else {
+ if (!readState_.event_->eventBuff_) {
+ readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
+ readState_.event_->eventBuffPos_ = 0;
+ }
+ // take either the entire event or the remaining bytes in the buffer
+ int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
+ readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
+
+ // copy data from read buffer into event buffer
+ memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
+ readBuff_ + readState_.bufferPtr_,
+ reclaimBuffer);
+
+ // increment position ptrs
+ readState_.event_->eventBuffPos_ += reclaimBuffer;
+ readState_.bufferPtr_ += reclaimBuffer;
+
+ // if (reclaimBuffer > 0) {
+ // T_DEBUG_L(0, "eventBuffPost: %u", readState_.event_->eventBuffPos_);
+ // T_DEBUG_L(0, "eventSize: %u", readState_.event_->eventSize_);
+ // }
+
+ // check if the event has been read in full
+ if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
+ // set the completed event to the current event
+ currentEvent_ = readState_.event_;
+ currentEvent_->eventBuffPos_ = 0;
+
+ readState_.event_ = 0;
+ readState_.resetState(readState_.bufferPtr_);
+
+ // exit criteria
+ T_DEBUG_L(0, "Finished one event");
+ return true;
+ }
+ }
+ }
+
+
+ }
+}
+
+void TFileTransport::seekToChunk(int32_t chunk) {
+ if (fd_ <= 0) {
+ throw TTransportException("File not open");
+ }
+
+ int32_t lastChunk = getNumChunks();
+
+ // negative indicates reverse seek (from the end)
+ if (chunk < 0) {
+ chunk += lastChunk;
+ }
+
+ // cannot seek past EOF
+ if (chunk > lastChunk) {
+ T_DEBUG("Trying to seek past EOF. Seeking to EOF instead");
+ chunk = lastChunk;
+ }
+
+ uint32_t minEndOffset = 0;
+ if (chunk == lastChunk) {
+ minEndOffset = lseek(fd_, 0, SEEK_END);
+ }
+
+ offset_ = lseek(fd_, chunk * chunkSize_, SEEK_SET);
+ readState_.resetAllValues();
+ if (offset_ == -1) {
+ perror("TFileTransport: lseek error in seekToChunk");
+ // TODO: should this trigger an exception or simply continue?
+ throw TTransportException("TFileTransport: lseek error in seekToChunk");
+ }
+
+ // seek to EOF if user wanted to go to last chunk
+ uint32_t oldReadTimeout = getReadTimeout();
+ setReadTimeout(0);
+ if (chunk == lastChunk) {
+ // keep on reading unti the last event at point of seekChunk call
+ while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
+ }
+ setReadTimeout(oldReadTimeout);
+
+}
+
+void TFileTransport::seekToEnd() {
+ seekToChunk(getNumChunks());
+}
+
+uint32_t TFileTransport::getNumChunks() {
+ if (fd_ <= 0) {
+ return 0;
+ }
+ struct stat f_info;
+ fstat(fd_, &f_info);
+ return (f_info.st_size)/chunkSize_;
+}
+
+// Utility Functions
+void TFileTransport::openLogFile() {
mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
- fd_ = ::open(filename_.c_str(), O_WRONLY | O_CREAT | O_APPEND, mode);
+ fd_ = ::open(filename_.c_str(), O_RDWR | O_CREAT | O_APPEND, mode);
// make sure open call was successful
if(fd_ == -1) {
char errorMsg[1024];
- sprintf(errorMsg, "TBufferedFileWriter: Could not open file: %s", filename_.c_str());
+ sprintf(errorMsg, "TFileTransport: Could not open file: %s", filename_.c_str());
perror(errorMsg);
throw TTransportException(errorMsg);
}
+
+ // opening the file in append mode causes offset_t to be at the end
+ offset_ = lseek(fd_, 0, SEEK_CUR);
+ T_DEBUG_L(1, "initial offset: %lu", offset_);
}
-uint32_t TBufferedFileWriter::getCurrentTime() {
+uint32_t TFileTransport::getCurrentTime() {
long long ret;
struct timeval tv;
gettimeofday(&tv, NULL);
@@ -241,108 +630,60 @@
}
-void TBufferedFileWriter::writerThread() {
- // open file if it is not open
- if(!fd_) {
- openOutputFile();
- }
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileTransport> inputTransport):
+ processor_(processor), protocolFactory_(protocolFactory),
+ inputTransport_(inputTransport) {
- // Figure out the next time by which a flush must take place
- long long nextFlush = getCurrentTime() + flushMaxUs_;
- uint32_t unflushed = 0;
+ // default the output transport to a null transport (common case)
+ outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileTransport> inputTransport,
+ shared_ptr<TTransport> outputTransport):
+ processor_(processor), protocolFactory_(protocolFactory),
+ inputTransport_(inputTransport), outputTransport_(outputTransport) {
+};
+
+void TFileProcessor::process(uint32_t numEvents, bool tail) {
+ pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
+
+ // set the read timeout to 0 if tailing is required
+ int32_t oldReadTimeout = inputTransport_->getReadTimeout();
+ if (tail) {
+ // save old read timeout so it can be restored
+ inputTransport_->setReadTimeout(0);
+ }
+
+ uint32_t numProcessed = 0;
while(1) {
- // this will only be true when the destructor is being invoked
- if(closing_) {
- if(-1 == ::close(fd_)) {
- perror("TBufferedFileWriter: error in close");
+ // bad form to use exceptions for flow control but there is really
+ // no other way around it
+ try {
+ processor_->process(iop.first, iop.second);
+ numProcessed++;
+ if ( (numEvents > 0) && (numProcessed == numEvents)) {
+ return;
}
- throw TTransportException("error in file close");
- }
-
- //long long start = now();
- eventInfo outEvent = dequeueEvent(nextFlush);
-
- // sanity check on event
- if ( (maxEventSize_ > 0) && (outEvent.eventSize_ > maxEventSize_)) {
- T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent.eventSize_, maxEventSize_);
- continue;
- }
- //long long diff = now()-start;
- //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
-
- // If chunking is required, then make sure that msg does not cross chunk boundary
- if( (outEvent.eventSize_ > 0) && (chunkSize_ != 0)) {
-
- // event size must be less than chunk size
- if(outEvent.eventSize_ > chunkSize_) {
- T_ERROR("TBufferedFileWriter: event size(%u) is greater than chunk size(%u): skipping event",
- outEvent.eventSize_, chunkSize_);
- continue;
+ } catch (TEOFException& teof) {
+ if (!tail) {
+ break;
}
-
- long long chunk1 = offset_/chunkSize_;
- long long chunk2 = (offset_ + outEvent.eventSize_ - 1)/chunkSize_;
-
- // if adding this event will cross a chunk boundary, pad the chunk with zeros
- if(chunk1 != chunk2) {
- int padding = (int)(chunk2*chunkSize_ - offset_);
-
- // sanity check
- if (padding <= 0) {
- T_DEBUG("Padding is empty, skipping event");
- continue;
- }
- if (padding > (int32_t)chunkSize_) {
- T_DEBUG("padding is larger than chunk size, skipping event");
- continue;
- }
- // T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
- uint8_t zeros[padding];
- bzero(zeros, padding);
- if(-1 == ::write(fd_, zeros, padding)) {
- perror("TBufferedFileWriter: error while padding zeros");
- throw TTransportException("TBufferedFileWriter: error while padding zeros");
- }
- unflushed += padding;
- offset_ += padding;
- }
- }
-
- // write the dequeued event to the file
- if(outEvent.eventSize_ > 0) {
- if(-1 == ::write(fd_, outEvent.payLoad_, outEvent.eventSize_)) {
- perror("TBufferedFileWriter: error while writing event");
- // TODO: should this trigger an exception or simply continue?
- throw TTransportException("TBufferedFileWriter: error while writing event");
- }
-
- // deallocate payload
- free(outEvent.payLoad_);
-
- unflushed += outEvent.eventSize_;
- offset_ += outEvent.eventSize_;
- }
-
- // couple of cases from which a flush could be triggered
- if((getCurrentTime() >= nextFlush && unflushed > 0) ||
- unflushed > flushMaxBytes_ ||
- (outEvent.eventSize_ == 0) ) {
- //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
-
- // sync (force flush) file to disk
- fsync(fd_);
- nextFlush = getCurrentTime() + flushMaxUs_;
- unflushed = 0;
-
- // notify anybody(thing?) waiting for flush completion
- pthread_mutex_lock(&mutex_);
- notFlushed_ = false;
- pthread_mutex_unlock(&mutex_);
- pthread_cond_broadcast(&flushed_);
+ } catch (TException te) {
+ cerr << te.what() << endl;
+ break;
}
}
+ // restore old read timeout
+ if (tail) {
+ inputTransport_->setReadTimeout(oldReadTimeout);
+ }
+
}
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h
index c327aab..8192332 100644
--- a/lib/cpp/src/transport/TBufferedFileWriter.h
+++ b/lib/cpp/src/transport/TBufferedFileWriter.h
@@ -1,10 +1,12 @@
-#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
-#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
+#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
#include "TTransport.h"
#include "Thrift.h"
+#include "TProcessor.h"
#include <string>
+#include <stdio.h>
#include <boost/shared_ptr.hpp>
@@ -15,42 +17,154 @@
// Data pertaining to a single event
typedef struct eventInfo {
- uint8_t* payLoad_;
- uint32_t eventSize_;
+ uint8_t* eventBuff_;
+ uint32_t eventSize_;
+ uint32_t eventBuffPos_;
- eventInfo():payLoad_(NULL), eventSize_(0){};
+ eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+ ~eventInfo() {
+ if (eventBuff_) {
+ delete[] eventBuff_;
+ }
+ }
} eventInfo;
+// information about current read state
+typedef struct readState {
+ eventInfo* event_;
+ // keep track of event size
+ uint8_t eventSizeBuff_[4];
+ uint8_t eventSizeBuffPos_;
+ bool readingSize_;
+
+ // read buffer variables
+ int32_t bufferPtr_;
+ int32_t bufferLen_;
+
+ // last successful dispatch point
+ int32_t lastDispatchPtr_;
+
+ void resetState(uint32_t lastDispatchPtr) {
+ readingSize_ = true;
+ eventSizeBuffPos_ = 0;
+ lastDispatchPtr_ = lastDispatchPtr;
+ }
+
+ void resetAllValues() {
+ resetState(0);
+ bufferPtr_ = 0;
+ bufferLen_ = 0;
+ if (event_) {
+ delete(event_);
+ }
+ event_ = 0;
+ }
+
+ readState() {
+ event_ = 0;
+ resetAllValues();
+ }
+
+ ~readState() {
+ if (event_) {
+ delete(event_);
+ }
+ }
+
+} readState;
+
/**
- * Class that stores a circular in-memory event/message buffer and writes
- * elements to disk when the buffer becomes full or a flush is triggered.
+ * File implementation of a transport. Reads and writes are done to a
+ * file on disk.
*
* @author Aditya Agarwal <aditya@facebook.com>
*/
-class TBufferedFileWriter : public TTransport {
+class TFileTransport : public TTransport {
public:
+ TFileTransport(string path);
+ ~TFileTransport();
+
+ // TODO: what is the correct behaviour for this?
+ // the log file is generally always open
+ bool isOpen() {
+ return true;
+ }
+
+ void write(const uint8_t* buf, uint32_t len) {
+ enqueueEvent(buf, len, false);
+ }
+
+ void flush();
+
+ uint32_t readAll(uint8_t* buf, uint32_t len);
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ // log-file specific functions
+ void seekToChunk(int chunk);
+ void seekToEnd();
+ uint32_t getNumChunks();
+
+ // for changing the output file
+ void resetOutputFile(int fd, string filename, long long offset);
+
+ // Setter/Getter functions for user-controllable options
+ void setReadBuffSize(uint32_t readBuffSize) {
+ if (readBuffSize) {
+ readBuffSize_ = readBuffSize;
+ }
+ }
+ uint32_t getReadBuffSize() {
+ return readBuffSize_;
+ }
+
+ void setReadTimeout(int32_t readTimeout) {
+ readTimeout_ = readTimeout;
+ }
+ int32_t getReadTimeout() {
+ return readTimeout_;
+ }
+
+ void setChunkSize(uint32_t chunkSize) {
+ if (chunkSize) {
+ chunkSize_ = chunkSize;
+ }
+ }
+ uint32_t getChunkSize() {
+ return chunkSize_;
+ }
+
+ void setEventBufferSize(uint32_t bufferSize) {
+ if (bufferSize) {
+ if (buffer_) {
+ delete[] buffer_;
+ }
+ eventBufferSize_ = bufferSize;
+ buffer_ = new eventInfo*[eventBufferSize_];
+ }
+ }
+ uint32_t getEventBufferSize() {
+ return eventBufferSize_;
+ }
+
void setFlushMaxUs(uint32_t flushMaxUs) {
- flushMaxUs_ = flushMaxUs;
+ if (flushMaxUs) {
+ flushMaxUs_ = flushMaxUs;
+ }
}
uint32_t getFlushMaxUs() {
return flushMaxUs_;
}
void setFlushMaxBytes(uint32_t flushMaxBytes) {
- flushMaxBytes_ = flushMaxBytes;
+ if (flushMaxBytes) {
+ flushMaxBytes_ = flushMaxBytes;
+ }
}
uint32_t getFlushMaxBytes() {
return flushMaxBytes_;
}
- void setChunkSize(uint32_t chunkSize) {
- chunkSize_ = chunkSize;
- }
- uint32_t getChunkSize() {
- return chunkSize_;
- }
-
void setMaxEventSize(uint32_t maxEventSize) {
maxEventSize_ = maxEventSize;
}
@@ -58,52 +172,88 @@
return maxEventSize_;
}
- TBufferedFileWriter(string filename, uint32_t sz);
- TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
- void init(string filename, uint32_t sz, int fd, long long offset);
- ~TBufferedFileWriter();
-
- void resetOutputFile(int fd, string filename, long long offset);
-
- void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
- void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
- void write(const uint8_t* buf, uint32_t len) {
- enqueueEvent(buf, len, false);
+ void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
+ maxCorruptedEvents_ = maxCorruptedEvents;
+ }
+ uint32_t getMaxCorruptedEvents() {
+ return maxCorruptedEvents_;
}
- eventInfo dequeueEvent(long long deadline);
- void flush();
+ void setEofSleepTimeUs(uint32_t eofSleepTime) {
+ if (eofSleepTime) {
+ eofSleepTime_ = eofSleepTime;
+ }
+ }
+ uint32_t getEofSleepTimeUs() {
+ return eofSleepTime_;
+ }
+
+ private:
+ // helper functions for writing to a file
+ void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+ void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush);
+ eventInfo* dequeueEvent(long long deadline);
// control for writer thread
static void* startWriterThread(void* ptr) {
- (((TBufferedFileWriter*)ptr)->writerThread());
+ (((TFileTransport*)ptr)->writerThread());
return 0;
}
void writerThread();
+ // helper functions for reading from a file
+ bool readEvent();
- private:
- // circular buffer to hold data in before it is flushed. This is an array of strings. Each
- // element of the array stores a msg that needs to be written to the file
- eventInfo* buffer_;
-
- // size of string buffer
- uint32_t sz_;
+ // Utility functions
+ void openLogFile();
+ uint32_t getCurrentTime();
+
+ // Class variables
+ readState readState_;
+ uint8_t* readBuff_;
+
+ eventInfo* currentEvent_;
+
+ uint32_t readBuffSize_;
+ static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
+
+ int32_t readTimeout_;
+ static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
// size of chunks that file will be split up into
uint32_t chunkSize_;
+ static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+ // size of string buffer
+ uint32_t eventBufferSize_;
+ static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024;
+
+ // circular buffer to hold data in before it is flushed. This is an array of strings. Each
+ // element of the array stores a msg that needs to be written to the file
+ eventInfo** buffer_;
+
// max number of microseconds that can pass without flushing
uint32_t flushMaxUs_;
+ static const uint32_t DEFAULT_FLUSH_MAX_US = 20000;
// max number of bytes that can be written without flushing
uint32_t flushMaxBytes_;
+ static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
// max event size
uint32_t maxEventSize_;
+ static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
+
+ // max number of corrupted events per chunk
+ uint32_t maxCorruptedEvents_;
+ static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
+ // sleep duration when EOF is hit
+ uint32_t eofSleepTime_;
+ static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+
// writer thread id
- pthread_t writer_;
+ pthread_t writerThreadId_;
// variables that determine position of head/tail of circular buffer
int headPos_, tailPos_;
@@ -126,13 +276,61 @@
int fd_;
// Offset within the file
- long long offset_;
-
- void openOutputFile();
- uint32_t getCurrentTime();
+ off_t offset_;
};
-}}}
+// Exception thrown when EOF is hit
+class TEOFException : public facebook::thrift::TTransportException {
+ public:
+ TEOFException():
+ facebook::thrift::TTransportException(TTX_EOF) {};
+};
-#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
+
+// wrapper class to process events from a file containing thrift events
+class TFileProcessor {
+ public:
+ /**
+ * Constructor that defaults output transport to null transport
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport file transport
+ */
+ TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileTransport> inputTransport);
+
+ /**
+ * Constructor
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport input file transport
+ * @param output output transport
+ */
+ TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileTransport> inputTransport,
+ shared_ptr<TTransport> outputTransport);
+
+ /**
+ * processes events from the file
+ *
+ * @param numEvents number of events to process (0 for unlimited)
+ * @param tail tails the file if true
+ */
+ void process(uint32_t numEvents, bool tail);
+
+ private:
+ shared_ptr<TProcessor> processor_;
+ shared_ptr<TProtocolFactory> protocolFactory_;
+ shared_ptr<TFileTransport> inputTransport_;
+ shared_ptr<TTransport> outputTransport_;
+};
+
+
+}}} // facebook::thrift::transport
+
+#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h
index e02eb70..de94a7c 100644
--- a/lib/cpp/src/transport/TTransportException.h
+++ b/lib/cpp/src/transport/TTransportException.h
@@ -12,6 +12,7 @@
TTX_UNKNOWN = 0,
TTX_NOT_OPEN = 1,
TTX_TIMED_OUT = 2,
+ TTX_EOF = 3,
};
/**
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index a8003cf..427cc0e 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -25,7 +25,10 @@
void open() {}
- void write(const std::string& s) {}
+ void write(const uint8_t* buf, uint32_t len) {
+ return;
+ }
+
};
diff --git a/test/cpp/src/main.cpp b/test/cpp/src/main.cpp
index 8344a88..47ae671 100644
--- a/test/cpp/src/main.cpp
+++ b/test/cpp/src/main.cpp
@@ -11,6 +11,7 @@
#include <transport/TTransportUtils.h>
#include <transport/TBufferedRouterTransport.h>
#include <transport/TBufferedFileWriter.h>
+#include <TLogging.h>
#include "Service.h"
@@ -73,7 +74,12 @@
int8_t echoByte(int8_t arg) {return arg;}
int32_t echoI32(int32_t arg) {return arg;}
int64_t echoI64(int64_t arg) {return arg;}
- string echoString(string arg) {return arg;}
+ string echoString(string arg) {
+ if (arg != "hello") {
+ T_ERROR_ABORT("WRONG STRING!!!!");
+ }
+ return arg;
+ }
vector<int8_t> echoList(vector<int8_t> arg) {return arg;}
set<int8_t> echoSet(set<int8_t> arg) {return arg;}
map<int8_t, int8_t> echoMap(map<int8_t, int8_t> arg) {return arg;}
@@ -189,20 +195,22 @@
bool _done;
Monitor _sleep;
};
-
+
+
int main(int argc, char **argv) {
- int port = 9090;
+ int port = 9091;
string serverType = "thread-pool";
string protocolType = "binary";
size_t workerCount = 4;
- size_t clientCount = 10;
- size_t loopCount = 10000;
+ size_t clientCount = 20;
+ size_t loopCount = 50000;
TType loopType = T_VOID;
string callName = "echoVoid";
bool runServer = true;
bool logRequests = false;
string requestLogPath = "./requestlog.tlog";
+ bool replayRequests = false;
ostringstream usage;
@@ -217,8 +225,10 @@
"\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
"\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
"\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl <<
+ "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl <<
"\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
+
map<string, string> args;
for(int ix = 1; ix < argc; ix++) {
@@ -272,6 +282,10 @@
logRequests = args["log-request"] == "true";
}
+ if(!args["replay-request"].empty()) {
+ replayRequests = args["replay-request"] == "true";
+ }
+
if(!args["server-type"].empty()) {
serverType = args["server-type"];
@@ -299,6 +313,28 @@
// Dispatcher
shared_ptr<Server> serviceHandler(new Server());
+ if (replayRequests) {
+ shared_ptr<Server> serviceHandler(new Server());
+ shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
+
+ // Transports
+ shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
+ fileTransport->setChunkSize(2 * 1024 * 1024);
+ fileTransport->setMaxEventSize(1024 * 16);
+ fileTransport->seekToEnd();
+
+ // Protocol Factory
+ shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
+
+ TFileProcessor fileProcessor(serviceProcessor,
+ protocolFactory,
+ fileTransport);
+
+ fileProcessor.process(0, true);
+ exit(0);
+ }
+
+
if(runServer) {
shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
@@ -314,11 +350,12 @@
if (logRequests) {
// initialize the log file
- shared_ptr<TBufferedFileWriter> bufferedFileWriter(new TBufferedFileWriter(requestLogPath, 1000));
- bufferedFileWriter->setChunkSize(2 * 1024 * 1024);
- bufferedFileWriter->setMaxEventSize(1024 * 16);
+ shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
+ fileTransport->setChunkSize(2 * 1024 * 1024);
+ fileTransport->setMaxEventSize(1024 * 16);
- transportFactory = shared_ptr<TTransportFactory>(new TBufferedRouterTransportFactory(bufferedFileWriter));
+ transportFactory =
+ shared_ptr<TTransportFactory>(new TBufferedRouterTransportFactory(fileTransport));
}
shared_ptr<Thread> serverThread;