Update Thrift CPP libraries to work with new generated source, change underlying buffers to use uint8_t* instead of std::string
Summary: Major overhaul to the CPP libraries.
Reviewed By: aditya
Test Plan: Again, keep an eye out for the unit tests commit
Notes: Initial perf tests show that Thrift is not only more robust than Pillar, but its implementation is actually around 10-20% faster. We can do about 10 RPC function calls with small data payloads in under 2ms. THAT IS FAST. THAT IS THRIFTY.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664714 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/transport/TSocket.cc b/lib/cpp/transport/TSocket.cc
index 1dfe431..3471755 100644
--- a/lib/cpp/transport/TSocket.cc
+++ b/lib/cpp/transport/TSocket.cc
@@ -7,9 +7,18 @@
#include <errno.h>
#include "transport/TSocket.h"
+#include "transport/TTransportException.h"
using namespace std;
+uint32_t g_socket_syscalls = 0;
+
+/**
+ * TSocket implementation.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+
// Mutex to protect syscalls to netdb
pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -27,15 +36,20 @@
close();
}
-bool TSocket::open() {
+bool TSocket::isOpen() {
+ return (socket_ > 0);
+}
+
+void TSocket::open() {
// Create socket
socket_ = socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == -1) {
- socket_ = 0;
- return false;
+ perror("TSocket::open() socket");
+ close();
+ throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
}
- // Lookup the host
+ // Lookup the hostname
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port_);
@@ -54,7 +68,7 @@
if (host_entry == NULL) {
// perror("dns error: failed call to gethostbyname.\n");
close();
- return false;
+ throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
}
addr.sin_port = htons(port_);
@@ -70,10 +84,10 @@
if (ret < 0) {
perror("TSocket::open() connect");
close();
- return false;
+ throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
}
- return true;
+ // Connection was successful
}
void TSocket::close() {
@@ -84,97 +98,127 @@
socket_ = 0;
}
-int TSocket::read(string& s, uint32_t len) {
- char buff[len];
- s = "";
+uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
+ if (socket_ <= 0) {
+ throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket");
+ }
- uint32_t have = 0;
uint32_t retries = 0;
-
- while (have < len) {
- try_again:
- // Read from the socket
- int got = recv(socket_, buff+have, len-have, 0);
-
- // Check for error on read
- if (got < 0) {
- perror("TSocket::read()");
-
- // If temporarily out of resources, sleep a bit and try again
- if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
- usleep(50);
- goto try_again;
- }
-
- // If interrupted, try again
- if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
- goto try_again;
- }
-
- // If we disconnect with no linger time
- if (errno == ECONNRESET) {
- return 0;
- }
-
- return 0;
+
+ try_again:
+ // Read from the socket
+ int got = recv(socket_, buf, len, 0);
+ ++g_socket_syscalls;
+
+ // Check for error on read
+ if (got < 0) {
+ perror("TSocket::read()");
+
+ // If temporarily out of resources, sleep a bit and try again
+ if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
+ usleep(50);
+ goto try_again;
}
- // Check for empty read
- if (got == 0) {
- return 0;
+ // If interrupted, try again
+ if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
+ goto try_again;
}
- // Update the count
- have += (uint32_t) got;
+ // If we disconnect with no linger time
+ if (errno == ECONNRESET) {
+ throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+ }
+
+ // This ish isn't open
+ if (errno == ENOTCONN) {
+ throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+ }
+
+ // Timed out!
+ if (errno == ETIMEDOUT) {
+ throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT");
+ }
+
+ // Some other error, whatevz
+ throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
+ }
+
+ // The remote host has closed the socket
+ if (got == 0) {
+ close();
+ return 0;
}
// Pack data into string
- s = string(buff, have);
- return have;
+ return got;
}
-void TSocket::write(const string& s) {
+void TSocket::write(const uint8_t* buf, uint32_t len) {
+ if (socket_ <= 0) {
+ throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket");
+ }
+
uint32_t sent = 0;
- while (sent < s.size()) {
- int b = send(socket_, s.data() + sent, s.size() - sent, 0);
-
+ while (sent < len) {
+ // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+ // check for the EPIPE return condition and close the socket in that case
+ int b = send(socket_, buf + sent, len - sent, MSG_NOSIGNAL);
+ ++g_socket_syscalls;
+
// Fail on a send error
if (b < 0) {
- // TODO(mcslee): Make the function return how many bytes it wrote or
- // throw an exception
- // throw_perror("send");
- return;
+ if (errno == EPIPE) {
+ close();
+ throw TTransportException(TTX_NOT_OPEN, "EPIPE");
+ }
+
+ if (errno == ECONNRESET) {
+ close();
+ throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+ }
+
+ if (errno == ENOTCONN) {
+ close();
+ throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+ }
+
+ perror("TSocket::write() send < 0");
+ throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
}
// Fail on blocked send
if (b == 0) {
- // TODO(mcslee): Make the function return how many bytes it wrote or
- // throw string("couldn't send data.\n");
- return;
+ throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0.");
}
-
sent += b;
}
}
-bool TSocket::setLinger(bool on, int linger) {
+void TSocket::setLinger(bool on, int linger) {
+ // TODO(mcslee): Store these options so they can be set pre-connect
+ if (socket_ <= 0) {
+ return;
+ }
+
struct linger ling = {(on ? 1 : 0), linger};
if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) {
close();
perror("TSocket::setLinger()");
- return false;
}
- return true;
}
-bool TSocket::setNoDelay(bool noDelay) {
+void TSocket::setNoDelay(bool noDelay) {
+ // TODO(mcslee): Store these options so they can be set pre-connect
+ if (socket_ <= 0) {
+ return;
+ }
+
// Set socket to NODELAY
int val = (noDelay ? 1 : 0);
if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
close();
perror("TSocket::setNoDelay()");
- return false;
}
- return true;
}