Thrift: Changing TFileTransport to use the same buffer-swap mechanism that pillar does

Reviewed by: aditya

Tested with thrift test class

Notes: TFileTransport used to use a circular buffer.  Changed this to use two large buffers, one for reading and one for writing, that are swapped whenever the writer thread finishes with the last write.

Also changed a few default constants -- force_flush timeout is now 3 sec, default buffer size is 10000 entries


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664997 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index 61f1b80..3f625bc 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -13,48 +13,39 @@
 
 namespace facebook { namespace thrift { namespace transport { 
 
-TFileTransport::TFileTransport(string path) {
-  filename_ = path;
-  openLogFile();
-
-  // set initial values to default
-  readBuffSize_ = DEFAULT_READ_BUFF_SIZE;
-  readTimeout_ = DEFAULT_READ_TIMEOUT_MS;
-  chunkSize_ = DEFAULT_CHUNK_SIZE;
-  eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
-  flushMaxUs_ = DEFAULT_FLUSH_MAX_US;
-  flushMaxBytes_ = DEFAULT_FLUSH_MAX_BYTES;
-  maxEventSize_ = DEFAULT_MAX_EVENT_SIZE;
-  maxCorruptedEvents_ = DEFAULT_MAX_CORRUPTED_EVENTS;
-  eofSleepTime_ = DEFAULT_EOF_SLEEP_TIME_US;
-
-  // initialize buffer lazily
-  buffer_ = 0;
-
-  // buffer is initially empty
-  isEmpty_ = true;
-  isFull_  = false;
-
-  // both head and tail are initially at 0
-  headPos_ = 0;
-  tailPos_ = 0;
-
+TFileTransport::TFileTransport(string path)
+  : readState_()
+  , readBuff_(NULL)
+  , currentEvent_(NULL)
+  , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
+  , readTimeout_(NO_TAIL_READ_TIMEOUT)
+  , chunkSize_(DEFAULT_CHUNK_SIZE)
+  , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
+  , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
+  , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
+  , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
+  , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
+  , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
+  , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
+  , writerThreadId_(0)
+  , dequeueBuffer_(NULL)
+  , enqueueBuffer_(NULL)
+  , closing_(false)
+  , forceFlush_(false)
+  , filename_(path)
+  , fd_(0)
+  , bufferAndThreadInitialized_(false)
+  , offset_(0)
+  , lastBadChunk_(0)
+  , numCorruptedEventsInChunk_(0)
+{
   // initialize all the condition vars/mutexes
   pthread_mutex_init(&mutex_, NULL);
   pthread_cond_init(&notFull_, NULL);
   pthread_cond_init(&notEmpty_, NULL);
   pthread_cond_init(&flushed_, NULL);
 
-  // not closing the file during init
-  closing_ = false;
-
-  // create writer thread on demand
-  writerThreadId_ = 0;
-
-  // read related variables
-  // read buff initialized lazily
-  readBuff_ = 0;
-  currentEvent_ = 0;
+  openLogFile();
 }
 
 void TFileTransport::resetOutputFile(int fd, string filename, long long offset) {
@@ -96,18 +87,27 @@
     // deal in the common case because writing is quick 
 
     pthread_join(writerThreadId_, NULL);
+    writerThreadId_ = 0;
   }
 
-  if (buffer_) {
-    delete[] buffer_;
+  if (dequeueBuffer_) {
+    delete dequeueBuffer_;
+    dequeueBuffer_ = NULL;
+  }
+
+  if (enqueueBuffer_) {
+    delete enqueueBuffer_;
+    enqueueBuffer_ = NULL;
   }
 
   if (readBuff_) {
     delete readBuff_;
+    readBuff_ = NULL;
   }
 
   if (currentEvent_) {
     delete currentEvent_;
+    currentEvent_ = NULL;
   }
 
   // close logfile
@@ -118,6 +118,25 @@
   }
 }
 
+bool TFileTransport::initBufferAndWriteThread() {
+  if (bufferAndThreadInitialized_) {
+    T_ERROR("Trying to double-init TFileTransport");
+    return false;
+  }
+
+  if (writerThreadId_ == 0) {
+    if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
+      T_ERROR("Could not create writer thread");
+      return false;
+    }
+  }
+
+  dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+  enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
+  bufferAndThreadInitialized_ = true;
+
+  return true;
+}
 
 void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
   // make sure that event size is valid
@@ -139,46 +158,34 @@
   memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
   toEnqueue->eventSize_ = eventLen + 4;
 
-  //  T_DEBUG_L(1, "event size: %u", eventLen);
-  return enqueueEvent(toEnqueue, blockUntilFlush);
-}
-
-void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) {
   // lock mutex
   pthread_mutex_lock(&mutex_);
 
   // make sure that enqueue buffer is initialized and writer thread is running
-  if (buffer_ == 0) {
-    buffer_ = new eventInfo*[eventBufferSize_];
-  }
-  if (writerThreadId_ == 0) {
-    if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
-      T_ERROR("Error creating write thread");
+  if (!bufferAndThreadInitialized_) {
+    if (!initBufferAndWriteThread()) {
+      delete toEnqueue;
+      pthread_mutex_unlock(&mutex_);
       return;
     }
   }
 
   // Can't enqueue while buffer is full
-  while(isFull_) {
+  while (enqueueBuffer_->isFull()) {
     pthread_cond_wait(&notFull_, &mutex_);
   }
 
-  // make a copy and enqueue at tail of buffer
-  buffer_[tailPos_] = toEnqueue;
-  tailPos_ = (tailPos_+1) % eventBufferSize_;
-  
-  // mark the buffer as non-empty
-  isEmpty_ = false;
-  
-  // circular buffer has wrapped around (and is full)
-  if(tailPos_ == headPos_) {
-    //    T_DEBUG("queue is full");
-    isFull_ = true;
+  // add to the buffer
+  if (!enqueueBuffer_->addEvent(toEnqueue)) {
+    delete toEnqueue;
+    pthread_mutex_unlock(&mutex_);
+    return;
   }
 
   // signal anybody who's waiting for the buffer to be non-empty
   pthread_cond_signal(&notEmpty_);
-  if(blockUntilFlush) {
+
+  if (blockUntilFlush) {
     pthread_cond_wait(&flushed_, &mutex_);
   }
 
@@ -186,61 +193,50 @@
   // because condition variables can get triggered by the os for no reason 
   // it is probably a non-factor for the time being
   pthread_mutex_unlock(&mutex_);
-
 }
 
-eventInfo* TFileTransport::dequeueEvent(long long deadline) {
+bool TFileTransport::swapEventBuffers(long long deadline) {
   //deadline time struc
   struct timespec ts;
-  if(deadline) {
+  if (deadline) {
     ts.tv_sec = deadline/(1000*1000);
     ts.tv_nsec = (deadline%(1000*1000))*1000;
   }
 
   // wait for the queue to fill up
   pthread_mutex_lock(&mutex_);
-  while(isEmpty_) {
+  while (enqueueBuffer_->isEmpty()) {
     // do a timed wait on the condition variable
-    if(deadline) {
+    if (deadline) {
       int e = pthread_cond_timedwait(&notEmpty_, &mutex_, &ts);
       if(e == ETIMEDOUT) {
         break;
       }
-    }
-    else {
+    } else {
       // just wait until the buffer gets an item
       pthread_cond_wait(&notEmpty_, &mutex_);
     }
   }
 
-  string ret;
-  bool doSignal = false;
+  bool swapped = false;
 
   // could be empty if we timed out
-  eventInfo* retEvent = 0;
-  if(!isEmpty_) {
-    retEvent = buffer_[headPos_];
-    headPos_ = (headPos_+1) % eventBufferSize_;
+  if (!enqueueBuffer_->isEmpty()) {
+    TFileTransportBuffer *temp = enqueueBuffer_;
+    enqueueBuffer_ = dequeueBuffer_;
+    dequeueBuffer_ = temp;
 
-    isFull_ = false;
-    doSignal = true;
-
-    // check if this is the last item in the buffer
-    if(headPos_ == tailPos_) {
-      isEmpty_ = true;
-    }
+    swapped = true;
   }
 
   // unlock the mutex and signal if required
   pthread_mutex_unlock(&mutex_);
-  if(doSignal) {
+
+  if (swapped) {
     pthread_cond_signal(&notFull_);
   }
 
-  if (!retEvent) {
-    retEvent = new eventInfo();
-  }
-  return retEvent;
+  return swapped;
 }
 
 
@@ -268,105 +264,99 @@
       return;
     }
 
-    //long long start = now();
-    eventInfo* outEvent = dequeueEvent(nextFlush);    
-    if (!outEvent) {
-      T_DEBUG_L(1, "Got an empty event");
-      return;
-    }
-
-    // sanity check on event
-    if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
-      T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
-      delete(outEvent);
-      continue;
-    }
-    //long long diff = now()-start;
-    //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
-
-    // If chunking is required, then make sure that msg does not cross chunk boundary
-    if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
-
-      // event size must be less than chunk size
-      if(outEvent->eventSize_ > chunkSize_) {
-        T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
-              outEvent->eventSize_, chunkSize_);
-        delete(outEvent);
-        continue;
-      }
-
-      long long chunk1 = offset_/chunkSize_;
-      long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
-      
-      // if adding this event will cross a chunk boundary, pad the chunk with zeros
-      if(chunk1 != chunk2) {
-        int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
-
-        // sanity check
-        if (padding <= 0) {
-          T_DEBUG("Padding is empty, skipping event");
-         continue;
+    if (swapEventBuffers(nextFlush)) {
+      eventInfo* outEvent;
+      while (NULL != (outEvent = dequeueBuffer_->getNext())) {
+        if (!outEvent) {
+          T_DEBUG_L(1, "Got an empty event");
+          return;
         }
-        if (padding > (int32_t)chunkSize_) {
-          T_DEBUG("padding is larger than chunk size, skipping event");
-         continue;
-        }
-        uint8_t zeros[padding];
-        bzero(zeros, padding);
-        //        T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)", 
-        //        padding, offset_, chunk2);
-        if(-1 == ::write(fd_, zeros, padding)) {
-          perror("TFileTransport: error while padding zeros");
-          throw TTransportException("TFileTransport: error while padding zeros");
-        }
-        unflushed += padding;
-        offset_ += padding;
-      }
-    }
 
-    // write the dequeued event to the file
-    if(outEvent->eventSize_ > 0) {
-      if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
-        perror("TFileTransport: error while writing event");
-        throw TTransportException("TFileTransport: error while writing event");
-      }
+        // sanity check on event
+        if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
+          T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
+          continue;
+        }
 
-      unflushed += outEvent->eventSize_;
-      offset_ += outEvent->eventSize_;
+        // If chunking is required, then make sure that msg does not cross chunk boundary
+        if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
+
+          // event size must be less than chunk size
+          if(outEvent->eventSize_ > chunkSize_) {
+            T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
+                  outEvent->eventSize_, chunkSize_);
+            continue;
+          }
+
+          long long chunk1 = offset_/chunkSize_;
+          long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
+          
+          // if adding this event will cross a chunk boundary, pad the chunk with zeros
+          if(chunk1 != chunk2) {
+            // refetch the offset to keep in sync
+            offset_ = lseek(fd_, 0, SEEK_CUR);
+            int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
+
+            // sanity check
+            if (padding <= 0) {
+              T_DEBUG("Padding is empty, skipping event");
+            continue;
+            }
+            if (padding > (int32_t)chunkSize_) {
+              T_DEBUG("padding is larger than chunk size, skipping event");
+            continue;
+            }
+            uint8_t zeros[padding];
+            bzero(zeros, padding);
+            //        T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)", 
+            //        padding, offset_, chunk2);
+            if(-1 == ::write(fd_, zeros, padding)) {
+              perror("TFileTransport: error while padding zeros");
+              throw TTransportException("TFileTransport: error while padding zeros");
+            }
+            unflushed += padding;
+            offset_ += padding;
+          }
+        }
+
+        // write the dequeued event to the file
+        if(outEvent->eventSize_ > 0) {
+          if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
+            perror("TFileTransport: error while writing event");
+            throw TTransportException("TFileTransport: error while writing event");
+          }
+
+          unflushed += outEvent->eventSize_;
+          offset_ += outEvent->eventSize_;
+        }
+      }
+      dequeueBuffer_->reset();
     }
 
     // couple of cases from which a flush could be triggered
-    if((getCurrentTime() >= nextFlush && unflushed > 0) ||
+    if ((getCurrentTime() >= nextFlush && unflushed > 0) ||
        unflushed > flushMaxBytes_ ||
-       (outEvent && (outEvent->eventSize_== 0)) ) {
-      //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
+       forceFlush_) {
 
       // sync (force flush) file to disk
       fsync(fd_);
       nextFlush = getCurrentTime() + flushMaxUs_;
       unflushed = 0;
 
-      // notify anybody(thing?) waiting for flush completion
-      pthread_mutex_lock(&mutex_);
-      notFlushed_ = false;
-      pthread_mutex_unlock(&mutex_);
+      // notify anybody waiting for flush completion
+      forceFlush_ = false;
       pthread_cond_broadcast(&flushed_);
     }
-    // deallocate dequeued event
-    delete(outEvent);
   }
 }
 
 void TFileTransport::flush() {
-  eventInfo* flushEvent = new eventInfo();
-  notFlushed_ = true;
-
-  enqueueEvent(flushEvent, false);
-
   // wait for flush to take place
   pthread_mutex_lock(&mutex_);
 
-  while(notFlushed_) {
+  forceFlush_ = true;
+
+  while (forceFlush_) {
     pthread_cond_wait(&flushed_, &mutex_);
   }
 
@@ -569,14 +559,14 @@
 void TFileTransport::performRecovery() {
   // perform some kickass recovery 
   uint32_t curChunk = getCurChunk();
-  if (lastBadChunk == curChunk) {
-    numCorruptedEventsinChunk++;
+  if (lastBadChunk_ == curChunk) {
+    numCorruptedEventsInChunk_++;
   } else {
-    lastBadChunk = curChunk;
-    numCorruptedEventsinChunk = 1;
+    lastBadChunk_ = curChunk;
+    numCorruptedEventsInChunk_ = 1;
   }
           
-  if (numCorruptedEventsinChunk < maxCorruptedEvents_) {
+  if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
     // maybe there was an error in reading the file from disk
     // seek to the beginning of chunk and try again
     seekToChunk(curChunk);
@@ -706,6 +696,70 @@
   return ret;
 }
 
+TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
+  : bufferMode_(WRITE)
+  , writePoint_(0)
+  , readPoint_(0)
+  , size_(size)
+{
+  buffer_ = new eventInfo*[size];
+}
+
+TFileTransportBuffer::~TFileTransportBuffer() {
+  if (buffer_) {
+    for (uint32_t i = 0; i < writePoint_; i++) {
+      delete buffer_[i];
+    }
+    delete[] buffer_;
+    buffer_ = NULL;
+  }
+}
+
+bool TFileTransportBuffer::addEvent(eventInfo *event) {
+  if (bufferMode_ == READ) {
+    perror("Trying to write to a buffer in read mode");
+  }
+  if (writePoint_ < size_) {
+    buffer_[writePoint_++] = event;
+    return true;
+  } else {
+    // buffer is full
+    return false;
+  }
+}
+
+eventInfo* TFileTransportBuffer::getNext() {
+  if (bufferMode_ == WRITE) {
+    bufferMode_ = READ;
+  }
+  if (readPoint_ < writePoint_) {
+    return buffer_[readPoint_++];
+  } else {
+    // no more entries
+    return NULL;
+  }
+}
+
+void TFileTransportBuffer::reset() {
+  if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
+    T_DEBUG("Resetting a buffer with unread entries");
+  }
+  // Clean up the old entries
+  for (uint32_t i = 0; i < writePoint_; i++) {
+    delete buffer_[i];
+  }
+  bufferMode_ = WRITE;
+  writePoint_ = 0;
+  readPoint_ = 0;
+}
+
+bool TFileTransportBuffer::isFull() {
+  return writePoint_ == size_;
+}
+
+bool TFileTransportBuffer::isEmpty() {
+  return writePoint_ == 0;
+}
 
 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
                                shared_ptr<TProtocolFactory> protocolFactory,
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index a58be2c..237234e 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -75,6 +75,47 @@
 } readState;
  
 /**
+ * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
+ * to be written to disk.  Should be used in the following way:
+ *  1) Buffer created
+ *  2) Buffer written to (addEvent)
+ *  3) Buffer read from (getNext)
+ *  4) Buffer reset (reset)
+ *  5) Go back to 2, or destroy buffer
+ *
+ * The buffer should never be written to after it is read from, unless it is reset first.
+ * Note: The above rules are enforced mainly for debugging its sole client TFileTransport 
+ *       which uses the buffer in this way.
+ * 
+ * @author James Wang <jwang@facebook.com>
+ */
+class TFileTransportBuffer {
+  public:
+    TFileTransportBuffer(uint32_t size);
+    ~TFileTransportBuffer();
+
+    bool addEvent(eventInfo *event);
+    eventInfo* getNext();
+    void reset();
+    bool isFull();
+    bool isEmpty();
+    
+  private:
+    TFileTransportBuffer(); // should not be used
+
+    enum mode {
+      WRITE,
+      READ
+    };
+    mode bufferMode_;
+
+    uint32_t writePoint_;
+    uint32_t readPoint_;
+    uint32_t size_;
+    eventInfo** buffer_;
+};
+
+/**
  * File implementation of a transport. Reads and writes are done to a 
  * file on disk.
  *
@@ -138,14 +179,13 @@
   }
 
   void setEventBufferSize(uint32_t bufferSize) {    
-    if (bufferSize) {
-      if (buffer_) {
-        delete[] buffer_;
-      }
-      eventBufferSize_ = bufferSize;
-      buffer_ = new eventInfo*[eventBufferSize_];
+    if (bufferAndThreadInitialized_) {
+      perror("Cannot change the buffer size after writer thread started");
+      return;
     }
+    eventBufferSize_ = bufferSize;
   }
+
   uint32_t getEventBufferSize() {
     return eventBufferSize_;
   }
@@ -194,8 +234,8 @@
  private:
   // helper functions for writing to a file
   void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
-  void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush);
-  eventInfo* dequeueEvent(long long deadline);
+  bool swapEventBuffers(long long deadline);
+  bool initBufferAndWriteThread();
 
   // control for writer thread
   static void* startWriterThread(void* ptr) {
@@ -218,7 +258,6 @@
   // Class variables
   readState readState_;
   uint8_t* readBuff_;
-
   eventInfo* currentEvent_;
 
   uint32_t readBuffSize_;
@@ -231,17 +270,13 @@
   uint32_t chunkSize_;
   static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
 
-  // size of string buffer
+  // size of event buffers
   uint32_t eventBufferSize_;
-  static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024;
+  static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
 
-  // circular buffer to hold data in before it is flushed. This is an array of strings. Each
-  // element of the array stores a msg that needs to be written to the file
-  eventInfo** buffer_;
-  
   // max number of microseconds that can pass without flushing
   uint32_t flushMaxUs_;
-  static const uint32_t DEFAULT_FLUSH_MAX_US = 20000;
+  static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
 
   // max number of bytes that can be written without flushing
   uint32_t flushMaxBytes_;
@@ -266,33 +301,35 @@
   // writer thread id
   pthread_t writerThreadId_;
 
-  // variables that determine position of head/tail of circular buffer
-  int headPos_, tailPos_;
-
-  // variables indicating whether the buffer is full or empty
-  bool isFull_, isEmpty_;
+  // buffers to hold data before it is flushed. Each element of the buffer stores a msg that 
+  // needs to be written to the file.  The buffers are swapped by the writer thread.
+  TFileTransportBuffer *dequeueBuffer_;
+  TFileTransportBuffer *enqueueBuffer_;
+  
+  // conditions used to block when the buffer is full or empty
   pthread_cond_t notFull_, notEmpty_;
-  bool closing_;
+  volatile bool closing_;
 
   // To keep track of whether the buffer has been flushed
   pthread_cond_t flushed_;
-  bool notFlushed_;
+  volatile bool forceFlush_;
 
-  // Mutex that is grabbed when enqueueing, dequeueing and flushing
-  // from the circular buffer
+  // Mutex that is grabbed when enqueueing and swapping the read/write buffers
   pthread_mutex_t mutex_;
 
   // File information
   string filename_;
   int fd_;
 
+  // Whether the writer thread and buffers have been initialized
+  bool bufferAndThreadInitialized_;
+
   // Offset within the file
   off_t offset_;
 
   // event corruption information
-  uint32_t lastBadChunk;
-  uint32_t numCorruptedEventsinChunk;
-  
+  uint32_t lastBadChunk_;
+  uint32_t numCorruptedEventsInChunk_;
 };
 
 // Exception thrown when EOF is hit