THRIFT-4465: Fix C++ TNonblockingServer and THRIFT_EAGAIN issues
Client: cpp
This closes #1497
diff --git a/build/cmake/ConfigureChecks.cmake b/build/cmake/ConfigureChecks.cmake
index 12a50df..e4793d4 100644
--- a/build/cmake/ConfigureChecks.cmake
+++ b/build/cmake/ConfigureChecks.cmake
@@ -37,11 +37,12 @@
check_include_file(stdint.h HAVE_STDINT_H)
check_include_file(unistd.h HAVE_UNISTD_H)
check_include_file(pthread.h HAVE_PTHREAD_H)
-check_include_file(sys/time.h HAVE_SYS_TIME_H)
+check_include_file(sys/ioctl.h HAVE_SYS_IOCTL_H)
check_include_file(sys/param.h HAVE_SYS_PARAM_H)
check_include_file(sys/resource.h HAVE_SYS_RESOURCE_H)
check_include_file(sys/socket.h HAVE_SYS_SOCKET_H)
check_include_file(sys/stat.h HAVE_SYS_STAT_H)
+check_include_file(sys/time.h HAVE_SYS_TIME_H)
check_include_file(sys/un.h HAVE_SYS_UN_H)
check_include_file(sys/poll.h HAVE_SYS_POLL_H)
check_include_file(sys/select.h HAVE_SYS_SELECT_H)
diff --git a/build/cmake/config.h.in b/build/cmake/config.h.in
index 083bc55..21561b2 100644
--- a/build/cmake/config.h.in
+++ b/build/cmake/config.h.in
@@ -100,8 +100,8 @@
/* Define to 1 if you have the <pthread.h> header file. */
#cmakedefine HAVE_PTHREAD_H 1
-/* Define to 1 if you have the <sys/time.h> header file. */
-#cmakedefine HAVE_SYS_TIME_H 1
+/* Define to 1 if you have the <sys/ioctl.h> header file. */
+#cmakedefine HAVE_SYS_IOCTL_H 1
/* Define to 1 if you have the <sys/param.h> header file. */
#cmakedefine HAVE_SYS_PARAM_H 1
@@ -124,6 +124,9 @@
/* Define to 1 if you have the <sys/select.h> header file. */
#cmakedefine HAVE_SYS_SELECT_H 1
+/* Define to 1 if you have the <sys/time.h> header file. */
+#cmakedefine HAVE_SYS_TIME_H 1
+
/* Define to 1 if you have the <sched.h> header file. */
#cmakedefine HAVE_SCHED_H 1
diff --git a/configure.ac b/configure.ac
index 7634823..9efc28b 100755
--- a/configure.ac
+++ b/configure.ac
@@ -635,6 +635,7 @@
AC_CHECK_HEADERS([pthread.h])
AC_CHECK_HEADERS([stddef.h])
AC_CHECK_HEADERS([stdlib.h])
+AC_CHECK_HEADERS([sys/ioctl.h])
AC_CHECK_HEADERS([sys/socket.h])
AC_CHECK_HEADERS([sys/time.h])
AC_CHECK_HEADERS([sys/un.h])
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index d17f77c..e60bffc 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -472,6 +472,18 @@
}
// size known; now get the rest of the frame
transition();
+
+ // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
+ // regular sockets, because if there is more data, libevent will fire the event handler registered for read
+ // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
+ // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
+ // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket,
+ // despite having more data.
+ if (tSocket_->hasPendingDataToRead())
+ {
+ workSocket();
+ }
+
return;
case SOCKET_RECV:
@@ -677,9 +689,6 @@
appState_ = APP_SEND_RESULT;
setWrite();
- // Try to work the socket immediately
- // workSocket();
-
return;
}
@@ -718,9 +727,6 @@
// Register read event
setRead();
- // Try to work the socket right away
- // workSocket();
-
return;
case APP_READ_FRAME_SIZE:
@@ -753,9 +759,6 @@
socketState_ = SOCKET_RECV;
appState_ = APP_READ_REQUEST;
- // Work the socket right away
- workSocket();
-
return;
case APP_CLOSE_CONNECTION:
@@ -1063,7 +1066,7 @@
connection->forceClose();
}
-void TNonblockingServer::stop() {
+void TNonblockingServer::stop() {
// Breaks the event loop in all threads so that they end ASAP.
for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
ioThreads_[i]->stop();
diff --git a/lib/cpp/src/thrift/transport/PlatformSocket.h b/lib/cpp/src/thrift/transport/PlatformSocket.h
index 1890b60..9591058 100644
--- a/lib/cpp/src/thrift/transport/PlatformSocket.h
+++ b/lib/cpp/src/thrift/transport/PlatformSocket.h
@@ -51,6 +51,8 @@
# define THRIFT_LSEEK _lseek
# define THRIFT_WRITE _write
# define THRIFT_READ _read
+# define THRIFT_IOCTL_SOCKET ioctlsocket
+# define THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE u_long
# define THRIFT_FSTAT _fstat
# define THRIFT_STAT _stat
# ifdef _WIN32_WCE
@@ -111,6 +113,8 @@
# define THRIFT_LSEEK lseek
# define THRIFT_WRITE write
# define THRIFT_READ read
+# define THRIFT_IOCTL_SOCKET ioctl
+# define THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE int
# define THRIFT_STAT stat
# define THRIFT_FSTAT fstat
# define THRIFT_GAI_STRERROR gai_strerror
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
index 3f0e28e..7bdacb0 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
@@ -249,6 +249,17 @@
close();
}
+bool TSSLSocket::hasPendingDataToRead() {
+ if (!isOpen()) {
+ return false;
+ }
+ initializeHandshake();
+ if (!checkHandshake())
+ throw TSSLException("TSSLSocket::hasPendingDataToRead: Handshake is not completed");
+ // data may be available in SSL buffers (note: SSL_pending does not have a failure mode)
+ return SSL_pending(ssl_) > 0 || TSocket::hasPendingDataToRead();
+}
+
void TSSLSocket::init() {
handshakeCompleted_ = false;
readRetryCount_ = 0;
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.h b/lib/cpp/src/thrift/transport/TSSLSocket.h
index 8527209..ec30cc1 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.h
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.h
@@ -78,6 +78,7 @@
bool peek();
void open();
void close();
+ bool hasPendingDataToRead();
uint32_t read(uint8_t* buf, uint32_t len);
void write(const uint8_t* buf, uint32_t len);
uint32_t write_partial(const uint8_t* buf, uint32_t len);
diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp
index d93d0ff..c90593d 100644
--- a/lib/cpp/src/thrift/transport/TSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSocket.cpp
@@ -21,6 +21,9 @@
#include <cstring>
#include <sstream>
+#ifdef HAVE_SYS_IOCTL_H
+#include <sys/ioctl.h>
+#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
@@ -167,6 +170,26 @@
close();
}
+bool TSocket::hasPendingDataToRead() {
+ if (!isOpen()) {
+ return false;
+ }
+
+ int32_t retries = 0;
+ THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE numBytesAvailable;
+try_again:
+ int r = THRIFT_IOCTL_SOCKET(socket_, FIONREAD, &numBytesAvailable);
+ if (r == -1) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
+ goto try_again;
+ }
+ GlobalOutput.perror("TSocket::hasPendingDataToRead() THRIFT_IOCTL_SOCKET() " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
+ }
+ return numBytesAvailable > 0;
+}
+
bool TSocket::isOpen() {
return (socket_ != THRIFT_INVALID_SOCKET);
}
diff --git a/lib/cpp/src/thrift/transport/TSocket.h b/lib/cpp/src/thrift/transport/TSocket.h
index 1f95e68..66d9e6c 100644
--- a/lib/cpp/src/thrift/transport/TSocket.h
+++ b/lib/cpp/src/thrift/transport/TSocket.h
@@ -84,7 +84,9 @@
virtual bool isOpen();
/**
- * Calls select on the socket to see if there is more data available.
+ * Checks whether there is more data available in the socket to read.
+ *
+ * This call blocks until at least one byte is available or the socket is closed.
*/
virtual bool peek();
@@ -101,6 +103,17 @@
virtual void close();
/**
+ * Determines whether there is pending data to read or not.
+ *
+ * This call does not block.
+ * \throws TTransportException of types:
+ * NOT_OPEN means the socket has been closed
+ * UNKNOWN means something unexpected happened
+ * \returns true if there is pending data to read, false otherwise
+ */
+ virtual bool hasPendingDataToRead();
+
+ /**
* Reads from the underlying socket.
* \returns the number of bytes read or 0 indicates EOF
* \throws TTransportException of types: