Thrift HTTP client optimizations

Summary: When reading chunked encoding don't always get all the parts right away, return one part at a time so that reading code can deserialize the data on the fly.

Reviewed By: http


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665025 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/THttpClient.cpp b/lib/cpp/src/transport/THttpClient.cpp
index b389694..c095d2a 100644
--- a/lib/cpp/src/transport/THttpClient.cpp
+++ b/lib/cpp/src/transport/THttpClient.cpp
@@ -21,10 +21,12 @@
   path_(path),
   readHeaders_(true),
   chunked_(false),
+  chunkedDone_(false),
   chunkSize_(0),
   contentLength_(0),
   httpBuf_(NULL),
-  httpBufPos_(0),
+  httpPos_(0),
+  httpBufLen_(0),
   httpBufSize_(1024) {
   init();
 }
@@ -34,10 +36,12 @@
   path_(path),
   readHeaders_(true),
   chunked_(false),
+  chunkedDone_(false),
   chunkSize_(0),
   contentLength_(0),
   httpBuf_(NULL),
-  httpBufPos_(0),
+  httpPos_(0),
+  httpBufLen_(0),
   httpBufSize_(1024) {
   transport_ = boost::shared_ptr<TTransport>(new TSocket(host, port));
   init();
@@ -48,7 +52,7 @@
   if (httpBuf_ == NULL) {
     throw TTransportException("Out of memory.");
   }
-  httpBuf_[httpBufPos_] = '\0';
+  httpBuf_[httpBufLen_] = '\0';
 }
 
 THttpClient::~THttpClient() {
@@ -68,6 +72,15 @@
   return readBuffer_.read(buf, len);
 }
 
