Update Thrift CPP libraries to work with new generated source, change underlying buffers to use uint8_t* instead of std::string
Summary: Major overhaul to the CPP libraries.
Reviewed By: aditya
Test Plan: Again, keep an eye out for the unit tests commit
Notes: Initial perf tests show that Thrift is not only more robust than Pillar, but its implementation is actually around 10-20% faster. We can do about 10 RPC function calls with small data payloads in under 2ms. THAT IS FAST. THAT IS THRIFTY.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664714 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile b/lib/cpp/Makefile
index 2045dba..f77c493 100644
--- a/lib/cpp/Makefile
+++ b/lib/cpp/Makefile
@@ -1,4 +1,9 @@
-# Makefile for Thrift C++ library.
+# Makefile for Thrift C++ library. Generates a shared object that can be
+# installed to /usr/local/lib
+#
+# TODO(mcslee): Add the ability to compile separate statis modules that can
+# be compiled directly into Thrift applications instead of dynamic runtime
+# loading of the full libs
#
# Author:
# Mark Slee <mcslee@facebook.com>
@@ -10,16 +15,17 @@
LDFL = -shared -Wall -I. -fPIC -Wl,-soname=libthrift.so
# Source files
-SRCS = client/TSimpleClient.cc \
- protocol/TBinaryProtocol.cc \
- server/TSimpleServer.cc \
+SRCS = protocol/TBinaryProtocol.cc \
+ transport/TBufferedTransport.cc \
transport/TSocket.cc \
- transport/TServerSocket.cc
+ transport/TServerSocket.cc \
+ server/TSimpleServer.cc
# Linked library
libthrift:
$(LD) -o libthrift.so $(LDFL) $(SRCS)
+# Clean it up
clean:
rm -f libthrift.so
diff --git a/lib/cpp/TDispatcher.h b/lib/cpp/TDispatcher.h
deleted file mode 100644
index f8ff847..0000000
--- a/lib/cpp/TDispatcher.h
+++ /dev/null
@@ -1,22 +0,0 @@
-#ifndef T_DISPATCHER_H
-#define T_DISPATCHER_H
-
-#include <string>
-
-/**
- * A dispatcher is a generic object that accepts an input buffer and returns
- * a buffer. It can be used in a variety of ways, i.e. as a client that
- * sends data over the network and returns a response, or as a server that
- * reads an input and returns an output.
- *
- * @author Mark Slee <mcslee@facebook.com>
- */
-class TDispatcher {
- public:
- virtual ~TDispatcher() {};
- virtual std::string dispatch(const std::string& s) = 0;
- protected:
- TDispatcher() {}
-};
-
-#endif
diff --git a/lib/cpp/TProcessor.h b/lib/cpp/TProcessor.h
new file mode 100644
index 0000000..f01379d
--- /dev/null
+++ b/lib/cpp/TProcessor.h
@@ -0,0 +1,24 @@
+#ifndef T_PROCESSOR_H
+#define T_PROCESSOR_H
+
+#include <string>
+#include "transport/TTransport.h"
+
+/**
+ * A processor is a generic object that acts upon two streams of data, one
+ * an input and the other an output. The definition of this object is loose,
+ * though the typical case is for some sort of server that either generates
+ * responses to an input stream or forwards data from one pipe onto another.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TProcessor {
+ public:
+ virtual ~TProcessor() {}
+ virtual bool process(TTransport* in, TTransport *out) = 0;
+ virtual bool process(TTransport* io) { return process(io, io); }
+ protected:
+ TProcessor() {}
+};
+
+#endif
diff --git a/lib/cpp/Thrift.h b/lib/cpp/Thrift.h
index 04fbaa1..9986e3b 100644
--- a/lib/cpp/Thrift.h
+++ b/lib/cpp/Thrift.h
@@ -1,7 +1,8 @@
#ifndef THRIFT_H
#define THRIFT_H
-#include <sys/types.h>
+#include <netinet/in.h>
+#include <inttypes.h>
#include <string>
#include <map>
#include <list>
diff --git a/lib/cpp/client/TClient.h b/lib/cpp/client/TClient.h
deleted file mode 100644
index 73dd093..0000000
--- a/lib/cpp/client/TClient.h
+++ /dev/null
@@ -1,16 +0,0 @@
-#ifndef T_CLIENT_H
-#define T_CLIENT_H
-
-#include "TDispatcher.h"
-
-class TClient : public TDispatcher {
- public:
- virtual ~TClient() {}
- virtual bool open() = 0;
- virtual void close() = 0;
- protected:
- TClient() {}
-};
-
-#endif
-
diff --git a/lib/cpp/client/TSimpleClient.cc b/lib/cpp/client/TSimpleClient.cc
deleted file mode 100644
index 9069c91..0000000
--- a/lib/cpp/client/TSimpleClient.cc
+++ /dev/null
@@ -1,44 +0,0 @@
-#include "TSimpleClient.h"
-using std::string;
-
-TSimpleClient::TSimpleClient(TTransport* transport) :
- transport_(transport) {}
-
-bool TSimpleClient::open() {
- return transport_->open();
-}
-
-void TSimpleClient::close() {
- transport_->close();
-}
-
-std::string TSimpleClient::dispatch(const string& s) {
- // Write size header
- int32_t size = s.size();
- // fprintf(stderr, "Writing size header %d to server\n", size);
- transport_->write(string((char*)&size, 4));
-
- // Write data payload
- // fprintf(stderr, "Writing %d byte payload to server\n", (int)s.size());
- transport_->write(s);
-
- // Read response size
- // fprintf(stderr, "Reading 4-byte response size header\n");
- string response;
- transport_->read(response, 4);
- size = *(int32_t*)response.data();
-
- // Read response data
- if (size < 0) {
- // TODO(mcslee): Handle exception
- // fprintf(stderr, "Exception case! Response size < 0\n");
- return "";
- } else {
- // fprintf(stderr, "Reading %d byte response payload\n", size);
- transport_->read(response, size);
- // TODO(mcslee): Check that we actually read enough data
- // fprintf(stderr, "Done reading payload, returning.\n");
- return response;
- }
-}
-
diff --git a/lib/cpp/client/TSimpleClient.h b/lib/cpp/client/TSimpleClient.h
deleted file mode 100644
index 249afe5..0000000
--- a/lib/cpp/client/TSimpleClient.h
+++ /dev/null
@@ -1,21 +0,0 @@
-#ifndef T_SIMPLE_CLIENT_H
-#define T_SIMPLE_CLIENT_H
-
-#include "client/TClient.h"
-#include "transport/TTransport.h"
-
-class TSimpleClient : public TClient {
- public:
- TSimpleClient(TTransport* transport);
- ~TSimpleClient() {}
-
- bool open();
- void close();
- std::string dispatch(const std::string& in);
-
- protected:
- TTransport* transport_;
-};
-
-#endif
-
diff --git a/lib/cpp/protocol/TBinaryProtocol.cc b/lib/cpp/protocol/TBinaryProtocol.cc
index 4c10bab..1f31c8d 100644
--- a/lib/cpp/protocol/TBinaryProtocol.cc
+++ b/lib/cpp/protocol/TBinaryProtocol.cc
@@ -1,140 +1,246 @@
#include "protocol/TBinaryProtocol.h"
-using namespace std;
+using std::string;
-string TBinaryProtocol::readFunction(TBuf& buf) const {
- // Let readString increment the buffer position
- return readString(buf);
+uint32_t TBinaryProtocol::writeStructBegin(TTransport* out,
+ const string& name) const {
+ return 0;
}
-string TBinaryProtocol::writeFunction(const string& name,
- const string& args) const{
- return writeString(name) + args;
+uint32_t TBinaryProtocol::writeStructEnd(TTransport* out) const {
+ return 0;
}
-map<uint32_t, TBuf> TBinaryProtocol::readStruct(TBuf& buf) const {
- map<uint32_t, TBuf> fieldMap;
+uint32_t TBinaryProtocol::writeFieldBegin(TTransport* out,
+ const string& name,
+ const TType fieldType,
+ const uint16_t fieldId) const {
+ return
+ writeByte(out, (uint8_t)fieldType) +
+ writeU32(out, (uint32_t)fieldId);
+}
+
+uint32_t TBinaryProtocol::writeFieldEnd(TTransport* out) const {
+ return 0;
+}
+
+uint32_t TBinaryProtocol::writeFieldStop(TTransport* out) const {
+ return
+ writeByte(out, (uint8_t)T_STOP);
+}
+
+uint32_t TBinaryProtocol::writeMapBegin(TTransport* out,
+ const TType keyType,
+ const TType valType,
+ const uint32_t size) const {
+ return
+ writeByte(out, (uint8_t)keyType) +
+ writeByte(out, (uint8_t)valType) +
+ writeU32(out, size);
+}
+
+uint32_t TBinaryProtocol::writeMapEnd(TTransport* out) const {
+ return 0;
+}
+
+uint32_t TBinaryProtocol::writeListBegin(TTransport* out,
+ const TType elemType,
+ const uint32_t size) const {
+ return
+ writeByte(out, (uint8_t) elemType) +
+ writeU32(out, size);
+}
+
+uint32_t TBinaryProtocol::writeListEnd(TTransport* out) const {
+ return 0;
+}
+
+uint32_t TBinaryProtocol::writeSetBegin(TTransport* out,
+ const TType elemType,
+ const uint32_t size) const {
+ return
+ writeByte(out, (uint8_t)elemType) +
+ writeU32(out, size);
+}
+
+uint32_t TBinaryProtocol::writeSetEnd(TTransport* out) const {
+ return 0;
+}
+
+uint32_t TBinaryProtocol::writeByte(TTransport* out,
+ const uint8_t byte) const {
+ out->write(&byte, 1);
+ return 1;
+}
+
+uint32_t TBinaryProtocol::writeU32(TTransport* out,
+ const uint32_t u32) const {
+ uint32_t net = (uint32_t)htonl(u32);
+ out->write((uint8_t*)&net, 4);
+ return 4;
+}
+
+uint32_t TBinaryProtocol::writeI32(TTransport* out,
+ const int32_t i32) const {
+ int32_t net = (int32_t)htonl(i32);
+ out->write((uint8_t*)&net, 4);
+ return 4;
+}
+
+uint32_t TBinaryProtocol::writeU64(TTransport* out,
+ const uint64_t u64) const {
+ uint64_t net = (uint64_t)htonll(u64);
+ out->write((uint8_t*)&net, 8);
+ return 8;
+}
+
+uint32_t TBinaryProtocol::writeI64(TTransport* out,
+ const int64_t i64) const {
+ int64_t net = (int64_t)htonll(i64);
+ out->write((uint8_t*)&net, 8);
+ return 8;
+}
+
+uint32_t TBinaryProtocol::writeString(TTransport* out,
+ const string& str) const {
+ uint32_t result = writeU32(out, str.size());
+ out->write((uint8_t*)str.data(), str.size());
+ return result + str.size();
+}
+
+/**
+ * Reading functions
+ */
+
+uint32_t TBinaryProtocol::readStructBegin(TTransport* in,
+ string& name) const {
+ name = "";
+ return 0;
+}
+
+uint32_t TBinaryProtocol::readStructEnd(TTransport* in) const {
+ return 0;
+}
+
+uint32_t TBinaryProtocol::readFieldBegin(TTransport* in,
+ string& name,
+ TType& fieldType,
+ uint16_t& fieldId) const {
+ uint32_t result = 0;
+ uint8_t type;
+ result += readByte(in, type);
+ fieldType = (TType)type;
+ if (fieldType == T_STOP) {
+ fieldId = 0;
+ return result;
+ }
+ uint32_t id;
+ result += readU32(in, id);
+ fieldId = (uint16_t)id;
+ return result;
+}
- if (buf.len < 4) {
- return fieldMap;
- }
- uint32_t total_size = readU32(buf);
- if (buf.len < total_size) {
- // Data looks corrupt, we don't have that much, we will try to read what
- // we can but be sure not to go over
- total_size = buf.len;
- }
-
- // Field headers are 8 bytes, 4 byte fid + 4 byte length
- while (total_size > 0 && buf.len > 8) {
- uint32_t fid = readU32(buf);
- uint32_t flen = readU32(buf);
- if (flen > buf.len) {
- // flen corrupt, there isn't that much data left
- break;
- }
- fieldMap.insert(make_pair(fid, TBuf(buf.data, flen)));
- buf.data += flen;
- buf.len -= flen;
- total_size -= 8 + flen;
- }
-
- return fieldMap;
+uint32_t TBinaryProtocol::readFieldEnd(TTransport* in) const {
+ return 0;
}
-
-string TBinaryProtocol::writeStruct(const map<uint32_t,string>& s) const {
- string result = "";
- map<uint32_t,string>::const_iterator s_iter;
- for (s_iter = s.begin(); s_iter != s.end(); ++s_iter) {
- result += writeU32(s_iter->first);
- result += writeU32(s_iter->second.size());
- result += s_iter->second;
- }
- return writeU32(result.size()) + result;
-}
-
-string TBinaryProtocol::readString(TBuf& buf) const {
- uint32_t len = readU32(buf);
- if (len == 0) {
- return "";
- }
- string result((const char*)(buf.data), len);
- buf.data += len;
- buf.len -= len;
+
+uint32_t TBinaryProtocol::readMapBegin(TTransport* in,
+ TType& keyType,
+ TType& valType,
+ uint32_t& size) const {
+ uint8_t k, v;
+ uint32_t result = 0;
+ result += readByte(in, k);
+ keyType = (TType)k;
+ result += readByte(in, v);
+ valType = (TType)v;
+ result += readU32(in, size);
return result;
}
-uint8_t TBinaryProtocol::readByte(TBuf& buf) const {
- if (buf.len == 0) {
- return 0;
- }
- uint8_t result = (uint8_t)buf.data[0];
- buf.data += 1;
- buf.len -= 1;
+uint32_t TBinaryProtocol::readMapEnd(TTransport* in) const {
+ return 0;
+}
+
+uint32_t TBinaryProtocol::readListBegin(TTransport* in,
+ TType& elemType,
+ uint32_t& size) const {
+ uint8_t e;
+ uint32_t result = 0;
+ result += readByte(in, e);
+ elemType = (TType)e;
+ result += readU32(in, size);
return result;
}
-uint32_t TBinaryProtocol::readU32(TBuf& buf) const {
- if (buf.len < 4) {
- return 0;
- }
- uint32_t result = *(uint32_t*)buf.data;
- buf.data += 4;
- buf.len -= 4;
+uint32_t TBinaryProtocol::readListEnd(TTransport* in) const {
+ return 0;
+}
+
+uint32_t TBinaryProtocol::readSetBegin(TTransport* in,
+ TType& elemType,
+ uint32_t& size) const {
+ uint8_t e;
+ uint32_t result = 0;
+ result += readByte(in, e);
+ elemType = (TType)e;
+ result += readU32(in, size);
return result;
}
-int32_t TBinaryProtocol::readI32(TBuf& buf) const {
- if (buf.len < 4) {
- return 0;
- }
- int32_t result = *(int32_t*)buf.data;
- buf.data += 4;
- buf.len -= 4;
- return result;
+uint32_t TBinaryProtocol::readSetEnd(TTransport* in) const {
+ return 0;
}
-uint64_t TBinaryProtocol::readU64(TBuf& buf) const {
- if (buf.len < 8) {
- return 0;
- }
- uint64_t result = *(uint64_t*)buf.data;
- buf.data += 8;
- buf.len -= 8;
- return result;
+uint32_t TBinaryProtocol::readByte(TTransport* in,
+ uint8_t& byte) const {
+ uint8_t b[1];
+ in->readAll(b, 1);
+ byte = *(uint8_t*)b;
+ return 1;
}
-int64_t TBinaryProtocol::readI64(TBuf& buf) const {
- if (buf.len < 8) {
- return 0;
- }
- int64_t result = *(int64_t*)buf.data;
- buf.data += 8;
- buf.len -= 8;
- return result;
+uint32_t TBinaryProtocol::readU32(TTransport* in,
+ uint32_t& u32) const {
+ uint8_t b[4];
+ in->readAll(b, 4);
+ u32 = *(uint32_t*)b;
+ u32 = (uint32_t)ntohl(u32);
+ return 4;
}
-string TBinaryProtocol::writeString(const string& str) const {
- uint32_t size = str.size();
- string result = string((const char*)&size, 4);
- return result + str;
+uint32_t TBinaryProtocol::readI32(TTransport* in,
+ int32_t& i32) const {
+ uint8_t b[4];
+ in->readAll(b, 4);
+ i32 = *(int32_t*)b;
+ i32 = (int32_t)ntohl(i32);
+ return 4;
}
-string TBinaryProtocol::writeByte(const uint8_t byte) const {
- return string((const char*)&byte, 1);
+uint32_t TBinaryProtocol::readU64(TTransport* in,
+ uint64_t& u64) const {
+ uint8_t b[8];
+ in->readAll(b, 8);
+ u64 = *(uint64_t*)b;
+ u64 = (uint64_t)ntohll(u64);
+ return 8;
}
-string TBinaryProtocol::writeU32(const uint32_t u32) const {
- return string((const char*)&u32, 4);
+uint32_t TBinaryProtocol::readI64(TTransport* in,
+ int64_t& i64) const {
+ uint8_t b[8];
+ in->readAll(b, 8);
+ i64 = *(int64_t*)b;
+ i64 = (int64_t)ntohll(i64);
+ return 8;
}
-string TBinaryProtocol::writeI32(int32_t i32) const {
- return string((const char*)&i32, 4);
-}
-
-string TBinaryProtocol::writeU64(uint64_t u64) const {
- return string((const char*)&u64, 8);
-}
-
-string TBinaryProtocol::writeI64(int64_t i64) const {
- return string((const char*)&i64, 8);
+uint32_t TBinaryProtocol::readString(TTransport* in,
+ string& str) const {
+ uint32_t size, result;
+ result = readU32(in, size);
+ uint8_t b[size];
+ in->readAll(b, size);
+ str = string((char*)b, size);
+ return result+size;
}
diff --git a/lib/cpp/protocol/TBinaryProtocol.h b/lib/cpp/protocol/TBinaryProtocol.h
index 976c383..e05bd9f 100644
--- a/lib/cpp/protocol/TBinaryProtocol.h
+++ b/lib/cpp/protocol/TBinaryProtocol.h
@@ -14,29 +14,114 @@
TBinaryProtocol() {}
~TBinaryProtocol() {}
- std::string
- readFunction(TBuf& buf) const;
- std::string
- writeFunction(const std::string& name, const std::string& args) const;
+ /**
+ * Writing functions.
+ */
- std::map<uint32_t, TBuf>
- readStruct(TBuf& buf) const;
- std::string
- writeStruct(const std::map<uint32_t,std::string>& s) const;
+ uint32_t writeStructBegin (TTransport* out,
+ const std::string& name) const;
- std::string readString (TBuf& buf) const;
- uint8_t readByte (TBuf& buf) const;
- uint32_t readU32 (TBuf& buf) const;
- int32_t readI32 (TBuf& buf) const;
- uint64_t readU64 (TBuf& buf) const;
- int64_t readI64 (TBuf& buf) const;
+ uint32_t writeStructEnd (TTransport* out) const;
- std::string writeString (const std::string& str) const;
- std::string writeByte (const uint8_t byte) const;
- std::string writeU32 (const uint32_t u32) const;
- std::string writeI32 (const int32_t i32) const;
- std::string writeU64 (const uint64_t u64) const;
- std::string writeI64 (const int64_t i64) const;
+ uint32_t writeFieldBegin (TTransport* out,
+ const std::string& name,
+ const TType fieldType,
+ const uint16_t fieldId) const;
+
+ uint32_t writeFieldEnd (TTransport* out) const;
+
+ uint32_t writeFieldStop (TTransport* out) const;
+
+ uint32_t writeMapBegin (TTransport* out,
+ const TType keyType,
+ const TType valType,
+ const uint32_t size) const;
+
+ uint32_t writeMapEnd (TTransport* out) const;
+
+ uint32_t writeListBegin (TTransport* out,
+ const TType elemType,
+ const uint32_t size) const;
+
+ uint32_t writeListEnd (TTransport* out) const;
+
+ uint32_t writeSetBegin (TTransport* out,
+ const TType elemType,
+ const uint32_t size) const;
+
+ uint32_t writeSetEnd (TTransport* out) const;
+
+ uint32_t writeByte (TTransport* out,
+ const uint8_t byte) const;
+
+ uint32_t writeU32 (TTransport* out,
+ const uint32_t u32) const;
+
+ uint32_t writeI32 (TTransport* out,
+ const int32_t i32) const;
+
+ uint32_t writeU64 (TTransport* out,
+ const uint64_t u64) const;
+
+ uint32_t writeI64 (TTransport* out,
+ const int64_t i64) const;
+
+ uint32_t writeString (TTransport* out,
+ const std::string& str) const;
+
+ /**
+ * Reading functions
+ */
+
+ uint32_t readStructBegin (TTransport* in,
+ std::string& name) const;
+
+ uint32_t readStructEnd (TTransport* in) const;
+
+ uint32_t readFieldBegin (TTransport* in,
+ std::string& name,
+ TType& fieldType,
+ uint16_t& fieldId) const;
+
+ uint32_t readFieldEnd (TTransport* in) const;
+
+ uint32_t readMapBegin (TTransport* in,
+ TType& keyType,
+ TType& valType,
+ uint32_t& size) const;
+
+ uint32_t readMapEnd (TTransport* in) const;
+
+ uint32_t readListBegin (TTransport* in,
+ TType& elemType,
+ uint32_t& size) const;
+
+ uint32_t readListEnd (TTransport* in) const;
+
+ uint32_t readSetBegin (TTransport* in,
+ TType& elemType,
+ uint32_t& size) const;
+
+ uint32_t readSetEnd (TTransport* in) const;
+
+ uint32_t readByte (TTransport* in,
+ uint8_t& byte) const;
+
+ uint32_t readU32 (TTransport* in,
+ uint32_t& u32) const;
+
+ uint32_t readI32 (TTransport* in,
+ int32_t& i32) const;
+
+ uint32_t readU64 (TTransport* in,
+ uint64_t& u64) const;
+
+ uint32_t readI64 (TTransport* in,
+ int64_t& i64) const;
+
+ uint32_t readString (TTransport* in,
+ std::string& str) const;
+
};
#endif
diff --git a/lib/cpp/protocol/TProtocol.h b/lib/cpp/protocol/TProtocol.h
index 1f2e0c8..fe2d6ef 100644
--- a/lib/cpp/protocol/TProtocol.h
+++ b/lib/cpp/protocol/TProtocol.h
@@ -1,14 +1,42 @@
#ifndef T_PROTOCOL_H
#define T_PROTOCOL_H
+#include <netinet/in.h>
#include <sys/types.h>
#include <string>
#include <map>
+#include "transport/TTransport.h"
+
+#define ntohll(x) (((uint64_t)(ntohl((int)((x << 32) >> 32))) << 32) | (uint32_t)ntohl(((int)(x >> 32))))
+
+#define htonll(x) ntohll(x)
+
/** Forward declaration for TProtocol */
struct TBuf;
/**
+ * Enumerated definition of the types that the Thrift protocol supports.
+ * Take special note of the T_END type which is used specifically to mark
+ * the end of a sequence of fields.
+ */
+enum TType {
+ T_STOP = 1,
+ T_BYTE = 2,
+ T_U16 = 3,
+ T_I16 = 4,
+ T_U32 = 5,
+ T_I32 = 6,
+ T_U64 = 7,
+ T_I64 = 8,
+ T_STRING = 9,
+ T_STRUCT = 10,
+ T_MAP = 11,
+ T_SET = 12,
+ T_LIST = 13
+};
+
+/**
* Abstract class for a thrift protocol driver. These are all the methods that
* a protocol must implement. Essentially, there must be some way of reading
* and writing all the base types, plus a mechanism for writing out structs
@@ -25,64 +53,211 @@
virtual ~TProtocol() {}
/**
- * Function call serialization.
+ * Writing functions.
*/
- virtual std::string
- readFunction(TBuf& buf) const = 0;
- virtual std::string
- writeFunction(const std::string& name, const std::string& args) const = 0;
+ virtual uint32_t writeStructBegin (TTransport* out,
+ const std::string& name) const = 0;
+
+ virtual uint32_t writeStructEnd (TTransport* out) const = 0;
+
+ virtual uint32_t writeFieldBegin (TTransport* out,
+ const std::string& name,
+ const TType fieldType,
+ const uint16_t fieldId) const = 0;
+
+ virtual uint32_t writeFieldEnd (TTransport* out) const = 0;
+
+ virtual uint32_t writeFieldStop (TTransport* out) const = 0;
+
+ virtual uint32_t writeMapBegin (TTransport* out,
+ const TType keyType,
+ const TType valType,
+ const uint32_t size) const = 0;
+
+ virtual uint32_t writeMapEnd (TTransport* out) const = 0;
+
+ virtual uint32_t writeListBegin (TTransport* out,
+ const TType elemType,
+ const uint32_t size) const = 0;
+
+ virtual uint32_t writeListEnd (TTransport* out) const = 0;
+
+ virtual uint32_t writeSetBegin (TTransport* out,
+ const TType elemType,
+ const uint32_t size) const = 0;
+
+ virtual uint32_t writeSetEnd (TTransport* out) const = 0;
+
+ virtual uint32_t writeByte (TTransport* out,
+ const uint8_t byte) const = 0;
+
+ virtual uint32_t writeU32 (TTransport* out,
+ const uint32_t u32) const = 0;
+
+ virtual uint32_t writeI32 (TTransport* out,
+ const int32_t i32) const = 0;
+
+ virtual uint32_t writeU64 (TTransport* out,
+ const uint64_t u64) const = 0;
+
+ virtual uint32_t writeI64 (TTransport* out,
+ const int64_t i64) const = 0;
+
+ virtual uint32_t writeString (TTransport* out,
+ const std::string& str) const = 0;
/**
- * Struct serialization.
+ * Reading functions
*/
- virtual std::map<uint32_t, TBuf>
- readStruct(TBuf& buf) const = 0;
- virtual std::string
- writeStruct(const std::map<uint32_t,std::string>& s) const = 0;
+ virtual uint32_t readStructBegin (TTransport* in,
+ std::string& name) const = 0;
+
+ virtual uint32_t readStructEnd (TTransport* in) const = 0;
+
+ virtual uint32_t readFieldBegin (TTransport* in,
+ std::string& name,
+ TType& fieldType,
+ uint16_t& fieldId) const = 0;
+
+ virtual uint32_t readFieldEnd (TTransport* in) const = 0;
+
+ virtual uint32_t readMapBegin (TTransport* in,
+ TType& keyType,
+ TType& valType,
+ uint32_t& size) const = 0;
+
+ virtual uint32_t readMapEnd (TTransport* in) const = 0;
+
+ virtual uint32_t readListBegin (TTransport* in,
+ TType& elemType,
+ uint32_t& size) const = 0;
+
+ virtual uint32_t readListEnd (TTransport* in) const = 0;
+
+ virtual uint32_t readSetBegin (TTransport* in,
+ TType& elemType,
+ uint32_t& size) const = 0;
+
+ virtual uint32_t readSetEnd (TTransport* in) const = 0;
+
+ virtual uint32_t readByte (TTransport* in,
+ uint8_t& byte) const = 0;
+
+ virtual uint32_t readU32 (TTransport* in,
+ uint32_t& u32) const = 0;
+
+ virtual uint32_t readI32 (TTransport* in,
+ int32_t& i32) const = 0;
+
+ virtual uint32_t readU64 (TTransport* in,
+ uint64_t& u64) const = 0;
+
+ virtual uint32_t readI64 (TTransport* in,
+ int64_t& i64) const = 0;
+
+ virtual uint32_t readString (TTransport* in,
+ std::string& str) const = 0;
/**
- * Basic data type deserialization. Note that these read methods do not
- * take a const reference to the TBuf object. They SHOULD change the TBuf
- * object so that it reflects the buffer AFTER the basic data type has
- * been consumed such that data may continue being read serially from the
- * buffer.
+ * Method to arbitrarily skip over data.
*/
-
- virtual std::string readString (TBuf& buf) const = 0;
- virtual uint8_t readByte (TBuf& buf) const = 0;
- virtual uint32_t readU32 (TBuf& buf) const = 0;
- virtual int32_t readI32 (TBuf& buf) const = 0;
- virtual uint64_t readU64 (TBuf& buf) const = 0;
- virtual int64_t readI64 (TBuf& buf) const = 0;
-
- virtual std::string writeString (const std::string& str) const = 0;
- virtual std::string writeByte (const uint8_t byte) const = 0;
- virtual std::string writeU32 (const uint32_t u32) const = 0;
- virtual std::string writeI32 (const int32_t i32) const = 0;
- virtual std::string writeU64 (const uint64_t u64) const = 0;
- virtual std::string writeI64 (const int64_t i64) const = 0;
+ uint32_t skip(TTransport* in, TType type) const {
+ switch (type) {
+ case T_BYTE:
+ {
+ uint8_t byte;
+ return readByte(in, byte);
+ }
+ case T_U32:
+ {
+ uint32_t u32;
+ return readU32(in, u32);
+ }
+ case T_I32:
+ {
+ int32_t i32;
+ return readI32(in, i32);
+ }
+ case T_U64:
+ {
+ uint64_t u64;
+ return readU64(in, u64);
+ }
+ case T_I64:
+ {
+ int64_t i64;
+ return readI64(in, i64);
+ }
+ case T_STRING:
+ {
+ std::string str;
+ return readString(in, str);
+ }
+ case T_STRUCT:
+ {
+ uint32_t result = 0;
+ std::string name;
+ uint16_t fid;
+ TType ftype;
+ result += readStructBegin(in, name);
+ while (true) {
+ result += readFieldBegin(in, name, ftype, fid);
+ if (ftype == T_STOP) {
+ break;
+ }
+ result += skip(in, ftype);
+ result += readFieldEnd(in);
+ }
+ result += readStructEnd(in);
+ return result;
+ }
+ case T_MAP:
+ {
+ uint32_t result = 0;
+ TType keyType;
+ TType valType;
+ uint32_t i, size;
+ result += readMapBegin(in, keyType, valType, size);
+ for (i = 0; i < size; i++) {
+ result += skip(in, keyType);
+ result += skip(in, valType);
+ }
+ result += readMapEnd(in);
+ return result;
+ }
+ case T_SET:
+ {
+ uint32_t result = 0;
+ TType elemType;
+ uint32_t i, size;
+ result += readSetBegin(in, elemType, size);
+ for (i = 0; i < size; i++) {
+ result += skip(in, elemType);
+ }
+ result += readSetEnd(in);
+ return result;
+ }
+ case T_LIST:
+ {
+ uint32_t result = 0;
+ TType elemType;
+ uint32_t i, size;
+ result += readListBegin(in, elemType, size);
+ for (i = 0; i < size; i++) {
+ result += skip(in, elemType);
+ }
+ result += readListEnd(in);
+ return result;
+ }
+ default:
+ return 0;
+ }
+ }
protected:
TProtocol() {}
};
-/**
- * Wrapper around raw data that allows us to track the length of a data
- * buffer. It is the responsibility of a robust TProtocol implementation
- * to ensure that any reads that are done from data do NOT overrun the
- * memory address at data+len. It is also a convention that TBuf objects
- * do NOT own the memory pointed to by data. They are merely wrappers
- * around buffers that have been allocated elsewhere. Therefore, the user
- * should never allocate memory before putting it into a TBuf nor should
- * they free the data pointed to by a TBuf.
- */
-struct TBuf {
- TBuf(const TBuf& that) : data(that.data), len(that.len) {}
- TBuf(const uint8_t* d, uint32_t l) : data(d), len(l) {}
- const uint8_t* data;
- uint32_t len;
-};
-
#endif
diff --git a/lib/cpp/server/TServer.h b/lib/cpp/server/TServer.h
index 9c4cc59..0d275ba 100644
--- a/lib/cpp/server/TServer.h
+++ b/lib/cpp/server/TServer.h
@@ -1,7 +1,7 @@
#ifndef T_SERVER_H
#define T_SERVER_H
-#include "TDispatcher.h"
+#include "TProcessor.h"
class TServerOptions;
@@ -16,10 +16,10 @@
virtual void run() = 0;
protected:
- TServer(TDispatcher* dispatcher, TServerOptions* options) :
- dispatcher_(dispatcher), options_(options) {}
+ TServer(TProcessor* processor, TServerOptions* options) :
+ processor_(processor), options_(options) {}
- TDispatcher* dispatcher_;
+ TProcessor* processor_;
TServerOptions* options_;
};
diff --git a/lib/cpp/server/TSimpleServer.cc b/lib/cpp/server/TSimpleServer.cc
index 16f5006..03069ae 100644
--- a/lib/cpp/server/TSimpleServer.cc
+++ b/lib/cpp/server/TSimpleServer.cc
@@ -1,60 +1,54 @@
#include "server/TSimpleServer.h"
+#include "transport/TBufferedTransport.h"
+#include "transport/TTransportException.h"
#include <string>
+#include <iostream>
using namespace std;
+/**
+ * A simple single-threaded application server. Perfect for unit tests!
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
void TSimpleServer::run() {
- TTransport* client;
+ TTransport* client = NULL;
- // Start the server listening
- if (serverTransport_->listen() == false) {
- // TODO(mcslee): Log error here
- fprintf(stderr, "TSimpleServer::run(): Call to listen failed\n");
+ try {
+ // Start the server listening
+ serverTransport_->listen();
+ } catch (TTransportException& ttx) {
+ cerr << "TSimpleServer::run() listen(): " << ttx.getMessage() << endl;
return;
}
// Fetch client from server
while (true) {
- // fprintf(stderr, "Listening for connection\n");
- if ((client = serverTransport_->accept()) == NULL) {
- // fprintf(stderr, "Got NULL connection, exiting.\n");
- break;
- }
-
- while (true) {
- // Read header from client
- // fprintf(stderr, "Reading 4 byte header from client.\n");
- string in;
- if (client->read(in, 4) <= 0) {
- // fprintf(stderr, "Size header negative. Exception!\n");
- break;
+ try {
+ client = serverTransport_->accept();
+ if (client != NULL) {
+ // Process for as long as we can keep the processor happy!
+ TBufferedTransport bufferedClient(client);
+ while (processor_->process(&bufferedClient)) {}
}
-
- // Read payload from client
- int32_t size = *(int32_t*)(in.data());
- // fprintf(stderr, "Reading %d byte payload from client.\n", size);
- if (client->read(in, size) < size) {
- // fprintf(stderr, "Didn't get enough data!!!\n");
- break;
+ } catch (TTransportException& ttx) {
+ if (client != NULL) {
+ cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
}
-
- // Pass payload to dispatcher
- // TODO(mcslee): Wrap this in try/catch and return exceptions
- string out = dispatcher_->dispatch(in);
-
- size = out.size();
-
- // Write size of response packet
- client->write(string((char*)&size, 4));
-
- // Write response payload
- client->write(out);
}
- // Clean up that client
- // fprintf(stderr, "Closing and cleaning up client\n");
- client->close();
- delete client;
+ // Clean up the client
+ if (client != NULL) {
+
+ // Ensure no resource leaks
+ client->close();
+
+ // Ensure no memory leaks
+ delete client;
+
+ // Ensure we don't try to double-free on the next pass
+ client = NULL;
+ }
}
- // TODO(mcslee): Is this a timeout case or the real thing?
+ // TODO(mcslee): Could this be a timeout case? Or always the real thing?
}
diff --git a/lib/cpp/server/TSimpleServer.h b/lib/cpp/server/TSimpleServer.h
index 47ab69e..9e0f79f 100644
--- a/lib/cpp/server/TSimpleServer.h
+++ b/lib/cpp/server/TSimpleServer.h
@@ -14,10 +14,10 @@
*/
class TSimpleServer : public TServer {
public:
- TSimpleServer(TDispatcher* dispatcher,
+ TSimpleServer(TProcessor* processor,
TServerOptions* options,
TServerTransport* serverTransport) :
- TServer(dispatcher, options), serverTransport_(serverTransport) {}
+ TServer(processor, options), serverTransport_(serverTransport) {}
~TSimpleServer() {}
diff --git a/lib/cpp/transport/TBufferedTransport.cc b/lib/cpp/transport/TBufferedTransport.cc
new file mode 100644
index 0000000..3fccc58
--- /dev/null
+++ b/lib/cpp/transport/TBufferedTransport.cc
@@ -0,0 +1,60 @@
+#include "TBufferedTransport.h"
+using std::string;
+
+uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
+ uint32_t need = len;
+
+ // We don't have enough data yet
+ if (rLen_-rPos_ < need) {
+ // Copy out whatever we have
+ if (rLen_ > 0) {
+ memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+ need -= rLen_-rPos_;
+ buf += rLen_-rPos_;
+ }
+ // Get more from underlying transport up to buffer size
+ rLen_ = transport_->read(rBuf_, rBufSize_);
+ rPos_ = 0;
+ }
+
+ // Hand over whatever we have
+ uint32_t give = need;
+ if (rLen_-rPos_ < give) {
+ give = rLen_-rPos_;
+ }
+ memcpy(buf, rBuf_+rPos_, give);
+ rPos_ += give;
+ need -= give;
+ return (len - need);
+}
+
+void TBufferedTransport::write(const uint8_t* buf, uint32_t len) {
+ if (len == 0) {
+ return;
+ }
+
+ if (len + wLen_ >= wBufSize_) {
+ uint32_t copy = wBufSize_ - wLen_;
+ memcpy(wBuf_ + wLen_, buf, copy);
+ transport_->write(wBuf_, wBufSize_);
+
+ wLen_ = len - copy;
+ if (wLen_ > 0) {
+ memcpy(wBuf_, buf+copy, wLen_);
+ }
+ } else {
+ memcpy(wBuf_+wLen_, buf, len);
+ wLen_ += len;
+ }
+}
+
+void TBufferedTransport::flush() {
+ // Write out any data waiting in the write buffer
+ if (wLen_ > 0) {
+ transport_->write(wBuf_, wLen_);
+ wLen_ = 0;
+ }
+
+ // Flush the underlying transport
+ transport_->flush();
+}
diff --git a/lib/cpp/transport/TBufferedTransport.h b/lib/cpp/transport/TBufferedTransport.h
new file mode 100644
index 0000000..991b50c
--- /dev/null
+++ b/lib/cpp/transport/TBufferedTransport.h
@@ -0,0 +1,75 @@
+#ifndef T_BUFFERED_TRANSPORT_H
+#define T_BUFFERED_TRANSPORT_H
+
+#include "transport/TTransport.h"
+#include <string>
+
+/**
+ * Buffered transport. For reads it will read more data than is requested
+ * and will serve future data out of a local buffer. For writes, data is
+ * stored to an in memory buffer before being written out.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBufferedTransport : public TTransport {
+ public:
+ TBufferedTransport(TTransport* transport) :
+ transport_(transport),
+ rBufSize_(512), rPos_(0), rLen_(0),
+ wBufSize_(512), wLen_(0) {
+ rBuf_ = new uint8_t[rBufSize_];
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ TBufferedTransport(TTransport* transport, uint32_t sz) :
+ transport_(transport),
+ rBufSize_(sz), rPos_(0), rLen_(0),
+ wBufSize_(sz), wLen_(0) {
+ rBuf_ = new uint8_t[rBufSize_];
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ TBufferedTransport(TTransport* transport, uint32_t rsz, uint32_t wsz) :
+ transport_(transport),
+ rBufSize_(rsz), rPos_(0), rLen_(0),
+ wBufSize_(wsz), wLen_(0) {
+ rBuf_ = new uint8_t[rBufSize_];
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ ~TBufferedTransport() {
+ delete [] rBuf_;
+ delete [] wBuf_;
+ }
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ void open() {
+ transport_->open();
+ }
+
+ void close() {
+ transport_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+ protected:
+ TTransport* transport_;
+ uint8_t* rBuf_;
+ uint32_t rBufSize_;
+ uint32_t rPos_;
+ uint32_t rLen_;
+
+ uint8_t* wBuf_;
+ uint32_t wBufSize_;
+ uint32_t wLen_;
+};
+
+#endif
diff --git a/lib/cpp/transport/TNullTransport.h b/lib/cpp/transport/TNullTransport.h
new file mode 100644
index 0000000..9562d9f
--- /dev/null
+++ b/lib/cpp/transport/TNullTransport.h
@@ -0,0 +1,24 @@
+#ifndef T_NULL_TRANSPORT
+#define T_NULL_TRANSPORT
+
+#include "transport/TTransport.h"
+
+/**
+ * The null transport is a dummy transport that doesn't actually do anything.
+ * It's sort of an analogy to /dev/null, you can never read anything from it
+ * and it will let you write anything you want to it, though it won't actually
+ * go anywhere.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TNullTransport : public TTransport {
+ public:
+ TNullTransport() {}
+ ~TNullTransport() {}
+
+ bool isOpen() { return true; }
+ void open() { }
+ void write(const std::string& s) {}
+};
+
+#endif
diff --git a/lib/cpp/transport/TServerSocket.cc b/lib/cpp/transport/TServerSocket.cc
index 178de81..1cf4a32 100644
--- a/lib/cpp/transport/TServerSocket.cc
+++ b/lib/cpp/transport/TServerSocket.cc
@@ -1,5 +1,6 @@
#include <sys/socket.h>
#include <netinet/in.h>
+#include <errno.h>
#include "transport/TSocket.h"
#include "transport/TServerSocket.h"
@@ -11,11 +12,12 @@
close();
}
-bool TServerSocket::listen() {
+void TServerSocket::listen() {
serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket_ == -1) {
+ perror("TServerSocket::listen() socket");
close();
- return false;
+ throw TTransportException(TTX_NOT_OPEN, "Could not create server socket.");
}
// Set reusaddress to prevent 2MSL delay on accept
@@ -24,16 +26,16 @@
&one, sizeof(one))) {
perror("TServerSocket::listen() SO_REUSEADDR");
close();
- return false;
+ throw TTransportException(TTX_NOT_OPEN, "Could not set SO_REUSEADDR");
}
// Turn linger off, don't want to block on calls to close
struct linger ling = {0, 0};
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
&ling, sizeof(ling))) {
- perror("TServerSocket::listen() SO_LINGER");
close();
- return false;
+ perror("TServerSocket::listen() SO_LINGER");
+ throw TTransportException(TTX_NOT_OPEN, "Could not set SO_LINGER");
}
// Bind to a port
@@ -47,24 +49,22 @@
sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
perror(errbuf);
close();
- return false;
+ throw TTransportException(TTX_NOT_OPEN, "Could not bind");
}
// Call listen
if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
perror("TServerSocket::listen() LISTEN");
close();
- return false;
+ throw TTransportException(TTX_NOT_OPEN, "Could not listen");
}
// The socket is now listening!
- return true;
}
-TTransport* TServerSocket::accept() {
+TTransport* TServerSocket::acceptImpl() {
if (serverSocket_ <= 0) {
- // TODO(mcslee): Log error with common logging tool
- return NULL;
+ throw TTransportException(TTX_NOT_OPEN, "TServerSocket not listening");
}
struct sockaddr_in clientAddress;
@@ -75,7 +75,7 @@
if (clientSocket <= 0) {
perror("TServerSocket::accept()");
- return NULL;
+ throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
}
return new TSocket(clientSocket);
diff --git a/lib/cpp/transport/TServerSocket.h b/lib/cpp/transport/TServerSocket.h
index 8ded4e2..ca30a03 100644
--- a/lib/cpp/transport/TServerSocket.h
+++ b/lib/cpp/transport/TServerSocket.h
@@ -16,11 +16,14 @@
TServerSocket(int port);
~TServerSocket();
- bool listen();
- TTransport* accept();
+ void listen();
void close();
+ protected:
+ TTransport* acceptImpl();
+
private:
+
int port_;
int serverSocket_;
int acceptBacklog_;
diff --git a/lib/cpp/transport/TServerTransport.h b/lib/cpp/transport/TServerTransport.h
index 4d063fc..9d71539 100644
--- a/lib/cpp/transport/TServerTransport.h
+++ b/lib/cpp/transport/TServerTransport.h
@@ -1,7 +1,8 @@
#ifndef T_SERVER_TRANSPORT_H
#define T_SERVER_TRANSPORT_H
-#include "TTransport.h"
+#include "transport/TTransport.h"
+#include "transport/TTransportException.h"
/**
* Server transport framework. A server needs to have some facility for
@@ -13,12 +14,48 @@
public:
virtual ~TServerTransport() {}
- virtual bool listen() = 0;
- virtual TTransport* accept() = 0;
+ /**
+ * Starts the server transport listening for new connections. Prior to this
+ * call most transports will not return anything when accept is called.
+ *
+ * @throws TTransportException if we were unable to listen
+ */
+ virtual void listen() {}
+
+ /**
+ * Gets a new dynamically allocated transport object and passes it to the
+ * caller. Note that it is the explicit duty of the caller to free the
+ * allocated object. The returned TTransport object must always be in the
+ * opened state. NULL should never be returned, instead an Exception should
+ * always be thrown.
+ *
+ * @return A new TTransport object
+ * @throws TTransportException if there is an error
+ */
+ TTransport* accept() {
+ TTransport* result = acceptImpl();
+ if (result == NULL) {
+ throw TTransportException("accept() may not return NULL");
+ }
+ return result;
+ }
+
+ /**
+ * Closes this transport such that future calls to accept will do nothing.
+ */
virtual void close() = 0;
protected:
TServerTransport() {}
+
+ /**
+ * Subclasses should implement this function for accept.
+ *
+ * @return A newly allocated TTransport object
+ * @throw TTransportException If an error occurs
+ */
+ virtual TTransport* acceptImpl() = 0;
+
};
#endif
diff --git a/lib/cpp/transport/TSocket.cc b/lib/cpp/transport/TSocket.cc
index 1dfe431..3471755 100644
--- a/lib/cpp/transport/TSocket.cc
+++ b/lib/cpp/transport/TSocket.cc
@@ -7,9 +7,18 @@
#include <errno.h>
#include "transport/TSocket.h"
+#include "transport/TTransportException.h"
using namespace std;
+uint32_t g_socket_syscalls = 0;
+
+/**
+ * TSocket implementation.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+
// Mutex to protect syscalls to netdb
pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -27,15 +36,20 @@
close();
}
-bool TSocket::open() {
+bool TSocket::isOpen() {
+ return (socket_ > 0);
+}
+
+void TSocket::open() {
// Create socket
socket_ = socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == -1) {
- socket_ = 0;
- return false;
+ perror("TSocket::open() socket");
+ close();
+ throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
}
- // Lookup the host
+ // Lookup the hostname
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port_);
@@ -54,7 +68,7 @@
if (host_entry == NULL) {
// perror("dns error: failed call to gethostbyname.\n");
close();
- return false;
+ throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
}
addr.sin_port = htons(port_);
@@ -70,10 +84,10 @@
if (ret < 0) {
perror("TSocket::open() connect");
close();
- return false;
+ throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
}
- return true;
+ // Connection was successful
}
void TSocket::close() {
@@ -84,97 +98,127 @@
socket_ = 0;
}
-int TSocket::read(string& s, uint32_t len) {
- char buff[len];
- s = "";
+uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
+ if (socket_ <= 0) {
+ throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket");
+ }
- uint32_t have = 0;
uint32_t retries = 0;
-
- while (have < len) {
- try_again:
- // Read from the socket
- int got = recv(socket_, buff+have, len-have, 0);
-
- // Check for error on read
- if (got < 0) {
- perror("TSocket::read()");
-
- // If temporarily out of resources, sleep a bit and try again
- if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
- usleep(50);
- goto try_again;
- }
-
- // If interrupted, try again
- if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
- goto try_again;
- }
-
- // If we disconnect with no linger time
- if (errno == ECONNRESET) {
- return 0;
- }
-
- return 0;
+
+ try_again:
+ // Read from the socket
+ int got = recv(socket_, buf, len, 0);
+ ++g_socket_syscalls;
+
+ // Check for error on read
+ if (got < 0) {
+ perror("TSocket::read()");
+
+ // If temporarily out of resources, sleep a bit and try again
+ if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
+ usleep(50);
+ goto try_again;
}
- // Check for empty read
- if (got == 0) {
- return 0;
+ // If interrupted, try again
+ if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
+ goto try_again;
}
- // Update the count
- have += (uint32_t) got;
+ // If we disconnect with no linger time
+ if (errno == ECONNRESET) {
+ throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+ }
+
+ // This ish isn't open
+ if (errno == ENOTCONN) {
+ throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+ }
+
+ // Timed out!
+ if (errno == ETIMEDOUT) {
+ throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT");
+ }
+
+ // Some other error, whatevz
+ throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
+ }
+
+ // The remote host has closed the socket
+ if (got == 0) {
+ close();
+ return 0;
}
// Pack data into string
- s = string(buff, have);
- return have;
+ return got;
}
-void TSocket::write(const string& s) {
+void TSocket::write(const uint8_t* buf, uint32_t len) {
+ if (socket_ <= 0) {
+ throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket");
+ }
+
uint32_t sent = 0;
- while (sent < s.size()) {
- int b = send(socket_, s.data() + sent, s.size() - sent, 0);
-
+ while (sent < len) {
+ // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+ // check for the EPIPE return condition and close the socket in that case
+ int b = send(socket_, buf + sent, len - sent, MSG_NOSIGNAL);
+ ++g_socket_syscalls;
+
// Fail on a send error
if (b < 0) {
- // TODO(mcslee): Make the function return how many bytes it wrote or
- // throw an exception
- // throw_perror("send");
- return;
+ if (errno == EPIPE) {
+ close();
+ throw TTransportException(TTX_NOT_OPEN, "EPIPE");
+ }
+
+ if (errno == ECONNRESET) {
+ close();
+ throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+ }
+
+ if (errno == ENOTCONN) {
+ close();
+ throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+ }
+
+ perror("TSocket::write() send < 0");
+ throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
}
// Fail on blocked send
if (b == 0) {
- // TODO(mcslee): Make the function return how many bytes it wrote or
- // throw string("couldn't send data.\n");
- return;
+ throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0.");
}
-
sent += b;
}
}
-bool TSocket::setLinger(bool on, int linger) {
+void TSocket::setLinger(bool on, int linger) {
+ // TODO(mcslee): Store these options so they can be set pre-connect
+ if (socket_ <= 0) {
+ return;
+ }
+
struct linger ling = {(on ? 1 : 0), linger};
if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) {
close();
perror("TSocket::setLinger()");
- return false;
}
- return true;
}
-bool TSocket::setNoDelay(bool noDelay) {
+void TSocket::setNoDelay(bool noDelay) {
+ // TODO(mcslee): Store these options so they can be set pre-connect
+ if (socket_ <= 0) {
+ return;
+ }
+
// Set socket to NODELAY
int val = (noDelay ? 1 : 0);
if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
close();
perror("TSocket::setNoDelay()");
- return false;
}
- return true;
}
diff --git a/lib/cpp/transport/TSocket.h b/lib/cpp/transport/TSocket.h
index 1da74c6..18abfa7 100644
--- a/lib/cpp/transport/TSocket.h
+++ b/lib/cpp/transport/TSocket.h
@@ -6,33 +6,94 @@
#include "transport/TTransport.h"
#include "transport/TServerSocket.h"
-class TSocketOptions;
-
/**
* TCP Socket implementation of the TTransport interface.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TSocket : public TTransport {
- friend TTransport* TServerSocket::accept();
+ /**
+ * We allow the TServerSocket acceptImpl() method to access the private
+ * members of a socket so that it can access the TSocket(int socket)
+ * constructor which creates a socket object from the raw UNIX socket
+ * handle.
+ */
+ friend class TServerSocket;
public:
+ /**
+ * Constructs a new socket. Note that this does NOT actually connect the
+ * socket.
+ *
+ * @param host An IP address or hostname to connect to
+ * @param port The port to connect on
+ */
TSocket(std::string host, int port);
+
+ /**
+ * Destroyes the socket object, closing it if necessary.
+ */
~TSocket();
- bool open();
- void close();
- int read (std::string &s, uint32_t size);
- void write(const std::string& s);
+ /**
+ * Whether the socket is alive.
+ *
+ * @return Is the socket alive?
+ */
+ bool isOpen();
- bool setLinger(bool on, int linger);
- bool setNoDelay(bool noDelay);
+ /**
+ * Creates and opens the UNIX socket.
+ *
+ * @throws TTransportException If the socket could not connect
+ */
+ void open();
+
+ /**
+ * Shuts down communications on the socket.
+ */
+ void close();
+
+ /**
+ * Reads from the underlying socket.
+ */
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ /**
+ * Writes to the underlying socket.
+ */
+ void write(const uint8_t* buf, uint32_t len);
+
+ /**
+ * Controls whether the linger option is set on the socket.
+ *
+ * @param on Whether SO_LINGER is on
+ * @param linger If linger is active, the number of seconds to linger for
+ */
+ void setLinger(bool on, int linger);
+
+ /**
+ * Whether to enable/disable Nagle's algorithm.
+ *
+ * @param noDelay Whether or not to disable the algorithm.
+ * @return
+ */
+ void setNoDelay(bool noDelay);
private:
+ /**
+ * Constructor to create socket from raw UNIX handle. Never called directly
+ * but used by the TServerSocket class.
+ */
TSocket(int socket);
- TSocketOptions *options_;
+
+ /** Host to connect to */
std::string host_;
+
+ /** Port number to connect on */
int port_;
+
+ /** Underlying UNIX socket handle */
int socket_;
};
diff --git a/lib/cpp/transport/TTransport.h b/lib/cpp/transport/TTransport.h
index a1f43d4..fcaece7 100644
--- a/lib/cpp/transport/TTransport.h
+++ b/lib/cpp/transport/TTransport.h
@@ -2,24 +2,96 @@
#define T_TRANSPORT_H
#include <string>
+#include "transport/TTransportException.h"
/**
- * Generic interface for a method of transporting data.
+ * Generic interface for a method of transporting data. A TTransport may be
+ * capable of either reading or writing, but not necessarily both.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TTransport {
public:
- virtual ~TTransport() {};
+ /**
+ * Virtual deconstructor.
+ */
+ virtual ~TTransport() {}
- virtual bool open() = 0;
- virtual void close() = 0;
+ /**
+ * Whether this transport is open.
+ */
+ virtual bool isOpen() { return false; }
- virtual int read (std::string& s, uint32_t size) = 0;
- virtual void write(const std::string& s) = 0;
+ /**
+ * Opens the transport for communications.
+ *
+ * @return bool Whether the transport was successfully opened
+ * @throws TTransportException if opening failed
+ */
+ virtual void open() {
+ throw TTransportException(TTX_NOT_OPEN, "Cannot open base TTransport.");
+ }
+
+ /**
+ * Closes the transport.
+ */
+ virtual void close() {
+ throw TTransportException(TTX_NOT_OPEN, "Cannot close base TTransport.");
+ }
+
+ /**
+ * Attempt to read up to the specified number of bytes into the string.
+ *
+ * @param s Reference to the location to append the read data
+ * @param len How many bytes to read
+ * @return How many bytes were actually read
+ * @throws TTransportException If an error occurs
+ */
+ virtual uint32_t read(uint8_t* buf, uint32_t len) {
+ throw TTransportException(TTX_NOT_OPEN, "Base TTransport cannot read.");
+ }
+
+ /**
+ * Reads the given amount of data in its entirety no matter what.
+ *
+ * @param s Reference to location for read data
+ * @param len How many bytes to read
+ * @return How many bytes read, which must be equal to size
+ * @throws TTransportException If insufficient data was read
+ */
+ virtual uint32_t readAll(uint8_t* buf, uint32_t len) {
+ uint32_t have = 0;
+
+ while (have < len) {
+ have += read(buf+have, len-have);
+ }
+
+ return have;
+ }
+
+ /**
+ * Writes the string in its entirety to the buffer.
+ *
+ * @param s The string to write out
+ * @throws TTransportException if an error occurs
+ */
+ virtual void write(const uint8_t* buf, uint32_t len) {
+ throw TTransportException(TTX_NOT_OPEN, "Base TTransport cannot write.");
+ }
+
+ /**
+ * Flushes any pending data to be written. Typically used with buffered
+ * transport mechanisms.
+ *
+ * @throws TTransportException if an error occurs
+ */
+ virtual void flush() {}
protected:
- TTransport() {};
+ /**
+ * Simple constructor.
+ */
+ TTransport() {}
};
#endif
diff --git a/lib/cpp/transport/TTransportException.h b/lib/cpp/transport/TTransportException.h
new file mode 100644
index 0000000..044e16d
--- /dev/null
+++ b/lib/cpp/transport/TTransportException.h
@@ -0,0 +1,63 @@
+#ifndef T_TRANSPORT_EXCEPTION_H
+#define T_TRANSPORT_EXCEPTION_H
+
+#include <string>
+
+/**
+ * Error codes for the various types of exceptions.
+ */
+enum TTransportExceptionType {
+ TTX_UNKNOWN = 0,
+ TTX_NOT_OPEN = 1,
+ TTX_TIMED_OUT = 2,
+};
+
+/**
+ * Class to encapsulate all the possible types of transport errors that may
+ * occur in various transport systems. This provides a sort of generic
+ * wrapper around the shitty UNIX E_ error codes that lets a common code
+ * base of error handling to be used for various types of transports, i.e.
+ * pipes etc.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TTransportException {
+ public:
+ TTransportException() :
+ type_(TTX_UNKNOWN), message_() {}
+
+ TTransportException(TTransportExceptionType type) :
+ type_(type), message_() {}
+
+ TTransportException(std::string message) :
+ type_(TTX_UNKNOWN), message_(message) {}
+
+ TTransportException(TTransportExceptionType type, std::string message) :
+ type_(type), message_(message) {}
+
+ ~TTransportException() {}
+
+ /**
+ * Returns an error code that provides information about the type of error
+ * that has occurred.
+ *
+ * @return Error code
+ */
+ TTransportExceptionType getType() { return type_; }
+
+ /**
+ * Returns an informative message about what caused this error.
+ *
+ * @return Error string
+ */
+ const std::string& getMessage() { return message_; }
+
+ protected:
+ /** Error code */
+ TTransportExceptionType type_;
+
+ /** Description */
+ std::string message_;
+};
+
+#endif