Update Thrift CPP libraries to work with new generated source, change underlying buffers to use uint8_t* instead of std::string

Summary: Major overhaul to the CPP libraries.

Reviewed By: aditya

Test Plan: Again, keep an eye out for the unit tests commit

Notes: Initial perf tests show that Thrift is not only more robust than Pillar, but its implementation is actually around 10-20% faster. We can do about 10 RPC function calls with small data payloads in under 2ms. THAT IS FAST. THAT IS THRIFTY.




git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664714 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/transport/TBufferedTransport.cc b/lib/cpp/transport/TBufferedTransport.cc
new file mode 100644
index 0000000..3fccc58
--- /dev/null
+++ b/lib/cpp/transport/TBufferedTransport.cc
@@ -0,0 +1,60 @@
+#include "TBufferedTransport.h"
+using std::string;
+
+uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
+  uint32_t need = len;
+
+  // We don't have enough data yet
+  if (rLen_-rPos_ < need) {
+    // Copy out whatever we have
+    if (rLen_ > 0) {
+      memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+      need -= rLen_-rPos_;
+      buf += rLen_-rPos_;
+    }    
+    // Get more from underlying transport up to buffer size
+    rLen_ = transport_->read(rBuf_, rBufSize_);
+    rPos_ = 0;
+  }
+  
+  // Hand over whatever we have
+  uint32_t give = need;
+  if (rLen_-rPos_ < give) {
+    give = rLen_-rPos_;
+  }
+  memcpy(buf, rBuf_+rPos_, give);
+  rPos_ += give;
+  need -= give;
+  return (len - need);
+}
+
+void TBufferedTransport::write(const uint8_t* buf, uint32_t len) {
+  if (len == 0) {
+    return;
+  }
+
+  if (len + wLen_ >= wBufSize_) {
+    uint32_t copy = wBufSize_ - wLen_;
+    memcpy(wBuf_ + wLen_, buf, copy);
+    transport_->write(wBuf_, wBufSize_);
+    
+    wLen_ = len - copy;
+    if (wLen_ > 0) {
+      memcpy(wBuf_, buf+copy, wLen_);
+    }
+  } else {
+    memcpy(wBuf_+wLen_, buf, len);
+    wLen_ += len;
+  }
+}
+
+void TBufferedTransport::flush()  {
+  // Write out any data waiting in the write buffer
+  if (wLen_ > 0) {
+    transport_->write(wBuf_, wLen_);
+    wLen_ = 0;
+  }
+
+  // Flush the underlying transport
+  transport_->flush();
+}
diff --git a/lib/cpp/transport/TBufferedTransport.h b/lib/cpp/transport/TBufferedTransport.h
new file mode 100644
index 0000000..991b50c
--- /dev/null
+++ b/lib/cpp/transport/TBufferedTransport.h
@@ -0,0 +1,75 @@
+#ifndef T_BUFFERED_TRANSPORT_H
+#define T_BUFFERED_TRANSPORT_H
+
+#include "transport/TTransport.h"
+#include <string>
+
+/**
+ * Buffered transport. For reads it will read more data than is requested
+ * and will serve future data out of a local buffer. For writes, data is
+ * stored to an in memory buffer before being written out.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBufferedTransport : public TTransport {
+ public:
+  TBufferedTransport(TTransport* transport) :
+    transport_(transport),
+    rBufSize_(512), rPos_(0), rLen_(0),
+    wBufSize_(512), wLen_(0) {
+    rBuf_ = new uint8_t[rBufSize_];
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  TBufferedTransport(TTransport* transport, uint32_t sz) :
+    transport_(transport),
+    rBufSize_(sz), rPos_(0), rLen_(0),
+    wBufSize_(sz), wLen_(0) {
+    rBuf_ = new uint8_t[rBufSize_];
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  TBufferedTransport(TTransport* transport, uint32_t rsz, uint32_t wsz) :
+    transport_(transport),
+    rBufSize_(rsz), rPos_(0), rLen_(0),
+    wBufSize_(wsz), wLen_(0) {
+    rBuf_ = new uint8_t[rBufSize_];
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  ~TBufferedTransport() {
+    delete [] rBuf_;
+    delete [] wBuf_;
+  }
+
+  bool isOpen() {
+    return transport_->isOpen();
+  }
+  
+  void open() {
+    transport_->open();
+  }
+
+  void close() {
+    transport_->close();
+  }
+  
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  void flush();
+
+ protected:
+  TTransport* transport_;
+  uint8_t* rBuf_;
+  uint32_t rBufSize_;
+  uint32_t rPos_;
+  uint32_t rLen_;
+
+  uint8_t* wBuf_;
+  uint32_t wBufSize_;
+  uint32_t wLen_;
+};
+
+#endif
diff --git a/lib/cpp/transport/TNullTransport.h b/lib/cpp/transport/TNullTransport.h
new file mode 100644
index 0000000..9562d9f
--- /dev/null
+++ b/lib/cpp/transport/TNullTransport.h
@@ -0,0 +1,24 @@
+#ifndef T_NULL_TRANSPORT
+#define T_NULL_TRANSPORT
+
+#include "transport/TTransport.h"
+
+/**
+ * The null transport is a dummy transport that doesn't actually do anything.
+ * It's sort of an analogy to /dev/null, you can never read anything from it
+ * and it will let you write anything you want to it, though it won't actually
+ * go anywhere.
+ * 
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TNullTransport : public TTransport {
+ public:
+  TNullTransport() {}
+  ~TNullTransport() {}
+
+  bool isOpen() { return true; }
+  void open() { }
+  void write(const std::string& s) {}
+};
+
+#endif
diff --git a/lib/cpp/transport/TServerSocket.cc b/lib/cpp/transport/TServerSocket.cc
index 178de81..1cf4a32 100644
--- a/lib/cpp/transport/TServerSocket.cc
+++ b/lib/cpp/transport/TServerSocket.cc
@@ -1,5 +1,6 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <errno.h>
 
 #include "transport/TSocket.h"
 #include "transport/TServerSocket.h"
@@ -11,11 +12,12 @@
   close();
 }
 
-bool TServerSocket::listen() {
+void TServerSocket::listen() {
   serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
   if (serverSocket_ == -1) {
+    perror("TServerSocket::listen() socket");
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "Could not create server socket.");
   }
 
   // Set reusaddress to prevent 2MSL delay on accept
@@ -24,16 +26,16 @@
                        &one, sizeof(one))) {
     perror("TServerSocket::listen() SO_REUSEADDR");
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "Could not set SO_REUSEADDR");
   }
 
   // Turn linger off, don't want to block on calls to close
   struct linger ling = {0, 0};
   if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
                        &ling, sizeof(ling))) {
-    perror("TServerSocket::listen() SO_LINGER");
     close();
-    return false;
+    perror("TServerSocket::listen() SO_LINGER");
+    throw TTransportException(TTX_NOT_OPEN, "Could not set SO_LINGER");
   }
 
   // Bind to a port
@@ -47,24 +49,22 @@
     sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
     perror(errbuf);
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "Could not bind");
   }
 
   // Call listen
   if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
     perror("TServerSocket::listen() LISTEN");
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "Could not listen");
   }
 
   // The socket is now listening!
-  return true;
 }
 
-TTransport* TServerSocket::accept() {
+TTransport* TServerSocket::acceptImpl() {
   if (serverSocket_ <= 0) {
-    // TODO(mcslee): Log error with common logging tool
-    return NULL;
+    throw TTransportException(TTX_NOT_OPEN, "TServerSocket not listening");
   }
 
   struct sockaddr_in clientAddress;
@@ -75,7 +75,7 @@
     
   if (clientSocket <= 0) {
     perror("TServerSocket::accept()");
-    return NULL;
+    throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
   }
 
   return new TSocket(clientSocket);
diff --git a/lib/cpp/transport/TServerSocket.h b/lib/cpp/transport/TServerSocket.h
index 8ded4e2..ca30a03 100644
--- a/lib/cpp/transport/TServerSocket.h
+++ b/lib/cpp/transport/TServerSocket.h
@@ -16,11 +16,14 @@
   TServerSocket(int port);
   ~TServerSocket();
 
-  bool listen();
-  TTransport* accept();
+  void listen();
   void close();
 
+ protected:
+  TTransport* acceptImpl();
+
  private:
+
   int port_;
   int serverSocket_;
   int acceptBacklog_;
diff --git a/lib/cpp/transport/TServerTransport.h b/lib/cpp/transport/TServerTransport.h
index 4d063fc..9d71539 100644
--- a/lib/cpp/transport/TServerTransport.h
+++ b/lib/cpp/transport/TServerTransport.h
@@ -1,7 +1,8 @@
 #ifndef T_SERVER_TRANSPORT_H
 #define T_SERVER_TRANSPORT_H
 
-#include "TTransport.h"
+#include "transport/TTransport.h"
+#include "transport/TTransportException.h"
 
 /**
  * Server transport framework. A server needs to have some facility for
@@ -13,12 +14,48 @@
  public:
   virtual ~TServerTransport() {}
 
-  virtual bool listen() = 0;
-  virtual TTransport* accept() = 0;
+  /**
+   * Starts the server transport listening for new connections. Prior to this
+   * call most transports will not return anything when accept is called.
+   *
+   * @throws TTransportException if we were unable to listen
+   */
+  virtual void listen() {}
+
+  /**
+   * Gets a new dynamically allocated transport object and passes it to the
+   * caller. Note that it is the explicit duty of the caller to free the
+   * allocated object. The returned TTransport object must always be in the
+   * opened state. NULL should never be returned, instead an Exception should
+   * always be thrown.
+   *
+   * @return A new TTransport object
+   * @throws TTransportException if there is an error
+   */
+  TTransport* accept() {
+    TTransport* result = acceptImpl();
+    if (result == NULL) {
+      throw TTransportException("accept() may not return NULL");
+    }
+    return result;
+  }
+
+  /**
+   * Closes this transport such that future calls to accept will do nothing.
+   */
   virtual void close() = 0;
 
  protected:
   TServerTransport() {}
