| // Copyright (c) 2006- Facebook |
| // Distributed under the Thrift Software License |
| // |
| // See accompanying file LICENSE or visit the Thrift site at: |
| // http://developers.facebook.com/thrift/ |
| |
| #include <config.h> |
| #include <sys/socket.h> |
| #include <sys/poll.h> |
| #include <sys/types.h> |
| #include <arpa/inet.h> |
| #include <netinet/in.h> |
| #include <netinet/tcp.h> |
| #include <netdb.h> |
| #include <unistd.h> |
| #include <errno.h> |
| #include <fcntl.h> |
| #include <sstream> |
| |
| #include "concurrency/Monitor.h" |
| #include "TSocket.h" |
| #include "TTransportException.h" |
| |
| namespace facebook { namespace thrift { namespace transport { |
| |
| using namespace std; |
| |
| // Global var to track total socket sys calls |
| uint32_t g_socket_syscalls = 0; |
| |
| /** |
| * TSocket implementation. |
| * |
| * @author Mark Slee <mcslee@facebook.com> |
| */ |
| |
| TSocket::TSocket(string host, int port) : |
| host_(host), |
| port_(port), |
| socket_(-1), |
| connTimeout_(0), |
| sendTimeout_(0), |
| recvTimeout_(0), |
| lingerOn_(1), |
| lingerVal_(0), |
| noDelay_(1), |
| maxRecvRetries_(5) { |
| recvTimeval_.tv_sec = (int)(recvTimeout_/1000); |
| recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); |
| } |
| |
| TSocket::TSocket() : |
| host_(""), |
| port_(0), |
| socket_(-1), |
| connTimeout_(0), |
| sendTimeout_(0), |
| recvTimeout_(0), |
| lingerOn_(1), |
| lingerVal_(0), |
| noDelay_(1), |
| maxRecvRetries_(5) { |
| recvTimeval_.tv_sec = (int)(recvTimeout_/1000); |
| recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); |
| } |
| |
| TSocket::TSocket(int socket) : |
| host_(""), |
| port_(0), |
| socket_(socket), |
| connTimeout_(0), |
| sendTimeout_(0), |
| recvTimeout_(0), |
| lingerOn_(1), |
| lingerVal_(0), |
| noDelay_(1), |
| maxRecvRetries_(5) { |
| recvTimeval_.tv_sec = (int)(recvTimeout_/1000); |
| recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); |
| } |
| |
| TSocket::~TSocket() { |
| close(); |
| } |
| |
| bool TSocket::isOpen() { |
| return (socket_ >= 0); |
| } |
| |
| bool TSocket::peek() { |
| if (!isOpen()) { |
| return false; |
| } |
| uint8_t buf; |
| int r = recv(socket_, &buf, 1, MSG_PEEK); |
| if (r == -1) { |
| int errno_copy = errno; |
| string errStr = "TSocket::peek() recv() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy); |
| } |
| return (r > 0); |
| } |
| |
| void TSocket::openConnection(struct addrinfo *res) { |
| if (isOpen()) { |
| throw TTransportException(TTransportException::ALREADY_OPEN); |
| } |
| |
| socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); |
| if (socket_ == -1) { |
| int errno_copy = errno; |
| string errStr = "TSocket::open() socket() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy); |
| } |
| |
| // Send timeout |
| if (sendTimeout_ > 0) { |
| setSendTimeout(sendTimeout_); |
| } |
| |
| // Recv timeout |
| if (recvTimeout_ > 0) { |
| setRecvTimeout(recvTimeout_); |
| } |
| |
| // Linger |
| setLinger(lingerOn_, lingerVal_); |
| |
| // No delay |
| setNoDelay(noDelay_); |
| |
| // Set the socket to be non blocking for connect if a timeout exists |
| int flags = fcntl(socket_, F_GETFL, 0); |
| if (connTimeout_ > 0) { |
| if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) { |
| int errno_copy = errno; |
| string errStr = "TSocket::open() fcntl() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy); |
| } |
| } else { |
| if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) { |
| int errno_copy = errno; |
| string errStr = "TSocket::open() fcntl " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy); |
| } |
| } |
| |
| // Connect the socket |
| int ret = connect(socket_, res->ai_addr, res->ai_addrlen); |
| |
| // success case |
| if (ret == 0) { |
| goto done; |
| } |
| |
| if (errno != EINPROGRESS) { |
| int errno_copy = errno; |
| string errStr = "TSocket::open() connect() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy); |
| } |
| |
| |
| struct pollfd fds[1]; |
| memset(fds, 0 , sizeof(fds)); |
| fds[0].fd = socket_; |
| fds[0].events = POLLOUT; |
| ret = poll(fds, 1, connTimeout_); |
| |
| if (ret > 0) { |
| // Ensure the socket is connected and that there are no errors set |
| int val; |
| socklen_t lon; |
| lon = sizeof(int); |
| int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon); |
| if (ret2 == -1) { |
| int errno_copy = errno; |
| string errStr = "TSocket::open() getsockopt() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy); |
| } |
| // no errors on socket, go to town |
| if (val == 0) { |
| goto done; |
| } |
| string errStr = "TSocket::open() error on socket (after poll) " + getSocketInfo() + TOutput::strerror_s(val); |
| GlobalOutput(errStr.c_str()); |
| throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val); |
| } else if (ret == 0) { |
| // socket timed out |
| string errStr = "TSocket::open() timed out " + getSocketInfo(); |
| GlobalOutput(errStr.c_str()); |
| throw TTransportException(TTransportException::NOT_OPEN, "open() timed out"); |
| } else { |
| // error on poll() |
| int errno_copy = errno; |
| string errStr = "TSocket::open() poll() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| throw TTransportException(TTransportException::NOT_OPEN, "poll() failed", errno_copy); |
| } |
| |
| done: |
| // Set socket back to normal mode (blocking) |
| fcntl(socket_, F_SETFL, flags); |
| } |
| |
| void TSocket::open() { |
| if (isOpen()) { |
| throw TTransportException(TTransportException::ALREADY_OPEN); |
| } |
| |
| // Validate port number |
| if (port_ < 0 || port_ > 65536) { |
| throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid"); |
| } |
| |
| struct addrinfo hints, *res, *res0; |
| res = NULL; |
| res0 = NULL; |
| int error; |
| char port[sizeof("65536")]; |
| memset(&hints, 0, sizeof(hints)); |
| hints.ai_family = PF_UNSPEC; |
| hints.ai_socktype = SOCK_STREAM; |
| hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; |
| sprintf(port, "%d", port_); |
| |
| error = getaddrinfo(host_.c_str(), port, &hints, &res0); |
| |
| if (error) { |
| string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(gai_strerror(error)); |
| GlobalOutput(errStr.c_str()); |
| close(); |
| throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for client socket."); |
| } |
| |
| // Cycle through all the returned addresses until one |
| // connects or push the exception up. |
| for (res = res0; res; res = res->ai_next) { |
| try { |
| openConnection(res); |
| break; |
| } catch (TTransportException& ttx) { |
| if (res->ai_next) { |
| close(); |
| } else { |
| close(); |
| freeaddrinfo(res0); // cleanup on failure |
| throw; |
| } |
| } |
| } |
| |
| // Free address structure memory |
| freeaddrinfo(res0); |
| } |
| |
| void TSocket::close() { |
| if (socket_ >= 0) { |
| shutdown(socket_, SHUT_RDWR); |
| ::close(socket_); |
| } |
| socket_ = -1; |
| } |
| |
| uint32_t TSocket::read(uint8_t* buf, uint32_t len) { |
| if (socket_ < 0) { |
| throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket"); |
| } |
| |
| int32_t retries = 0; |
| |
| // EAGAIN can be signalled both when a timeout has occurred and when |
| // the system is out of resources (an awesome undocumented feature). |
| // The following is an approximation of the time interval under which |
| // EAGAIN is taken to indicate an out of resources error. |
| uint32_t eagainThresholdMicros = 0; |
| if (recvTimeout_) { |
| // if a readTimeout is specified along with a max number of recv retries, then |
| // the threshold will ensure that the read timeout is not exceeded even in the |
| // case of resource errors |
| eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2); |
| } |
| |
| try_again: |
| // Read from the socket |
| struct timeval begin; |
| gettimeofday(&begin, NULL); |
| int got = recv(socket_, buf, len, 0); |
| struct timeval end; |
| gettimeofday(&end, NULL); |
| uint32_t readElapsedMicros = (((end.tv_sec - begin.tv_sec) * 1000 * 1000) |
| + (((uint64_t)(end.tv_usec - begin.tv_usec)))); |
| ++g_socket_syscalls; |
| |
| // Check for error on read |
| if (got < 0) { |
| if (errno == EAGAIN) { |
| // check if this is the lack of resources or timeout case |
| if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) { |
| if (retries++ < maxRecvRetries_) { |
| usleep(50); |
| goto try_again; |
| } else { |
| throw TTransportException(TTransportException::TIMED_OUT, |
| "EAGAIN (unavailable resources)"); |
| } |
| } else { |
| // infer that timeout has been hit |
| throw TTransportException(TTransportException::TIMED_OUT, |
| "EAGAIN (timed out)"); |
| } |
| } |
| |
| // If interrupted, try again |
| if (errno == EINTR && retries++ < maxRecvRetries_) { |
| goto try_again; |
| } |
| |
| // Now it's not a try again case, but a real probblez |
| string errStr = "TSocket::read() recv() " + getSocketInfo() + TOutput::strerror_s(errno); |
| GlobalOutput(errStr.c_str()); |
| |
| // If we disconnect with no linger time |
| if (errno == ECONNRESET) { |
| throw TTransportException(TTransportException::NOT_OPEN, "ECONNRESET"); |
| } |
| |
| // This ish isn't open |
| if (errno == ENOTCONN) { |
| throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN"); |
| } |
| |
| // Timed out! |
| if (errno == ETIMEDOUT) { |
| throw TTransportException(TTransportException::TIMED_OUT, "ETIMEDOUT"); |
| } |
| |
| // Some other error, whatevz |
| throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno); |
| } |
| |
| // The remote host has closed the socket |
| if (got == 0) { |
| close(); |
| return 0; |
| } |
| |
| // Pack data into string |
| return got; |
| } |
| |
| void TSocket::write(const uint8_t* buf, uint32_t len) { |
| if (socket_ < 0) { |
| throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket"); |
| } |
| |
| uint32_t sent = 0; |
| |
| while (sent < len) { |
| |
| int flags = 0; |
| #ifdef MSG_NOSIGNAL |
| // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we |
| // check for the EPIPE return condition and close the socket in that case |
| flags |= MSG_NOSIGNAL; |
| #endif // ifdef MSG_NOSIGNAL |
| |
| int b = send(socket_, buf + sent, len - sent, flags); |
| ++g_socket_syscalls; |
| |
| // Fail on a send error |
| if (b < 0) { |
| int errno_copy = errno; |
| string errStr = "TSocket::write() send() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| |
| if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) { |
| close(); |
| throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy); |
| } |
| |
| throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy); |
| } |
| |
| // Fail on blocked send |
| if (b == 0) { |
| throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0."); |
| } |
| sent += b; |
| } |
| } |
| |
| std::string TSocket::getHost() { |
| return host_; |
| } |
| |
| int TSocket::getPort() { |
| return port_; |
| } |
| |
| void TSocket::setHost(string host) { |
| host_ = host; |
| } |
| |
| void TSocket::setPort(int port) { |
| port_ = port; |
| } |
| |
| void TSocket::setLinger(bool on, int linger) { |
| lingerOn_ = on; |
| lingerVal_ = linger; |
| if (socket_ < 0) { |
| return; |
| } |
| |
| struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_}; |
| int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l)); |
| if (ret == -1) { |
| int errno_copy = errno; |
| string errStr = "TSocket::setLinger() setsockopt() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| } |
| } |
| |
| void TSocket::setNoDelay(bool noDelay) { |
| noDelay_ = noDelay; |
| if (socket_ < 0) { |
| return; |
| } |
| |
| // Set socket to NODELAY |
| int v = noDelay_ ? 1 : 0; |
| int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)); |
| if (ret == -1) { |
| int errno_copy = errno; |
| string errStr = "TSocket::setNoDelay() setsockopt() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| } |
| } |
| |
| void TSocket::setConnTimeout(int ms) { |
| connTimeout_ = ms; |
| } |
| |
| void TSocket::setRecvTimeout(int ms) { |
| if (ms < 0) { |
| char errBuf[512]; |
| sprintf(errBuf, "TSocket::setRecvTimeout with negative input: %d\n", ms); |
| GlobalOutput(errBuf); |
| return; |
| } |
| recvTimeout_ = ms; |
| |
| if (socket_ < 0) { |
| return; |
| } |
| |
| recvTimeval_.tv_sec = (int)(recvTimeout_/1000); |
| recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); |
| |
| // Copy because poll may modify |
| struct timeval r = recvTimeval_; |
| int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r)); |
| if (ret == -1) { |
| int errno_copy = errno; |
| string errStr = "TSocket::setRecvTimeout() setsockopt() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| } |
| } |
| |
| void TSocket::setSendTimeout(int ms) { |
| if (ms < 0) { |
| char errBuf[512]; |
| sprintf(errBuf, "TSocket::setSendTimeout with negative input: %d\n", ms); |
| GlobalOutput(errBuf); |
| return; |
| } |
| sendTimeout_ = ms; |
| |
| if (socket_ < 0) { |
| return; |
| } |
| |
| struct timeval s = {(int)(sendTimeout_/1000), |
| (int)((sendTimeout_%1000)*1000)}; |
| int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s)); |
| if (ret == -1) { |
| int errno_copy = errno; |
| string errStr = "TSocket::setSendTimeout() setsockopt() " + getSocketInfo() + TOutput::strerror_s(errno_copy); |
| GlobalOutput(errStr.c_str()); |
| } |
| } |
| |
| void TSocket::setMaxRecvRetries(int maxRecvRetries) { |
| maxRecvRetries_ = maxRecvRetries; |
| } |
| |
| string TSocket::getSocketInfo() { |
| std::ostringstream oss; |
| oss << "<Host: " << host_ << " Port: " << port_ << ">"; |
| return oss.str(); |
| } |
| |
| std::string TSocket::getPeerHost() { |
| if (peerHost_.empty()) { |
| struct sockaddr_storage addr; |
| socklen_t addrLen = sizeof(addr); |
| |
| if (socket_ < 0) { |
| return host_; |
| } |
| |
| int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen); |
| |
| if (rv != 0) { |
| return peerHost_; |
| } |
| |
| char clienthost[NI_MAXHOST]; |
| char clientservice[NI_MAXSERV]; |
| |
| getnameinfo((sockaddr*) &addr, addrLen, |
| clienthost, sizeof(clienthost), |
| clientservice, sizeof(clientservice), 0); |
| |
| peerHost_ = clienthost; |
| } |
| return peerHost_; |
| } |
| |
| std::string TSocket::getPeerAddress() { |
| if (peerAddress_.empty()) { |
| struct sockaddr_storage addr; |
| socklen_t addrLen = sizeof(addr); |
| |
| if (socket_ < 0) { |
| return peerAddress_; |
| } |
| |
| int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen); |
| |
| if (rv != 0) { |
| return peerAddress_; |
| } |
| |
| char clienthost[NI_MAXHOST]; |
| char clientservice[NI_MAXSERV]; |
| |
| getnameinfo((sockaddr*) &addr, addrLen, |
| clienthost, sizeof(clienthost), |
| clientservice, sizeof(clientservice), |
| NI_NUMERICHOST|NI_NUMERICSERV); |
| |
| peerAddress_ = clienthost; |
| peerPort_ = std::atoi(clientservice); |
| } |
| return peerAddress_; |
| } |
| |
| int TSocket::getPeerPort() { |
| getPeerAddress(); |
| return peerPort_; |
| } |
| |
| }}} // facebook::thrift::transport |