THRIFT-3224 Fix TNamedPipeServer unpredictable behavior on accept
THRIFT-3225 Fix TPipeServer unpredictable behavior on interrupt()
Client: cpp
Patch: Ben Craig <bencraig@apache.org> and Pawel Janicki <pjs.wcy@poczta.onet.pl>

This closes #544
diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp
index 8939d50..cd114c2 100644
--- a/lib/cpp/src/thrift/transport/TPipe.cpp
+++ b/lib/cpp/src/thrift/transport/TPipe.cpp
@@ -58,7 +58,7 @@
 
 class TNamedPipeImpl : public TPipeImpl {
 public:
-  explicit TNamedPipeImpl(HANDLE pipehandle) : Pipe_(pipehandle) {}
+  explicit TNamedPipeImpl(TAutoHandle &pipehandle) : Pipe_(pipehandle.release()) {}
   virtual ~TNamedPipeImpl() {}
   virtual uint32_t read(uint8_t* buf, uint32_t len) {
     return pseudo_sync_read(Pipe_.h, read_event_.h, buf, len);
@@ -98,14 +98,15 @@
 // than using the regular named pipe implementation
 class TWaitableNamedPipeImpl : public TPipeImpl {
 public:
-  explicit TWaitableNamedPipeImpl(HANDLE pipehandle)
-    : Pipe_(pipehandle), begin_unread_idx_(0), end_unread_idx_(0) {
+  explicit TWaitableNamedPipeImpl(TAutoHandle &pipehandle)
+    : begin_unread_idx_(0), end_unread_idx_(0) {
     readOverlap_.action = TOverlappedWorkItem::READ;
-    readOverlap_.h = Pipe_.h;
+    readOverlap_.h = pipehandle.h;
     cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
-    cancelOverlap_.h = Pipe_.h;
+    cancelOverlap_.h = pipehandle.h;
     buffer_.resize(1024 /*arbitrary buffer size*/, '\0');
     beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
+    Pipe_.reset(pipehandle.release());
   }
   virtual ~TWaitableNamedPipeImpl() {
     // see if there is an outstanding read request
@@ -222,10 +223,17 @@
 }
 
 //---- Constructors ----
-TPipe::TPipe(HANDLE Pipe)
+TPipe::TPipe(TAutoHandle &Pipe)
   : impl_(new TWaitableNamedPipeImpl(Pipe)), TimeoutSeconds_(3), isAnonymous_(false) {
 }
 
+TPipe::TPipe(HANDLE Pipe)
+  : TimeoutSeconds_(3), isAnonymous_(false)
+{
+  TAutoHandle pipeHandle(Pipe);
+  impl_.reset(new TWaitableNamedPipeImpl(pipeHandle));
+}
+
 TPipe::TPipe(const char* pipename) : TimeoutSeconds_(3), isAnonymous_(false) {
   setPipename(pipename);
 }
@@ -284,8 +292,7 @@
     throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
   }
 
-  impl_.reset(new TNamedPipeImpl(hPipe.h));
-  hPipe.release();
+  impl_.reset(new TNamedPipeImpl(hPipe));
 }
 
 void TPipe::close() {
@@ -355,7 +362,10 @@
   if (isAnonymous_)
     impl_->setPipeHandle(pipehandle);
   else
-    impl_.reset(new TNamedPipeImpl(pipehandle));
+  {
+    TAutoHandle pipe(pipehandle);
+    impl_.reset(new TNamedPipeImpl(pipe));
+  }
 }
 
 HANDLE TPipe::getWrtPipeHandle() {
diff --git a/lib/cpp/src/thrift/transport/TPipe.h b/lib/cpp/src/thrift/transport/TPipe.h
index ef957c6..e9f5393 100644
--- a/lib/cpp/src/thrift/transport/TPipe.h
+++ b/lib/cpp/src/thrift/transport/TPipe.h
@@ -25,7 +25,13 @@
 #ifndef _WIN32
 #include <thrift/transport/TSocket.h>
 #endif
+#ifdef _WIN32
+#include <thrift/windows/Sync.h>
+#endif
 #include <boost/noncopyable.hpp>
+#ifdef _WIN32
+#include <thrift/windows/Sync.h>
+#endif
 
 namespace apache {
 namespace thrift {
@@ -45,7 +51,9 @@
   // Constructs a new pipe object.
   TPipe();
   // Named pipe constructors -
+  explicit TPipe(TAutoHandle &Pipe); // The Pipe will be pseudo-moved from in here
   explicit TPipe(HANDLE Pipe); // HANDLE is a void*
+  explicit TPipe(TAutoHandle &Pipe); // this ctor will clear out / move from Pipe
   // need a const char * overload so string literals don't go to the HANDLE overload
   explicit TPipe(const char* pipename);
   explicit TPipe(const std::string& pipename);
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp
index 3779d7f..baa2603 100644
--- a/lib/cpp/src/thrift/transport/TPipeServer.cpp
+++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp
@@ -45,7 +45,6 @@
   TPipeServerImpl() {}
   virtual ~TPipeServerImpl() = 0 {}
   virtual void interrupt() = 0;
-  virtual void close() = 0;
   virtual boost::shared_ptr<TTransport> acceptImpl() = 0;
 
   virtual HANDLE getPipeHandle() = 0;
@@ -68,16 +67,15 @@
     }
   }
 
-  virtual ~TAnonPipeServer() {}
-
-  virtual void interrupt() {} // not currently implemented
-  virtual void close() {
+  virtual ~TAnonPipeServer() {
     PipeR_.reset();
     PipeW_.reset();
     ClientAnonRead_.reset();
     ClientAnonWrite_.reset();
   }
 
+  virtual void interrupt() {} // not currently implemented
+
   virtual boost::shared_ptr<TTransport> acceptImpl();
 
   virtual HANDLE getPipeHandle() { return PipeR_.h; }
@@ -100,10 +98,12 @@
 class TNamedPipeServer : public TPipeServerImpl {
 public:
   TNamedPipeServer(const std::string& pipename, uint32_t bufsize, uint32_t maxconnections)
-    : stopping_(false), pipename_(pipename), bufsize_(bufsize), maxconns_(maxconnections) {
+    : stopping_(false), pipename_(pipename), bufsize_(bufsize), maxconns_(maxconnections)
+  {
     connectOverlap_.action = TOverlappedWorkItem::CONNECT;
     cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
-    initiateNamedConnect();
+    TAutoCrit lock(pipe_protect_);
+    initiateNamedConnect(lock);
   }
   virtual ~TNamedPipeServer() {}
 
@@ -115,12 +115,9 @@
       cancelOverlap_.h = Pipe_.h;
       // This should wake up GetOverlappedResult
       thread_->addWorkItem(&cancelOverlap_);
-      close();
     }
   }
 
-  virtual void close() { Pipe_.reset(); }
-
   virtual boost::shared_ptr<TTransport> acceptImpl();
 
   virtual HANDLE getPipeHandle() { return Pipe_.h; }
@@ -130,8 +127,8 @@
   virtual HANDLE getNativeWaitHandle() { return listen_event_.h; }
 
 private:
-  bool createNamedPipe();
-  void initiateNamedConnect();
+  bool createNamedPipe(const TAutoCrit &lockProof);
+  void initiateNamedConnect(const TAutoCrit &lockProof);
 
   TAutoOverlapThread thread_;
   TOverlappedWorkItem connectOverlap_;
@@ -142,9 +139,11 @@
   uint32_t bufsize_;
   uint32_t maxconns_;
   TManualResetEvent listen_event_;
+
+  TCriticalSection pipe_protect_;
+  // only read or write these variables underneath a locked pipe_protect_
   boost::shared_ptr<TPipe> cached_client_;
   TAutoHandle Pipe_;
-  TCriticalSection pipe_protect_;
 };
 
 HANDLE TPipeServer::getNativeWaitHandle() {
@@ -182,8 +181,7 @@
 }
 
 //---- Destructor ----
-TPipeServer::~TPipeServer() {
-}
+TPipeServer::~TPipeServer() {}
 
 //---------------------------------------------------------
 // Transport callbacks
@@ -217,10 +215,10 @@
   return client;
 }
 
-void TNamedPipeServer::initiateNamedConnect() {
+void TNamedPipeServer::initiateNamedConnect(const TAutoCrit &lockProof) {
   if (stopping_)
     return;
-  if (!createNamedPipe()) {
+  if (!createNamedPipe(lockProof)) {
     GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
     throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
   }
@@ -236,8 +234,7 @@
   // zero, GetLastError should return ERROR_PIPE_CONNECTED.
   if (connectOverlap_.success) {
     GlobalOutput.printf("Client connected.");
-    cached_client_.reset(new TPipe(Pipe_.h));
-    Pipe_.release();
+    cached_client_.reset(new TPipe(Pipe_));
     // make sure people know that a connection is ready
     SetEvent(listen_event_.h);
     return;
@@ -247,8 +244,7 @@
   switch (dwErr) {
   case ERROR_PIPE_CONNECTED:
     GlobalOutput.printf("Client connected.");
-    cached_client_.reset(new TPipe(Pipe_.h));
-    Pipe_.release();
+    cached_client_.reset(new TPipe(Pipe_));
     // make sure people know that a connection is ready
     SetEvent(listen_event_.h);
     return;
@@ -270,7 +266,7 @@
       client.swap(cached_client_);
 
       // kick off the next connection before returning
-      initiateNamedConnect();
+      initiateNamedConnect(lock);
       return client; // success!
     }
   }
@@ -281,18 +277,25 @@
   }
 
   DWORD dwDummy = 0;
+
+  // For the most part, Pipe_ should be protected with pipe_protect_.  We can't
+  // reasonably do that here though without breaking interruptability.  However,
+  // this should be safe, though I'm not happy about it.  We only need to ensure
+  // that no one writes / modifies Pipe_.h while we are reading it.  Well, the
+  // only two things that should be modifying Pipe_ are acceptImpl, the
+  // functions it calls, and the destructor.  Those things shouldn't be run
+  // concurrently anyway.  So this call is 'really' just a read that may happen
+  // concurrently with interrupt, and that should be fine.
   if (GetOverlappedResult(Pipe_.h, &connectOverlap_.overlap, &dwDummy, TRUE)) {
     TAutoCrit lock(pipe_protect_);
     GlobalOutput.printf("Client connected.");
-    shared_ptr<TPipe> client(new TPipe(Pipe_.h));
-    Pipe_.release();
+    shared_ptr<TPipe> client(new TPipe(Pipe_));
     // kick off the next connection before returning
-    initiateNamedConnect();
+    initiateNamedConnect(lock);
     return client; // success!
   }
   // if we got here, then we are in an error / shutdown case
   DWORD gle = GetLastError(); // save error before doing cleanup
-  close();
   GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", gle);
   throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
 }
@@ -303,11 +306,10 @@
 }
 
 void TPipeServer::close() {
-  if (impl_)
-    impl_->close();
+  impl_.reset();
 }
 
-bool TNamedPipeServer::createNamedPipe() {
+bool TNamedPipeServer::createNamedPipe(const TAutoCrit & /*lockProof*/) {
 
   // Windows - set security to allow non-elevated apps
   // to access pipes created by elevated apps.
diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt
index 427ff41..86dfd13 100644
--- a/lib/cpp/test/CMakeLists.txt
+++ b/lib/cpp/test/CMakeLists.txt
@@ -88,16 +88,23 @@
     set_property( TARGET UnitTests APPEND_STRING PROPERTY COMPILE_FLAGS /wd4503 )
 endif ( MSVC )
 
-add_executable(TSocketInterruptTest TSocketInterruptTest.cpp)
-target_link_libraries(TSocketInterruptTest
+
+set( TInterruptTest_SOURCES TSocketInterruptTest.cpp )
+if (WIN32)
+    list(APPEND TInterruptTest_SOURCES
+        TPipeInterruptTest.cpp
+    )
+endif()
+add_executable(TInterruptTest ${TInterruptTest_SOURCES})
+target_link_libraries(TInterruptTest
     testgencpp
     ${Boost_LIBRARIES}
 )
-LINK_AGAINST_THRIFT_LIBRARY(TSocketInterruptTest thrift)
+LINK_AGAINST_THRIFT_LIBRARY(TInterruptTest thrift)
 if (NOT MSVC AND NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
-target_link_libraries(TSocketInterruptTest -lrt)
+target_link_libraries(TInterruptTest -lrt)
 endif ()
-add_test(NAME TSocketInterruptTest COMMAND TSocketInterruptTest)
+add_test(NAME TInterruptTest COMMAND TInterruptTest)
 
 add_executable(TServerIntegrationTest TServerIntegrationTest.cpp)
 target_link_libraries(TServerIntegrationTest
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 18b4654..5c32eba 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -82,7 +82,7 @@
 	SpecializationTest \
 	AllProtocolsTest \
 	TransportTest \
-	TSocketInterruptTest \
+	TInterruptTest \
 	TServerIntegrationTest \
 	ZlibTest \
 	TFileTransportTest \
@@ -125,10 +125,10 @@
   libtestgencpp.la \
   $(BOOST_TEST_LDADD)
 
-TSocketInterruptTest_SOURCES = \
+TInterruptTest_SOURCES = \
 	TSocketInterruptTest.cpp
 
-TSocketInterruptTest_LDADD = \
+TInterruptTest_LDADD = \
   libtestgencpp.la \
   $(BOOST_TEST_LDADD) \
   $(BOOST_CHRONO_LDADD) \
@@ -214,7 +214,7 @@
 # DebugProtoTest
 #
 DebugProtoTest_SOURCES = \
-	DebugProtoTest.cpp 
+	DebugProtoTest.cpp
 
 DebugProtoTest_LDADD = \
 	libtestgencpp.la \
diff --git a/lib/cpp/test/TPipeInterruptTest.cpp b/lib/cpp/test/TPipeInterruptTest.cpp
new file mode 100644
index 0000000..b0e246d
--- /dev/null
+++ b/lib/cpp/test/TPipeInterruptTest.cpp
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/test/test_tools.hpp>
+#include <boost/test/unit_test_suite.hpp>
+
+#include <boost/bind.hpp>
+#include <boost/chrono/duration.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/thread/thread.hpp>
+#include <thrift/transport/TPipe.h>
+#include <thrift/transport/TPipeServer.h>
+
+using apache::thrift::transport::TPipeServer;
+using apache::thrift::transport::TPipe;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+
+BOOST_AUTO_TEST_SUITE(TPipeInterruptTest)
+
+// TODO: duplicate the test cases in TSocketInterruptTest for pipes,
+// once pipes implement interruptChildren
+
+BOOST_AUTO_TEST_CASE(test_interrupt_before_accept) {
+  TPipeServer pipe1("TPipeInterruptTest");
+  pipe1.listen();
+  pipe1.interrupt();
+  BOOST_CHECK_THROW(pipe1.accept(), TTransportException);
+}
+
+static void acceptWorker(TPipeServer *pipe) {
+  try
+  {
+    for (;;)
+    {
+      boost::shared_ptr<TTransport> temp = pipe->accept();
+    }
+  }
+  catch (...) {/*just want to make sure nothing crashes*/ }
+}
+
+static void interruptWorker(TPipeServer *pipe) {
+  boost::this_thread::sleep(boost::posix_time::milliseconds(10));
+  pipe->interrupt();
+}
+
+BOOST_AUTO_TEST_CASE(stress_pipe_accept_interruption) {
+  int interruptIters = 100;
+
+  for (int i = 0; i < interruptIters; ++i)
+  {
+    TPipeServer pipeServer("TPipeInterruptTest");
+    pipeServer.listen();
+    boost::thread acceptThread(boost::bind(acceptWorker, &pipeServer));
+    boost::thread interruptThread(boost::bind(interruptWorker, &pipeServer));
+    try
+    {
+      for (;;)
+      {
+        TPipe client("TPipeInterruptTest");
+        client.setConnectTimeout(1);
+        client.open();
+      }
+    } catch (...) { /*just testing for crashes*/ }
+    interruptThread.join();
+    acceptThread.join();
+  }
+}
+
+BOOST_AUTO_TEST_SUITE_END()