+
+  /**
+   * Subclasses should implement this function for accept.
+   *
+   * @return A newly allocated TTransport object
+   * @throw TTransportException If an error occurs
+   */
+  virtual TTransport* acceptImpl() = 0;
+
 };
 
 #endif
diff --git a/lib/cpp/transport/TSocket.cc b/lib/cpp/transport/TSocket.cc
index 1dfe431..3471755 100644
--- a/lib/cpp/transport/TSocket.cc
+++ b/lib/cpp/transport/TSocket.cc
@@ -7,9 +7,18 @@
 #include <errno.h>
 
 #include "transport/TSocket.h"
+#include "transport/TTransportException.h"
 
 using namespace std;
 
+uint32_t g_socket_syscalls = 0;
+
+/**
+ * TSocket implementation.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+
 // Mutex to protect syscalls to netdb
 pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER;
 
@@ -27,15 +36,20 @@
   close();
 }
 
-bool TSocket::open() {
+bool TSocket::isOpen() {
+  return (socket_ > 0); 
+}
+
+void TSocket::open() {
   // Create socket
   socket_ = socket(AF_INET, SOCK_STREAM, 0);
   if (socket_ == -1) {
-    socket_ = 0;
-    return false;
+    perror("TSocket::open() socket");
+    close();
+    throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
   }
   
-  // Lookup the host
+  // Lookup the hostname
   struct sockaddr_in addr;
   addr.sin_family = AF_INET;
   addr.sin_port = htons(port_);
@@ -54,7 +68,7 @@
     if (host_entry == NULL) {
       // perror("dns error: failed call to gethostbyname.\n");
       close();
-      return false;
+      throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
     }
     
     addr.sin_port = htons(port_);
@@ -70,10 +84,10 @@
   if (ret < 0) {
     perror("TSocket::open() connect");
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
   }
 
-  return true;
+  // Connection was successful
 }
 
 void TSocket::close() {
@@ -84,97 +98,127 @@
   socket_ = 0;
 }
 
-int TSocket::read(string& s, uint32_t len) {
-  char buff[len];
-  s = "";
+uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
+  if (socket_ <= 0) {
+    throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket");
+  }
 
-  uint32_t have = 0;
   uint32_t retries = 0;
-
-  while (have < len) {
-  try_again:
-    // Read from the socket
-    int got = recv(socket_, buff+have, len-have, 0);
-
-    // Check for error on read
-    if (got < 0) {
-      perror("TSocket::read()");
-
-      // If temporarily out of resources, sleep a bit and try again
-      if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
-        usleep(50);
-        goto try_again;
-      }
-
-      // If interrupted, try again
-      if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
-        goto try_again;
-      }
-
-      // If we disconnect with no linger time
-      if (errno == ECONNRESET) {
-        return 0;
-      }
-
-      return 0;
+  
+ try_again:
+  // Read from the socket
+  int got = recv(socket_, buf, len, 0);
+  ++g_socket_syscalls;
+  
+  // Check for error on read
+  if (got < 0) {
+    perror("TSocket::read()");
+    
+    // If temporarily out of resources, sleep a bit and try again
+    if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
+      usleep(50);
+      goto try_again;
     }
     
-    // Check for empty read
-    if (got == 0) {
-      return 0;
+    // If interrupted, try again
+    if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
+      goto try_again;
     }
     
-    // Update the count
-    have += (uint32_t) got;
+    // If we disconnect with no linger time
+    if (errno == ECONNRESET) {
+      throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+    }
+    
+    // This ish isn't open
+    if (errno == ENOTCONN) {
+      throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+    }
+    
+    // Timed out!
+    if (errno == ETIMEDOUT) {
+      throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT");
+    }
+    
+    // Some other error, whatevz
+    throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
+  }
+  
+  // The remote host has closed the socket
+  if (got == 0) {
+    close();
+    return 0;
   }
   
   // Pack data into string
-  s = string(buff, have);
-  return have;
+  return got;
 }
 
-void TSocket::write(const string& s) {
+void TSocket::write(const uint8_t* buf, uint32_t len) {
+  if (socket_ <= 0) {
+    throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket");
+  }
+
   uint32_t sent = 0;
     
-  while (sent < s.size()) {
-    int b = send(socket_, s.data() + sent, s.size() - sent, 0);
-    
+  while (sent < len) {
+    // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+    // check for the EPIPE return condition and close the socket in that case
+    int b = send(socket_, buf + sent, len - sent, MSG_NOSIGNAL);
+    ++g_socket_syscalls;
+
     // Fail on a send error
     if (b < 0) {
-      // TODO(mcslee): Make the function return how many bytes it wrote or
-      // throw an exception
-      // throw_perror("send");
-      return;
+      if (errno == EPIPE) {
+        close();
+        throw TTransportException(TTX_NOT_OPEN, "EPIPE");
+      }
+
+      if (errno == ECONNRESET) {
+        close();
+        throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+      }
+
+      if (errno == ENOTCONN) {
+        close();
+        throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+      }
+
+      perror("TSocket::write() send < 0");
+      throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
     }
     
     // Fail on blocked send
     if (b == 0) {
-      // TODO(mcslee): Make the function return how many bytes it wrote or
-      // throw string("couldn't send data.\n");
-      return;
+      throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0.");
     }
-
     sent += b;
   }
 }
 
-bool TSocket::setLinger(bool on, int linger) {
+void TSocket::setLinger(bool on, int linger) {
+  // TODO(mcslee): Store these options so they can be set pre-connect
+  if (socket_ <= 0) {
+    return;
+  }
+
   struct linger ling = {(on ? 1 : 0), linger};
   if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) {
     close();
     perror("TSocket::setLinger()");
-    return false;
   }
-  return true; 
 }
 
-bool TSocket::setNoDelay(bool noDelay) {
+void TSocket::setNoDelay(bool noDelay) {
+  // TODO(mcslee): Store these options so they can be set pre-connect
+  if (socket_ <= 0) {
+    return;
+  }
+
   // Set socket to NODELAY
   int val = (noDelay ? 1 : 0);
   if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
     close();
     perror("TSocket::setNoDelay()");
-    return false;
   }
-  return true;
 }
diff --git a/lib/cpp/transport/TSocket.h b/lib/cpp/transport/TSocket.h
index 1da74c6..18abfa7 100644
--- a/lib/cpp/transport/TSocket.h
+++ b/lib/cpp/transport/TSocket.h
@@ -6,33 +6,94 @@
 #include "transport/TTransport.h"
 #include "transport/TServerSocket.h"
 
-class TSocketOptions;
-
 /**
  * TCP Socket implementation of the TTransport interface.
  *
  * @author Mark Slee <mcslee@facebook.com>
  */
 class TSocket : public TTransport {
-  friend TTransport* TServerSocket::accept();
+  /**
+   * We allow the TServerSocket acceptImpl() method to access the private
+   * members of a socket so that it can access the TSocket(int socket)
+   * constructor which creates a socket object from the raw UNIX socket
+   * handle.
+   */
+  friend class TServerSocket;
 
  public:
+  /**
+   * Constructs a new socket. Note that this does NOT actually connect the
+   * socket.
+   *
+   * @param host An IP address or hostname to connect to
+   * @param port The port to connect on
+   */
   TSocket(std::string host, int port);
+
+  /**
+   * Destroyes the socket object, closing it if necessary.
+   */
   ~TSocket();
 
-  bool open();
-  void close();
-  int  read (std::string &s, uint32_t size);
-  void write(const std::string& s);
+  /**
+   * Whether the socket is alive.
+   *
+   * @return Is the socket alive?
+   */
+  bool isOpen();
 
-  bool setLinger(bool on, int linger);
-  bool setNoDelay(bool noDelay);
+  /**
+   * Creates and opens the UNIX socket.
+   *
+   * @throws TTransportException If the socket could not connect
+   */
+  void open();
+
+  /**
+   * Shuts down communications on the socket.
+   */
+  void close();
+
+  /**
+   * Reads from the underlying socket.
+   */
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  /**
+   * Writes to the underlying socket.
+   */
+  void write(const uint8_t* buf, uint32_t len);
+
+  /**
+   * Controls whether the linger option is set on the socket.
+   *
+   * @param on      Whether SO_LINGER is on
+   * @param linger  If linger is active, the number of seconds to linger for
+   */
+  void setLinger(bool on, int linger);
+
+  /**
+   * Whether to enable/disable Nagle's algorithm.
+   *
+   * @param noDelay Whether or not to disable the algorithm.
+   * @return 
+   */
+  void setNoDelay(bool noDelay);
 
  private:
+  /**
+   * Constructor to create socket from raw UNIX handle. Never called directly
+   * but used by the TServerSocket class.
+   */
   TSocket(int socket);
-  TSocketOptions *options_;
+
+  /** Host to connect to */
   std::string host_;
+
+  /** Port number to connect on */
   int port_;
+
+  /** Underlying UNIX socket handle */
   int socket_;
 };
 
