THRIFT-3224 Fix TNamedPipeServer unpredictable behavior on accept
THRIFT-3225 Fix TPipeServer unpredictable behavior on interrupt()
Client: cpp
Patch: Ben Craig <bencraig@apache.org> and Pawel Janicki <pjs.wcy@poczta.onet.pl>
This closes #544
diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp
index 8939d50..cd114c2 100644
--- a/lib/cpp/src/thrift/transport/TPipe.cpp
+++ b/lib/cpp/src/thrift/transport/TPipe.cpp
@@ -58,7 +58,7 @@
class TNamedPipeImpl : public TPipeImpl {
public:
- explicit TNamedPipeImpl(HANDLE pipehandle) : Pipe_(pipehandle) {}
+ explicit TNamedPipeImpl(TAutoHandle &pipehandle) : Pipe_(pipehandle.release()) {}
virtual ~TNamedPipeImpl() {}
virtual uint32_t read(uint8_t* buf, uint32_t len) {
return pseudo_sync_read(Pipe_.h, read_event_.h, buf, len);
@@ -98,14 +98,15 @@
// than using the regular named pipe implementation
class TWaitableNamedPipeImpl : public TPipeImpl {
public:
- explicit TWaitableNamedPipeImpl(HANDLE pipehandle)
- : Pipe_(pipehandle), begin_unread_idx_(0), end_unread_idx_(0) {
+ explicit TWaitableNamedPipeImpl(TAutoHandle &pipehandle)
+ : begin_unread_idx_(0), end_unread_idx_(0) {
readOverlap_.action = TOverlappedWorkItem::READ;
- readOverlap_.h = Pipe_.h;
+ readOverlap_.h = pipehandle.h;
cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
- cancelOverlap_.h = Pipe_.h;
+ cancelOverlap_.h = pipehandle.h;
buffer_.resize(1024 /*arbitrary buffer size*/, '\0');
beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
+ Pipe_.reset(pipehandle.release());
}
virtual ~TWaitableNamedPipeImpl() {
// see if there is an outstanding read request
@@ -222,10 +223,17 @@
}
//---- Constructors ----
-TPipe::TPipe(HANDLE Pipe)
+TPipe::TPipe(TAutoHandle &Pipe)
: impl_(new TWaitableNamedPipeImpl(Pipe)), TimeoutSeconds_(3), isAnonymous_(false) {
}
+TPipe::TPipe(HANDLE Pipe)
+ : TimeoutSeconds_(3), isAnonymous_(false)
+{
+ TAutoHandle pipeHandle(Pipe);
+ impl_.reset(new TWaitableNamedPipeImpl(pipeHandle));
+}
+
TPipe::TPipe(const char* pipename) : TimeoutSeconds_(3), isAnonymous_(false) {
setPipename(pipename);
}
@@ -284,8 +292,7 @@
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
}
- impl_.reset(new TNamedPipeImpl(hPipe.h));
- hPipe.release();
+ impl_.reset(new TNamedPipeImpl(hPipe));
}
void TPipe::close() {
@@ -355,7 +362,10 @@
if (isAnonymous_)
impl_->setPipeHandle(pipehandle);
else
- impl_.reset(new TNamedPipeImpl(pipehandle));
+ {
+ TAutoHandle pipe(pipehandle);
+ impl_.reset(new TNamedPipeImpl(pipe));
+ }
}
HANDLE TPipe::getWrtPipeHandle() {
diff --git a/lib/cpp/src/thrift/transport/TPipe.h b/lib/cpp/src/thrift/transport/TPipe.h
index ef957c6..e9f5393 100644
--- a/lib/cpp/src/thrift/transport/TPipe.h
+++ b/lib/cpp/src/thrift/transport/TPipe.h
@@ -25,7 +25,13 @@
#ifndef _WIN32
#include <thrift/transport/TSocket.h>
#endif
+#ifdef _WIN32
+#include <thrift/windows/Sync.h>
+#endif
#include <boost/noncopyable.hpp>
+#ifdef _WIN32
+#include <thrift/windows/Sync.h>
+#endif
namespace apache {
namespace thrift {
@@ -45,7 +51,9 @@
// Constructs a new pipe object.
TPipe();
// Named pipe constructors -
+ explicit TPipe(TAutoHandle &Pipe); // The Pipe will be pseudo-moved from in here
explicit TPipe(HANDLE Pipe); // HANDLE is a void*
+ explicit TPipe(TAutoHandle &Pipe); // this ctor will clear out / move from Pipe
// need a const char * overload so string literals don't go to the HANDLE overload
explicit TPipe(const char* pipename);
explicit TPipe(const std::string& pipename);
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp
index 3779d7f..baa2603 100644
--- a/lib/cpp/src/thrift/transport/TPipeServer.cpp
+++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp
@@ -45,7 +45,6 @@
TPipeServerImpl() {}
virtual ~TPipeServerImpl() = 0 {}
virtual void interrupt() = 0;
- virtual void close() = 0;
virtual boost::shared_ptr<TTransport> acceptImpl() = 0;
virtual HANDLE getPipeHandle() = 0;
@@ -68,16 +67,15 @@
}
}
- virtual ~TAnonPipeServer() {}
-
- virtual void interrupt() {} // not currently implemented
- virtual void close() {
+ virtual ~TAnonPipeServer() {
PipeR_.reset();
PipeW_.reset();
ClientAnonRead_.reset();
ClientAnonWrite_.reset();
}
+ virtual void interrupt() {} // not currently implemented
+
virtual boost::shared_ptr<TTransport> acceptImpl();
virtual HANDLE getPipeHandle() { return PipeR_.h; }
@@ -100,10 +98,12 @@
class TNamedPipeServer : public TPipeServerImpl {
public:
TNamedPipeServer(const std::string& pipename, uint32_t bufsize, uint32_t maxconnections)
- : stopping_(false), pipename_(pipename), bufsize_(bufsize), maxconns_(maxconnections) {
+ : stopping_(false), pipename_(pipename), bufsize_(bufsize), maxconns_(maxconnections)
+ {
connectOverlap_.action = TOverlappedWorkItem::CONNECT;
cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
- initiateNamedConnect();
+ TAutoCrit lock(pipe_protect_);
+ initiateNamedConnect(lock);
}
virtual ~TNamedPipeServer() {}
@@ -115,12 +115,9 @@
cancelOverlap_.h = Pipe_.h;
// This should wake up GetOverlappedResult
thread_->addWorkItem(&cancelOverlap_);
- close();
}
}
- virtual void close() { Pipe_.reset(); }
-
virtual boost::shared_ptr<TTransport> acceptImpl();
virtual HANDLE getPipeHandle() { return Pipe_.h; }
@@ -130,8 +127,8 @@
virtual HANDLE getNativeWaitHandle() { return listen_event_.h; }
private:
- bool createNamedPipe();
- void initiateNamedConnect();
+ bool createNamedPipe(const TAutoCrit &lockProof);
+ void initiateNamedConnect(const TAutoCrit &lockProof);
TAutoOverlapThread thread_;
TOverlappedWorkItem connectOverlap_;
@@ -142,9 +139,11 @@
uint32_t bufsize_;
uint32_t maxconns_;
TManualResetEvent listen_event_;
+
+ TCriticalSection pipe_protect_;
+ // only read or write these variables underneath a locked pipe_protect_
boost::shared_ptr<TPipe> cached_client_;
TAutoHandle Pipe_;
- TCriticalSection pipe_protect_;
};
HANDLE TPipeServer::getNativeWaitHandle() {
@@ -182,8 +181,7 @@
}
//---- Destructor ----
-TPipeServer::~TPipeServer() {
-}
+TPipeServer::~TPipeServer() {}
//---------------------------------------------------------
// Transport callbacks
@@ -217,10 +215,10 @@
return client;
}
-void TNamedPipeServer::initiateNamedConnect() {
+void TNamedPipeServer::initiateNamedConnect(const TAutoCrit &lockProof) {
if (stopping_)
return;
- if (!createNamedPipe()) {
+ if (!createNamedPipe(lockProof)) {
GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
}
@@ -236,8 +234,7 @@
// zero, GetLastError should return ERROR_PIPE_CONNECTED.
if (connectOverlap_.success) {
GlobalOutput.printf("Client connected.");
- cached_client_.reset(new TPipe(Pipe_.h));
- Pipe_.release();
+ cached_client_.reset(new TPipe(Pipe_));
// make sure people know that a connection is ready
SetEvent(listen_event_.h);
return;
@@ -247,8 +244,7 @@
switch (dwErr) {
case ERROR_PIPE_CONNECTED:
GlobalOutput.printf("Client connected.");
- cached_client_.reset(new TPipe(Pipe_.h));
- Pipe_.release();
+ cached_client_.reset(new TPipe(Pipe_));
// make sure people know that a connection is ready
SetEvent(listen_event_.h);
return;
@@ -270,7 +266,7 @@
client.swap(cached_client_);
// kick off the next connection before returning
- initiateNamedConnect();
+ initiateNamedConnect(lock);
return client; // success!
}
}
@@ -281,18 +277,25 @@
}
DWORD dwDummy = 0;
+
+ // For the most part, Pipe_ should be protected with pipe_protect_. We can't
+ // reasonably do that here though without breaking interruptability. However,
+ // this should be safe, though I'm not happy about it. We only need to ensure
+ // that no one writes / modifies Pipe_.h while we are reading it. Well, the
+ // only two things that should be modifying Pipe_ are acceptImpl, the
+ // functions it calls, and the destructor. Those things shouldn't be run
+ // concurrently anyway. So this call is 'really' just a read that may happen
+ // concurrently with interrupt, and that should be fine.
if (GetOverlappedResult(Pipe_.h, &connectOverlap_.overlap, &dwDummy, TRUE)) {
TAutoCrit lock(pipe_protect_);
GlobalOutput.printf("Client connected.");
- shared_ptr<TPipe> client(new TPipe(Pipe_.h));
- Pipe_.release();
+ shared_ptr<TPipe> client(new TPipe(Pipe_));
// kick off the next connection before returning
- initiateNamedConnect();
+ initiateNamedConnect(lock);
return client; // success!
}
// if we got here, then we are in an error / shutdown case
DWORD gle = GetLastError(); // save error before doing cleanup
- close();
GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", gle);
throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
}
@@ -303,11 +306,10 @@
}
void TPipeServer::close() {
- if (impl_)
- impl_->close();
+ impl_.reset();
}
-bool TNamedPipeServer::createNamedPipe() {
+bool TNamedPipeServer::createNamedPipe(const TAutoCrit & /*lockProof*/) {
// Windows - set security to allow non-elevated apps
// to access pipes created by elevated apps.
diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt
index 427ff41..86dfd13 100644
--- a/lib/cpp/test/CMakeLists.txt
+++ b/lib/cpp/test/CMakeLists.txt
@@ -88,16 +88,23 @@
set_property( TARGET UnitTests APPEND_STRING PROPERTY COMPILE_FLAGS /wd4503 )
endif ( MSVC )
-add_executable(TSocketInterruptTest TSocketInterruptTest.cpp)
-target_link_libraries(TSocketInterruptTest
+
+set( TInterruptTest_SOURCES TSocketInterruptTest.cpp )
+if (WIN32)
+ list(APPEND TInterruptTest_SOURCES
+ TPipeInterruptTest.cpp
+ )
+endif()
+add_executable(TInterruptTest ${TInterruptTest_SOURCES})
+target_link_libraries(TInterruptTest
testgencpp
${Boost_LIBRARIES}
)
-LINK_AGAINST_THRIFT_LIBRARY(TSocketInterruptTest thrift)
+LINK_AGAINST_THRIFT_LIBRARY(TInterruptTest thrift)
if (NOT MSVC AND NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
-target_link_libraries(TSocketInterruptTest -lrt)
+target_link_libraries(TInterruptTest -lrt)
endif ()
-add_test(NAME TSocketInterruptTest COMMAND TSocketInterruptTest)
+add_test(NAME TInterruptTest COMMAND TInterruptTest)
add_executable(TServerIntegrationTest TServerIntegrationTest.cpp)
target_link_libraries(TServerIntegrationTest
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 18b4654..5c32eba 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -82,7 +82,7 @@
SpecializationTest \
AllProtocolsTest \
TransportTest \
- TSocketInterruptTest \
+ TInterruptTest \
TServerIntegrationTest \
ZlibTest \
TFileTransportTest \
@@ -125,10 +125,10 @@
libtestgencpp.la \
$(BOOST_TEST_LDADD)
-TSocketInterruptTest_SOURCES = \
+TInterruptTest_SOURCES = \
TSocketInterruptTest.cpp
-TSocketInterruptTest_LDADD = \
+TInterruptTest_LDADD = \
libtestgencpp.la \
$(BOOST_TEST_LDADD) \
$(BOOST_CHRONO_LDADD) \
@@ -214,7 +214,7 @@
# DebugProtoTest
#
DebugProtoTest_SOURCES = \
- DebugProtoTest.cpp
+ DebugProtoTest.cpp
DebugProtoTest_LDADD = \
libtestgencpp.la \
diff --git a/lib/cpp/test/TPipeInterruptTest.cpp b/lib/cpp/test/TPipeInterruptTest.cpp
new file mode 100644
index 0000000..b0e246d
--- /dev/null
+++ b/lib/cpp/test/TPipeInterruptTest.cpp
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/test/test_tools.hpp>
+#include <boost/test/unit_test_suite.hpp>
+
+#include <boost/bind.hpp>
+#include <boost/chrono/duration.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/thread/thread.hpp>
+#include <thrift/transport/TPipe.h>
+#include <thrift/transport/TPipeServer.h>
+
+using apache::thrift::transport::TPipeServer;
+using apache::thrift::transport::TPipe;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+
+BOOST_AUTO_TEST_SUITE(TPipeInterruptTest)
+
+// TODO: duplicate the test cases in TSocketInterruptTest for pipes,
+// once pipes implement interruptChildren
+
+BOOST_AUTO_TEST_CASE(test_interrupt_before_accept) {
+ TPipeServer pipe1("TPipeInterruptTest");
+ pipe1.listen();
+ pipe1.interrupt();
+ BOOST_CHECK_THROW(pipe1.accept(), TTransportException);
+}
+
+static void acceptWorker(TPipeServer *pipe) {
+ try
+ {
+ for (;;)
+ {
+ boost::shared_ptr<TTransport> temp = pipe->accept();
+ }
+ }
+ catch (...) {/*just want to make sure nothing crashes*/ }
+}
+
+static void interruptWorker(TPipeServer *pipe) {
+ boost::this_thread::sleep(boost::posix_time::milliseconds(10));
+ pipe->interrupt();
+}
+
+BOOST_AUTO_TEST_CASE(stress_pipe_accept_interruption) {
+ int interruptIters = 100;
+
+ for (int i = 0; i < interruptIters; ++i)
+ {
+ TPipeServer pipeServer("TPipeInterruptTest");
+ pipeServer.listen();
+ boost::thread acceptThread(boost::bind(acceptWorker, &pipeServer));
+ boost::thread interruptThread(boost::bind(interruptWorker, &pipeServer));
+ try
+ {
+ for (;;)
+ {
+ TPipe client("TPipeInterruptTest");
+ client.setConnectTimeout(1);
+ client.open();
+ }
+ } catch (...) { /*just testing for crashes*/ }
+ interruptThread.join();
+ acceptThread.join();
+ }
+}
+
+BOOST_AUTO_TEST_SUITE_END()