Thrift HTTP client in C++

Reviewed By: aditya, dweatherford

Test Plan: SMC client


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665022 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 92ccdf1..b205e78 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -13,6 +13,7 @@
                     src/concurrency/TimerManager.cpp \
 	            src/protocol/TBinaryProtocol.cpp \
                     src/transport/TFileTransport.cpp \
+                    src/transport/THttpClient.cpp \
                     src/transport/TSocket.cpp \
                     src/transport/TServerSocket.cpp \
                     src/transport/TTransportUtils.cpp \
@@ -60,6 +61,7 @@
                          src/transport/TFileTransport.h \
                          src/transport/TServerSocket.h \
                          src/transport/TServerTransport.h \
+                         src/transport/THttpClient.h \
                          src/transport/TSocket.h \
                          src/transport/TTransport.h \
                          src/transport/TTransportException.h \
diff --git a/lib/cpp/src/transport/THttpClient.cpp b/lib/cpp/src/transport/THttpClient.cpp
new file mode 100644
index 0000000..7ee83fc
--- /dev/null
+++ b/lib/cpp/src/transport/THttpClient.cpp
@@ -0,0 +1,318 @@
+#include "THttpClient.h"
+#include "TSocket.h"
+
+namespace facebook { namespace thrift { namespace transport { 
+
+using namespace std;
+
+/**
+ * Http client implementation.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+
+// Yeah, yeah, hacky to put these here, I know.
+static const char* CRLF = "\r\n";
+static const int CRLF_LEN = 2;
+
+  THttpClient::THttpClient(boost::shared_ptr<TTransport> transport, string host, string path) :
+  transport_(transport),
+  host_(host),
+  path_(path),
+  readHeaders_(true),
+  chunked_(false),
+  chunkSize_(0),
+  contentLength_(0),
+  httpBuf_(NULL),
+  httpBufPos_(0),
+  httpBufSize_(1024) {
+  init();
+}
+
+THttpClient::THttpClient(string host, int port, string path) :
+  host_(host),
+  path_(path),
+  readHeaders_(true),
+  chunked_(false),
+  chunkSize_(0),
+  contentLength_(0),
+  httpBuf_(NULL),
+  httpBufPos_(0),
+  httpBufSize_(1024) {
+  transport_ = boost::shared_ptr<TTransport>(new TSocket(host, port));
+  init();
+}
+
+void THttpClient::init() {
+  httpBuf_ = (char*)malloc(httpBufSize_+1);
+  if (httpBuf_ == NULL) {
+    throw TTransportException("Out of memory.");
+  }
+}
+
+THttpClient::~THttpClient() {
+  if (httpBuf_ != NULL) {
+    free(httpBuf_);
+  }
+}
+
+uint32_t THttpClient::read(uint8_t* buf, uint32_t len) {
+  if (readBuffer_.available() == 0) {
+    readBuffer_.resetBuffer();
+    uint32_t got = readMoreData();
+    if (got == 0) {
+      return 0;
+    }
+  }
+  return readBuffer_.read(buf, len);
+}
+
+uint32_t THttpClient::readMoreData() {
+  // Get more data!
+  refill();
+
+  if (readHeaders_) {
+    readHeaders();
+  }
+
+  if (chunked_) {
+    return readChunked();
+  } else {
+    char* read;
+    read = readContent((char*)httpBuf_, contentLength_);
+    shift(read);
+    return contentLength_;
+  }
+}
+
+uint32_t THttpClient::readChunked() {
+  uint32_t length = 0;
+  char* nextLine = (char*)httpBuf_;
+  while (true) {
+    char* line = nextLine;
+    nextLine = readLine(nextLine);
+    uint32_t chunkSize = parseChunkSize(line);
+    if (chunkSize == 0) {
+      break;
+    }
+    // Read data content
+    nextLine = readContent(nextLine, chunkSize);
+    length += chunkSize;
+
+    // Read trailing CRLF after content
+    nextLine = readLine(nextLine);
+  }
+
+  // Read footer lines until a blank one appears
+  while (true) {
+    char* line = nextLine;
+    nextLine = readLine(nextLine);
+    if (strlen(line) == 0) {
+      break;
+    }
+  }
+
+  // Shift down whatever we have left in the buf
+  shift(nextLine);
+
+  return length;
+}
+
+uint32_t THttpClient::parseChunkSize(char* line) {
+  char* semi = strchr(line, ';');
+  if (semi != NULL) {
+    *semi = '\0';
+  }
+  int s;
+  int size;
+  s = sscanf(line, "%x", &size);
+  return (uint32_t)size;
+}
+
+char* THttpClient::readContent(char* pos, uint32_t size) {
+  uint32_t need = size;
+
+  while (need > 0) {
+    uint32_t avail = httpBufPos_ - (pos - httpBuf_);
+    if (avail == 0) {
+      refill();
+    }
+    uint32_t give = avail;
+    if (need < give) {
+      give = need;
+    }
+    readBuffer_.write((uint8_t*)pos, give);
+    pos += give;
+    need -= give;
+  }
+  return pos;
+}
+
+char* THttpClient::readLine(char* pos) {
+  while (true) {
+    char* eol = NULL;
+
+    // Note, the data we read could have ended right on the CRLF pair
+    if (pos != NULL) {
+      eol = strstr(pos, CRLF);
+    }   
+
+    // No CRLF yet?
+    if (eol == NULL) {
+      // Shift whatever we have now to front
+      pos = shift(pos);
+      // Refill the buffer
+      refill();
+    } else {
+      // Return pointer to next line
+      *eol = '\0';
+      return eol + CRLF_LEN;
+    }
+  }
+
+}
+
+char* THttpClient::shift(char* pos) {
+  if (pos != NULL && httpBufPos_ > (pos - httpBuf_)) {
+    // Shift down remaining data and read more
+    uint32_t length = httpBufPos_ - (pos - httpBuf_);
+    memmove(httpBuf_, pos, length);
+    httpBufPos_ = length;       
+  } else {
+    httpBufPos_ = 0;
+  }
+  httpBuf_[httpBufPos_] = '\0';
+  return httpBuf_;
+}
+
+void THttpClient::refill() {
+  uint32_t avail = httpBufSize_ - httpBufPos_;
+  if (avail <= (httpBufSize_ / 4)) {
+    httpBufSize_ *= 2;
+    httpBuf_ = (char*)realloc(httpBuf_, httpBufSize_);
+    if (httpBuf_ == NULL) {
+      throw TTransportException("Out of memory.");
+    }
+  }
+      
+  // Read more data
+  uint32_t got = transport_->read((uint8_t*)(httpBuf_+httpBufPos_), httpBufSize_-httpBufPos_);
+  httpBufPos_ += got;
+  httpBuf_[httpBufPos_] = '\0';
+      
+  if (got == 0) {
+    throw TTransportException("Could not finish reading HTTP headers");
+  }
+}
+
+void THttpClient::readHeaders() {
+  // Initialize headers state variables
+  contentLength_ = 0;
+  chunked_ = false;
+  chunkSize_ = 0;
+
+  // Control state flow
+  bool statusLine = true;
+  bool finished = false;
+
+  // Initialize local pos vars
+  char* nextLine = (char*)httpBuf_;
+
+  // Loop until headers are finished
+  while (true) {
+    char* line = nextLine;
+    nextLine = readLine(nextLine);
+
+    if (strlen(line) == 0) {
+      if (finished) {
+        readHeaders_ = false;
+        shift(nextLine);
+        return;
+      } else {
+        // Must have been an HTTP 100, keep going for another status line
+        statusLine = true;
+      }
+    } else {
+      if (statusLine) {
+        statusLine = false;
+        finished = parseStatusLine(line);
+      } else {
+        parseHeader(line);
+      }
+    }
+  }  
+}
+
+bool THttpClient::parseStatusLine(char* status) {
+  char* http = status;
+
+  char* code = strchr(http, ' ');
+  *code = '\0';
+
+  while (*(code++) == ' ');
+
+  char* msg = strchr(code, ' ');
+  *msg = '\0';
+
+  if (strcmp(code, "200") == 0) {
+    // HTTP 200 = OK, we got the response
+    return true;
+  } else if (strcmp(code, "100") == 0) {
+    // HTTP 100 = continue, just keep reading
+    return false;
+  } else {
+    throw TTransportException(status);
+  }
+}
+
+void THttpClient::parseHeader(char* header) {
+  char* colon = strchr(header, ':');
+  if (colon == NULL) {
+    return;
+  }
+  uint32_t sz = colon - header;
+  char* value = colon+1;
+
+  if (strncmp(header, "Transfer-Encoding", sz) == 0) {
+    if (strstr(value, "chunked") != NULL) {
+      chunked_ = true;
+    }
+  } else if (strncmp(header, "Content-Length", sz) == 0) {
+    chunked_ = false;
+    contentLength_ = atoi(value);
+  }
+}
+
+void THttpClient::write(const uint8_t* buf, uint32_t len) {
+  writeBuffer_.write(buf, len);
+}
+
+void THttpClient::flush() {
+  // Fetch the contents of the write buffer
+  uint8_t* buf;
+  uint32_t len;
+  writeBuffer_.getBuffer(&buf, &len);
+
+  // Construct the HTTP header
+  std::ostringstream h;
+  h <<
+    "POST " << path_ << " HTTP/1.1" << CRLF <<
+    "Host: " << host_ << CRLF <<
+    "Content-Type: application/x-thrift" << CRLF <<
+    "Content-Length: " << len << CRLF <<
+    "Accept: application/x-thrift" << CRLF <<
+    "User-Agent: C++/THttpClient" << CRLF <<
+    CRLF;
+  string header = h.str();
+
+  // Write the header, then the data, then flush
+  transport_->write((const uint8_t*)header.c_str(), header.size());
+  transport_->write(buf, len);
+  transport_->flush();
+
+  // Reset the buffer and header variables
+  writeBuffer_.resetBuffer();
+  readHeaders_ = true;
+}
+
+}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/THttpClient.h b/lib/cpp/src/transport/THttpClient.h
new file mode 100644
index 0000000..e7f88c9
--- /dev/null
+++ b/lib/cpp/src/transport/THttpClient.h
@@ -0,0 +1,88 @@
+#ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
+#define _THRIFT_TRANSPORT_THTTPCLIENT_H_ 1
+
+#include <transport/TTransportUtils.h>
+
+namespace facebook { namespace thrift { namespace transport { 
+
+/**
+ * HTTP client implementation of the thrift transport. This was irritating
+ * to write, but the alternatives in C++ land are daunting. Linking CURL
+ * requires 23 dynamic libraries last time I checked (WTF?!?). All we have
+ * here is a VERY basic HTTP/1.1 client which supports HTTP 100 Continue,
+ * chunked transfer encoding, keepalive, etc. Tested against Apache.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class THttpClient : public TTransport {
+ public:
+  THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path="");
+
+  THttpClient(std::string host, int port, std::string path="");
+
+  virtual ~THttpClient();
+
+  void open() {
+    transport_->open();
+  }
+
+  bool isOpen() {
+    return transport_->isOpen();
+  }
+  
+  bool peek() {    
+    return transport_->peek();
+  }
+
+  void close() {
+    transport_->close();
+  }
+
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void write(const uint8_t* buf, uint32_t len);
+  
+  void flush();
+
+ private:
+  void init();
+
+ protected:
+
+  boost::shared_ptr<TTransport> transport_;
+
+  TMemoryBuffer writeBuffer_;
+  TMemoryBuffer readBuffer_;
+
+  std::string host_;
+  std::string path_;
+
+  bool readHeaders_;
+  bool chunked_;
+  uint32_t chunkSize_;
+  uint32_t contentLength_;
+
+  char* httpBuf_;
+  uint32_t httpBufPos_;
+  uint32_t httpBufSize_;
+
+  uint32_t readMoreData();
+  char* readLine(char* line);
+
+  void readHeaders();
+  void parseHeader(char* header);
+  bool parseStatusLine(char* status);
+
+  uint32_t readChunked();
+  uint32_t parseChunkSize(char* line);
+
+  char* readContent(char* pos, uint32_t size);
+
+  void refill();
+  char* shift(char* pos);
+
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index 3decb2c..9fbdb20 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -40,11 +40,10 @@
    */
   TSocket(std::string host, int port);
 
-
   /**
    * Destroyes the socket object, closing it if necessary.
    */
-  ~TSocket();
+  virtual ~TSocket();
 
   /**
    * Whether the socket is alive.
@@ -125,8 +124,7 @@
    */
   void setSendTimeout(int ms);
 
-
- private:
+ protected:
   /**
    * Constructor to create socket from raw UNIX handle. Never called directly
    * but used by the TServerSocket class.
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
index 54b1f3f..adcb013 100644
--- a/lib/cpp/src/transport/TTransportUtils.cpp
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -180,6 +180,9 @@
 uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) {
   // Check avaible data for reading
   uint32_t avail = wPos_ - rPos_;
+  if (avail == 0) {
+    return 0;
+  }
 
   // Device how much to give
   uint32_t give = len;
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
index 06547c7..545a0a1 100644
--- a/lib/cpp/src/transport/TTransportUtils.h
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -312,6 +312,10 @@
 
   void write(const uint8_t* buf, uint32_t len);
 
+  uint32_t available() {
+    return wPos_ - rPos_;
+  }
+
  private:
   // Data buffer
   uint8_t* buffer_;