THRIFT-928. cpp: Include request/response size in processor callbacks
Required updating transport interface.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005129 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc
index c2f9e2e..e372b97 100644
--- a/compiler/cpp/src/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/generate/t_cpp_generator.cc
@@ -2351,9 +2351,9 @@
indent() << argsname << " args;" << endl <<
indent() << "args.read(iprot);" << endl <<
indent() << "iprot->readMessageEnd();" << endl <<
- indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
+ indent() << "uint32_t bytes = iprot->getTransport()->readEnd();" << endl << endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
indent() << "}" << endl <<
endl;
@@ -2460,10 +2460,10 @@
indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
indent() << "result.write(oprot);" << endl <<
indent() << "oprot->writeMessageEnd();" << endl <<
- indent() << "oprot->getTransport()->writeEnd();" << endl <<
+ indent() << "bytes = oprot->getTransport()->writeEnd();" << endl <<
indent() << "oprot->getTransport()->flush();" << endl << endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
indent() << "}" << endl;
// Close function
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index 896f5ae..7858166 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -59,7 +59,7 @@
/**
* Called between reading arguments and calling the handler.
*/
- virtual void postRead(void* ctx, const char* fn_name) {}
+ virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {}
/**
* Called between calling the handler and writing the response.
@@ -69,7 +69,7 @@
/**
* Called after writing the response.
*/
- virtual void postWrite(void* ctx, const char* fn_name) {}
+ virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {}
/**
* Called when an async function call completes successfully.
diff --git a/lib/cpp/src/transport/TBufferTransports.cpp b/lib/cpp/src/transport/TBufferTransports.cpp
index 6097130..c76f661 100644
--- a/lib/cpp/src/transport/TBufferTransports.cpp
+++ b/lib/cpp/src/transport/TBufferTransports.cpp
@@ -262,6 +262,10 @@
transport_->flush();
}
+uint32_t TFramedTransport::writeEnd() {
+ return wBase_ - wBuf_.get();
+}
+
const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
// Don't try to be clever with shifting buffers.
// If the fast path failed let the protocol use its slow path.
@@ -269,6 +273,10 @@
return NULL;
}
+uint32_t TFramedTransport::readEnd() {
+ // include framing bytes
+ return rBound_ - rBuf_.get() + sizeof(uint32_t);
+}
void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
// Correct rBound_ so we can use the fast path in the future.
diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h
index b542fd5..f81a6a0 100644
--- a/lib/cpp/src/transport/TBufferTransports.h
+++ b/lib/cpp/src/transport/TBufferTransports.h
@@ -348,6 +348,10 @@
virtual void flush();
+ uint32_t readEnd();
+
+ uint32_t writeEnd();
+
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
boost::shared_ptr<TTransport> getUnderlyingTransport() {
@@ -612,10 +616,18 @@
uint32_t readAppendToString(std::string& str, uint32_t len);
- void readEnd() {
+ // return number of bytes read
+ uint32_t readEnd() {
+ uint32_t bytes = rBase_ - buffer_;
if (rBase_ == wBase_) {
resetBuffer();
}
+ return bytes;
+ }
+
+ // Return number of bytes written
+ uint32_t writeEnd() {
+ return wBase_ - buffer_;
}
uint32_t available_read() const {
diff --git a/lib/cpp/src/transport/THttpTransport.cpp b/lib/cpp/src/transport/THttpTransport.cpp
index 4010d6b..0934f1b 100644
--- a/lib/cpp/src/transport/THttpTransport.cpp
+++ b/lib/cpp/src/transport/THttpTransport.cpp
@@ -66,13 +66,14 @@
return readBuffer_.read(buf, len);
}
-void THttpTransport::readEnd() {
+uint32_t THttpTransport::readEnd() {
// Read any pending chunked data (footers etc.)
if (chunked_) {
while (!chunkedDone_) {
readChunked();
}
}
+ return 0;
}
uint32_t THttpTransport::readMoreData() {
diff --git a/lib/cpp/src/transport/THttpTransport.h b/lib/cpp/src/transport/THttpTransport.h
index e71dcbd..cd58bcb 100644
--- a/lib/cpp/src/transport/THttpTransport.h
+++ b/lib/cpp/src/transport/THttpTransport.h
@@ -55,7 +55,7 @@
uint32_t read(uint8_t* buf, uint32_t len);
- void readEnd();
+ uint32_t readEnd();
void write(const uint8_t* buf, uint32_t len);
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
index f9e20ce..b9c35f0 100644
--- a/lib/cpp/src/transport/TTransport.h
+++ b/lib/cpp/src/transport/TTransport.h
@@ -116,10 +116,11 @@
* This can be over-ridden to perform a transport-specific action
* e.g. logging the request to a file
*
+ * @return number of bytes read if available, 0 otherwise.
*/
- virtual void readEnd() {
+ virtual uint32_t readEnd() {
// default behaviour is to do nothing
- return;
+ return 0;
}
/**
@@ -137,10 +138,11 @@
* This can be over-ridden to perform a transport-specific action
* at the end of a request.
*
+ * @return number of bytes written if available, 0 otherwise
*/
- virtual void writeEnd() {
+ virtual uint32_t writeEnd() {
// default behaviour is to do nothing
- return;
+ return 0;
}
/**
@@ -149,7 +151,9 @@
*
* @throws TTransportException if an error occurs
*/
- virtual void flush() {}
+ virtual void flush() {
+ // default behaviour is to do nothing
+ }
/**
* Attempts to return a pointer to \c len bytes, possibly copied into \c buf.
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
index a840fa6..72289bc 100644
--- a/lib/cpp/src/transport/TTransportUtils.cpp
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -135,16 +135,16 @@
return have;
}
-void TPipedFileReaderTransport::readEnd() {
- TPipedTransport::readEnd();
+uint32_t TPipedFileReaderTransport::readEnd() {
+ return TPipedTransport::readEnd();
}
void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) {
TPipedTransport::write(buf, len);
}
-void TPipedFileReaderTransport::writeEnd() {
- TPipedTransport::writeEnd();
+uint32_t TPipedFileReaderTransport::writeEnd() {
+ return TPipedTransport::writeEnd();
}
void TPipedFileReaderTransport::flush() {
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index d65c916..8b0c076 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -136,7 +136,7 @@
uint32_t read(uint8_t* buf, uint32_t len);
- void readEnd() {
+ uint32_t readEnd() {
if (pipeOnRead_) {
dstTrans_->write(rBuf_, rPos_);
@@ -148,18 +148,22 @@
// If requests are being pipelined, copy down our read-ahead data,
// then reset our state.
int read_ahead = rLen_ - rPos_;
+ uint32_t bytes = rPos_;
memcpy(rBuf_, rBuf_ + rPos_, read_ahead);
rPos_ = 0;
rLen_ = read_ahead;
+
+ return bytes;
}
void write(const uint8_t* buf, uint32_t len);
- void writeEnd() {
+ uint32_t writeEnd() {
if (pipeOnWrite_) {
dstTrans_->write(wBuf_, wLen_);
dstTrans_->flush();
}
+ return wLen_;
}
void flush();
@@ -237,9 +241,9 @@
void close();
uint32_t read(uint8_t* buf, uint32_t len);
uint32_t readAll(uint8_t* buf, uint32_t len);
- void readEnd();
+ uint32_t readEnd();
void write(const uint8_t* buf, uint32_t len);
- void writeEnd();
+ uint32_t writeEnd();
void flush();
// TFileReaderTransport functions
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index 18bdc54..369237e 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -298,13 +298,13 @@
virtual void preRead(void* ctx, const char* fn_name) {
communicate("preRead", ctx, fn_name);
}
- virtual void postRead(void* ctx, const char* fn_name) {
+ virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {
communicate("postRead", ctx, fn_name);
}
virtual void preWrite(void* ctx, const char* fn_name) {
communicate("preWrite", ctx, fn_name);
}
- virtual void postWrite(void* ctx, const char* fn_name) {
+ virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
communicate("postWrite", ctx, fn_name);
}
virtual void asyncComplete(void* ctx, const char* fn_name) {