THRIFT-2441 Cannot shutdown TThreadedServer when clients are still connected

Author: James E. King, III <Jim.King@simplivity.com>
diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt
index 721053a..83ebe9e 100644
--- a/lib/cpp/test/CMakeLists.txt
+++ b/lib/cpp/test/CMakeLists.txt
@@ -20,7 +20,7 @@
 
 # Find required packages
 set(Boost_USE_STATIC_LIBS ON) # Force the use of static boost test framework
-find_package(Boost 1.53.0 REQUIRED COMPONENTS unit_test_framework)
+find_package(Boost 1.53.0 REQUIRED COMPONENTS chrono system thread unit_test_framework)
 include_directories(SYSTEM "${Boost_INCLUDE_DIRS}")
 
 #Make sure gen-cpp files can be included
@@ -50,6 +50,8 @@
 set(testgencpp_cob_SOURCES
     gen-cpp/ChildService.cpp
     gen-cpp/ChildService.h
+    gen-cpp/EmptyService.cpp
+    gen-cpp/EmptyService.h
     gen-cpp/ParentService.cpp
     gen-cpp/ParentService.h
     gen-cpp/proc_types.cpp
@@ -71,6 +73,7 @@
     ToStringTest.cpp
     TypedefTest.cpp
     TServerSocketTest.cpp
+    TServerTransportTest.cpp
 )
 
 if(NOT WITH_BOOSTTHREADS AND NOT WITH_STDTHREADS)
@@ -81,6 +84,26 @@
 target_link_libraries(UnitTests testgencpp thrift ${Boost_LIBRARIES})
 add_test(NAME UnitTests COMMAND UnitTests)
 
+add_executable(TSocketInterruptTest TSocketInterruptTest.cpp)
+target_link_libraries(TSocketInterruptTest
+    testgencpp
+    ${Boost_LIBRARIES}
+    #-lrt
+)
+if (NOT MSVC)
+target_link_libraries(TSocketInterruptTest -lrt)
+endif ()
+add_test(NAME TSocketInterruptTest COMMAND TSocketInterruptTest)
+
+add_executable(TServerIntegrationTest TServerIntegrationTest.cpp)
+target_link_libraries(TServerIntegrationTest
+    testgencpp_cob
+    ${Boost_LIBRARIES}
+)
+if (NOT MSVC)
+target_link_libraries(TServerIntegrationTest -lrt)
+endif ()
+add_test(NAME TServerIntegrationTest COMMAND TServerIntegrationTest)
 
 if(WITH_ZLIB)
 add_executable(TransportTest TransportTest.cpp)
@@ -255,7 +278,7 @@
 #
 
 