+void THttpClient::readEnd() {
+  // Read any pending chunked data (footers etc.)
+  if (chunked_) {
+    while (!chunkedDone_) {
+      readChunked();
+    }
+  }
+}
+
 uint32_t THttpClient::readMoreData() {
   // Get more data!
   refill();
@@ -79,42 +92,35 @@
   if (chunked_) {
     return readChunked();
   } else {
-    char* read;
-    read = readContent(httpBuf_, contentLength_);
-    shift(read);
-    return contentLength_;
+    return readContent(contentLength_);
   }
 }
 
 uint32_t THttpClient::readChunked() {
   uint32_t length = 0;
-  char* nextLine = httpBuf_;
-  while (true) {
-    char* line = readLine(nextLine, &nextLine);
-    uint32_t chunkSize = parseChunkSize(line);
-    if (chunkSize == 0) {
-      break;
-    }
+
+  char* line = readLine();
+  uint32_t chunkSize = parseChunkSize(line);
+  if (chunkSize == 0) {
+    readChunkedFooters();
+  } else {
     // Read data content
-    nextLine = readContent(nextLine, chunkSize);
-    length += chunkSize;
-
+    length += readContent(chunkSize);
     // Read trailing CRLF after content
-    readLine(nextLine, &nextLine);
+    readLine();
   }
+  return length;
+}
 
-  // Read footer lines until a blank one appears
+void THttpClient::readChunkedFooters() {
+  // End of data, read footer lines until a blank one appears
   while (true) {
-    char* line = readLine(nextLine, &nextLine);
+    char* line = readLine();
     if (strlen(line) == 0) {
+      chunkedDone_ = true;
       break;
     }
   }
-
-  // Shift down whatever we have left in the buf
-  shift(nextLine);
-
-  return length;
 }
 
 uint32_t THttpClient::parseChunkSize(char* line) {
@@ -127,69 +133,67 @@
   return (uint32_t)size;
 }
 
-char* THttpClient::readContent(char* pos, uint32_t size) {
+uint32_t THttpClient::readContent(uint32_t size) {
   uint32_t need = size;
-
   while (need > 0) {
-    uint32_t avail = httpBufPos_ - (pos - httpBuf_);
+    uint32_t avail = httpBufLen_ - httpPos_;
     if (avail == 0) {
       // We have given all the data, reset position to head of the buffer
-      pos = shift(pos);
-      pos = refill();
+      httpPos_ = 0;
+      httpBufLen_ = 0;
+      refill();
       
       // Now have available however much we read
-      avail = httpBufPos_;
+      avail = httpBufLen_;
     }
     uint32_t give = avail;
     if (need < give) {
       give = need;
     }
-    readBuffer_.write((uint8_t*)pos, give);
-    pos += give;
+    readBuffer_.write((uint8_t*)(httpBuf_+httpPos_), give);
+    httpPos_ += give;
     need -= give;
   }
-  return pos;
+  return size;
 }
   
-char* THttpClient::readLine(char* pos, char** next) {
+char* THttpClient::readLine() {
   while (true) {
     char* eol = NULL;
 
-    // Note, the data we read could have ended right on the CRLF pair
-    if (pos != NULL) {
-      eol = strstr(pos, CRLF);
-    }   
+    eol = strstr(httpBuf_+httpPos_, CRLF);
 
     // No CRLF yet?
     if (eol == NULL) {
       // Shift whatever we have now to front and refill
-      pos = shift(pos);
-      pos = refill();
+      shift();
+      refill();
     } else {
       // Return pointer to next line
       *eol = '\0';
-      *next = eol + CRLF_LEN;
-      return pos;
+      char* line = httpBuf_+httpPos_;
+      httpPos_ = (eol-httpBuf_) + CRLF_LEN;
+      return line;
     }
   }
 
 }
 
-char* THttpClient::shift(char* pos) {
-  if (pos != NULL && httpBufPos_ > (pos - httpBuf_)) {
+void THttpClient::shift() {
+  if (httpBufLen_ > httpPos_) {
     // Shift down remaining data and read more
-    uint32_t length = httpBufPos_ - (pos - httpBuf_);
-    memmove(httpBuf_, pos, length);
-    httpBufPos_ = length;       
+    uint32_t length = httpBufLen_ - httpPos_;
+    memmove(httpBuf_, httpBuf_+httpPos_, length);
+    httpBufLen_ = length;
   } else {
-    httpBufPos_ = 0;
+    httpBufLen_ = 0;
   }
-  httpBuf_[httpBufPos_] = '\0';
-  return httpBuf_;
+  httpPos_ = 0;
+  httpBuf_[httpBufLen_] = '\0';
 }
 
-char* THttpClient::refill() {
-  uint32_t avail = httpBufSize_ - httpBufPos_;
+void THttpClient::refill() {
+  uint32_t avail = httpBufSize_ - httpBufLen_;
   if (avail <= (httpBufSize_ / 4)) {
     httpBufSize_ *= 2;
     httpBuf_ = (char*)realloc(httpBuf_, httpBufSize_+1);
@@ -199,38 +203,33 @@
   }
       
   // Read more data
-  uint32_t got = transport_->read((uint8_t*)(httpBuf_+httpBufPos_), httpBufSize_-httpBufPos_);
-  httpBufPos_ += got;
-  httpBuf_[httpBufPos_] = '\0';
+  uint32_t got = transport_->read((uint8_t*)(httpBuf_+httpBufLen_), httpBufSize_-httpBufLen_);
+  httpBufLen_ += got;
+  httpBuf_[httpBufLen_] = '\0';
  
   if (got == 0) {
     throw TTransportException("Could not refill buffer");
   }
-
-  return httpBuf_;
 }
 
 void THttpClient::readHeaders() {
   // Initialize headers state variables
   contentLength_ = 0;
   chunked_ = false;
+  chunkedDone_ = false;
   chunkSize_ = 0;
 
   // Control state flow
   bool statusLine = true;
   bool finished = false;
 
-  // Initialize local pos vars
-  char* nextLine = (char*)httpBuf_;
-
   // Loop until headers are finished
   while (true) {
-    char* line = readLine(nextLine, &nextLine);
+    char* line = readLine();
 
     if (strlen(line) == 0) {
       if (finished) {
         readHeaders_ = false;
-        shift(nextLine);
         return;
       } else {
         // Must have been an HTTP 100, keep going for another status line
@@ -251,11 +250,17 @@
   char* http = status;
 
   char* code = strchr(http, ' ');
+  if (code == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+  
   *code = '\0';
-
   while (*(code++) == ' ');
 
   char* msg = strchr(code, ' ');
+  if (msg == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
   *msg = '\0';
 
   if (strcmp(code, "200") == 0) {
@@ -265,7 +270,7 @@
     // HTTP 100 = continue, just keep reading
     return false;
   } else {
-    throw TTransportException(status);
+    throw TTransportException(string("Bad Status: ") + status);
   }
 }
 
diff --git a/lib/cpp/src/transport/THttpClient.h b/lib/cpp/src/transport/THttpClient.h
index 81fff39..c34494e 100644
--- a/lib/cpp/src/transport/THttpClient.h
+++ b/lib/cpp/src/transport/THttpClient.h
@@ -40,6 +40,8 @@
 
   uint32_t read(uint8_t* buf, uint32_t len);
 
+  void readEnd();
+
   void write(const uint8_t* buf, uint32_t len);
   
   void flush();
@@ -59,27 +61,30 @@
 
   bool readHeaders_;
   bool chunked_;
+  bool chunkedDone_;
   uint32_t chunkSize_;
   uint32_t contentLength_;
 
   char* httpBuf_;
-  uint32_t httpBufPos_;
+  uint32_t httpPos_;
+  uint32_t httpBufLen_;
   uint32_t httpBufSize_;
 
   uint32_t readMoreData();
-  char* readLine(char* line, char** next);
+  char* readLine();
 
   void readHeaders();
   void parseHeader(char* header);
   bool parseStatusLine(char* status);
 
   uint32_t readChunked();
+  void readChunkedFooters();
   uint32_t parseChunkSize(char* line);
 
-  char* readContent(char* pos, uint32_t size);
+  uint32_t readContent(uint32_t size);
 
-  char* refill();
-  char* shift(char* pos);
+  void refill();
+  void shift();
 
 };