THRIFT-928. cpp: TNonblockingServer: use TSocket and support TClientInfo
Modify TNonblockingServer to use TSocket for I/O and support server
event handlers; this enables TClientInfo to function with a minor change
to the processing loop.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005145 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index ee76c3f..7a48505 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -357,6 +357,13 @@
socket_ = -1;
}
+void TSocket::setSocketFD(int socket) {
+ if (socket_ >= 0) {
+ close();
+ }
+ socket_ = socket;
+}
+
uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
if (socket_ < 0) {
throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
@@ -379,7 +386,13 @@
try_again:
// Read from the socket
struct timeval begin;
- gettimeofday(&begin, NULL);
+ if (recvTimeout_ > 0) {
+ gettimeofday(&begin, NULL);
+ } else {
+ // if there is no read timeout we don't need the TOD to determine whether
+ // an EAGAIN is due to a timeout or an out-of-resource condition.
+ begin.tv_sec = begin.tv_usec = 0;
+ }
int got = recv(socket_, buf, len, 0);
int errno_copy = errno; //gettimeofday can change errno
++g_socket_syscalls;
@@ -387,6 +400,11 @@
// Check for error on read
if (got < 0) {
if (errno_copy == EAGAIN) {
+ // if no timeout we can assume that resource exhaustion has occurred.
+ if (recvTimeout_ == 0) {
+ throw TTransportException(TTransportException::TIMED_OUT,
+ "EAGAIN (unavailable resources)");
+ }
// check if this is the lack of resources or timeout case
struct timeval end;
gettimeofday(&end, NULL);
@@ -417,8 +435,8 @@
if (errno_copy == ECONNRESET) {
/* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
* ECONNRESET if peer performed shutdown
+ * edhall: eliminated close() since we do that in the destructor.
*/
- close();
return 0;
}
#endif
@@ -447,7 +465,8 @@
// The remote host has closed the socket
if (got == 0) {
- close();
+ // edhall: we used to call close() here, but our caller may want to deal
+ // with the socket fd and we'll close() in our destructor in any case.
return 0;
}
@@ -456,43 +475,57 @@
}
void TSocket::write(const uint8_t* buf, uint32_t len) {
+ uint32_t sent = 0;
+
+ while (sent < len) {
+ uint32_t b = write_partial(buf + sent, len - sent);
+ if (b == 0) {
+ // We assume that we got 0 because send() errored with EAGAIN due to
+ // lack of system resources; release the CPU for a bit.
+ usleep(50);
+ }
+ sent += b;
+ }
+}
+
+uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
if (socket_ < 0) {
throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
}
uint32_t sent = 0;
- while (sent < len) {
+ int flags = 0;
+#ifdef MSG_NOSIGNAL
+ // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+ // check for the EPIPE return condition and close the socket in that case
+ flags |= MSG_NOSIGNAL;
+#endif // ifdef MSG_NOSIGNAL
- int flags = 0;
- #ifdef MSG_NOSIGNAL
- // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
- // check for the EPIPE return condition and close the socket in that case
- flags |= MSG_NOSIGNAL;
- #endif // ifdef MSG_NOSIGNAL
+ int b = send(socket_, buf + sent, len - sent, flags);
+ ++g_socket_syscalls;
- int b = send(socket_, buf + sent, len - sent, flags);
- ++g_socket_syscalls;
-
+ if (b < 0) {
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ return 0;
+ }
// Fail on a send error
- if (b < 0) {
- int errno_copy = errno;
- GlobalOutput.perror("TSocket::write() send() " + getSocketInfo(), errno_copy);
+ int errno_copy = errno;
+ GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy);
- if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) {
- close();
- throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
- }
-
- throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
+ if (errno_copy == EPIPE || errno_copy == ECONNRESET || errno_copy == ENOTCONN) {
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
}
- // Fail on blocked send
- if (b == 0) {
- throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
- }
- sent += b;
+ throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
}
+
+ // Fail on blocked send
+ if (b == 0) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
+ }
+ return b;
}
std::string TSocket::getHost() {
@@ -598,7 +631,12 @@
string TSocket::getSocketInfo() {
std::ostringstream oss;
- oss << "<Host: " << host_ << " Port: " << port_ << ">";
+ if (host_.empty() || port_ == 0) {
+ oss << "<Host: " << getPeerAddress();
+ oss << " Port: " << getPeerPort() << ">";
+ } else {
+ oss << "<Host: " << host_ << " Port: " << port_ << ">";
+ }
return oss.str();
}
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index 97562c2..e89059f 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -95,11 +95,16 @@
uint32_t read(uint8_t* buf, uint32_t len);
/**
- * Writes to the underlying socket.
+ * Writes to the underlying socket. Loops until done or fail.
*/
void write(const uint8_t* buf, uint32_t len);
/**
+ * Writes to the underlying socket. Does single send() and returns result.
+ */
+ uint32_t write_partial(const uint8_t* buf, uint32_t len);
+
+ /**
* Get the host that the socket is connected to
*
* @return string host identifier
@@ -191,6 +196,15 @@
return socket_;
}
+ /**
+ * (Re-)initialize a TSocket for the supplied descriptor. This is only
+ * intended for use by TNonblockingServer -- other use may result in
+ * unfortunate surprises.
+ *
+ * @param fd the descriptor for an already-connected socket
+ */
+ void setSocketFD(int fd);
+
/*
* Returns a cached copy of the peer address.
*/
@@ -211,16 +225,16 @@
*/
TSocket(int socket);
- protected:
- /** connect, called by open */
- void openConnection(struct addrinfo *res);
-
/**
* Set a cache of the peer address (used when trivially available: e.g.
* accept() or connect()). Only caches IPV4 and IPV6; unset for others.
*/
void setCachedAddress(const sockaddr* addr, socklen_t len);
+ protected:
+ /** connect, called by open */
+ void openConnection(struct addrinfo *res);
+
/** Host to connect to */
std::string host_;