-add_custom_command(OUTPUT gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h
+add_custom_command(OUTPUT gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h gen-cpp/EmptyService.cpp  gen-cpp/EmptyService.h
     COMMAND thrift-compiler --gen cpp:dense ${PROJECT_SOURCE_DIR}/test/DebugProtoTest.thrift
 )
 
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 46ff911..0cd1c67 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -25,6 +25,7 @@
                 gen-cpp/ThriftTest_types.h \
                 gen-cpp/TypedefTest_types.h \
                 gen-cpp/ChildService.h \
+                gen-cpp/EmptyService.h \
                 gen-cpp/ParentService.h \
                 gen-cpp/proc_types.h
 
@@ -50,6 +51,8 @@
 nodist_libprocessortest_la_SOURCES = \
 	gen-cpp/ChildService.cpp \
 	gen-cpp/ChildService.h \
+	gen-cpp/EmptyService.cpp \
+	gen-cpp/EmptyService.h \
 	gen-cpp/ParentService.cpp \
 	gen-cpp/ParentService.h \
 	gen-cpp/proc_types.cpp \
@@ -79,6 +82,8 @@
 	SpecializationTest \
 	AllProtocolsTest \
 	TransportTest \
+	TSocketInterruptTest \
+	TServerIntegrationTest \
 	ZlibTest \
 	TFileTransportTest \
 	link_test \
@@ -107,17 +112,38 @@
 	Base64Test.cpp \
 	ToStringTest.cpp \
 	TypedefTest.cpp \
-        TServerSocketTest.cpp
+	TServerSocketTest.cpp \
+	TServerTransportTest.cpp
 
 if !WITH_BOOSTTHREADS
 UnitTests_SOURCES += \
-        RWMutexStarveTest.cpp
+    RWMutexStarveTest.cpp
 endif
 
 UnitTests_LDADD = \
   libtestgencpp.la \
   $(BOOST_TEST_LDADD)
 
+TSocketInterruptTest_SOURCES = \
+	TSocketInterruptTest.cpp
+
+TSocketInterruptTest_LDADD = \
+  libtestgencpp.la \
+  $(BOOST_TEST_LDADD) \
+  $(BOOST_CHRONO_LDADD) \
+  $(BOOST_SYSTEM_LDADD) \
+  $(BOOST_THREAD_LDADD)
+
+TServerIntegrationTest_SOURCES = \
+	TServerIntegrationTest.cpp
+
+TServerIntegrationTest_LDADD = \
+  libtestgencpp.la \
+  libprocessortest.la \
+  $(BOOST_TEST_LDADD) \
+  $(BOOST_SYSTEM_LDADD) \
+  $(BOOST_THREAD_LDADD)
+
 TransportTest_SOURCES = \
 	TransportTest.cpp
 
@@ -273,7 +299,7 @@
 #
 THRIFT = $(top_builddir)/compiler/cpp/thrift
 
-gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h: $(top_srcdir)/test/DebugProtoTest.thrift
+gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h gen-cpp/EmptyService.cpp gen-cpp/EmptyService.h: $(top_srcdir)/test/DebugProtoTest.thrift
 	$(THRIFT) --gen cpp:dense $<
 
 gen-cpp/EnumTest_types.cpp gen-cpp/EnumTest_types.h: $(top_srcdir)/test/EnumTest.thrift
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
new file mode 100644
index 0000000..9edeb19
--- /dev/null
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define BOOST_TEST_MODULE TServerIntegrationTest
+#include <boost/test/auto_unit_test.hpp>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <thrift/server/TThreadedServer.h>
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/transport/TServerSocket.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TTransport.h>
+#include "gen-cpp/ParentService.h"
+#include "TestPortFixture.h"
+#include <vector>
+
+using apache::thrift::concurrency::Guard;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Synchronized;
+using apache::thrift::protocol::TBinaryProtocol;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerSocket;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportFactory;
+using apache::thrift::server::TServerEventHandler;
+using apache::thrift::server::TThreadedServer;
+using apache::thrift::test::ParentServiceClient;
+using apache::thrift::test::ParentServiceIf;
+using apache::thrift::test::ParentServiceProcessor;
+
+/**
+ * preServe runs after listen() is successful, when we can connect
+ */
+class TServerReadyEventHandler : public TServerEventHandler, public Monitor
+{
+public:
+  TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
+  virtual ~TServerReadyEventHandler() {}
+  virtual void preServe() {
+    Synchronized sync(*this);
+    isListening_ = true;
+    notify();
+  }
+  virtual void* createContext(boost::shared_ptr<TProtocol> input,
+                              boost::shared_ptr<TProtocol> output) {
+    Synchronized sync(*this);
+    ++accepted_;
+    notify();
+
+    (void)input;
+    (void)output;
+    return NULL;
+  }
+  bool isListening() const { return isListening_; }
+  uint64_t acceptedCount() const { return accepted_; }
+private:
+  bool isListening_;
+  uint64_t accepted_;
+};
+
+class ParentHandler : virtual public ParentServiceIf {
+public:
+  ParentHandler() : generation_(0) {}
+
+  int32_t incrementGeneration() {
+    Guard g(mutex_);
+    return ++generation_;
+  }
+
+  int32_t getGeneration() {
+    Guard g(mutex_);
+    return generation_;
+  }
+
+  void addString(const std::string& s) {
+    Guard g(mutex_);
+    strings_.push_back(s);
+  }
+
+  void getStrings(std::vector<std::string>& _return) {
+    Guard g(mutex_);
+    _return = strings_;
+  }
+
+  void getDataWait(std::string& _return, int32_t length) {
+  }
+
+  void onewayWait() {
+  }
+
+  void exceptionWait(const std::string& message) {
+  }
+
+  void unexpectedExceptionWait(const std::string& message) {
+  }
+
+protected:
+  Mutex mutex_;
+  int32_t generation_;
+  std::vector<std::string> strings_;
+};
+
+class TServerIntegrationTestFixture : public TestPortFixture
+{
+public:
+  TServerIntegrationTestFixture() :
+      pServer(new TThreadedServer(
+                    boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
+                            boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
+                    boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
+                    boost::shared_ptr<TTransportFactory>(new TTransportFactory),
+                    boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
+      pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler))
+  {
+    pServer->setServerEventHandler(pEventHandler);
+  }
+
+  void startServer() {
+    pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
+
+    // block until listen() completes so clients will be able to connect
+    Synchronized sync(*(pEventHandler.get()));
+    while (!pEventHandler->isListening()) {
+        pEventHandler->wait();
+    }
+
+    BOOST_MESSAGE("server is listening");
+  }
+
+  void blockUntilAccepted(uint64_t numAccepted) {
+    Synchronized sync(*(pEventHandler.get()));
+    while (pEventHandler->acceptedCount() < numAccepted) {
+        pEventHandler->wait();
+    }
+
+    BOOST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
+  }
+
+  void stopServer() {
+    pServer->stop();
+    BOOST_MESSAGE("server stop completed");
+    pServerThread->join();
+    BOOST_MESSAGE("server thread joined");
+  }
+
+  ~TServerIntegrationTestFixture() {
+    stopServer();
+  }
+
+  void delayClose(boost::shared_ptr<TTransport> toClose) {
+    boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+    toClose->close();
+  }
+
+  boost::shared_ptr<TThreadedServer> pServer;
+  boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
+  boost::shared_ptr<boost::thread> pServerThread;
+};
+
+BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
+
+BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
+{
+    // this test establishes some basic sanity
+
+    startServer();
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
+    ParentServiceClient client1(pClientProtocol1);
+    pClientSock1->open();
+    client1.incrementGeneration();
+    pClientSock1->close();
+    stopServer();
+}
+
+BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
+{
+    // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
+
+    startServer();
+
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    pClientSock1->open();
+
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+    pClientSock2->open();
+
+    // Ensure they have been accepted
+    blockUntilAccepted(2);
+
+    // The test fixture destructor will force the sockets to disconnect
+    // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
+    stopServer();
+
+    // extra proof the server end disconnected the clients
+    uint8_t buf[1];
+    BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1));   // 0 = disconnected
+    BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1));   // 0 = disconnected
+    pClientSock1->close();
+    pClientSock2->close();
+}
+
+BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
+{
+    // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
+    // disconnect.
+
+    boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
+            setInterruptableChildren(false);    // returns to pre-THRIFT-2441 behavior
+    startServer();
+
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    pClientSock1->open();
+
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+    pClientSock2->open();
+
+    // Ensure they have been accepted
+    blockUntilAccepted(2);
+
+    boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
+    boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
+
+    // Once the clients disconnect the server will stop
+    stopServer();
+
+    pClientSock1->close();
+    pClientSock2->close();
+}
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/lib/cpp/test/TServerSocketTest.cpp b/lib/cpp/test/TServerSocketTest.cpp
index eee7c26..65f99f9 100644
--- a/lib/cpp/test/TServerSocketTest.cpp
+++ b/lib/cpp/test/TServerSocketTest.cpp
@@ -22,6 +22,7 @@
 #include <thrift/transport/TServerSocket.h>
 #include "TestPortFixture.h"
 #include "TTransportCheckThrow.h"
