-- ThriftLogfile is finally done.
Summary:
- Everything seems to be working well..
Reviewed By: tbr - slee
Test Plan: Tested using search
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664947 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 4615977..a9beff9 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -63,7 +63,8 @@
// check if current file is still open
if (fd_ > 0) {
- // TODO: should there be a flush here?
+ // flush any events in the queue
+ flush();
fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str());
if(-1 == ::close(fd_)) {
perror("TFileTransport: error in file close");
@@ -81,14 +82,19 @@
TFileTransport::~TFileTransport() {
- // TODO: Make sure the buffer is actually flushed
// flush the buffer if a writer thread is active
if (writerThreadId_ > 0) {
// flush output buffer
flush();
- // send a signal to write thread to end
+ // set state to closing
closing_ = true;
+
+ // TODO: make sure event queue is empty
+ // currently only the write buffer is flushed
+ // we dont actually wait until the queue is empty. This shouldn't be a big
+ // deal in the common case because writing is quick
+
pthread_join(writerThreadId_, NULL);
}
@@ -116,7 +122,7 @@
void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
// make sure that event size is valid
if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
- T_DEBUG("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
+ T_ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
return;
}
@@ -176,7 +182,6 @@
pthread_cond_wait(&flushed_, &mutex_);
}
- // TODO: don't return until flushed to disk
// this really should be a loop where it makes sure it got flushed
// because condition variables can get triggered by the os for no reason
// it is probably a non-factor for the time being
@@ -304,7 +309,7 @@
}
if (padding > (int32_t)chunkSize_) {
T_DEBUG("padding is larger than chunk size, skipping event");
- continue;
+ continue;
}
uint8_t zeros[padding];
bzero(zeros, padding);
@@ -442,10 +447,10 @@
throw TTransportException("TFileTransport: error while reading from file");
} else if (readState_.bufferLen_ == 0) { // EOF
// wait indefinitely if there is no timeout
- if (readTimeout_ == -1) {
+ if (readTimeout_ == TAIL_READ_TIMEOUT) {
usleep(eofSleepTime_);
continue;
- } else if (readTimeout_ == 0) {
+ } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
// reset state
readState_.resetState(0);
return false;
@@ -592,7 +597,7 @@
// point and punt on the error
readState_.resetState(readState_.lastDispatchPtr_);
char errorMsg[1024];
- sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
+ sprintf(errorMsg, "TFileTransport: log file corrupted at offset:%lu",
offset_ + readState_.lastDispatchPtr_);
perror(errorMsg);
throw TTransportException(errorMsg);
@@ -645,7 +650,7 @@
// seek to EOF if user wanted to go to last chunk
if (seekToEnd) {
uint32_t oldReadTimeout = getReadTimeout();
- setReadTimeout(0);
+ setReadTimeout(NO_TAIL_READ_TIMEOUT);
// keep on reading unti the last event at point of seekChunk call
while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
setReadTimeout(oldReadTimeout);
@@ -745,7 +750,7 @@
int32_t oldReadTimeout = inputTransport_->getReadTimeout();
if (tail) {
// save old read timeout so it can be restored
- inputTransport_->setReadTimeout(0);
+ inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
}
uint32_t numProcessed = 0;