-- additions to ThriftLogfile
Summary:
-- fixed peek() in TBufferedRouterTransport.cpp
-- Added processChunk() to ThriftLogfile
Reviewed By: Slee
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664924 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.cpp b/lib/cpp/src/transport/TBufferedRouterTransport.cpp
index 7c09953..ad6a28f 100644
--- a/lib/cpp/src/transport/TBufferedRouterTransport.cpp
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.cpp
@@ -27,14 +27,17 @@
rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
}
+
// Hand over whatever we have
uint32_t give = need;
if (rLen_-rPos_ < give) {
give = rLen_-rPos_;
}
- memcpy(buf, rBuf_+rPos_, give);
- rPos_ += give;
- need -= give;
+ if (give > 0) {
+ memcpy(buf, rBuf_+rPos_, give);
+ rPos_ += give;
+ need -= give;
+ }
return (len - need);
}
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h
index 3a5e394..b2e4d4f 100644
--- a/lib/cpp/src/transport/TBufferedRouterTransport.h
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.h
@@ -50,6 +50,21 @@
return trans_->isOpen();
}
+ bool peek() {
+ if (rPos_ >= rLen_) {
+ // Double the size of the underlying buffer if it is full
+ if (rLen_ == rBufSize_) {
+ rBufSize_ *=2;
+ rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
+ }
+
+ // try to fill up the buffer
+ rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
+ }
+ return (rLen_ > rPos_);
+ }
+
+
void open() {
trans_->open();
}
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index cfdb3b9..05dd25d 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -306,10 +306,10 @@
T_DEBUG("padding is larger than chunk size, skipping event");
continue;
}
- // T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
uint8_t zeros[padding];
bzero(zeros, padding);
- T_DEBUG_L(1, "Adding padding of %u bytes at %lu", padding, offset_);
+ // T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
+ // padding, offset_, chunk2);
if(-1 == ::write(fd_, zeros, padding)) {
perror("TFileTransport: error while padding zeros");
throw TTransportException("TFileTransport: error while padding zeros");
@@ -399,9 +399,12 @@
// read as much of the current event as possible
int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
if (remaining <= (int32_t)len) {
- memcpy(buf,
- currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
- remaining);
+ // copy over anything thats remaining
+ if (remaining > 0) {
+ memcpy(buf,
+ currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
+ remaining);
+ }
delete(currentEvent_);
currentEvent_ = 0;
return remaining;
@@ -426,12 +429,12 @@
// advance the offset pointer
offset_ += readState_.bufferLen_;
readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
- if (readState_.bufferLen_) {
- T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
- }
+ // if (readState_.bufferLen_) {
+ // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
+ // }
readState_.bufferPtr_ = 0;
readState_.lastDispatchPtr_ = 0;
-
+
// read error
if (readState_.bufferLen_ == -1) {
readState_.resetAllValues();
@@ -481,7 +484,7 @@
if (readState_.eventSizeBuffPos_ == 4) {
// 0 length event indicates padding
if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
- T_DEBUG_L(1, "Got padding");
+ // T_DEBUG_L(1, "Got padding");
readState_.resetState(readState_.lastDispatchPtr_);
continue;
}
@@ -493,8 +496,6 @@
readState_.event_ = new eventInfo();
readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
- T_DEBUG_L(0, "Event size: %u", readState_.event_->eventSize_);
-
// TODO
// make sure event is valid, an error is triggered if:
// 1. Event size is larger than user-speficied max-event size
@@ -541,7 +542,7 @@
readState_.resetState(readState_.bufferPtr_);
// exit criteria
- T_DEBUG_L(0, "Finished one event");
+ // T_DEBUG_L(0, "Finished one event");
return true;
}
}
@@ -605,6 +606,10 @@
return (f_info.st_size)/chunkSize_;
}
+uint32_t TFileTransport::getCurChunk() {
+ return offset_/chunkSize_;
+}
+
// Utility Functions
void TFileTransport::openLogFile() {
mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
@@ -620,7 +625,6 @@
// opening the file in append mode causes offset_t to be at the end
offset_ = lseek(fd_, 0, SEEK_CUR);
- T_DEBUG_L(1, "initial offset: %lu", offset_);
}
uint32_t TFileTransport::getCurrentTime() {
@@ -689,4 +693,27 @@
}
+void TFileProcessor::processChunk() {
+ pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
+
+ uint32_t curChunk = inputTransport_->getCurChunk();
+
+ while(1) {
+ // bad form to use exceptions for flow control but there is really
+ // no other way around it
+ try {
+ processor_->process(iop.first, iop.second);
+ if (curChunk != inputTransport_->getCurChunk()) {
+ break;
+ }
+ } catch (TEOFException& teof) {
+ break;
+ } catch (TException te) {
+ cerr << te.what() << endl;
+ break;
+ }
+ }
+}
+
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index 8192332..fb4a1c2 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -63,7 +63,7 @@
readState() {
event_ = 0;
- resetAllValues();
+ resetAllValues();
}
~readState() {
@@ -104,6 +104,7 @@
void seekToChunk(int chunk);
void seekToEnd();
uint32_t getNumChunks();
+ uint32_t getCurChunk();
// for changing the output file
void resetOutputFile(int fd, string filename, long long offset);
@@ -323,6 +324,12 @@
*/
void process(uint32_t numEvents, bool tail);
+ /**
+ * process events until the end of the chunk
+ *
+ */
+ void processChunk();
+
private:
shared_ptr<TProcessor> processor_;
shared_ptr<TProtocolFactory> protocolFactory_;