+#include <iostream>
 
 using apache::thrift::transport::TServerSocket;
 using apache::thrift::transport::TSocket;
@@ -30,24 +31,18 @@
 
 BOOST_FIXTURE_TEST_SUITE ( TServerSocketTest, TestPortFixture )
 
-class TestTServerSocket : public TServerSocket
-{
-  public:
-    TestTServerSocket(const std::string& address, int port) : TServerSocket(address, port) { }
-    using TServerSocket::acceptImpl;
-};
-
 BOOST_AUTO_TEST_CASE( test_bind_to_address )
 {
-    TestTServerSocket sock1("localhost", m_serverPort);
+    TServerSocket sock1("localhost", m_serverPort);
     sock1.listen();
     TSocket clientSock("localhost", m_serverPort);
     clientSock.open();
-    boost::shared_ptr<TTransport> accepted = sock1.acceptImpl();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
     accepted->close();
     sock1.close();
 
-    TServerSocket sock2("this.is.truly.an.unrecognizable.address.", m_serverPort);
+    std::cout << "An error message from getaddrinfo on the console is expected:" << std::endl;
+    TServerSocket sock2("257.258.259.260", m_serverPort);
     BOOST_CHECK_THROW(sock2.listen(), TTransportException);
     sock2.close();
 }
