THRIFT-900. cpp: Unix domain socket
This patch adds a new Unix Socket transport.
Patch: Roger Meier
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1002179 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
index 2f14fd5..836f6ba 100644
--- a/lib/cpp/src/transport/TServerSocket.cpp
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -20,6 +20,7 @@
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/un.h>
#include <sys/poll.h>
#include <sys/types.h>
#include <netinet/in.h>
@@ -68,6 +69,20 @@
intSock1_(-1),
intSock2_(-1) {}
+TServerSocket::TServerSocket(string path) :
+ port_(0),
+ path_(path),
+ serverSocket_(-1),
+ acceptBacklog_(1024),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ retryLimit_(0),
+ retryDelay_(0),
+ tcpSendBuffer_(0),
+ tcpRecvBuffer_(0),
+ intSock1_(-1),
+ intSock2_(-1) {}
+
TServerSocket::~TServerSocket() {
close();
}
@@ -131,7 +146,12 @@
break;
}
- serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (! path_.empty()) {
+ serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
+ } else {
+ serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ }
+
if (serverSocket_ == -1) {
int errno_copy = errno;
GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
@@ -201,13 +221,16 @@
throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
}
- // TCP Nodelay, speed over bandwidth
- if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
- &one, sizeof(one))) {
- int errno_copy = errno;
- GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
- close();
- throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
+ // Unix Sockets do not need that
+ if (path_.empty()) {
+ // TCP Nodelay, speed over bandwidth
+ if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
+ &one, sizeof(one))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
+ }
}
// Set NONBLOCK on the accept socket
@@ -228,21 +251,49 @@
// we may want to try to bind more than once, since SO_REUSEADDR doesn't
// always seem to work. The client can configure the retry variables.
int retries = 0;
- do {
- if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
- break;
+
+ if (! path_.empty()) {
+ // Unix Domain Socket
+ struct sockaddr_un address;
+ socklen_t len;
+
+ if (path_.length() > sizeof(address.sun_path)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
}
- // use short circuit evaluation here to only sleep if we need to
- } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+ address.sun_family = AF_UNIX;
+ sprintf(address.sun_path, path_.c_str());
+ len = sizeof(address);
- // free addrinfo
- freeaddrinfo(res0);
+ do {
+ if (0 == bind(serverSocket_, (struct sockaddr *) &address, len)) {
+ break;
+ }
+ // use short circuit evaluation here to only sleep if we need to
+ } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+ } else {
+ do {
+ if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
+ break;
+ }
+ // use short circuit evaluation here to only sleep if we need to
+ } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+
+ // free addrinfo
+ freeaddrinfo(res0);
+ }
// throw an error if we failed to bind properly
if (retries > retryLimit_) {
char errbuf[1024];
- sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
+ if (! path_.empty()) {
+ sprintf(errbuf, "TServerSocket::listen() PATH %s", path_.c_str());
+ }
+ else {
+ sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
+ }
GlobalOutput(errbuf);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h
index a6be017..8cd521f 100644
--- a/lib/cpp/src/transport/TServerSocket.h
+++ b/lib/cpp/src/transport/TServerSocket.h
@@ -36,6 +36,7 @@
public:
TServerSocket(int port);
TServerSocket(int port, int sendTimeout, int recvTimeout);
+ TServerSocket(std::string path);
~TServerSocket();
@@ -58,6 +59,7 @@
private:
int port_;
+ std::string path_;
int serverSocket_;
int acceptBacklog_;
int sendTimeout_;
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 5da33bb..951ddcf 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -21,6 +21,7 @@
#include <cstring>
#include <sstream>
#include <sys/socket.h>
+#include <sys/un.h>
#include <sys/poll.h>
#include <sys/types.h>
#include <arpa/inet.h>
@@ -50,6 +51,23 @@
TSocket::TSocket(string host, int port) :
host_(host),
port_(port),
+ path_(""),
+ socket_(-1),
+ connTimeout_(0),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ lingerOn_(1),
+ lingerVal_(0),
+ noDelay_(1),
+ maxRecvRetries_(5) {
+ recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+ recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+}
+
+TSocket::TSocket(string path) :
+ host_(""),
+ port_(0),
+ path_(path),
socket_(-1),
connTimeout_(0),
sendTimeout_(0),
@@ -65,6 +83,7 @@
TSocket::TSocket() :
host_(""),
port_(0),
+ path_(""),
socket_(-1),
connTimeout_(0),
sendTimeout_(0),
@@ -80,6 +99,7 @@
TSocket::TSocket(int socket) :
host_(""),
port_(0),
+ path_(""),
socket_(socket),
connTimeout_(0),
sendTimeout_(0),
@@ -130,7 +150,12 @@
return;
}
- socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (! path_.empty()) {
+ socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
+ } else {
+ socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ }
+
if (socket_ == -1) {
int errno_copy = errno;
GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
@@ -179,7 +204,24 @@
}
// Connect the socket
- int ret = connect(socket_, res->ai_addr, res->ai_addrlen);
+ int ret;
+ if (! path_.empty()) {
+ struct sockaddr_un address;
+ socklen_t len;
+
+ if (path_.length() > sizeof(address.sun_path)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
+ }
+
+ address.sun_family = AF_UNIX;
+ sprintf(address.sun_path, path_.c_str());
+ len = sizeof(address);
+ ret = connect(socket_, (struct sockaddr *) &address, len);
+ } else {
+ ret = connect(socket_, res->ai_addr, res->ai_addrlen);
+ }
// success case
if (ret == 0) {
@@ -237,6 +279,24 @@
if (isOpen()) {
return;
}
+ if (! path_.empty()) {
+ unix_open();
+ } else {
+ local_open();
+ }
+}
+
+void TSocket::unix_open(){
+ if (! path_.empty()) {
+ // Unix Domain SOcket does not need addrinfo struct, so we pass NULL
+ openConnection(NULL);
+ }
+}
+
+void TSocket::local_open(){
+ if (isOpen()) {
+ return;
+ }
// Validate port number
if (port_ < 0 || port_ > 0xFFFF) {
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index 0184362..f69a9a1 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -60,6 +60,14 @@
TSocket(std::string host, int port);
/**
+ * Constructs a new Unix domain socket.
+ * Note that this does NOT actually connect the socket.
+ *
+ * @param path The Unix domain socket e.g. "/tmp/ThriftTest.binary.thrift"
+ */
+ TSocket(std::string path);
+
+ /**
* Destroyes the socket object, closing it if necessary.
*/
virtual ~TSocket();
@@ -217,6 +225,9 @@
/** Port number to connect on */
int port_;
+ /** UNIX domain socket path */
+ std::string path_;
+
/** Underlying UNIX socket handle */
int socket_;
@@ -246,6 +257,10 @@
/** Whether to use low minimum TCP retransmission timeout */
static bool useLowMinRto_;
+
+ private:
+ void unix_open();
+ void local_open();
};
}}} // apache::thrift::transport