diff --git a/lib/cpp/transport/TTransport.h b/lib/cpp/transport/TTransport.h
index a1f43d4..fcaece7 100644
--- a/lib/cpp/transport/TTransport.h
+++ b/lib/cpp/transport/TTransport.h
@@ -2,24 +2,96 @@
 #define T_TRANSPORT_H
 
 #include <string>
+#include "transport/TTransportException.h"
 
 /**
- * Generic interface for a method of transporting data.
+ * Generic interface for a method of transporting data. A TTransport may be
+ * capable of either reading or writing, but not necessarily both.
  *
  * @author Mark Slee <mcslee@facebook.com>
  */
 class TTransport {
  public:
-  virtual ~TTransport() {};
+  /**
+   * Virtual deconstructor.
+   */
+  virtual ~TTransport() {}
 
-  virtual bool open() = 0;
-  virtual void close() = 0;
+  /**
+   * Whether this transport is open.
+   */
+  virtual bool isOpen() { return false; }
 
-  virtual int  read (std::string& s, uint32_t size) = 0;
-  virtual void write(const std::string& s) = 0;
+  /**
+   * Opens the transport for communications.
+   *
+   * @return bool Whether the transport was successfully opened
+   * @throws TTransportException if opening failed
+   */
+  virtual void open() {
+    throw TTransportException(TTX_NOT_OPEN, "Cannot open base TTransport.");
+  }
+
+  /**
+   * Closes the transport.
+   */
+  virtual void close() {
+    throw TTransportException(TTX_NOT_OPEN, "Cannot close base TTransport.");
+  }
+
+  /**
+   * Attempt to read up to the specified number of bytes into the string.
+   *
+   * @param s     Reference to the location to append the read data
+   * @param len  How many bytes to read
+   * @return How many bytes were actually read
+   * @throws TTransportException If an error occurs
+   */
+  virtual uint32_t read(uint8_t* buf, uint32_t len) {
+    throw TTransportException(TTX_NOT_OPEN, "Base TTransport cannot read.");
+  }
+
+  /**
+   * Reads the given amount of data in its entirety no matter what.
+   *
+   * @param s     Reference to location for read data
+   * @param len   How many bytes to read
+   * @return How many bytes read, which must be equal to size
+   * @throws TTransportException If insufficient data was read
+   */
+  virtual uint32_t readAll(uint8_t* buf, uint32_t len) {
+    uint32_t have = 0;
+    
+    while (have < len) {
+      have += read(buf+have, len-have);
+    }
+
+    return have;
+  }
+
+  /**
+   * Writes the string in its entirety to the buffer.
+   *
+   * @param s The string to write out
+   * @throws TTransportException if an error occurs
+   */
+  virtual void write(const uint8_t* buf, uint32_t len) {
+    throw TTransportException(TTX_NOT_OPEN, "Base TTransport cannot write.");
+  }
+
+  /**
+   * Flushes any pending data to be written. Typically used with buffered
+   * transport mechanisms.
+   *
+   * @throws TTransportException if an error occurs
+   */
+  virtual void flush() {}
 
  protected:
-  TTransport() {};
+  /**
+   * Simple constructor.
+   */
+  TTransport() {}
 };
 
 #endif