diff --git a/lib/cpp/test/TServerTransportTest.cpp b/lib/cpp/test/TServerTransportTest.cpp
new file mode 100644
index 0000000..09b2c59
--- /dev/null
+++ b/lib/cpp/test/TServerTransportTest.cpp
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/test/auto_unit_test.hpp>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TServerTransport.h>
+#include "TestPortFixture.h"
+
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+
+BOOST_AUTO_TEST_SUITE ( TServerTransportTest )
+
+class TestTTransport : public TTransport
+{
+};
+
+class TestTServerTransport : public TServerTransport
+{
+public:
+    TestTServerTransport() : valid_(true) {}
+    void close() {}
+    bool valid_;
+protected:
+    boost::shared_ptr<TTransport> acceptImpl()
+    {
+        return valid_ ? boost::shared_ptr<TestTTransport>(new TestTTransport) : boost::shared_ptr<TestTTransport>();
+    }
+};
+
+BOOST_AUTO_TEST_CASE( test_positive_accept )
+{
+    TestTServerTransport uut;
+    BOOST_CHECK(uut.accept());
+}
+
+BOOST_AUTO_TEST_CASE( test_negative_accept )
+{
+    TestTServerTransport uut;
+    uut.valid_ = false;
+    BOOST_CHECK_THROW(uut.accept(), TTransportException);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
diff --git a/lib/cpp/test/TSocketInterruptTest.cpp b/lib/cpp/test/TSocketInterruptTest.cpp
new file mode 100644
index 0000000..4f6b2bc
--- /dev/null
+++ b/lib/cpp/test/TSocketInterruptTest.cpp
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define BOOST_TEST_MODULE TSocketInterruptTest
+#include <boost/test/auto_unit_test.hpp>
+
+#include <boost/bind.hpp>
+#include <boost/chrono/duration.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/thread/thread.hpp>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TServerSocket.h>
+#include "TestPortFixture.h"
+
+using apache::thrift::transport::TServerSocket;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+
+BOOST_FIXTURE_TEST_SUITE ( TSocketInterruptTest, TestPortFixture )
+
+void readerWorker(boost::shared_ptr<TTransport> tt, uint32_t expectedResult)
+{
+    uint8_t buf[4];
+    BOOST_CHECK_EQUAL(expectedResult, tt->read(buf, 4));
+}
+
+void readerWorkerMustThrow(boost::shared_ptr<TTransport> tt)
+{
+    try
+    {
+        uint8_t buf[4];
+        tt->read(buf, 4);
+        BOOST_ERROR("should not have gotten here");
+    }
+    catch (const TTransportException& tx)
+    {
+        BOOST_CHECK_EQUAL(TTransportException::INTERRUPTED, tx.getType());
+    }
+}
+
+BOOST_AUTO_TEST_CASE( test_interruptable_child_read )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.listen();
+    TSocket clientSock("localhost", m_serverPort);
+    clientSock.open();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
+    boost::thread readThread(boost::bind(readerWorkerMustThrow, accepted));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    // readThread is practically guaranteed to be blocking now
+    sock1.interruptChildren();
+    BOOST_CHECK_MESSAGE(readThread.try_join_for(boost::chrono::milliseconds(200)),
+        "server socket interruptChildren did not interrupt child read");
+    clientSock.close();
+    accepted->close();
+    sock1.close();
+}
+
+BOOST_AUTO_TEST_CASE( test_non_interruptable_child_read )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+    sock1.listen();
+    TSocket clientSock("localhost", m_serverPort);
+    clientSock.open();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
+    boost::thread readThread(boost::bind(readerWorker, accepted, 0));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    // readThread is practically guaranteed to be blocking here
+    sock1.interruptChildren();
+    BOOST_CHECK_MESSAGE(!readThread.try_join_for(boost::chrono::milliseconds(200)),
+        "server socket interruptChildren interrupted child read");
+
+    // only way to proceed is to have the client disconnect
+    clientSock.close();
+    readThread.join();
+    accepted->close();
+    sock1.close();
+}
+
+BOOST_AUTO_TEST_CASE( test_cannot_change_after_listen )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.listen();
+    BOOST_CHECK_THROW(sock1.setInterruptableChildren(false), std::logic_error);
+    sock1.close();
+}
+
+void peekerWorker(boost::shared_ptr<TTransport> tt, bool expectedResult)
+{
+    BOOST_CHECK_EQUAL(expectedResult, tt->peek());
+}
+
+BOOST_AUTO_TEST_CASE( test_interruptable_child_peek )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.listen();
+    TSocket clientSock("localhost", m_serverPort);
+    clientSock.open();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
+    // peek() will return false if child is interrupted
+    boost::thread peekThread(boost::bind(peekerWorker, accepted, false));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    // peekThread is practically guaranteed to be blocking now
+    sock1.interruptChildren();
+    BOOST_CHECK_MESSAGE(peekThread.try_join_for(boost::chrono::milliseconds(200)),
+        "server socket interruptChildren did not interrupt child peek");
+    clientSock.close();
+    accepted->close();
+    sock1.close();
+}
+
+BOOST_AUTO_TEST_CASE( test_non_interruptable_child_peek )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+    sock1.listen();
+    TSocket clientSock("localhost", m_serverPort);
+    clientSock.open();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
+    // peek() will return false when remote side is closed
+    boost::thread peekThread(boost::bind(peekerWorker, accepted, false));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    // peekThread is practically guaranteed to be blocking now
+    sock1.interruptChildren();
+    BOOST_CHECK_MESSAGE(!peekThread.try_join_for(boost::chrono::milliseconds(200)),
+        "server socket interruptChildren interrupted child peek");
+
+    // only way to proceed is to have the client disconnect
+    clientSock.close();
+    peekThread.join();
+    accepted->close();
+    sock1.close();
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+