Update Thrift CPP libraries to work with new generated source, change underlying buffers to use uint8_t* instead of std::string

Summary: Major overhaul to the CPP libraries.

Reviewed By: aditya

Test Plan: Again, keep an eye out for the unit tests commit

Notes: Initial perf tests show that Thrift is not only more robust than Pillar, but its implementation is actually around 10-20% faster. We can do about 10 RPC function calls with small data payloads in under 2ms. THAT IS FAST. THAT IS THRIFTY.




git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664714 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile b/lib/cpp/Makefile
index 2045dba..f77c493 100644
--- a/lib/cpp/Makefile
+++ b/lib/cpp/Makefile
@@ -1,4 +1,9 @@
-# Makefile for Thrift C++ library.
+# Makefile for Thrift C++ library. Generates a shared object that can be
+# installed to /usr/local/lib
+#
+# TODO(mcslee): Add the ability to compile separate statis modules that can
+# be compiled directly into Thrift applications instead of dynamic runtime
+# loading of the full libs
 # 
 # Author:
 #   Mark Slee <mcslee@facebook.com>
@@ -10,16 +15,17 @@
 LDFL  = -shared -Wall -I. -fPIC -Wl,-soname=libthrift.so
 
 # Source files
-SRCS  = client/TSimpleClient.cc \
-	protocol/TBinaryProtocol.cc \
-	server/TSimpleServer.cc \
+SRCS  = protocol/TBinaryProtocol.cc \
+	transport/TBufferedTransport.cc \
 	transport/TSocket.cc \
-	transport/TServerSocket.cc
+	transport/TServerSocket.cc \
+	server/TSimpleServer.cc
 
 # Linked library
 libthrift:
 	$(LD) -o libthrift.so $(LDFL) $(SRCS)
 
+# Clean it up
 clean:
 	rm -f libthrift.so
 
diff --git a/lib/cpp/TDispatcher.h b/lib/cpp/TDispatcher.h
deleted file mode 100644
index f8ff847..0000000
--- a/lib/cpp/TDispatcher.h
+++ /dev/null
@@ -1,22 +0,0 @@
-#ifndef T_DISPATCHER_H
-#define T_DISPATCHER_H
-
-#include <string>
-
-/**
- * A dispatcher is a generic object that accepts an input buffer and returns
- * a buffer. It can be used in a variety of ways, i.e. as a client that
- * sends data over the network and returns a response, or as a server that
- * reads an input and returns an output.
- *
- * @author Mark Slee <mcslee@facebook.com>
- */
-class TDispatcher {
- public:
-  virtual ~TDispatcher() {};
-  virtual std::string dispatch(const std::string& s) = 0;
- protected:
-  TDispatcher() {}
-};
-
-#endif
diff --git a/lib/cpp/TProcessor.h b/lib/cpp/TProcessor.h
new file mode 100644
index 0000000..f01379d
--- /dev/null
+++ b/lib/cpp/TProcessor.h
@@ -0,0 +1,24 @@
+#ifndef T_PROCESSOR_H
+#define T_PROCESSOR_H
+
+#include <string>
+#include "transport/TTransport.h"
+
+/**
+ * A processor is a generic object that acts upon two streams of data, one
+ * an input and the other an output. The definition of this object is loose,
+ * though the typical case is for some sort of server that either generates
+ * responses to an input stream or forwards data from one pipe onto another.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TProcessor {
+ public:
+  virtual ~TProcessor() {}
+  virtual bool process(TTransport* in, TTransport *out) = 0;
+  virtual bool process(TTransport* io) { return process(io, io); }
+ protected:
+  TProcessor() {}
+};
+
+#endif
diff --git a/lib/cpp/Thrift.h b/lib/cpp/Thrift.h
index 04fbaa1..9986e3b 100644
--- a/lib/cpp/Thrift.h
+++ b/lib/cpp/Thrift.h
@@ -1,7 +1,8 @@
 #ifndef THRIFT_H
 #define THRIFT_H
 
-#include <sys/types.h>
+#include <netinet/in.h>
+#include <inttypes.h>
 #include <string>
 #include <map>
 #include <list>
diff --git a/lib/cpp/client/TClient.h b/lib/cpp/client/TClient.h
deleted file mode 100644
index 73dd093..0000000
--- a/lib/cpp/client/TClient.h
+++ /dev/null
@@ -1,16 +0,0 @@
-#ifndef T_CLIENT_H
-#define T_CLIENT_H
-
-#include "TDispatcher.h"
-
-class TClient : public TDispatcher {
- public:
-  virtual ~TClient() {}
-  virtual bool open() = 0;
-  virtual void close() = 0;
- protected:
-  TClient() {}
-};
-
-#endif
-
diff --git a/lib/cpp/client/TSimpleClient.cc b/lib/cpp/client/TSimpleClient.cc
deleted file mode 100644
index 9069c91..0000000
--- a/lib/cpp/client/TSimpleClient.cc
+++ /dev/null
@@ -1,44 +0,0 @@
-#include "TSimpleClient.h"
-using std::string;
-
-TSimpleClient::TSimpleClient(TTransport* transport) :
-  transport_(transport) {}
-
-bool TSimpleClient::open() {
-  return transport_->open();
-}
-
-void TSimpleClient::close() {
-  transport_->close();
-}
-
-std::string TSimpleClient::dispatch(const string& s) {
-  // Write size header
-  int32_t size = s.size();
-  // fprintf(stderr, "Writing size header %d to server\n", size);
-  transport_->write(string((char*)&size, 4));
-
-  // Write data payload
-  // fprintf(stderr, "Writing %d byte payload to server\n", (int)s.size());
-  transport_->write(s);
-
-  // Read response size
-  // fprintf(stderr, "Reading 4-byte response size header\n");
-  string response;
-  transport_->read(response, 4);
-  size = *(int32_t*)response.data();
-
-  // Read response data
-  if (size < 0) {
-    // TODO(mcslee): Handle exception
-    // fprintf(stderr, "Exception case! Response size < 0\n");
-    return "";
-  } else {
-    // fprintf(stderr, "Reading %d byte response payload\n", size);
-    transport_->read(response, size);
-    // TODO(mcslee): Check that we actually read enough data
-    // fprintf(stderr, "Done reading payload, returning.\n");
-    return response;
-  }
-}
-
diff --git a/lib/cpp/client/TSimpleClient.h b/lib/cpp/client/TSimpleClient.h
deleted file mode 100644
index 249afe5..0000000
--- a/lib/cpp/client/TSimpleClient.h
+++ /dev/null
@@ -1,21 +0,0 @@
-#ifndef T_SIMPLE_CLIENT_H
-#define T_SIMPLE_CLIENT_H
-
-#include "client/TClient.h"
-#include "transport/TTransport.h"
-
-class TSimpleClient : public TClient {
- public:
-  TSimpleClient(TTransport* transport);
-  ~TSimpleClient() {}
-
-  bool open();
-  void close();
-  std::string dispatch(const std::string& in);
-
- protected:
-  TTransport* transport_;
-};
-
-#endif
-
diff --git a/lib/cpp/protocol/TBinaryProtocol.cc b/lib/cpp/protocol/TBinaryProtocol.cc
index 4c10bab..1f31c8d 100644
--- a/lib/cpp/protocol/TBinaryProtocol.cc
+++ b/lib/cpp/protocol/TBinaryProtocol.cc
@@ -1,140 +1,246 @@
 #include "protocol/TBinaryProtocol.h"
-using namespace std;
+using std::string;
 
-string TBinaryProtocol::readFunction(TBuf& buf) const {
-  // Let readString increment the buffer position
-  return readString(buf);
+uint32_t TBinaryProtocol::writeStructBegin(TTransport* out,
+                                           const string& name) const {
+  return 0;
 }
 
-string TBinaryProtocol::writeFunction(const string& name,
-                                      const string& args) const{
-  return writeString(name) + args;
+uint32_t TBinaryProtocol::writeStructEnd(TTransport* out) const {
+  return 0;
 }
 
-map<uint32_t, TBuf> TBinaryProtocol::readStruct(TBuf& buf) const {
-  map<uint32_t, TBuf> fieldMap;
+uint32_t TBinaryProtocol::writeFieldBegin(TTransport* out,
+                                          const string& name,
+                                          const TType fieldType,
+                                          const uint16_t fieldId) const {
+  return
+    writeByte(out, (uint8_t)fieldType) +
+    writeU32(out, (uint32_t)fieldId);
+}
+
+uint32_t TBinaryProtocol::writeFieldEnd(TTransport* out) const {
+  return 0;
+}
+
+uint32_t TBinaryProtocol::writeFieldStop(TTransport* out) const {
+  return
+    writeByte(out, (uint8_t)T_STOP);
+}  
+                               
+uint32_t TBinaryProtocol::writeMapBegin(TTransport* out,
+                                        const TType keyType,
+                                        const TType valType,
+                                        const uint32_t size) const {
+  return
+    writeByte(out, (uint8_t)keyType) +
+    writeByte(out, (uint8_t)valType) +
+    writeU32(out, size);
+}
+
+uint32_t TBinaryProtocol::writeMapEnd(TTransport* out) const {
+  return 0;
+}
+
+uint32_t TBinaryProtocol::writeListBegin(TTransport* out,
+                                         const TType elemType,
+                                         const uint32_t size) const {
+  return
+    writeByte(out, (uint8_t) elemType) +
+    writeU32(out, size);
+}
+
+uint32_t TBinaryProtocol::writeListEnd(TTransport* out) const {
+  return 0;
+}
+
+uint32_t TBinaryProtocol::writeSetBegin(TTransport* out,
+                                        const TType elemType,
+                                        const uint32_t size) const {
+  return
+    writeByte(out, (uint8_t)elemType) +
+    writeU32(out, size);
+}
+
+uint32_t TBinaryProtocol::writeSetEnd(TTransport* out) const {
+  return 0;
+}
+
+uint32_t TBinaryProtocol::writeByte(TTransport* out,
+                                    const uint8_t byte) const {
+  out->write(&byte, 1);
+  return 1;
+}
+
+uint32_t TBinaryProtocol::writeU32(TTransport* out,
+                                   const uint32_t u32) const {
+  uint32_t net = (uint32_t)htonl(u32);
+  out->write((uint8_t*)&net, 4);
+  return 4;
+}
+
+uint32_t TBinaryProtocol::writeI32(TTransport* out,
+                                   const int32_t i32) const {
+  int32_t net = (int32_t)htonl(i32);
+  out->write((uint8_t*)&net, 4);
+  return 4;
+}
+
+uint32_t TBinaryProtocol::writeU64(TTransport* out,
+                                   const uint64_t u64) const {
+  uint64_t net = (uint64_t)htonll(u64);
+  out->write((uint8_t*)&net, 8);
+  return 8;
+}
+
+uint32_t TBinaryProtocol::writeI64(TTransport* out,
+                                   const int64_t i64) const {
+  int64_t net = (int64_t)htonll(i64);
+  out->write((uint8_t*)&net, 8);
+  return 8;
+}
+
+uint32_t TBinaryProtocol::writeString(TTransport* out,
+                                      const string& str) const {
+  uint32_t result = writeU32(out, str.size());
+  out->write((uint8_t*)str.data(), str.size());
+  return result + str.size();
+}
+
+/**
+ * Reading functions
+ */
+
+uint32_t TBinaryProtocol::readStructBegin(TTransport* in,
+                                          string& name) const {
+  name = "";
+  return 0;
+}
+
+uint32_t TBinaryProtocol::readStructEnd(TTransport* in) const {
+  return 0;
+}
+
+uint32_t TBinaryProtocol::readFieldBegin(TTransport* in,
+                                         string& name,
+                                         TType& fieldType,
+                                         uint16_t& fieldId) const {
+  uint32_t result = 0;
+  uint8_t type;
+  result += readByte(in, type);
+  fieldType = (TType)type;
+  if (fieldType == T_STOP) {
+    fieldId = 0;
+    return result;
+  }
+  uint32_t id;
+  result += readU32(in, id);
+  fieldId = (uint16_t)id;
+  return result;
+}
   
