THRIFT-2441 Cannot shutdown TThreadedServer when clients are still connected
Author: James E. King, III <Jim.King@simplivity.com>
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp
index fa6bff5..19f44ac 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.cpp
+++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp
@@ -70,11 +70,11 @@
if (client) {
client->close();
}
- if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+ if (ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TServerTransport died on accept: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
- continue;
+ if (stop_) break; else continue;
} catch (TException& tx) {
if (inputTransport) {
inputTransport->close();
@@ -88,7 +88,7 @@
string errStr = string("Some kind of accept exception: ") + tx.what();
GlobalOutput(errStr.c_str());
continue;
- } catch (string s) {
+ } catch (const string& s) {
if (inputTransport) {
inputTransport->close();
}
@@ -122,8 +122,12 @@
}
}
} catch (const TTransportException& ttx) {
- string errStr = string("TSimpleServer client died: ") + ttx.what();
- GlobalOutput(errStr.c_str());
+ if (ttx.getType() != TTransportException::END_OF_FILE &&
+ ttx.getType() != TTransportException::INTERRUPTED)
+ {
+ string errStr = string("TSimpleServer client died: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
} catch (const std::exception& x) {
GlobalOutput.printf("TSimpleServer exception: %s: %s", typeid(x).name(), x.what());
} catch (...) {
@@ -163,6 +167,15 @@
stop_ = false;
}
}
+
+void TSimpleServer::stop() {
+ if (!stop_) {
+ stop_ = true;
+ serverTransport_->interrupt();
+ serverTransport_->interruptChildren();
+ }
+}
+
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h
index 967f834..941f12b 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.h
+++ b/lib/cpp/src/thrift/server/TSimpleServer.h
@@ -32,7 +32,6 @@
* continuous loop of accepting a single connection, processing requests on
* that connection until it closes, and then repeating. It is a good example
* of how to extend the TServer interface.
- *
*/
class TSimpleServer : public TServer {
public:
@@ -84,14 +83,20 @@
outputProtocolFactory),
stop_(false) {}
- ~TSimpleServer() {}
-
+ /**
+ * Process one connection at a time using the caller's thread.
+ * Call stop() on another thread to interrupt processing and
+ * return control to the caller.
+ * Post-conditions (return guarantees):
+ * The serverTransport will be closed.
+ * There will be no connected client.
+ */
void serve();
- void stop() {
- stop_ = true;
- serverTransport_->interrupt();
- }
+ /**
+ * Interrupt serve() so that it meets post-conditions.
+ */
+ void stop();
protected:
bool stop_;
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index 0530d8d..58cfe3e 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -69,11 +69,12 @@
break;
}
}
- } catch (const TTransportException&) {
- // This is reasonably expected, client didn't send a full request so just
- // ignore him
- // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
- // GlobalOutput(errStr.c_str());
+ } catch (const TTransportException& ttx) {
+ if (ttx.getType() != TTransportException::END_OF_FILE &&
+ ttx.getType() != TTransportException::INTERRUPTED) {
+ string errStr = string("TThreadPoolServer::Task client died: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
} catch (const std::exception& x) {
GlobalOutput.printf("TThreadPoolServer exception %s: %s", typeid(x).name(), x.what());
} catch (...) {
@@ -108,8 +109,7 @@
shared_ptr<TTransport> transport_;
};
-TThreadPoolServer::~TThreadPoolServer() {
-}
+TThreadPoolServer::~TThreadPoolServer() {}
void TThreadPoolServer::serve() {
shared_ptr<TTransport> client;
@@ -160,11 +160,11 @@
if (client) {
client->close();
}
- if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+ if (ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
- continue;
+ if (stop_) break; else continue;
} catch (TException& tx) {
if (inputTransport) {
inputTransport->close();
@@ -178,7 +178,7 @@
string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
GlobalOutput(errStr.c_str());
continue;
- } catch (string s) {
+ } catch (const string& s) {
if (inputTransport) {
inputTransport->close();
}
@@ -207,6 +207,14 @@
}
}
+void TThreadPoolServer::stop() {
+ if (!stop_) {
+ stop_ = true;
+ serverTransport_->interrupt();
+ serverTransport_->interruptChildren();
+ }
+}
+
int64_t TThreadPoolServer::getTimeout() const {
return timeout_;
}
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h
index ad7e7ef..1696700 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.h
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h
@@ -109,15 +109,12 @@
virtual void serve();
+ virtual void stop();
+
virtual int64_t getTimeout() const;
virtual void setTimeout(int64_t value);
- virtual void stop() {
- stop_ = true;
- serverTransport_->interrupt();
- }
-
virtual int64_t getTaskExpiration() const;
virtual void setTaskExpiration(int64_t value);
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 380f69c..118c9cb 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -55,10 +55,6 @@
~Task() {}
- void stop() {
- input_->getTransport()->close();
- }
-
void run() {
boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
void* connectionContext = NULL;
@@ -76,7 +72,8 @@
}
}
} catch (const TTransportException& ttx) {
- if (ttx.getType() != TTransportException::END_OF_FILE) {
+ if (ttx.getType() != TTransportException::END_OF_FILE &&
+ ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TThreadedServer client died: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
@@ -130,8 +127,7 @@
}
}
-TThreadedServer::~TThreadedServer() {
-}
+TThreadedServer::~TThreadedServer() {}
void TThreadedServer::serve() {
@@ -196,11 +192,11 @@
if (client) {
client->close();
}
- if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+ if (ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
- continue;
+ if (stop_) break; else continue;
} catch (TException& tx) {
if (inputTransport) {
inputTransport->close();
@@ -214,7 +210,7 @@
string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
GlobalOutput(errStr.c_str());
continue;
- } catch (string s) {
+ } catch (const string& s) {
if (inputTransport) {
inputTransport->close();
}
@@ -240,8 +236,6 @@
}
try {
Synchronized s(tasksMonitor_);
- for ( std::set<Task*>::iterator tIt = tasks_.begin(); tIt != tasks_.end(); ++tIt )
- (*tIt)->stop();
while (!tasks_.empty()) {
tasksMonitor_.wait();
}
@@ -252,6 +246,14 @@
stop_ = false;
}
}
+
+void TThreadedServer::stop() {
+ if (!stop_) {
+ stop_ = true;
+ serverTransport_->interrupt();
+ serverTransport_->interruptChildren();
+ }
+}
}
}
} // apache::thrift::server
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index 2b1f757..b9b24fe 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -75,11 +75,7 @@
virtual ~TThreadedServer();
virtual void serve();
-
- void stop() {
- stop_ = true;
- serverTransport_->interrupt();
- }
+ void stop();
protected:
void init();
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp
index 8c8fd73..f5c3ea5 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp
@@ -20,6 +20,7 @@
#include <thrift/thrift-config.h>
#include <cstring>
+#include <stdexcept>
#include <sys/types.h>
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
@@ -69,6 +70,12 @@
return reinterpret_cast<SOCKOPT_CAST_T*>(v);
}
+void destroyer_of_fine_sockets(THRIFT_SOCKET *ssock)
+{
+ ::THRIFT_CLOSESOCKET(*ssock);
+ delete ssock;
+}
+
namespace apache {
namespace thrift {
namespace transport {
@@ -88,9 +95,12 @@
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
keepAlive_(false),
- intSock1_(THRIFT_INVALID_SOCKET),
- intSock2_(THRIFT_INVALID_SOCKET) {
-}
+ interruptableChildren_(true),
+ listening_(false),
+ interruptSockWriter_(THRIFT_INVALID_SOCKET),
+ interruptSockReader_(THRIFT_INVALID_SOCKET),
+ childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
+{}
TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout)
: port_(port),
@@ -104,9 +114,12 @@
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
keepAlive_(false),
- intSock1_(THRIFT_INVALID_SOCKET),
- intSock2_(THRIFT_INVALID_SOCKET) {
-}
+ interruptableChildren_(true),
+ listening_(false),
+ interruptSockWriter_(THRIFT_INVALID_SOCKET),
+ interruptSockReader_(THRIFT_INVALID_SOCKET),
+ childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
+{}
TServerSocket::TServerSocket(const string& address, int port)
: port_(port),
@@ -121,9 +134,12 @@
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
keepAlive_(false),
- intSock1_(THRIFT_INVALID_SOCKET),
- intSock2_(THRIFT_INVALID_SOCKET) {
-}
+ interruptableChildren_(true),
+ listening_(false),
+ interruptSockWriter_(THRIFT_INVALID_SOCKET),
+ interruptSockReader_(THRIFT_INVALID_SOCKET),
+ childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
+{}
TServerSocket::TServerSocket(const string& path)
: port_(0),
@@ -138,9 +154,12 @@
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
keepAlive_(false),
- intSock1_(THRIFT_INVALID_SOCKET),
- intSock2_(THRIFT_INVALID_SOCKET) {
-}
+ interruptableChildren_(true),
+ listening_(false),
+ interruptSockWriter_(THRIFT_INVALID_SOCKET),
+ interruptSockReader_(THRIFT_INVALID_SOCKET),
+ childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
+{}
TServerSocket::~TServerSocket() {
close();
@@ -178,18 +197,41 @@
tcpRecvBuffer_ = tcpRecvBuffer;
}
+void TServerSocket::setInterruptableChildren(bool enable) {
+ if (listening_) {
+ throw std::logic_error("setInterruptableChildren cannot be called after listen()");
+ }
+ interruptableChildren_ = enable;
+}
+
void TServerSocket::listen() {
+ listening_ = true;
#ifdef _WIN32
TWinsockSingleton::create();
#endif // _WIN32
THRIFT_SOCKET sv[2];
+ // Create the socket pair used to interrupt
if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
- GlobalOutput.perror("TServerSocket::listen() socketpair() ", THRIFT_GET_SOCKET_ERROR);
- intSock1_ = THRIFT_INVALID_SOCKET;
- intSock2_ = THRIFT_INVALID_SOCKET;
+ GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt",
+ THRIFT_GET_SOCKET_ERROR);
+ interruptSockWriter_ = THRIFT_INVALID_SOCKET;
+ interruptSockReader_ = THRIFT_INVALID_SOCKET;
} else {
- intSock1_ = sv[1];
- intSock2_ = sv[0];
+ interruptSockWriter_ = sv[1];
+ interruptSockReader_ = sv[0];
+ }
+
+ // Create the socket pair used to interrupt all clients
+ if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt",
+ THRIFT_GET_SOCKET_ERROR);
+ childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
+ pChildInterruptSockReader_.reset();
+ } else {
+ childInterruptSockWriter_ = sv[1];
+ pChildInterruptSockReader_ =
+ boost::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]),
+ destroyer_of_fine_sockets);
}
// Validate port number
@@ -469,8 +511,8 @@
std::memset(fds, 0, sizeof(fds));
fds[0].fd = serverSocket_;
fds[0].events = THRIFT_POLLIN;
- if (intSock2_ != THRIFT_INVALID_SOCKET) {
- fds[1].fd = intSock2_;
+ if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
+ fds[1].fd = interruptSockReader_;
fds[1].events = THRIFT_POLLIN;
}
/*
@@ -491,9 +533,9 @@
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
} else if (ret > 0) {
// Check for an interrupt signal
- if (intSock2_ != THRIFT_INVALID_SOCKET && (fds[1].revents & THRIFT_POLLIN)) {
+ if (interruptSockReader_ != THRIFT_INVALID_SOCKET && (fds[1].revents & THRIFT_POLLIN)) {
int8_t buf;
- if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
+ if (-1 == recv(interruptSockReader_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ",
THRIFT_GET_SOCKET_ERROR);
}
@@ -562,16 +604,28 @@
}
shared_ptr<TSocket> TServerSocket::createSocket(THRIFT_SOCKET clientSocket) {
- return shared_ptr<TSocket>(new TSocket(clientSocket));
+ if (interruptableChildren_) {
+ return shared_ptr<TSocket>(new TSocket(clientSocket, pChildInterruptSockReader_));
+ } else {
+ return shared_ptr<TSocket>(new TSocket(clientSocket));
+ }
+}
+
+void TServerSocket::notify(THRIFT_SOCKET notifySocket) {
+ if (notifySocket != THRIFT_INVALID_SOCKET) {
+ int8_t byte = 0;
+ if (-1 == send(notifySocket, cast_sockopt(&byte), sizeof(int8_t), 0)) {
+ GlobalOutput.perror("TServerSocket::notify() send() ", THRIFT_GET_SOCKET_ERROR);
+ }
+ }
}
void TServerSocket::interrupt() {
- if (intSock1_ != THRIFT_INVALID_SOCKET) {
- int8_t byte = 0;
- if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0)) {
- GlobalOutput.perror("TServerSocket::interrupt() send() ", THRIFT_GET_SOCKET_ERROR);
- }
- }
+ notify(interruptSockWriter_);
+}
+
+void TServerSocket::interruptChildren() {
+ notify(childInterruptSockWriter_);
}
void TServerSocket::close() {
@@ -579,16 +633,23 @@
shutdown(serverSocket_, THRIFT_SHUT_RDWR);
::THRIFT_CLOSESOCKET(serverSocket_);
}
- if (intSock1_ != THRIFT_INVALID_SOCKET) {
- ::THRIFT_CLOSESOCKET(intSock1_);
+ if (interruptSockWriter_ != THRIFT_INVALID_SOCKET) {
+ ::THRIFT_CLOSESOCKET(interruptSockWriter_);
}
- if (intSock2_ != THRIFT_INVALID_SOCKET) {
- ::THRIFT_CLOSESOCKET(intSock2_);
+ if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
+ ::THRIFT_CLOSESOCKET(interruptSockReader_);
+ }
+ if (childInterruptSockWriter_ != THRIFT_INVALID_SOCKET) {
+ ::THRIFT_CLOSESOCKET(childInterruptSockWriter_);
}
serverSocket_ = THRIFT_INVALID_SOCKET;
- intSock1_ = THRIFT_INVALID_SOCKET;
- intSock2_ = THRIFT_INVALID_SOCKET;
+ interruptSockWriter_ = THRIFT_INVALID_SOCKET;
+ interruptSockReader_ = THRIFT_INVALID_SOCKET;
+ childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
+ pChildInterruptSockReader_.reset();
+ listening_ = false;
}
+
}
}
} // apache::thrift::transport
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.h b/lib/cpp/src/thrift/transport/TServerSocket.h
index 49711e8..58e4afd 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.h
+++ b/lib/cpp/src/thrift/transport/TServerSocket.h
@@ -100,17 +100,33 @@
// socket, this is the place to do it.
void setAcceptCallback(const socket_func_t& acceptCallback) { acceptCallback_ = acceptCallback; }
- void listen();
- void close();
+ // When enabled (the default), new children TSockets will be constructed so
+ // they can be interrupted by TServerTransport::interruptChildren().
+ // This is more expensive in terms of system calls (poll + recv) however
+ // ensures a connected client cannot interfere with TServer::stop().
+ //
+ // When disabled, TSocket children do not incur an additional poll() call.
+ // Server-side reads are more efficient, however a client can interfere with
+ // the server's ability to shutdown properly by staying connected.
+ //
+ // Must be called before listen(); mode cannot be switched after that.
+ // \throws std::logic_error if listen() has been called
+ void setInterruptableChildren(bool enable);
- void interrupt();
int getPort();
+ void listen();
+ void interrupt();
+ void interruptChildren();
+ void close();
+
protected:
boost::shared_ptr<TTransport> acceptImpl();
virtual boost::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client);
private:
+ void notify(THRIFT_SOCKET notifySock);
+
int port_;
std::string address_;
std::string path_;
@@ -124,9 +140,13 @@
int tcpSendBuffer_;
int tcpRecvBuffer_;
bool keepAlive_;
+ bool interruptableChildren_;
+ bool listening_;
- THRIFT_SOCKET intSock1_;
- THRIFT_SOCKET intSock2_;
+ THRIFT_SOCKET interruptSockWriter_; // is notified on interrupt()
+ THRIFT_SOCKET interruptSockReader_; // is used in select/poll with serverSocket_ for interruptability
+ THRIFT_SOCKET childInterruptSockWriter_; // is notified on interruptChildren()
+ boost::shared_ptr<THRIFT_SOCKET> pChildInterruptSockReader_; // if interruptableChildren_ this is shared with child TSockets
socket_func_t listenCallback_;
socket_func_t acceptCallback_;
diff --git a/lib/cpp/src/thrift/transport/TServerTransport.h b/lib/cpp/src/thrift/transport/TServerTransport.h
index 7c4a7c3..cd1d3da 100644
--- a/lib/cpp/src/thrift/transport/TServerTransport.h
+++ b/lib/cpp/src/thrift/transport/TServerTransport.h
@@ -30,8 +30,9 @@
/**
* Server transport framework. A server needs to have some facility for
- * creating base transports to read/write from.
- *
+ * creating base transports to read/write from. The server is expected
+ * to keep track of TTransport children that it creates for purposes of
+ * controlling their lifetime.
*/
class TServerTransport {
public:
@@ -67,11 +68,21 @@
* For "smart" TServerTransport implementations that work in a multi
* threaded context this can be used to break out of an accept() call.
* It is expected that the transport will throw a TTransportException
- * with the interrupted error code.
+ * with the INTERRUPTED error code.
+ *
+ * This will not make an attempt to interrupt any TTransport children.
*/
virtual void interrupt() {}
/**
+ * This will interrupt the children created by the server transport.
+ * allowing them to break out of any blocking data reception call.
+ * It is expected that the children will throw a TTransportException
+ * with the INTERRUPTED error code.
+ */
+ virtual void interruptChildren() {}
+
+ /**
* Closes this transport such that future calls to accept will do nothing.
*/
virtual void close() = 0;
diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp
index bcea291..cc4dce0 100644
--- a/lib/cpp/src/thrift/transport/TSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSocket.cpp
@@ -74,7 +74,7 @@
*
*/
-TSocket::TSocket(string host, int port)
+TSocket::TSocket(const string& host, int port)
: host_(host),
port_(port),
path_(""),
@@ -89,7 +89,7 @@
maxRecvRetries_(5) {
}
-TSocket::TSocket(string path)
+TSocket::TSocket(const string& path)
: host_(""),
port_(0),
path_(path),
@@ -143,6 +143,30 @@
#endif
}
+TSocket::TSocket(THRIFT_SOCKET socket,
+ boost::shared_ptr<THRIFT_SOCKET> interruptListener) :
+ host_(""),
+ port_(0),
+ path_(""),
+ socket_(socket),
+ interruptListener_(interruptListener),
+ connTimeout_(0),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ keepAlive_(false),
+ lingerOn_(1),
+ lingerVal_(0),
+ noDelay_(1),
+ maxRecvRetries_(5) {
+ cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
+#ifdef SO_NOSIGPIPE
+ {
+ int one = 1;
+ setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+ }
+#endif
+}
+
TSocket::~TSocket() {
close();
}
@@ -153,8 +177,41 @@
bool TSocket::peek() {
if (!isOpen()) {
- return false;
+ return false;
}
+ if (interruptListener_)
+ {
+ for (int retries = 0; ; ) {
+ struct THRIFT_POLLFD fds[2];
+ std::memset(fds, 0, sizeof(fds));
+ fds[0].fd = socket_;
+ fds[0].events = THRIFT_POLLIN;
+ fds[1].fd = *(interruptListener_.get());
+ fds[1].events = THRIFT_POLLIN;
+ int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ if (ret < 0) {
+ // error cases
+ if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
+ continue;
+ }
+ GlobalOutput.perror("TSocket::peek() THRIFT_POLL() ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
+ } else if (ret > 0) {
+ // Check the interruptListener
+ if (fds[1].revents & THRIFT_POLLIN) {
+ return false;
+ }
+ // There must be data or a disconnection, fall through to the PEEK
+ break;
+ } else {
+ // timeout
+ return false;
+ }
+ }
+ }
+
+ // Check to see if data is available or if the remote side closed
uint8_t buf;
int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
if (r == -1) {
@@ -455,9 +512,44 @@
// an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
begin.tv_sec = begin.tv_usec = 0;
}
- int got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
- int errno_copy = THRIFT_GET_SOCKET_ERROR; // THRIFT_GETTIMEOFDAY can change
- // THRIFT_GET_SOCKET_ERROR
+
+ int got = 0;
+
+ if (interruptListener_)
+ {
+ struct THRIFT_POLLFD fds[2];
+ std::memset(fds, 0, sizeof(fds));
+ fds[0].fd = socket_;
+ fds[0].events = THRIFT_POLLIN;
+ fds[1].fd = *(interruptListener_.get());
+ fds[1].events = THRIFT_POLLIN;
+
+ int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ if (ret < 0) {
+ // error cases
+ if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
+ goto try_again;
+ }
+ GlobalOutput.perror("TSocket::read() THRIFT_POLL() ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
+ } else if (ret > 0) {
+ // Check the interruptListener
+ if (fds[1].revents & THRIFT_POLLIN) {
+ throw TTransportException(TTransportException::INTERRUPTED,
+ "Interrupted");
+ }
+ } else /* ret == 0 */ {
+ throw TTransportException(TTransportException::TIMED_OUT,
+ "THRIFT_EAGAIN (timed out)");
+ }
+
+ // falling through means there is something to recv and it cannot block
+ }
+
+ got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
+ // THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
// Check for error on read
if (got < 0) {
@@ -493,29 +585,9 @@
goto try_again;
}
-#if defined __FreeBSD__ || defined __MACH__
if (errno_copy == THRIFT_ECONNRESET) {
- /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
- * THRIFT_ECONNRESET if peer performed shutdown
- * edhall: eliminated close() since we do that in the destructor.
- */
return 0;
}
-#endif
-
-#ifdef _WIN32
- if (errno_copy == WSAECONNRESET) {
- return 0; // EOF
- }
-#endif
-
- // Now it's not a try again case, but a real probblez
- GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
-
- // If we disconnect with no linger time
- if (errno_copy == THRIFT_ECONNRESET) {
- throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ECONNRESET");
- }
// This ish isn't open
if (errno_copy == THRIFT_ENOTCONN) {
@@ -527,18 +599,13 @@
throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
}
+ // Now it's not a try again case, but a real probblez
+ GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
+
// Some other error, whatevz
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
}
- // The remote host has closed the socket
- if (got == 0) {
- // edhall: we used to call close() here, but our caller may want to deal
- // with the socket fd and we'll close() in our destructor in any case.
- return 0;
- }
-
- // Pack data into string
return got;
}
diff --git a/lib/cpp/src/thrift/transport/TSocket.h b/lib/cpp/src/thrift/transport/TSocket.h
index 427b91f..09a64e3 100644
--- a/lib/cpp/src/thrift/transport/TSocket.h
+++ b/lib/cpp/src/thrift/transport/TSocket.h
@@ -61,7 +61,7 @@
* @param host An IP address or hostname to connect to
* @param port The port to connect on
*/
- TSocket(std::string host, int port);
+ TSocket(const std::string& host, int port);
/**
* Constructs a new Unix domain socket.
@@ -69,7 +69,7 @@
*
* @param path The Unix domain socket e.g. "/tmp/ThriftTest.binary.thrift"
*/
- TSocket(std::string path);
+ TSocket(const std::string& path);
/**
* Destroyes the socket object, closing it if necessary.
@@ -102,6 +102,13 @@
/**
* Reads from the underlying socket.
+ * \returns the number of bytes read or 0 indicates EOF
+ * \throws TTransportException of types:
+ * INTERRUPTED means the socket was interrupted
+ * out of a blocking call
+ * NOT_OPEN means the socket has been closed
+ * TIMED_OUT means the receive timeout expired
+ * UNKNOWN means something unexpected happened
*/
virtual uint32_t read(uint8_t* buf, uint32_t len);
@@ -242,11 +249,18 @@
virtual const std::string getOrigin();
/**
- * Constructor to create socket from raw UNIX handle.
+ * Constructor to create socket from file descriptor.
*/
TSocket(THRIFT_SOCKET socket);
/**
+ * Constructor to create socket from file descriptor that
+ * can be interrupted safely.
+ */
+ TSocket(THRIFT_SOCKET socket,
+ boost::shared_ptr<THRIFT_SOCKET> interruptListener);
+
+ /**
* Set a cache of the peer address (used when trivially available: e.g.
* accept() or connect()). Only caches IPV4 and IPV6; unset for others.
*/
@@ -274,9 +288,15 @@
/** UNIX domain socket path */
std::string path_;
- /** Underlying UNIX socket handle */
+ /** Underlying socket handle */
THRIFT_SOCKET socket_;
+ /**
+ * A shared socket pointer that will interrupt a blocking read if data
+ * becomes available on it
+ */
+ boost::shared_ptr<THRIFT_SOCKET> interruptListener_;
+
/** Connect timeout in ms */
int connTimeout_;
diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt
index 721053a..83ebe9e 100644
--- a/lib/cpp/test/CMakeLists.txt
+++ b/lib/cpp/test/CMakeLists.txt
@@ -20,7 +20,7 @@
# Find required packages
set(Boost_USE_STATIC_LIBS ON) # Force the use of static boost test framework
-find_package(Boost 1.53.0 REQUIRED COMPONENTS unit_test_framework)
+find_package(Boost 1.53.0 REQUIRED COMPONENTS chrono system thread unit_test_framework)
include_directories(SYSTEM "${Boost_INCLUDE_DIRS}")
#Make sure gen-cpp files can be included
@@ -50,6 +50,8 @@
set(testgencpp_cob_SOURCES
gen-cpp/ChildService.cpp
gen-cpp/ChildService.h
+ gen-cpp/EmptyService.cpp
+ gen-cpp/EmptyService.h
gen-cpp/ParentService.cpp
gen-cpp/ParentService.h
gen-cpp/proc_types.cpp
@@ -71,6 +73,7 @@
ToStringTest.cpp
TypedefTest.cpp
TServerSocketTest.cpp
+ TServerTransportTest.cpp
)
if(NOT WITH_BOOSTTHREADS AND NOT WITH_STDTHREADS)
@@ -81,6 +84,26 @@
target_link_libraries(UnitTests testgencpp thrift ${Boost_LIBRARIES})
add_test(NAME UnitTests COMMAND UnitTests)
+add_executable(TSocketInterruptTest TSocketInterruptTest.cpp)
+target_link_libraries(TSocketInterruptTest
+ testgencpp
+ ${Boost_LIBRARIES}
+ #-lrt
+)
+if (NOT MSVC)
+target_link_libraries(TSocketInterruptTest -lrt)
+endif ()
+add_test(NAME TSocketInterruptTest COMMAND TSocketInterruptTest)
+
+add_executable(TServerIntegrationTest TServerIntegrationTest.cpp)
+target_link_libraries(TServerIntegrationTest
+ testgencpp_cob
+ ${Boost_LIBRARIES}
+)
+if (NOT MSVC)
+target_link_libraries(TServerIntegrationTest -lrt)
+endif ()
+add_test(NAME TServerIntegrationTest COMMAND TServerIntegrationTest)
if(WITH_ZLIB)
add_executable(TransportTest TransportTest.cpp)
@@ -255,7 +278,7 @@
#
-add_custom_command(OUTPUT gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h
+add_custom_command(OUTPUT gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h gen-cpp/EmptyService.cpp gen-cpp/EmptyService.h
COMMAND thrift-compiler --gen cpp:dense ${PROJECT_SOURCE_DIR}/test/DebugProtoTest.thrift
)
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 46ff911..0cd1c67 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -25,6 +25,7 @@
gen-cpp/ThriftTest_types.h \
gen-cpp/TypedefTest_types.h \
gen-cpp/ChildService.h \
+ gen-cpp/EmptyService.h \
gen-cpp/ParentService.h \
gen-cpp/proc_types.h
@@ -50,6 +51,8 @@
nodist_libprocessortest_la_SOURCES = \
gen-cpp/ChildService.cpp \
gen-cpp/ChildService.h \
+ gen-cpp/EmptyService.cpp \
+ gen-cpp/EmptyService.h \
gen-cpp/ParentService.cpp \
gen-cpp/ParentService.h \
gen-cpp/proc_types.cpp \
@@ -79,6 +82,8 @@
SpecializationTest \
AllProtocolsTest \
TransportTest \
+ TSocketInterruptTest \
+ TServerIntegrationTest \
ZlibTest \
TFileTransportTest \
link_test \
@@ -107,17 +112,38 @@
Base64Test.cpp \
ToStringTest.cpp \
TypedefTest.cpp \
- TServerSocketTest.cpp
+ TServerSocketTest.cpp \
+ TServerTransportTest.cpp
if !WITH_BOOSTTHREADS
UnitTests_SOURCES += \
- RWMutexStarveTest.cpp
+ RWMutexStarveTest.cpp
endif
UnitTests_LDADD = \
libtestgencpp.la \
$(BOOST_TEST_LDADD)
+TSocketInterruptTest_SOURCES = \
+ TSocketInterruptTest.cpp
+
+TSocketInterruptTest_LDADD = \
+ libtestgencpp.la \
+ $(BOOST_TEST_LDADD) \
+ $(BOOST_CHRONO_LDADD) \
+ $(BOOST_SYSTEM_LDADD) \
+ $(BOOST_THREAD_LDADD)
+
+TServerIntegrationTest_SOURCES = \
+ TServerIntegrationTest.cpp
+
+TServerIntegrationTest_LDADD = \
+ libtestgencpp.la \
+ libprocessortest.la \
+ $(BOOST_TEST_LDADD) \
+ $(BOOST_SYSTEM_LDADD) \
+ $(BOOST_THREAD_LDADD)
+
TransportTest_SOURCES = \
TransportTest.cpp
@@ -273,7 +299,7 @@
#
THRIFT = $(top_builddir)/compiler/cpp/thrift
-gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h: $(top_srcdir)/test/DebugProtoTest.thrift
+gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h gen-cpp/EmptyService.cpp gen-cpp/EmptyService.h: $(top_srcdir)/test/DebugProtoTest.thrift
$(THRIFT) --gen cpp:dense $<
gen-cpp/EnumTest_types.cpp gen-cpp/EnumTest_types.h: $(top_srcdir)/test/EnumTest.thrift
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
new file mode 100644
index 0000000..9edeb19
--- /dev/null
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define BOOST_TEST_MODULE TServerIntegrationTest
+#include <boost/test/auto_unit_test.hpp>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <thrift/server/TThreadedServer.h>
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/transport/TServerSocket.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TTransport.h>
+#include "gen-cpp/ParentService.h"
+#include "TestPortFixture.h"
+#include <vector>
+
+using apache::thrift::concurrency::Guard;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Synchronized;
+using apache::thrift::protocol::TBinaryProtocol;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerSocket;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportFactory;
+using apache::thrift::server::TServerEventHandler;
+using apache::thrift::server::TThreadedServer;
+using apache::thrift::test::ParentServiceClient;
+using apache::thrift::test::ParentServiceIf;
+using apache::thrift::test::ParentServiceProcessor;
+
+/**
+ * preServe runs after listen() is successful, when we can connect
+ */
+class TServerReadyEventHandler : public TServerEventHandler, public Monitor
+{
+public:
+ TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
+ virtual ~TServerReadyEventHandler() {}
+ virtual void preServe() {
+ Synchronized sync(*this);
+ isListening_ = true;
+ notify();
+ }
+ virtual void* createContext(boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output) {
+ Synchronized sync(*this);
+ ++accepted_;
+ notify();
+
+ (void)input;
+ (void)output;
+ return NULL;
+ }
+ bool isListening() const { return isListening_; }
+ uint64_t acceptedCount() const { return accepted_; }
+private:
+ bool isListening_;
+ uint64_t accepted_;
+};
+
+class ParentHandler : virtual public ParentServiceIf {
+public:
+ ParentHandler() : generation_(0) {}
+
+ int32_t incrementGeneration() {
+ Guard g(mutex_);
+ return ++generation_;
+ }
+
+ int32_t getGeneration() {
+ Guard g(mutex_);
+ return generation_;
+ }
+
+ void addString(const std::string& s) {
+ Guard g(mutex_);
+ strings_.push_back(s);
+ }
+
+ void getStrings(std::vector<std::string>& _return) {
+ Guard g(mutex_);
+ _return = strings_;
+ }
+
+ void getDataWait(std::string& _return, int32_t length) {
+ }
+
+ void onewayWait() {
+ }
+
+ void exceptionWait(const std::string& message) {
+ }
+
+ void unexpectedExceptionWait(const std::string& message) {
+ }
+
+protected:
+ Mutex mutex_;
+ int32_t generation_;
+ std::vector<std::string> strings_;
+};
+
+class TServerIntegrationTestFixture : public TestPortFixture
+{
+public:
+ TServerIntegrationTestFixture() :
+ pServer(new TThreadedServer(
+ boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
+ boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
+ boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
+ boost::shared_ptr<TTransportFactory>(new TTransportFactory),
+ boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
+ pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler))
+ {
+ pServer->setServerEventHandler(pEventHandler);
+ }
+
+ void startServer() {
+ pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
+
+ // block until listen() completes so clients will be able to connect
+ Synchronized sync(*(pEventHandler.get()));
+ while (!pEventHandler->isListening()) {
+ pEventHandler->wait();
+ }
+
+ BOOST_MESSAGE("server is listening");
+ }
+
+ void blockUntilAccepted(uint64_t numAccepted) {
+ Synchronized sync(*(pEventHandler.get()));
+ while (pEventHandler->acceptedCount() < numAccepted) {
+ pEventHandler->wait();
+ }
+
+ BOOST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
+ }
+
+ void stopServer() {
+ pServer->stop();
+ BOOST_MESSAGE("server stop completed");
+ pServerThread->join();
+ BOOST_MESSAGE("server thread joined");
+ }
+
+ ~TServerIntegrationTestFixture() {
+ stopServer();
+ }
+
+ void delayClose(boost::shared_ptr<TTransport> toClose) {
+ boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+ toClose->close();
+ }
+
+ boost::shared_ptr<TThreadedServer> pServer;
+ boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
+ boost::shared_ptr<boost::thread> pServerThread;
+};
+
+BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
+
+BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
+{
+ // this test establishes some basic sanity
+
+ startServer();
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
+ ParentServiceClient client1(pClientProtocol1);
+ pClientSock1->open();
+ client1.incrementGeneration();
+ pClientSock1->close();
+ stopServer();
+}
+
+BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
+{
+ // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
+
+ startServer();
+
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+ pClientSock1->open();
+
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+ pClientSock2->open();
+
+ // Ensure they have been accepted
+ blockUntilAccepted(2);
+
+ // The test fixture destructor will force the sockets to disconnect
+ // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
+ stopServer();
+
+ // extra proof the server end disconnected the clients
+ uint8_t buf[1];
+ BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
+ BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
+ pClientSock1->close();
+ pClientSock2->close();
+}
+
+BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
+{
+ // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
+ // disconnect.
+
+ boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
+ setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+ startServer();
+
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+ pClientSock1->open();
+
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+ pClientSock2->open();
+
+ // Ensure they have been accepted
+ blockUntilAccepted(2);
+
+ boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
+ boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
+
+ // Once the clients disconnect the server will stop
+ stopServer();
+
+ pClientSock1->close();
+ pClientSock2->close();
+}
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/lib/cpp/test/TServerSocketTest.cpp b/lib/cpp/test/TServerSocketTest.cpp
index eee7c26..65f99f9 100644
--- a/lib/cpp/test/TServerSocketTest.cpp
+++ b/lib/cpp/test/TServerSocketTest.cpp
@@ -22,6 +22,7 @@
#include <thrift/transport/TServerSocket.h>
#include "TestPortFixture.h"
#include "TTransportCheckThrow.h"
+#include <iostream>
using apache::thrift::transport::TServerSocket;
using apache::thrift::transport::TSocket;
@@ -30,24 +31,18 @@
BOOST_FIXTURE_TEST_SUITE ( TServerSocketTest, TestPortFixture )
-class TestTServerSocket : public TServerSocket
-{
- public:
- TestTServerSocket(const std::string& address, int port) : TServerSocket(address, port) { }
- using TServerSocket::acceptImpl;
-};
-
BOOST_AUTO_TEST_CASE( test_bind_to_address )
{
- TestTServerSocket sock1("localhost", m_serverPort);
+ TServerSocket sock1("localhost", m_serverPort);
sock1.listen();
TSocket clientSock("localhost", m_serverPort);
clientSock.open();
- boost::shared_ptr<TTransport> accepted = sock1.acceptImpl();
+ boost::shared_ptr<TTransport> accepted = sock1.accept();
accepted->close();
sock1.close();
- TServerSocket sock2("this.is.truly.an.unrecognizable.address.", m_serverPort);
+ std::cout << "An error message from getaddrinfo on the console is expected:" << std::endl;
+ TServerSocket sock2("257.258.259.260", m_serverPort);
BOOST_CHECK_THROW(sock2.listen(), TTransportException);
sock2.close();
}
diff --git a/lib/cpp/test/TServerTransportTest.cpp b/lib/cpp/test/TServerTransportTest.cpp
new file mode 100644
index 0000000..09b2c59
--- /dev/null
+++ b/lib/cpp/test/TServerTransportTest.cpp
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/test/auto_unit_test.hpp>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TServerTransport.h>
+#include "TestPortFixture.h"
+
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+
+BOOST_AUTO_TEST_SUITE ( TServerTransportTest )
+
+class TestTTransport : public TTransport
+{
+};
+
+class TestTServerTransport : public TServerTransport
+{
+public:
+ TestTServerTransport() : valid_(true) {}
+ void close() {}
+ bool valid_;
+protected:
+ boost::shared_ptr<TTransport> acceptImpl()
+ {
+ return valid_ ? boost::shared_ptr<TestTTransport>(new TestTTransport) : boost::shared_ptr<TestTTransport>();
+ }
+};
+
+BOOST_AUTO_TEST_CASE( test_positive_accept )
+{
+ TestTServerTransport uut;
+ BOOST_CHECK(uut.accept());
+}
+
+BOOST_AUTO_TEST_CASE( test_negative_accept )
+{
+ TestTServerTransport uut;
+ uut.valid_ = false;
+ BOOST_CHECK_THROW(uut.accept(), TTransportException);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
diff --git a/lib/cpp/test/TSocketInterruptTest.cpp b/lib/cpp/test/TSocketInterruptTest.cpp
new file mode 100644
index 0000000..4f6b2bc
--- /dev/null
+++ b/lib/cpp/test/TSocketInterruptTest.cpp
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define BOOST_TEST_MODULE TSocketInterruptTest
+#include <boost/test/auto_unit_test.hpp>
+
+#include <boost/bind.hpp>
+#include <boost/chrono/duration.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/thread/thread.hpp>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TServerSocket.h>
+#include "TestPortFixture.h"
+
+using apache::thrift::transport::TServerSocket;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+
+BOOST_FIXTURE_TEST_SUITE ( TSocketInterruptTest, TestPortFixture )
+
+void readerWorker(boost::shared_ptr<TTransport> tt, uint32_t expectedResult)
+{
+ uint8_t buf[4];
+ BOOST_CHECK_EQUAL(expectedResult, tt->read(buf, 4));
+}
+
+void readerWorkerMustThrow(boost::shared_ptr<TTransport> tt)
+{
+ try
+ {
+ uint8_t buf[4];
+ tt->read(buf, 4);
+ BOOST_ERROR("should not have gotten here");
+ }
+ catch (const TTransportException& tx)
+ {
+ BOOST_CHECK_EQUAL(TTransportException::INTERRUPTED, tx.getType());
+ }
+}
+
+BOOST_AUTO_TEST_CASE( test_interruptable_child_read )
+{
+ TServerSocket sock1("localhost", m_serverPort);
+ sock1.listen();
+ TSocket clientSock("localhost", m_serverPort);
+ clientSock.open();
+ boost::shared_ptr<TTransport> accepted = sock1.accept();
+ boost::thread readThread(boost::bind(readerWorkerMustThrow, accepted));
+ boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+ // readThread is practically guaranteed to be blocking now
+ sock1.interruptChildren();
+ BOOST_CHECK_MESSAGE(readThread.try_join_for(boost::chrono::milliseconds(200)),
+ "server socket interruptChildren did not interrupt child read");
+ clientSock.close();
+ accepted->close();
+ sock1.close();
+}
+
+BOOST_AUTO_TEST_CASE( test_non_interruptable_child_read )
+{
+ TServerSocket sock1("localhost", m_serverPort);
+ sock1.setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+ sock1.listen();
+ TSocket clientSock("localhost", m_serverPort);
+ clientSock.open();
+ boost::shared_ptr<TTransport> accepted = sock1.accept();
+ boost::thread readThread(boost::bind(readerWorker, accepted, 0));
+ boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+ // readThread is practically guaranteed to be blocking here
+ sock1.interruptChildren();
+ BOOST_CHECK_MESSAGE(!readThread.try_join_for(boost::chrono::milliseconds(200)),
+ "server socket interruptChildren interrupted child read");
+
+ // only way to proceed is to have the client disconnect
+ clientSock.close();
+ readThread.join();
+ accepted->close();
+ sock1.close();
+}
+
+BOOST_AUTO_TEST_CASE( test_cannot_change_after_listen )
+{
+ TServerSocket sock1("localhost", m_serverPort);
+ sock1.listen();
+ BOOST_CHECK_THROW(sock1.setInterruptableChildren(false), std::logic_error);
+ sock1.close();
+}
+
+void peekerWorker(boost::shared_ptr<TTransport> tt, bool expectedResult)
+{
+ BOOST_CHECK_EQUAL(expectedResult, tt->peek());
+}
+
+BOOST_AUTO_TEST_CASE( test_interruptable_child_peek )
+{
+ TServerSocket sock1("localhost", m_serverPort);
+ sock1.listen();
+ TSocket clientSock("localhost", m_serverPort);
+ clientSock.open();
+ boost::shared_ptr<TTransport> accepted = sock1.accept();
+ // peek() will return false if child is interrupted
+ boost::thread peekThread(boost::bind(peekerWorker, accepted, false));
+ boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+ // peekThread is practically guaranteed to be blocking now
+ sock1.interruptChildren();
+ BOOST_CHECK_MESSAGE(peekThread.try_join_for(boost::chrono::milliseconds(200)),
+ "server socket interruptChildren did not interrupt child peek");
+ clientSock.close();
+ accepted->close();
+ sock1.close();
+}
+
+BOOST_AUTO_TEST_CASE( test_non_interruptable_child_peek )
+{
+ TServerSocket sock1("localhost", m_serverPort);
+ sock1.setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+ sock1.listen();
+ TSocket clientSock("localhost", m_serverPort);
+ clientSock.open();
+ boost::shared_ptr<TTransport> accepted = sock1.accept();
+ // peek() will return false when remote side is closed
+ boost::thread peekThread(boost::bind(peekerWorker, accepted, false));
+ boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+ // peekThread is practically guaranteed to be blocking now
+ sock1.interruptChildren();
+ BOOST_CHECK_MESSAGE(!peekThread.try_join_for(boost::chrono::milliseconds(200)),
+ "server socket interruptChildren interrupted child peek");
+
+ // only way to proceed is to have the client disconnect
+ clientSock.close();
+ peekThread.join();
+ accepted->close();
+ sock1.close();
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+