Fix to TThriftTransport corruption detection
Summary: O_APPEND means that even if the file pointer is in the middle of the file, the next write will go to the end of the file. So when initializing the TThriftTransport to be written to, we need to truncate the file after the last good event. I'm pretty sure this was what was causing queuemap log corruption when the multiplexer is restarted.
The corruption doesn't necessarily go away at the next chunk boundary because of the offset being recalculated. I think it's good to have the offset recalculated, but then we shouldn't use the old chunk boundary data there, but rather use the newly calculated offset to determine the padding (could be an entire chunk, but that's okay -- way better than corruption)
Reviewed By: mcslee
Test Plan: Wrote to a ThriftFile using local scribe instance. Printed it. Then cat'd garbage to it from the command line. Then wrote to it again. Then printed it again. Seems to work fine, whereas with old scribe binary the later entries were corrupted.
Revert: OK
DiffCamp Revision: 4683
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665388 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 8b55715..e6ff029 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -292,7 +292,15 @@
}
// set the offset to the correct value (EOF)
- seekToEnd();
+ try {
+ seekToEnd();
+ } catch (TException &te) {
+ }
+
+ // throw away any partial events
+ offset_ += readState_.lastDispatchPtr_;
+ ftruncate(fd_, offset_);
+ readState_.resetAllValues();
// Figure out the next time by which a flush must take place
@@ -348,21 +356,10 @@
if (chunk1 != chunk2) {
// refetch the offset to keep in sync
offset_ = lseek(fd_, 0, SEEK_CUR);
- int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
+ int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_);
- // sanity check
- if (padding <= 0) {
- T_DEBUG("Padding is empty, skipping event");
- continue;
- }
- if (padding > (int32_t)chunkSize_) {
- T_DEBUG("padding is larger than chunk size, skipping event");
- continue;
- }
uint8_t zeros[padding];
bzero(zeros, padding);
- //T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
- //padding, offset_, chunk2);
if (-1 == ::write(fd_, zeros, padding)) {
GlobalOutput("TFileTransport: error while padding zeros");
throw TTransportException("TFileTransport: error while padding zeros");
@@ -711,7 +708,7 @@
uint32_t oldReadTimeout = getReadTimeout();
setReadTimeout(NO_TAIL_READ_TIMEOUT);
// keep on reading unti the last event at point of seekChunk call
- while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
+ while (readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
setReadTimeout(oldReadTimeout);
}