-  if (buf.len < 4) {
-    return fieldMap;
-  }
-  uint32_t total_size = readU32(buf);
-  if (buf.len < total_size) {
-    // Data looks corrupt, we don't have that much, we will try to read what
-    // we can but be sure not to go over
-    total_size = buf.len;
-  }
-
-  // Field headers are 8 bytes, 4 byte fid + 4 byte length
-  while (total_size > 0 && buf.len > 8) {
-    uint32_t fid  = readU32(buf);
-    uint32_t flen = readU32(buf);
-    if (flen > buf.len) {
-      // flen corrupt, there isn't that much data left
-      break;
-    }
-    fieldMap.insert(make_pair(fid, TBuf(buf.data, flen)));
-    buf.data += flen;
-    buf.len  -= flen;
-    total_size -= 8 + flen;
-  }
-
-  return fieldMap;
+uint32_t TBinaryProtocol::readFieldEnd(TTransport* in) const {
+  return 0;
 }
-
-string TBinaryProtocol::writeStruct(const map<uint32_t,string>& s) const {
-  string result = "";
-  map<uint32_t,string>::const_iterator s_iter;
-  for (s_iter = s.begin(); s_iter != s.end(); ++s_iter) {
-    result += writeU32(s_iter->first);
-    result += writeU32(s_iter->second.size());
-    result += s_iter->second;
-  }
-  return writeU32(result.size()) + result;
-}
-
-string TBinaryProtocol::readString(TBuf& buf) const {
-  uint32_t len = readU32(buf);
-  if (len == 0) {
-    return "";
-  }
-  string result((const char*)(buf.data), len);
-  buf.data += len;
-  buf.len  -= len;
+ 
+uint32_t TBinaryProtocol::readMapBegin(TTransport* in,
+                                       TType& keyType,
+                                       TType& valType,
+                                       uint32_t& size) const {
+  uint8_t k, v;
+  uint32_t result = 0;
+  result += readByte(in, k);
+  keyType = (TType)k;
+  result += readByte(in, v);
+  valType = (TType)v;
+  result += readU32(in, size);
   return result;
 }
 
-uint8_t TBinaryProtocol::readByte(TBuf& buf) const {
-  if (buf.len == 0) {
-    return 0;
-  }
-  uint8_t result = (uint8_t)buf.data[0];
-  buf.data += 1;
-  buf.len  -= 1;
+uint32_t TBinaryProtocol::readMapEnd(TTransport* in) const {
+  return 0;
+}
+
+uint32_t TBinaryProtocol::readListBegin(TTransport* in,
+                                        TType& elemType,
+                                        uint32_t& size) const {
+  uint8_t e;
+  uint32_t result = 0;
+  result += readByte(in, e);
+  elemType = (TType)e;
+  result += readU32(in, size);
   return result;
 }
 
-uint32_t TBinaryProtocol::readU32(TBuf& buf) const {
-  if (buf.len < 4) {
-    return 0;
-  }
-  uint32_t result = *(uint32_t*)buf.data;
-  buf.data += 4;
-  buf.len  -= 4;
+uint32_t TBinaryProtocol::readListEnd(TTransport* in) const {
+  return 0;
+}
+
+uint32_t TBinaryProtocol::readSetBegin(TTransport* in,
+                                       TType& elemType,
+                                       uint32_t& size) const {
+  uint8_t e;
+  uint32_t result = 0;
+  result += readByte(in, e);
+  elemType = (TType)e;
+  result += readU32(in, size);
   return result;
 }
 
-int32_t TBinaryProtocol::readI32(TBuf& buf) const {
-  if (buf.len < 4) {
-    return 0;
-  }
-  int32_t result = *(int32_t*)buf.data;
-  buf.data += 4;
-  buf.len  -= 4;
-  return result; 
+uint32_t TBinaryProtocol::readSetEnd(TTransport* in) const {
+  return 0;
 }
 
-uint64_t TBinaryProtocol::readU64(TBuf& buf) const {
-  if (buf.len < 8) {
-    return 0;
-  }
-  uint64_t result = *(uint64_t*)buf.data;
-  buf.data += 8;
-  buf.len  -= 8;
-  return result;
+uint32_t TBinaryProtocol::readByte(TTransport* in,
+                                   uint8_t& byte) const {
+  uint8_t b[1];
+  in->readAll(b, 1);
+  byte = *(uint8_t*)b;
+  return 1;
 }
 
-int64_t TBinaryProtocol::readI64(TBuf& buf) const {
-  if (buf.len < 8) {
-    return 0;
-  }
-  int64_t result = *(int64_t*)buf.data;
-  buf.data += 8;
-  buf.len  -= 8;
-  return result;
+uint32_t TBinaryProtocol::readU32(TTransport* in,
+                                  uint32_t& u32) const {
+  uint8_t b[4];
+  in->readAll(b, 4);
+  u32 = *(uint32_t*)b;
+  u32 = (uint32_t)ntohl(u32);
+  return 4;
 }
 
-string TBinaryProtocol::writeString(const string& str) const {
-  uint32_t size = str.size();
-  string result = string((const char*)&size, 4);
-  return result + str;
+uint32_t TBinaryProtocol::readI32(TTransport* in,
+                                  int32_t& i32) const {
+  uint8_t b[4];
+  in->readAll(b, 4);
+  i32 = *(int32_t*)b;
+  i32 = (int32_t)ntohl(i32);
+  return 4;
 }
 
-string TBinaryProtocol::writeByte(const uint8_t byte) const {
-  return string((const char*)&byte, 1);
+uint32_t TBinaryProtocol::readU64(TTransport* in,
+                                  uint64_t& u64) const {
+  uint8_t b[8];
+  in->readAll(b, 8);
+  u64 = *(uint64_t*)b;
+  u64 = (uint64_t)ntohll(u64);
+  return 8;
 }
 
-string TBinaryProtocol::writeU32(const uint32_t u32) const {
-  return string((const char*)&u32, 4);
+uint32_t TBinaryProtocol::readI64(TTransport* in,
+                                  int64_t& i64) const {
+  uint8_t b[8];
+  in->readAll(b, 8);
+  i64 = *(int64_t*)b;
+  i64 = (int64_t)ntohll(i64);
+  return 8;
 }
 
-string TBinaryProtocol::writeI32(int32_t i32) const {
-  return string((const char*)&i32, 4);
-}
-
-string TBinaryProtocol::writeU64(uint64_t u64) const {
-  return string((const char*)&u64, 8);
-}
-
-string TBinaryProtocol::writeI64(int64_t i64) const {
-  return string((const char*)&i64, 8);
+uint32_t TBinaryProtocol::readString(TTransport* in,
+                                     string& str) const {
+  uint32_t size, result;
+  result = readU32(in, size);
+  uint8_t b[size];
+  in->readAll(b, size);
+  str = string((char*)b, size);
+  return result+size;
 }
diff --git a/lib/cpp/protocol/TBinaryProtocol.h b/lib/cpp/protocol/TBinaryProtocol.h
index 976c383..e05bd9f 100644
--- a/lib/cpp/protocol/TBinaryProtocol.h
+++ b/lib/cpp/protocol/TBinaryProtocol.h
@@ -14,29 +14,114 @@
   TBinaryProtocol() {}
   ~TBinaryProtocol() {}
 
-  std::string
-    readFunction(TBuf& buf) const;
-  std::string
-    writeFunction(const std::string& name, const std::string& args) const;
+  /**
+   * Writing functions.
+   */
 
-  std::map<uint32_t, TBuf>
-    readStruct(TBuf& buf) const;
-  std::string
-    writeStruct(const std::map<uint32_t,std::string>& s) const;
+  uint32_t writeStructBegin   (TTransport*    out,
+                               const std::string& name)   const;
 
-  std::string readString  (TBuf& buf) const;
-  uint8_t     readByte    (TBuf& buf) const;
-  uint32_t    readU32     (TBuf& buf) const;
-  int32_t     readI32     (TBuf& buf) const;
-  uint64_t    readU64     (TBuf& buf) const;
-  int64_t     readI64     (TBuf& buf) const;
+  uint32_t writeStructEnd     (TTransport*    out)        const;
 
-  std::string writeString (const std::string& str) const;
-  std::string writeByte   (const uint8_t  byte)    const;
-  std::string writeU32    (const uint32_t u32)     const;
-  std::string writeI32    (const int32_t  i32)     const;
-  std::string writeU64    (const uint64_t u64)     const;
-  std::string writeI64    (const int64_t  i64)     const;
+  uint32_t writeFieldBegin    (TTransport*    out,
+                               const std::string& name,
+                               const TType    fieldType,
+                               const uint16_t fieldId)    const;
+
+  uint32_t writeFieldEnd      (TTransport*    out)        const;
+
+  uint32_t writeFieldStop     (TTransport*    out)        const;
+                                       
+  uint32_t writeMapBegin      (TTransport*    out,
+                               const TType    keyType,
+                               const TType    valType,
+                               const uint32_t size)       const;
+
+  uint32_t writeMapEnd        (TTransport*    out)        const;
+
+  uint32_t writeListBegin     (TTransport*    out,
+                               const TType    elemType,
+                               const uint32_t size)       const;
+
+  uint32_t writeListEnd       (TTransport*    out)        const;
+
+  uint32_t writeSetBegin      (TTransport*    out,
+                               const TType    elemType,
+                               const uint32_t size)       const;
+
+  uint32_t writeSetEnd        (TTransport*    out)        const;
+
+  uint32_t writeByte          (TTransport*    out,
+                               const uint8_t  byte)       const;
+
+  uint32_t writeU32           (TTransport*    out,
+                               const uint32_t u32)        const;
+
+  uint32_t writeI32           (TTransport*    out,
+                               const int32_t  i32)        const;
+
+  uint32_t writeU64           (TTransport*    out,
+                               const uint64_t u64)        const;
+
+  uint32_t writeI64           (TTransport*    out,
+                               const int64_t  i64)        const;
+
+  uint32_t writeString        (TTransport*    out,
+                               const std::string& str)    const;
+
+  /**
+   * Reading functions
+   */
+
+  uint32_t readStructBegin    (TTransport*    in,
+                               std::string& name)         const;
+
+  uint32_t readStructEnd      (TTransport*    in)         const;
+
+  uint32_t readFieldBegin     (TTransport*    in,
+                               std::string&   name,
+                               TType&         fieldType,
+                               uint16_t&      fieldId)    const;
+  
+  uint32_t readFieldEnd       (TTransport*    in)         const;
+ 
+  uint32_t readMapBegin       (TTransport*    in,
+                               TType&         keyType,
+                               TType&         valType,
+                               uint32_t&      size)       const;
+
+  uint32_t readMapEnd         (TTransport*    in)         const;
+
+  uint32_t readListBegin      (TTransport*    in,
+                               TType&         elemType,
+                               uint32_t&      size)       const;
+  
+  uint32_t readListEnd        (TTransport*    in)         const;
+
+  uint32_t readSetBegin       (TTransport*    in,
+                               TType&         elemType,
+                               uint32_t&      size)       const;
+
+  uint32_t readSetEnd         (TTransport*    in)         const;
+
+  uint32_t readByte           (TTransport*    in,
+                               uint8_t&       byte)       const;
+
+  uint32_t readU32            (TTransport*    in,
+                               uint32_t&      u32)        const;
+
+  uint32_t readI32            (TTransport*    in,
+                               int32_t&       i32)        const;
+
+  uint32_t readU64            (TTransport*    in,
+                               uint64_t&      u64)        const;
+
+  uint32_t readI64            (TTransport*    in,
+                               int64_t&       i64)        const;
+
+  uint32_t readString         (TTransport*    in,
+                               std::string&   str)        const;
+
 };
 
 #endif
diff --git a/lib/cpp/protocol/TProtocol.h b/lib/cpp/protocol/TProtocol.h
index 1f2e0c8..fe2d6ef 100644
--- a/lib/cpp/protocol/TProtocol.h
+++ b/lib/cpp/protocol/TProtocol.h
@@ -1,14 +1,42 @@
 #ifndef T_PROTOCOL_H
 #define T_PROTOCOL_H
 
+#include <netinet/in.h>
 #include <sys/types.h>
 #include <string>
 #include <map>
 
+#include "transport/TTransport.h"
+
+#define ntohll(x) (((uint64_t)(ntohl((int)((x << 32) >> 32))) << 32) | (uint32_t)ntohl(((int)(x >> 32))))
+
+#define htonll(x) ntohll(x)
+
 /** Forward declaration for TProtocol */
 struct TBuf;
 
 /**
+ * Enumerated definition of the types that the Thrift protocol supports.
+ * Take special note of the T_END type which is used specifically to mark
+ * the end of a sequence of fields.
+ */
+enum TType {
+  T_STOP       = 1,
+  T_BYTE       = 2,
+  T_U16        = 3,
+  T_I16        = 4,
+  T_U32        = 5,
+  T_I32        = 6,
+  T_U64        = 7,
+  T_I64        = 8,
+  T_STRING     = 9,
+  T_STRUCT     = 10,
+  T_MAP        = 11,
+  T_SET        = 12,
+  T_LIST       = 13
+};
+
+/**
  * Abstract class for a thrift protocol driver. These are all the methods that
  * a protocol must implement. Essentially, there must be some way of reading
  * and writing all the base types, plus a mechanism for writing out structs
@@ -25,64 +53,211 @@
   virtual ~TProtocol() {}
 
   /**
-   * Function call serialization.
+   * Writing functions.
    */
 
-  virtual std::string
-    readFunction(TBuf& buf) const = 0;
-  virtual std::string
-    writeFunction(const std::string& name, const std::string& args) const = 0;
+  virtual uint32_t writeStructBegin   (TTransport*    out,
+                                       const std::string& name)   const = 0;
+
+  virtual uint32_t writeStructEnd     (TTransport*    out)        const = 0;
+
+  virtual uint32_t writeFieldBegin    (TTransport*    out,
+                                       const std::string& name,
+                                       const TType    fieldType,
+                                       const uint16_t fieldId)    const = 0;
+
+  virtual uint32_t writeFieldEnd      (TTransport*    out)        const = 0;
+
+  virtual uint32_t writeFieldStop     (TTransport*    out)        const = 0;
+                                      
+  virtual uint32_t writeMapBegin      (TTransport*    out,
+                                       const TType    keyType,
+                                       const TType    valType,
+                                       const uint32_t size)       const = 0;
+
+  virtual uint32_t writeMapEnd        (TTransport*    out)        const = 0;
+
+  virtual uint32_t writeListBegin     (TTransport*    out,
+                                       const TType    elemType,
+                                       const uint32_t size)       const = 0;
+
+  virtual uint32_t writeListEnd       (TTransport*    out)        const = 0;
+
+  virtual uint32_t writeSetBegin      (TTransport*    out,
+                                       const TType    elemType,
+                                       const uint32_t size)       const = 0;
+
+  virtual uint32_t writeSetEnd        (TTransport*    out)        const = 0;
+
+  virtual uint32_t writeByte          (TTransport*    out,
+                                       const uint8_t  byte)       const = 0;
+
+  virtual uint32_t writeU32           (TTransport*    out,
+                                       const uint32_t u32)        const = 0;
+
+  virtual uint32_t writeI32           (TTransport*    out,
+                                       const int32_t  i32)        const = 0;
+
+  virtual uint32_t writeU64           (TTransport*    out,
+                                       const uint64_t u64)        const = 0;
+
+  virtual uint32_t writeI64           (TTransport*    out,
+                                       const int64_t  i64)        const = 0;
+
+  virtual uint32_t writeString        (TTransport*    out,
+                                       const std::string& str)    const = 0;
 
   /**
-   * Struct serialization.
+   * Reading functions
    */
 
-  virtual std::map<uint32_t, TBuf>
-    readStruct(TBuf& buf) const = 0;
-  virtual std::string
-    writeStruct(const std::map<uint32_t,std::string>& s) const = 0;
+  virtual uint32_t readStructBegin    (TTransport*    in,
+                                       std::string& name)         const = 0;
+
+  virtual uint32_t readStructEnd      (TTransport*    in)         const = 0;
+
+  virtual uint32_t readFieldBegin     (TTransport*    in,
+                                       std::string&   name,
+                                       TType&         fieldType,
+                                       uint16_t&      fieldId)    const = 0;
+  
+  virtual uint32_t readFieldEnd       (TTransport*    in)         const = 0;
+ 
+  virtual uint32_t readMapBegin       (TTransport*    in,
+                                       TType&         keyType,
+                                       TType&         valType,
+                                       uint32_t&      size)       const = 0;
+
+  virtual uint32_t readMapEnd         (TTransport*    in)         const = 0;
+
+  virtual uint32_t readListBegin      (TTransport*    in,
+                                       TType&         elemType,
+                                       uint32_t&      size)       const = 0;
+
+  virtual uint32_t readListEnd        (TTransport*    in)         const = 0;
+
+  virtual uint32_t readSetBegin       (TTransport*    in,
+                                       TType&         elemType,
+                                       uint32_t&      size)       const = 0;
+
+  virtual uint32_t readSetEnd         (TTransport*    in)         const = 0;
+
+  virtual uint32_t readByte           (TTransport*    in,
+                                       uint8_t&       byte)       const = 0;
+
+  virtual uint32_t readU32            (TTransport*    in,
+                                       uint32_t&      u32)        const = 0;
+
+  virtual uint32_t readI32            (TTransport*    in,
+                                       int32_t&       i32)        const = 0;
+
+  virtual uint32_t readU64            (TTransport*    in,
+                                       uint64_t&      u64)        const = 0;
+
+  virtual uint32_t readI64            (TTransport*    in,
+                                       int64_t&       i64)        const = 0;
+
+  virtual uint32_t readString         (TTransport*    in,
+                                       std::string&   str)        const = 0;
 
   /**
-   * Basic data type deserialization. Note that these read methods do not
-   * take a const reference to the TBuf object. They SHOULD change the TBuf
-   * object so that it reflects the buffer AFTER the basic data type has
-   * been consumed such that data may continue being read serially from the
-   * buffer.
+   * Method to arbitrarily skip over data.
    */
-
-  virtual std::string readString  (TBuf& buf) const = 0;
-  virtual uint8_t     readByte    (TBuf& buf) const = 0;
-  virtual uint32_t    readU32     (TBuf& buf) const = 0;
-  virtual int32_t     readI32     (TBuf& buf) const = 0;
-  virtual uint64_t    readU64     (TBuf& buf) const = 0;
-  virtual int64_t     readI64     (TBuf& buf) const = 0;
-
-  virtual std::string writeString (const std::string& str) const = 0;
-  virtual std::string writeByte   (const uint8_t  byte)    const = 0;
-  virtual std::string writeU32    (const uint32_t u32)     const = 0;
-  virtual std::string writeI32    (const int32_t  i32)     const = 0;
-  virtual std::string writeU64    (const uint64_t u64)     const = 0;
-  virtual std::string writeI64    (const int64_t  i64)     const = 0;
+  uint32_t skip(TTransport* in, TType type) const {
+    switch (type) {
+    case T_BYTE:
+      {
+        uint8_t byte;
+        return readByte(in, byte);
+      }
+    case T_U32:
+      {
+        uint32_t u32;
+        return readU32(in, u32);
+      }
+    case T_I32:
+      {
+        int32_t i32;
+        return readI32(in, i32);
+      }
+    case T_U64:
+      {
+        uint64_t u64;
+        return readU64(in, u64);
+      }
+    case T_I64:
+      {
+        int64_t i64;
+        return readI64(in, i64);
+      }
+    case T_STRING:
+      {
+        std::string str;
+        return readString(in, str);
+      }
+    case T_STRUCT:
+      {
+        uint32_t result = 0;
+        std::string name;
+        uint16_t fid;
+        TType ftype;
+        result += readStructBegin(in, name);
+        while (true) {
+          result += readFieldBegin(in, name, ftype, fid);
+          if (ftype == T_STOP) {
+            break;
+          }
+          result += skip(in, ftype);
+          result += readFieldEnd(in);
+        }
+        result += readStructEnd(in);
+        return result;
+      }
+    case T_MAP:
+      {
+        uint32_t result = 0;
+        TType keyType;
+        TType valType;
+        uint32_t i, size;
+        result += readMapBegin(in, keyType, valType, size);
+        for (i = 0; i < size; i++) {
+          result += skip(in, keyType);
+          result += skip(in, valType);
+        }
+        result += readMapEnd(in);
+        return result;
+      }
+    case T_SET:
+      {
+        uint32_t result = 0;
+        TType elemType;
+        uint32_t i, size;
+        result += readSetBegin(in, elemType, size);
+        for (i = 0; i < size; i++) {
+          result += skip(in, elemType);
+        }
+        result += readSetEnd(in);
+        return result;
+      }
+    case T_LIST:
+      {
+        uint32_t result = 0;
+        TType elemType;
+        uint32_t i, size;
+        result += readListBegin(in, elemType, size);
+        for (i = 0; i < size; i++) {
+          result += skip(in, elemType);
+        }
+        result += readListEnd(in);
+        return result;
+      }
+    default:
+      return 0;
+    }
+  }
 
  protected:
   TProtocol() {}
 };
 
-/**
- * Wrapper around raw data that allows us to track the length of a data
- * buffer. It is the responsibility of a robust TProtocol implementation
- * to ensure that any reads that are done from data do NOT overrun the
- * memory address at data+len. It is also a convention that TBuf objects
- * do NOT own the memory pointed to by data. They are merely wrappers
- * around buffers that have been allocated elsewhere. Therefore, the user
- * should never allocate memory before putting it into a TBuf nor should
- * they free the data pointed to by a TBuf.
- */
-struct TBuf {
-  TBuf(const TBuf& that) : data(that.data), len(that.len) {}
-  TBuf(const uint8_t* d, uint32_t l) : data(d), len(l) {}
-  const uint8_t* data;
-  uint32_t len;
-};
-
 #endif
diff --git a/lib/cpp/server/TServer.h b/lib/cpp/server/TServer.h
index 9c4cc59..0d275ba 100644
--- a/lib/cpp/server/TServer.h
+++ b/lib/cpp/server/TServer.h
@@ -1,7 +1,7 @@
 #ifndef T_SERVER_H
 #define T_SERVER_H
 
-#include "TDispatcher.h"
+#include "TProcessor.h"
 
 class TServerOptions;
 
@@ -16,10 +16,10 @@
   virtual void run() = 0;
 
  protected:
-  TServer(TDispatcher* dispatcher, TServerOptions* options) :
-    dispatcher_(dispatcher), options_(options) {}
+  TServer(TProcessor* processor, TServerOptions* options) :
+    processor_(processor), options_(options) {}
     
-  TDispatcher* dispatcher_;
+  TProcessor* processor_;
   TServerOptions* options_;
 };
 
diff --git a/lib/cpp/server/TSimpleServer.cc b/lib/cpp/server/TSimpleServer.cc
index 16f5006..03069ae 100644
--- a/lib/cpp/server/TSimpleServer.cc
+++ b/lib/cpp/server/TSimpleServer.cc
@@ -1,60 +1,54 @@
 #include "server/TSimpleServer.h"
+#include "transport/TBufferedTransport.h"
+#include "transport/TTransportException.h"
 #include <string>
+#include <iostream>
 using namespace std;
 
+/**
+ * A simple single-threaded application server. Perfect for unit tests!
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
 void TSimpleServer::run() {
-  TTransport* client;
+  TTransport* client = NULL;
 
-  // Start the server listening
-  if (serverTransport_->listen() == false) {
-    // TODO(mcslee): Log error here
-    fprintf(stderr, "TSimpleServer::run(): Call to listen failed\n");
+  try {
+    // Start the server listening
+    serverTransport_->listen();
+  } catch (TTransportException& ttx) {
+    cerr << "TSimpleServer::run() listen(): " << ttx.getMessage() << endl;
     return;
   }
 
   // Fetch client from server
   while (true) {
-    // fprintf(stderr, "Listening for connection\n");
-    if ((client = serverTransport_->accept()) == NULL) {
-      // fprintf(stderr, "Got NULL connection, exiting.\n");
-      break;
-    }
-
-    while (true) {
-      // Read header from client
-      // fprintf(stderr, "Reading 4 byte header from client.\n");
-      string in;
-      if (client->read(in, 4) <= 0) {
-        // fprintf(stderr, "Size header negative. Exception!\n");
-        break;
+    try {
+      client = serverTransport_->accept();
+      if (client != NULL) {
+        // Process for as long as we can keep the processor happy!
+        TBufferedTransport bufferedClient(client);
+        while (processor_->process(&bufferedClient)) {}
       }
-      
-      // Read payload from client
-      int32_t size = *(int32_t*)(in.data());
-      // fprintf(stderr, "Reading %d byte payload from client.\n", size);
-      if (client->read(in, size) < size) {
-        // fprintf(stderr, "Didn't get enough data!!!\n");
-        break;
+    } catch (TTransportException& ttx) {
+      if (client != NULL) {
+        cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
       }
-      
-      // Pass payload to dispatcher
-      // TODO(mcslee): Wrap this in try/catch and return exceptions
-      string out = dispatcher_->dispatch(in);
-      
-      size = out.size();
-      
-      // Write size of response packet
-      client->write(string((char*)&size, 4));
-      
-      // Write response payload
-      client->write(out);
     }
   
-    // Clean up that client
-    // fprintf(stderr, "Closing and cleaning up client\n");
-    client->close();
-    delete client;
+    // Clean up the client
+    if (client != NULL) {
+
+      // Ensure no resource leaks
+      client->close();
+
+      // Ensure no memory leaks
+      delete client;
+
+      // Ensure we don't try to double-free on the next pass
+      client = NULL;
+    }
   }
 
-  // TODO(mcslee): Is this a timeout case or the real thing?
+  // TODO(mcslee): Could this be a timeout case? Or always the real thing?
 }
diff --git a/lib/cpp/server/TSimpleServer.h b/lib/cpp/server/TSimpleServer.h
index 47ab69e..9e0f79f 100644
--- a/lib/cpp/server/TSimpleServer.h
+++ b/lib/cpp/server/TSimpleServer.h
@@ -14,10 +14,10 @@
  */
 class TSimpleServer : public TServer {
  public:
-  TSimpleServer(TDispatcher* dispatcher,
+  TSimpleServer(TProcessor* processor,
                 TServerOptions* options,
                 TServerTransport* serverTransport) :
-    TServer(dispatcher, options), serverTransport_(serverTransport) {}
+    TServer(processor, options), serverTransport_(serverTransport) {}
     
   ~TSimpleServer() {}
 
diff --git a/lib/cpp/transport/TBufferedTransport.cc b/lib/cpp/transport/TBufferedTransport.cc
new file mode 100644
index 0000000..3fccc58
--- /dev/null
+++ b/lib/cpp/transport/TBufferedTransport.cc
@@ -0,0 +1,60 @@
+#include "TBufferedTransport.h"
+using std::string;
+
+uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
+  uint32_t need = len;
+
+  // We don't have enough data yet
+  if (rLen_-rPos_ < need) {
+    // Copy out whatever we have
+    if (rLen_ > 0) {
+      memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+      need -= rLen_-rPos_;
+      buf += rLen_-rPos_;
+    }    
+    // Get more from underlying transport up to buffer size
+    rLen_ = transport_->read(rBuf_, rBufSize_);
+    rPos_ = 0;
+  }
+  
+  // Hand over whatever we have
+  uint32_t give = need;
+  if (rLen_-rPos_ < give) {
+    give = rLen_-rPos_;
+  }
+  memcpy(buf, rBuf_+rPos_, give);
+  rPos_ += give;
+  need -= give;
+  return (len - need);
+}
+
+void TBufferedTransport::write(const uint8_t* buf, uint32_t len) {
+  if (len == 0) {
+    return;
+  }
+
+  if (len + wLen_ >= wBufSize_) {
+    uint32_t copy = wBufSize_ - wLen_;
+    memcpy(wBuf_ + wLen_, buf, copy);
+    transport_->write(wBuf_, wBufSize_);
+    
+    wLen_ = len - copy;
+    if (wLen_ > 0) {
+      memcpy(wBuf_, buf+copy, wLen_);
+    }
+  } else {
+    memcpy(wBuf_+wLen_, buf, len);
+    wLen_ += len;
+  }
+}
+
+void TBufferedTransport::flush()  {
+  // Write out any data waiting in the write buffer
+  if (wLen_ > 0) {
+    transport_->write(wBuf_, wLen_);
+    wLen_ = 0;
+  }
+
+  // Flush the underlying transport
+  transport_->flush();
+}
diff --git a/lib/cpp/transport/TBufferedTransport.h b/lib/cpp/transport/TBufferedTransport.h
new file mode 100644
index 0000000..991b50c
--- /dev/null
+++ b/lib/cpp/transport/TBufferedTransport.h
@@ -0,0 +1,75 @@
+#ifndef T_BUFFERED_TRANSPORT_H
+#define T_BUFFERED_TRANSPORT_H
+
+#include "transport/TTransport.h"
+#include <string>
+
+/**
+ * Buffered transport. For reads it will read more data than is requested
+ * and will serve future data out of a local buffer. For writes, data is
+ * stored to an in memory buffer before being written out.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBufferedTransport : public TTransport {
+ public:
+  TBufferedTransport(TTransport* transport) :
+    transport_(transport),
+    rBufSize_(512), rPos_(0), rLen_(0),
+    wBufSize_(512), wLen_(0) {
+    rBuf_ = new uint8_t[rBufSize_];
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  TBufferedTransport(TTransport* transport, uint32_t sz) :
+    transport_(transport),
+    rBufSize_(sz), rPos_(0), rLen_(0),
+    wBufSize_(sz), wLen_(0) {
+    rBuf_ = new uint8_t[rBufSize_];
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  TBufferedTransport(TTransport* transport, uint32_t rsz, uint32_t wsz) :
+    transport_(transport),
+    rBufSize_(rsz), rPos_(0), rLen_(0),
+    wBufSize_(wsz), wLen_(0) {
+    rBuf_ = new uint8_t[rBufSize_];
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  ~TBufferedTransport() {
+    delete [] rBuf_;
+    delete [] wBuf_;
+  }
+
+  bool isOpen() {
+    return transport_->isOpen();
+  }
+  
+  void open() {
+    transport_->open();
+  }
+
+  void close() {
+    transport_->close();
+  }
+  
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  void flush();
+
+ protected:
+  TTransport* transport_;
+  uint8_t* rBuf_;
+  uint32_t rBufSize_;
+  uint32_t rPos_;
+  uint32_t rLen_;
+
+  uint8_t* wBuf_;
+  uint32_t wBufSize_;
+  uint32_t wLen_;
+};
+
+#endif
diff --git a/lib/cpp/transport/TNullTransport.h b/lib/cpp/transport/TNullTransport.h
new file mode 100644
index 0000000..9562d9f
--- /dev/null
+++ b/lib/cpp/transport/TNullTransport.h
@@ -0,0 +1,24 @@
+#ifndef T_NULL_TRANSPORT
+#define T_NULL_TRANSPORT
+
+#include "transport/TTransport.h"
+
+/**
+ * The null transport is a dummy transport that doesn't actually do anything.
+ * It's sort of an analogy to /dev/null, you can never read anything from it
+ * and it will let you write anything you want to it, though it won't actually
+ * go anywhere.
+ * 
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TNullTransport : public TTransport {
+ public:
+  TNullTransport() {}
+  ~TNullTransport() {}
+
+  bool isOpen() { return true; }
+  void open() { }
+  void write(const std::string& s) {}
+};
+
+#endif
diff --git a/lib/cpp/transport/TServerSocket.cc b/lib/cpp/transport/TServerSocket.cc
index 178de81..1cf4a32 100644
--- a/lib/cpp/transport/TServerSocket.cc
+++ b/lib/cpp/transport/TServerSocket.cc
@@ -1,5 +1,6 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <errno.h>
 
 #include "transport/TSocket.h"
 #include "transport/TServerSocket.h"
@@ -11,11 +12,12 @@
   close();
 }
 
-bool TServerSocket::listen() {
+void TServerSocket::listen() {
   serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
   if (serverSocket_ == -1) {
+    perror("TServerSocket::listen() socket");
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "Could not create server socket.");
   }
 
   // Set reusaddress to prevent 2MSL delay on accept
@@ -24,16 +26,16 @@
                        &one, sizeof(one))) {
     perror("TServerSocket::listen() SO_REUSEADDR");
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "Could not set SO_REUSEADDR");
   }
 
   // Turn linger off, don't want to block on calls to close
   struct linger ling = {0, 0};
   if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
                        &ling, sizeof(ling))) {
-    perror("TServerSocket::listen() SO_LINGER");
     close();
-    return false;
+    perror("TServerSocket::listen() SO_LINGER");
+    throw TTransportException(TTX_NOT_OPEN, "Could not set SO_LINGER");
   }
 
   // Bind to a port
@@ -47,24 +49,22 @@
     sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
     perror(errbuf);
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "Could not bind");
   }
 
   // Call listen
   if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
     perror("TServerSocket::listen() LISTEN");
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "Could not listen");
   }
 
   // The socket is now listening!
-  return true;
 }
 
-TTransport* TServerSocket::accept() {
+TTransport* TServerSocket::acceptImpl() {
   if (serverSocket_ <= 0) {
-    // TODO(mcslee): Log error with common logging tool
-    return NULL;
+    throw TTransportException(TTX_NOT_OPEN, "TServerSocket not listening");
   }
 
   struct sockaddr_in clientAddress;
@@ -75,7 +75,7 @@
     
   if (clientSocket <= 0) {
     perror("TServerSocket::accept()");
-    return NULL;
+    throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
   }
 
   return new TSocket(clientSocket);
diff --git a/lib/cpp/transport/TServerSocket.h b/lib/cpp/transport/TServerSocket.h
index 8ded4e2..ca30a03 100644
--- a/lib/cpp/transport/TServerSocket.h
+++ b/lib/cpp/transport/TServerSocket.h
@@ -16,11 +16,14 @@
   TServerSocket(int port);
   ~TServerSocket();
 
-  bool listen();
-  TTransport* accept();
+  void listen();
   void close();
 
+ protected:
+  TTransport* acceptImpl();
+
  private:
+
   int port_;
   int serverSocket_;
   int acceptBacklog_;
diff --git a/lib/cpp/transport/TServerTransport.h b/lib/cpp/transport/TServerTransport.h
index 4d063fc..9d71539 100644
--- a/lib/cpp/transport/TServerTransport.h
+++ b/lib/cpp/transport/TServerTransport.h
@@ -1,7 +1,8 @@
 #ifndef T_SERVER_TRANSPORT_H
 #define T_SERVER_TRANSPORT_H
 
-#include "TTransport.h"
+#include "transport/TTransport.h"
+#include "transport/TTransportException.h"
 
 /**
  * Server transport framework. A server needs to have some facility for
@@ -13,12 +14,48 @@
  public:
   virtual ~TServerTransport() {}
 
-  virtual bool listen() = 0;
-  virtual TTransport* accept() = 0;
+  /**
+   * Starts the server transport listening for new connections. Prior to this
+   * call most transports will not return anything when accept is called.
+   *
+   * @throws TTransportException if we were unable to listen
+   */
+  virtual void listen() {}
+
+  /**
+   * Gets a new dynamically allocated transport object and passes it to the
+   * caller. Note that it is the explicit duty of the caller to free the
+   * allocated object. The returned TTransport object must always be in the
+   * opened state. NULL should never be returned, instead an Exception should
+   * always be thrown.
+   *
+   * @return A new TTransport object
+   * @throws TTransportException if there is an error
+   */
+  TTransport* accept() {
+    TTransport* result = acceptImpl();
+    if (result == NULL) {
+      throw TTransportException("accept() may not return NULL");
+    }
+    return result;
+  }
+
+  /**
+   * Closes this transport such that future calls to accept will do nothing.
+   */
   virtual void close() = 0;
 
  protected:
   TServerTransport() {}
+
+  /**
+   * Subclasses should implement this function for accept.
+   *
+   * @return A newly allocated TTransport object
+   * @throw TTransportException If an error occurs
+   */
+  virtual TTransport* acceptImpl() = 0;
+
 };
 
 #endif
diff --git a/lib/cpp/transport/TSocket.cc b/lib/cpp/transport/TSocket.cc
index 1dfe431..3471755 100644
--- a/lib/cpp/transport/TSocket.cc
+++ b/lib/cpp/transport/TSocket.cc
@@ -7,9 +7,18 @@
 #include <errno.h>
 
 #include "transport/TSocket.h"
+#include "transport/TTransportException.h"
 
 using namespace std;
 
+uint32_t g_socket_syscalls = 0;
+
+/**
+ * TSocket implementation.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+
 // Mutex to protect syscalls to netdb
 pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER;
 
@@ -27,15 +36,20 @@
   close();
 }
 
-bool TSocket::open() {
+bool TSocket::isOpen() {
+  return (socket_ > 0); 
+}
+
+void TSocket::open() {
   // Create socket
   socket_ = socket(AF_INET, SOCK_STREAM, 0);
   if (socket_ == -1) {
-    socket_ = 0;
-    return false;
+    perror("TSocket::open() socket");
+    close();
+    throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
   }
   
-  // Lookup the host
+  // Lookup the hostname
   struct sockaddr_in addr;
   addr.sin_family = AF_INET;
   addr.sin_port = htons(port_);
@@ -54,7 +68,7 @@
     if (host_entry == NULL) {
       // perror("dns error: failed call to gethostbyname.\n");
       close();
-      return false;
+      throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
     }
     
     addr.sin_port = htons(port_);
@@ -70,10 +84,10 @@
   if (ret < 0) {
     perror("TSocket::open() connect");
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
   }
 
-  return true;
+  // Connection was successful
 }
 
 void TSocket::close() {
@@ -84,97 +98,127 @@
   socket_ = 0;
 }
 
-int TSocket::read(string& s, uint32_t len) {
-  char buff[len];
-  s = "";
+uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
+  if (socket_ <= 0) {
+    throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket");
+  }
 
-  uint32_t have = 0;
   uint32_t retries = 0;
-
-  while (have < len) {
-  try_again:
-    // Read from the socket
-    int got = recv(socket_, buff+have, len-have, 0);
-
-    // Check for error on read
-    if (got < 0) {
-      perror("TSocket::read()");
-
-      // If temporarily out of resources, sleep a bit and try again
-      if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
-        usleep(50);
-        goto try_again;
-      }
-
-      // If interrupted, try again
-      if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
-        goto try_again;
-      }
-
-      // If we disconnect with no linger time
-      if (errno == ECONNRESET) {
-        return 0;
-      }
-
-      return 0;
+  
+ try_again:
+  // Read from the socket
+  int got = recv(socket_, buf, len, 0);
+  ++g_socket_syscalls;
+  
+  // Check for error on read
+  if (got < 0) {
+    perror("TSocket::read()");
+    
+    // If temporarily out of resources, sleep a bit and try again
+    if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
+      usleep(50);
+      goto try_again;
     }
     
-    // Check for empty read
-    if (got == 0) {
-      return 0;
+    // If interrupted, try again
+    if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
+      goto try_again;
     }
     
-    // Update the count
-    have += (uint32_t) got;
+    // If we disconnect with no linger time
+    if (errno == ECONNRESET) {
+      throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+    }
+    
+    // This ish isn't open
+    if (errno == ENOTCONN) {
+      throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+    }
+    
+    // Timed out!
+    if (errno == ETIMEDOUT) {
+      throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT");
+    }
+    
+    // Some other error, whatevz
+    throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
+  }
+  
+  // The remote host has closed the socket
+  if (got == 0) {
+    close();
+    return 0;
   }
   
   // Pack data into string
-  s = string(buff, have);
-  return have;
+  return got;
 }
 
-void TSocket::write(const string& s) {
+void TSocket::write(const uint8_t* buf, uint32_t len) {
+  if (socket_ <= 0) {
+    throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket");
+  }
+
   uint32_t sent = 0;
     
-  while (sent < s.size()) {
-    int b = send(socket_, s.data() + sent, s.size() - sent, 0);
-    
+  while (sent < len) {
+    // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+    // check for the EPIPE return condition and close the socket in that case
+    int b = send(socket_, buf + sent, len - sent, MSG_NOSIGNAL);
+    ++g_socket_syscalls;
+
     // Fail on a send error
     if (b < 0) {
-      // TODO(mcslee): Make the function return how many bytes it wrote or
-      // throw an exception
-      // throw_perror("send");
-      return;
+      if (errno == EPIPE) {
+        close();
+        throw TTransportException(TTX_NOT_OPEN, "EPIPE");
+      }
+
+      if (errno == ECONNRESET) {
+        close();
+        throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+      }
+
+      if (errno == ENOTCONN) {
+        close();
+        throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+      }
+
+      perror("TSocket::write() send < 0");
+      throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
     }
     
     // Fail on blocked send
     if (b == 0) {
-      // TODO(mcslee): Make the function return how many bytes it wrote or
-      // throw string("couldn't send data.\n");
-      return;
+      throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0.");
     }
-
     sent += b;
   }
 }
 
-bool TSocket::setLinger(bool on, int linger) {
+void TSocket::setLinger(bool on, int linger) {
+  // TODO(mcslee): Store these options so they can be set pre-connect
+  if (socket_ <= 0) {
+    return;
+  }
+
   struct linger ling = {(on ? 1 : 0), linger};
   if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) {
     close();
     perror("TSocket::setLinger()");
-    return false;
   }
-  return true; 
 }
 
-bool TSocket::setNoDelay(bool noDelay) {
+void TSocket::setNoDelay(bool noDelay) {
+  // TODO(mcslee): Store these options so they can be set pre-connect
+  if (socket_ <= 0) {
+    return;
+  }
+
   // Set socket to NODELAY
   int val = (noDelay ? 1 : 0);
   if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
     close();
     perror("TSocket::setNoDelay()");
-    return false;
   }
-  return true;
 }
diff --git a/lib/cpp/transport/TSocket.h b/lib/cpp/transport/TSocket.h
index 1da74c6..18abfa7 100644
--- a/lib/cpp/transport/TSocket.h
+++ b/lib/cpp/transport/TSocket.h
@@ -6,33 +6,94 @@
 #include "transport/TTransport.h"
 #include "transport/TServerSocket.h"
 
-class TSocketOptions;
-
 /**
  * TCP Socket implementation of the TTransport interface.
  *
  * @author Mark Slee <mcslee@facebook.com>
  */
 class TSocket : public TTransport {
-  friend TTransport* TServerSocket::accept();
+  /**
+   * We allow the TServerSocket acceptImpl() method to access the private
+   * members of a socket so that it can access the TSocket(int socket)
+   * constructor which creates a socket object from the raw UNIX socket
+   * handle.
+   */
+  friend class TServerSocket;
 
  public:
+  /**
+   * Constructs a new socket. Note that this does NOT actually connect the
+   * socket.
+   *
+   * @param host An IP address or hostname to connect to
+   * @param port The port to connect on
+   */
   TSocket(std::string host, int port);
+
+  /**
+   * Destroyes the socket object, closing it if necessary.
+   */
   ~TSocket();
 
-  bool open();
-  void close();
-  int  read (std::string &s, uint32_t size);
-  void write(const std::string& s);
+  /**
+   * Whether the socket is alive.
+   *
+   * @return Is the socket alive?
+   */
+  bool isOpen();
 
-  bool setLinger(bool on, int linger);
-  bool setNoDelay(bool noDelay);
+  /**
+   * Creates and opens the UNIX socket.
+   *
+   * @throws TTransportException If the socket could not connect
+   */
+  void open();
+
+  /**
+   * Shuts down communications on the socket.
+   */
+  void close();
+
+  /**
+   * Reads from the underlying socket.
+   */
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  /**
+   * Writes to the underlying socket.
+   */
+  void write(const uint8_t* buf, uint32_t len);
+
+  /**
+   * Controls whether the linger option is set on the socket.
+   *
+   * @param on      Whether SO_LINGER is on
+   * @param linger  If linger is active, the number of seconds to linger for
+   */
+  void setLinger(bool on, int linger);
+
+  /**
+   * Whether to enable/disable Nagle's algorithm.
+   *
+   * @param noDelay Whether or not to disable the algorithm.
+   * @return 
+   */
+  void setNoDelay(bool noDelay);
 
  private:
+  /**
+   * Constructor to create socket from raw UNIX handle. Never called directly
+   * but used by the TServerSocket class.
+   */
   TSocket(int socket);
-  TSocketOptions *options_;
+
+  /** Host to connect to */
   std::string host_;
+
+  /** Port number to connect on */
   int port_;
+
+  /** Underlying UNIX socket handle */
   int socket_;
 };
 
diff --git a/lib/cpp/transport/TTransport.h b/lib/cpp/transport/TTransport.h
index a1f43d4..fcaece7 100644
--- a/lib/cpp/transport/TTransport.h
+++ b/lib/cpp/transport/TTransport.h
@@ -2,24 +2,96 @@
 #define T_TRANSPORT_H
 
 #include <string>
+#include "transport/TTransportException.h"
 
 /**
- * Generic interface for a method of transporting data.
+ * Generic interface for a method of transporting data. A TTransport may be
+ * capable of either reading or writing, but not necessarily both.
  *
  * @author Mark Slee <mcslee@facebook.com>
  */
 class TTransport {
  public:
-  virtual ~TTransport() {};
+  /**
+   * Virtual deconstructor.
+   */
+  virtual ~TTransport() {}
 
-  virtual bool open() = 0;
-  virtual void close() = 0;
+  /**
+   * Whether this transport is open.
+   */
+  virtual bool isOpen() { return false; }
 
-  virtual int  read (std::string& s, uint32_t size) = 0;
-  virtual void write(const std::string& s) = 0;
+  /**
+   * Opens the transport for communications.
+   *
+   * @return bool Whether the transport was successfully opened
+   * @throws TTransportException if opening failed
+   */
+  virtual void open() {
+    throw TTransportException(TTX_NOT_OPEN, "Cannot open base TTransport.");
+  }
+
+  /**
+   * Closes the transport.
+   */
+  virtual void close() {
+    throw TTransportException(TTX_NOT_OPEN, "Cannot close base TTransport.");
+  }
+
+  /**
+   * Attempt to read up to the specified number of bytes into the string.
+   *
+   * @param s     Reference to the location to append the read data
+   * @param len  How many bytes to read
+   * @return How many bytes were actually read
+   * @throws TTransportException If an error occurs
+   */
+  virtual uint32_t read(uint8_t* buf, uint32_t len) {
+    throw TTransportException(TTX_NOT_OPEN, "Base TTransport cannot read.");
+  }
+
+  /**
+   * Reads the given amount of data in its entirety no matter what.
+   *
+   * @param s     Reference to location for read data
+   * @param len   How many bytes to read
+   * @return How many bytes read, which must be equal to size
+   * @throws TTransportException If insufficient data was read
+   */
+  virtual uint32_t readAll(uint8_t* buf, uint32_t len) {
+    uint32_t have = 0;
+    
+    while (have < len) {
+      have += read(buf+have, len-have);
+    }
+
+    return have;
+  }
+
+  /**
+   * Writes the string in its entirety to the buffer.
+   *
+   * @param s The string to write out
+   * @throws TTransportException if an error occurs
+   */
+  virtual void write(const uint8_t* buf, uint32_t len) {
+    throw TTransportException(TTX_NOT_OPEN, "Base TTransport cannot write.");
+  }
+
+  /**
+   * Flushes any pending data to be written. Typically used with buffered
+   * transport mechanisms.
+   *
+   * @throws TTransportException if an error occurs
+   */
+  virtual void flush() {}
 
  protected:
-  TTransport() {};
+  /**
+   * Simple constructor.
+   */
+  TTransport() {}
 };
 
 #endif
diff --git a/lib/cpp/transport/TTransportException.h b/lib/cpp/transport/TTransportException.h
new file mode 100644
index 0000000..044e16d
--- /dev/null
+++ b/lib/cpp/transport/TTransportException.h
@@ -0,0 +1,63 @@
+#ifndef T_TRANSPORT_EXCEPTION_H
+#define T_TRANSPORT_EXCEPTION_H
+
+#include <string>
+
+/**
+ * Error codes for the various types of exceptions.
+ */
+enum TTransportExceptionType {
+  TTX_UNKNOWN = 0,
+  TTX_NOT_OPEN = 1,
+  TTX_TIMED_OUT = 2,
+};
+
+/**
+ * Class to encapsulate all the possible types of transport errors that may
+ * occur in various transport systems. This provides a sort of generic
+ * wrapper around the shitty UNIX E_ error codes that lets a common code
+ * base of error handling to be used for various types of transports, i.e.
+ * pipes etc.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TTransportException {
+ public:
+  TTransportException() :
+    type_(TTX_UNKNOWN), message_() {}
+
+  TTransportException(TTransportExceptionType type) :
+    type_(type), message_() {}
+
+  TTransportException(std::string message) :
+    type_(TTX_UNKNOWN), message_(message) {}
+
+  TTransportException(TTransportExceptionType type, std::string message) :
+    type_(type), message_(message) {}
+
+  ~TTransportException() {}
+
+  /**
+   * Returns an error code that provides information about the type of error
+   * that has occurred.
+   *
+   * @return Error code
+   */
+  TTransportExceptionType getType() { return type_; }
+ 
+  /**
+   * Returns an informative message about what caused this error.
+   *
+   * @return Error string
+   */
+  const std::string& getMessage() { return message_; }
+
+ protected:
+  /** Error code */
+  TTransportExceptionType type_;
+
+  /** Description */
+  std::string message_;
+};
+
+#endif