diff --git a/lib/cpp/transport/TTransportException.h b/lib/cpp/transport/TTransportException.h
new file mode 100644
index 0000000..044e16d
--- /dev/null
+++ b/lib/cpp/transport/TTransportException.h
@@ -0,0 +1,63 @@
+#ifndef T_TRANSPORT_EXCEPTION_H
+#define T_TRANSPORT_EXCEPTION_H
+
+#include <string>
+
+/**
+ * Error codes for the various types of exceptions.
+ */
+enum TTransportExceptionType {
+  TTX_UNKNOWN = 0,
+  TTX_NOT_OPEN = 1,
+  TTX_TIMED_OUT = 2,
+};
+
+/**
+ * Class to encapsulate all the possible types of transport errors that may
+ * occur in various transport systems. This provides a sort of generic
+ * wrapper around the shitty UNIX E_ error codes that lets a common code
+ * base of error handling to be used for various types of transports, i.e.
+ * pipes etc.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TTransportException {
+ public:
+  TTransportException() :
+    type_(TTX_UNKNOWN), message_() {}
+
+  TTransportException(TTransportExceptionType type) :
+    type_(type), message_() {}
+
+  TTransportException(std::string message) :
+    type_(TTX_UNKNOWN), message_(message) {}
+
+  TTransportException(TTransportExceptionType type, std::string message) :
+    type_(type), message_(message) {}
+
+  ~TTransportException() {}
+
+  /**
+   * Returns an error code that provides information about the type of error
+   * that has occurred.
+   *
+   * @return Error code
+   */
+  TTransportExceptionType getType() { return type_; }
+ 
+  /**
+   * Returns an informative message about what caused this error.
+   *
+   * @return Error string
+   */
+  const std::string& getMessage() { return message_; }
+
+ protected:
+  /** Error code */
+  TTransportExceptionType type_;
+
+  /** Description */
+  std::string message_;
+};
+
+#endif