-- additions to ThriftLogfile

Summary:
-- fixed peek() in TBufferedRouterTransport.cpp
-- Added processChunk() to ThriftLogfile

Reviewed By: Slee


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664924 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.cpp b/lib/cpp/src/transport/TBufferedRouterTransport.cpp
index 7c09953..ad6a28f 100644
--- a/lib/cpp/src/transport/TBufferedRouterTransport.cpp
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.cpp
@@ -27,14 +27,17 @@
     rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
   }
 
+
   // Hand over whatever we have
   uint32_t give = need;
   if (rLen_-rPos_ < give) {
     give = rLen_-rPos_;
   }
-  memcpy(buf, rBuf_+rPos_, give);
-  rPos_ += give;
-  need -= give;
+  if (give > 0) {
+    memcpy(buf, rBuf_+rPos_, give);
+    rPos_ += give;
+    need -= give;
+  }
 
   return (len - need);
 }
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h
index 3a5e394..b2e4d4f 100644
--- a/lib/cpp/src/transport/TBufferedRouterTransport.h
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.h
@@ -50,6 +50,21 @@
     return trans_->isOpen();
   }
   
+  bool peek() {    
+    if (rPos_ >= rLen_) {
+      // Double the size of the underlying buffer if it is full
+      if (rLen_ == rBufSize_) {
+        rBufSize_ *=2;
+        rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+      }
+    
+      // try to fill up the buffer
+      rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+    }
+    return (rLen_ > rPos_);
+  }
+
+
   void open() {
     trans_->open();
   }
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index cfdb3b9..05dd25d 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -306,10 +306,10 @@
           T_DEBUG("padding is larger than chunk size, skipping event");
           continue;
         }
-        //        T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
         uint8_t zeros[padding];
         bzero(zeros, padding);
-        T_DEBUG_L(1, "Adding padding of %u bytes at %lu", padding, offset_);
+        //        T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)", 
+        //        padding, offset_, chunk2);
         if(-1 == ::write(fd_, zeros, padding)) {
           perror("TFileTransport: error while padding zeros");
           throw TTransportException("TFileTransport: error while padding zeros");
@@ -399,9 +399,12 @@
   // read as much of the current event as possible
   int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
   if (remaining <= (int32_t)len) {
-    memcpy(buf, 
-           currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, 
-           remaining);
+    // copy over anything thats remaining
+    if (remaining > 0) {
+      memcpy(buf, 
+             currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, 
+             remaining);
+    }
     delete(currentEvent_);
     currentEvent_ = 0;
     return remaining;
@@ -426,12 +429,12 @@
       // advance the offset pointer
       offset_ += readState_.bufferLen_;
       readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
-      if (readState_.bufferLen_) {
-        T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
-      }
+      //       if (readState_.bufferLen_) {
+      //         T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
+      //       }
       readState_.bufferPtr_ = 0;
       readState_.lastDispatchPtr_ = 0;
-  
+
       // read error
       if (readState_.bufferLen_ == -1) {
         readState_.resetAllValues();
@@ -481,7 +484,7 @@
         if (readState_.eventSizeBuffPos_ == 4) {
           // 0 length event indicates padding
           if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
-            T_DEBUG_L(1, "Got padding");
+            //            T_DEBUG_L(1, "Got padding");
             readState_.resetState(readState_.lastDispatchPtr_);
             continue;
           }
@@ -493,8 +496,6 @@
           readState_.event_ = new eventInfo();
           readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
 
-          T_DEBUG_L(0, "Event size: %u", readState_.event_->eventSize_);
-
           // TODO
           // make sure event is valid, an error is triggered if:
           // 1. Event size is larger than user-speficied max-event size
@@ -541,7 +542,7 @@
           readState_.resetState(readState_.bufferPtr_);
           
           // exit criteria
-          T_DEBUG_L(0, "Finished one event");
+          //          T_DEBUG_L(0, "Finished one event");
           return true;
         }
       }
@@ -605,6 +606,10 @@
   return (f_info.st_size)/chunkSize_;
 }
 
+uint32_t TFileTransport::getCurChunk() {
+  return offset_/chunkSize_;
+}
+
 // Utility Functions
 void TFileTransport::openLogFile() {
   mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
@@ -620,7 +625,6 @@
 
   // opening the file in append mode causes offset_t to be at the end
   offset_ = lseek(fd_, 0, SEEK_CUR);
-  T_DEBUG_L(1, "initial offset: %lu", offset_);
 }
 
 uint32_t TFileTransport::getCurrentTime() {
@@ -689,4 +693,27 @@
 
 }
 
+void TFileProcessor::processChunk() {
+  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+  iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
+
+  uint32_t curChunk = inputTransport_->getCurChunk();
+
+  while(1) {
+    // bad form to use exceptions for flow control but there is really
+    // no other way around it
+    try {
+      processor_->process(iop.first, iop.second);
+      if (curChunk != inputTransport_->getCurChunk()) {
+        break;
+      }
+    } catch (TEOFException& teof) {
+      break;
+    } catch (TException te) {
+      cerr << te.what() << endl;
+      break;
+    }
+  }
+}
+
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index 8192332..fb4a1c2 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -63,7 +63,7 @@
 
   readState() {
     event_ = 0;
-    resetAllValues();
+   resetAllValues();
   }
 
   ~readState() {
@@ -104,6 +104,7 @@
   void seekToChunk(int chunk);
   void seekToEnd();
   uint32_t getNumChunks();
+  uint32_t getCurChunk();
 
   // for changing the output file
   void resetOutputFile(int fd, string filename, long long offset);
@@ -323,6 +324,12 @@
    */
   void process(uint32_t numEvents, bool tail);
 
+  /**
+   * process events until the end of the chunk
+   *
+   */
+  void processChunk();
+  
  private:
   shared_ptr<TProcessor> processor_;
   shared_ptr<TProtocolFactory> protocolFactory_;