THRIFT-2069: TPipeServer creates overlapped pipes, then uses synchronous I/O on them with TPipe
Client: cpp
Patch: Ben Craig
diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp
index 92e2912..3bb3dac 100644
--- a/lib/cpp/src/thrift/transport/TPipe.cpp
+++ b/lib/cpp/src/thrift/transport/TPipe.cpp
@@ -19,6 +19,10 @@
#include <thrift/transport/TTransportException.h>
#include <thrift/transport/TPipe.h>
+#ifdef _WIN32
+ #include <thrift/windows/OverlappedSubmissionThread.h>
+ #include <thrift/windows/Sync.h>
+#endif
namespace apache { namespace thrift { namespace transport {
@@ -29,123 +33,301 @@
*/
#ifdef _WIN32
+
+uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len);
+void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len);
+
+uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len);
+void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len);
+
+class TPipeImpl : boost::noncopyable {
+public:
+ TPipeImpl() {}
+ virtual ~TPipeImpl() = 0 {}
+ virtual uint32_t read(uint8_t* buf, uint32_t len) = 0;
+ virtual void write(const uint8_t* buf, uint32_t len) = 0;
+ virtual HANDLE getPipeHandle() = 0; //doubles as the read handle for anon pipe
+ virtual void setPipeHandle(HANDLE pipehandle) = 0;
+ virtual HANDLE getWrtPipeHandle() {return INVALID_HANDLE_VALUE;}
+ virtual void setWrtPipeHandle(HANDLE) {}
+ virtual bool isBufferedDataAvailable() { return false; }
+ virtual HANDLE getNativeWaitHandle() { return INVALID_HANDLE_VALUE; }
+};
+
+class TNamedPipeImpl : public TPipeImpl {
+public:
+ explicit TNamedPipeImpl(HANDLE pipehandle) : Pipe_(pipehandle) {}
+ virtual ~TNamedPipeImpl() {}
+ virtual uint32_t read(uint8_t* buf, uint32_t len) {
+ return pseudo_sync_read (Pipe_.h, read_event_.h, buf, len);
+ }
+ virtual void write(const uint8_t* buf, uint32_t len) {
+ pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
+ }
+
+ virtual HANDLE getPipeHandle() {return Pipe_.h;}
+ virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
+private:
+ TManualResetEvent read_event_;
+ TManualResetEvent write_event_;
+ TAutoHandle Pipe_;
+};
+
+class TAnonPipeImpl : public TPipeImpl {
+public:
+ TAnonPipeImpl(HANDLE PipeRd, HANDLE PipeWrt) : PipeRd_(PipeRd), PipeWrt_(PipeWrt) {}
+ virtual ~TAnonPipeImpl() {}
+ virtual uint32_t read(uint8_t* buf, uint32_t len) {return pipe_read (PipeRd_.h, buf, len);}
+ virtual void write(const uint8_t* buf, uint32_t len) { pipe_write(PipeWrt_.h, buf, len);}
+
+ virtual HANDLE getPipeHandle() {return PipeRd_.h;}
+ virtual void setPipeHandle(HANDLE PipeRd) {PipeRd_.reset(PipeRd);}
+ virtual HANDLE getWrtPipeHandle() {return PipeWrt_.h;}
+ virtual void setWrtPipeHandle(HANDLE PipeWrt) {PipeWrt_.reset(PipeWrt);}
+private:
+ TAutoHandle PipeRd_;
+ TAutoHandle PipeWrt_;
+};
+
+// If you want a select-like loop to work, use this subclass. Be warned...
+// the read implementation has several context switches, so this is slower
+// than using the regular named pipe implementation
+class TWaitableNamedPipeImpl : public TPipeImpl {
+public:
+ explicit TWaitableNamedPipeImpl(HANDLE pipehandle) :
+ Pipe_(pipehandle),
+ begin_unread_idx_(0),
+ end_unread_idx_(0)
+ {
+ readOverlap_.action = TOverlappedWorkItem::READ;
+ readOverlap_.h = Pipe_.h;
+ cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
+ cancelOverlap_.h = Pipe_.h;
+ buffer_.resize(1024 /*arbitrary buffer size*/, '\0');
+ beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
+ }
+ virtual ~TWaitableNamedPipeImpl() {
+ // see if there is an outstanding read request
+ if(begin_unread_idx_ == end_unread_idx_) {
+ // if so, cancel it, and wait for the dead completion
+ thread_->addWorkItem(&cancelOverlap_);
+ readOverlap_.overlappedResults(false /*ignore errors*/);
+ }
+ }
+ virtual uint32_t read(uint8_t* buf, uint32_t len);
+ virtual void write(const uint8_t* buf, uint32_t len) {
+ pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
+ }
+
+ virtual HANDLE getPipeHandle() {return Pipe_.h;}
+ virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);}
+ virtual bool isBufferedDataAvailable() {return begin_unread_idx_ < end_unread_idx_;}
+ virtual HANDLE getNativeWaitHandle() { return ready_event_.h; }
+private:
+ void beginAsyncRead(uint8_t* buf, uint32_t len);
+ uint32_t endAsyncRead();
+
+ TAutoOverlapThread thread_;
+ TAutoHandle Pipe_;
+ TOverlappedWorkItem readOverlap_;
+ TOverlappedWorkItem cancelOverlap_;
+ TManualResetEvent ready_event_;
+ TManualResetEvent write_event_;
+ std::vector<uint8_t> buffer_;
+ uint32_t begin_unread_idx_;
+ uint32_t end_unread_idx_;
+};
+
+void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len)
+{
+ begin_unread_idx_ = end_unread_idx_ = 0;
+ readOverlap_.reset(buf, len, ready_event_.h);
+ thread_->addWorkItem(&readOverlap_);
+ if(readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING)
+ {
+ GlobalOutput.perror("TPipe ::ReadFile errored GLE=", readOverlap_.last_error);
+ throw TTransportException(TTransportException::UNKNOWN, "TPipe: ReadFile failed");
+ }
+}
+
+uint32_t TWaitableNamedPipeImpl::endAsyncRead()
+{
+ return readOverlap_.overlappedResults();
+}
+
+uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len)
+{
+ if(begin_unread_idx_ == end_unread_idx_) {
+ end_unread_idx_ = endAsyncRead();
+ }
+
+ uint32_t bytes_to_copy = (std::min)(len, end_unread_idx_-begin_unread_idx_);
+ memcpy(buf, &buffer_[begin_unread_idx_], bytes_to_copy);
+ begin_unread_idx_ += bytes_to_copy;
+ if(begin_unread_idx_ != end_unread_idx_)
+ {
+ assert(len == bytes_to_copy);
+ // we were able to fulfill the read with just the bytes in our
+ // buffer, and we still have buffer left
+ return bytes_to_copy;
+ }
+ uint32_t bytes_copied = bytes_to_copy;
+
+ //all of the requested data has been read. Kick off an async read for the next round.
+ beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
+
+ return bytes_copied;
+}
+
+void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len)
+{
+ OVERLAPPED tempOverlap;
+ memset( &tempOverlap, 0, sizeof(tempOverlap));
+ tempOverlap.hEvent = event;
+
+ uint32_t written = 0;
+ while(written < len)
+ {
+ BOOL result = ::WriteFile(pipe, buf+written, len-written, NULL, &tempOverlap);
+
+ if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
+ {
+ GlobalOutput.perror("TPipe ::WriteFile errored GLE=", ::GetLastError());
+ throw TTransportException(TTransportException::UNKNOWN, "TPipe: write failed");
+ }
+
+ DWORD bytes = 0;
+ result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
+ if(!result)
+ {
+ GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
+ throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
+ }
+ written += bytes;
+ }
+}
+
+uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len)
+{
+ OVERLAPPED tempOverlap;
+ memset( &tempOverlap, 0, sizeof(tempOverlap));
+ tempOverlap.hEvent = event;
+
+ BOOL result = ::ReadFile(pipe, buf, len, NULL, &tempOverlap);
+
+ if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING)
+ {
+ GlobalOutput.perror("TPipe ::ReadFile errored GLE=", ::GetLastError());
+ throw TTransportException(TTransportException::UNKNOWN, "TPipe: read failed");
+ }
+
+ DWORD bytes = 0;
+ result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
+ if(!result)
+ {
+ GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
+ throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
+ }
+ return bytes;
+}
+
//---- Constructors ----
TPipe::TPipe(HANDLE Pipe) :
- Pipe_(Pipe),
+ impl_(new TWaitableNamedPipeImpl(Pipe)),
TimeoutSeconds_(3),
- isAnonymous(false)
+ isAnonymous_(false)
{}
TPipe::TPipe(const char *pipename) :
- Pipe_(INVALID_HANDLE_VALUE),
TimeoutSeconds_(3),
- isAnonymous(false)
+ isAnonymous_(false)
{
setPipename(pipename);
}
TPipe::TPipe(const std::string &pipename) :
- Pipe_(INVALID_HANDLE_VALUE),
TimeoutSeconds_(3),
- isAnonymous(false)
+ isAnonymous_(false)
{
setPipename(pipename);
}
TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt) :
- Pipe_(PipeRd),
- PipeWrt_(PipeWrt),
+ impl_(new TAnonPipeImpl(PipeRd, PipeWrt)),
TimeoutSeconds_(3),
- isAnonymous(true)
+ isAnonymous_(true)
{}
TPipe::TPipe() :
- Pipe_(INVALID_HANDLE_VALUE),
- TimeoutSeconds_(3)
+ TimeoutSeconds_(3),
+ isAnonymous_(false)
{}
-//---- Destructor ----
-TPipe::~TPipe() {
- close();
-}
-
+TPipe::~TPipe() {}
//---------------------------------------------------------
// Transport callbacks
//---------------------------------------------------------
-
bool TPipe::isOpen() {
- return (Pipe_ != INVALID_HANDLE_VALUE);
+ return impl_.get() != NULL;
}
bool TPipe::peek() {
- if (!isOpen()) {
- return false;
- }
- DWORD bytesavail = 0;
- int PeekRet = 0;
- PeekRet = PeekNamedPipe(Pipe_, NULL, 0, NULL, &bytesavail, NULL);
- return (PeekRet != 0 && bytesavail > 0);
+ return isOpen();
}
void TPipe::open() {
- if (isOpen()) {
+ if (isOpen())
return;
- }
- int SleepInterval = 500; //ms
- int retries = TimeoutSeconds_ * 1000 / SleepInterval;
- HANDLE hPipe_;
- for(int i=0; i<retries; i++)
+ TAutoHandle hPipe;
+ do {
+ DWORD flags = FILE_FLAG_OVERLAPPED; // async mode, so we can do reads at the same time as writes
+ hPipe.reset(CreateFile(
+ pipename_.c_str(),
+ GENERIC_READ | GENERIC_WRITE,
+ 0, // no sharing
+ NULL, // default security attributes
+ OPEN_EXISTING, // opens existing pipe
+ flags,
+ NULL)); // no template file
+
+ if (hPipe.h != INVALID_HANDLE_VALUE)
+ break; //success!
+
+ if(::GetLastError() != ERROR_PIPE_BUSY)
+ {
+ GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
+ throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
+ }
+ } while( ::WaitNamedPipe(pipename_.c_str(), TimeoutSeconds_*1000) );
+
+ if(hPipe.h == INVALID_HANDLE_VALUE)
{
- hPipe_ = CreateFile(
- pipename_.c_str(),
- GENERIC_READ | GENERIC_WRITE,
- 0, // no sharing
- NULL, // default security attributes
- OPEN_EXISTING, // opens existing pipe
- 0, // default attributes
- NULL); // no template file
-
- if (hPipe_ == INVALID_HANDLE_VALUE)
- ::Sleep(SleepInterval);
- else
- break;
- }
- if (hPipe_ == INVALID_HANDLE_VALUE)
+ GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
-
- // The pipe connected; change to message-read mode.
- DWORD dwMode = PIPE_READMODE_MESSAGE;
- int fSuccess = SetNamedPipeHandleState(
- hPipe_, // pipe handle
- &dwMode, // new pipe mode
- NULL, // don't set maximum bytes
- NULL); // don't set maximum time
- if (fSuccess == 0)
- {
- throw TTransportException(TTransportException::NOT_OPEN, "SetNamedPipeHandleState failed");
- close();
}
- Pipe_ = hPipe_;
+
+ impl_.reset(new TNamedPipeImpl(hPipe.h));
+ hPipe.release();
}
void TPipe::close() {
- if (isOpen())
- {
- CloseHandle(Pipe_);
- Pipe_ = INVALID_HANDLE_VALUE;
- }
+ impl_.reset();
}
uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
if (!isOpen())
throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open pipe");
+ return impl_->read(buf, len);
+}
+uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len)
+{
DWORD cbRead;
int fSuccess = ReadFile(
- Pipe_, // pipe handle
+ pipe, // pipe handle
buf, // buffer to receive reply
len, // size of buffer
&cbRead, // number of bytes read
@@ -160,11 +342,14 @@
void TPipe::write(const uint8_t* buf, uint32_t len) {
if (!isOpen())
throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open pipe");
+ impl_->write(buf, len);
+}
- HANDLE WritePipe = isAnonymous? PipeWrt_: Pipe_;
+void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len)
+{
DWORD cbWritten;
int fSuccess = WriteFile(
- WritePipe, // pipe handle
+ pipe, // pipe handle
buf, // message
len, // message length
&cbWritten, // bytes written
@@ -190,19 +375,29 @@
}
HANDLE TPipe::getPipeHandle() {
- return Pipe_;
+ if(impl_) return impl_->getPipeHandle();
+ return INVALID_HANDLE_VALUE;
}
void TPipe::setPipeHandle(HANDLE pipehandle) {
- Pipe_ = pipehandle;
+ if(isAnonymous_)
+ impl_->setPipeHandle(pipehandle);
+ else
+ impl_.reset(new TNamedPipeImpl(pipehandle));
}
HANDLE TPipe::getWrtPipeHandle() {
- return PipeWrt_;
+ if(impl_) return impl_->getWrtPipeHandle();
+ return INVALID_HANDLE_VALUE;
}
void TPipe::setWrtPipeHandle(HANDLE pipehandle) {
- PipeWrt_ = pipehandle;
+ if(impl_) impl_->setWrtPipeHandle(pipehandle);
+}
+
+HANDLE TPipe::getNativeWaitHandle() {
+ if(impl_) return impl_->getNativeWaitHandle();
+ return INVALID_HANDLE_VALUE;
}
long TPipe::getConnectTimeout() {
@@ -212,6 +407,7 @@
void TPipe::setConnectTimeout(long seconds) {
TimeoutSeconds_ = seconds;
}
+
#endif //_WIN32
}}} // apache::thrift::transport
diff --git a/lib/cpp/src/thrift/transport/TPipe.h b/lib/cpp/src/thrift/transport/TPipe.h
index 3c1755b..2e4539c 100644
--- a/lib/cpp/src/thrift/transport/TPipe.h
+++ b/lib/cpp/src/thrift/transport/TPipe.h
@@ -25,17 +25,21 @@
#ifndef _WIN32
# include <thrift/transport/TSocket.h>
#endif
+#include <boost/noncopyable.hpp>
namespace apache { namespace thrift { namespace transport {
/**
* Windows Pipes implementation of the TTransport interface.
- *
+ * Don't destroy a TPipe at global scope, as that will cause a thread join
+ * during DLLMain. That also means that client objects using TPipe shouldn't be at global
+ * scope.
*/
#ifdef _WIN32
+class TPipeImpl;
+
class TPipe : public TVirtualTransport<TPipe> {
public:
-
// Constructs a new pipe object.
TPipe();
// Named pipe constructors -
@@ -78,14 +82,18 @@
long getConnectTimeout();
void setConnectTimeout(long seconds);
+ //this function is intended to be used in generic / template situations,
+ //so its name needs to be the same as TPipeServer's
+ HANDLE getNativeWaitHandle();
private:
+ boost::shared_ptr<TPipeImpl> impl_;
+
std::string pipename_;
- //Named pipe handles are R/W, while anonymous pipes are one or the other (half duplex).
- HANDLE Pipe_, PipeWrt_;
long TimeoutSeconds_;
- bool isAnonymous;
+ bool isAnonymous_;
};
+
#else
typedef TSocket TPipe;
#endif
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp
index 10fc69b..e14a94a 100644
--- a/lib/cpp/src/thrift/transport/TPipeServer.cpp
+++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp
@@ -23,7 +23,10 @@
#include <thrift/transport/TPipe.h>
#include <thrift/transport/TPipeServer.h>
#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
#ifdef _WIN32
+# include <thrift/windows/OverlappedSubmissionThread.h>
# include <AccCtrl.h>
# include <Aclapi.h>
#endif //_WIN32
@@ -35,230 +38,295 @@
using namespace std;
using boost::shared_ptr;
+class TPipeServerImpl : boost::noncopyable {
+public:
+ TPipeServerImpl() {}
+ virtual ~TPipeServerImpl() = 0 {}
+ virtual void interrupt() = 0;
+ virtual void close() = 0;
+ virtual boost::shared_ptr<TTransport> acceptImpl() = 0;
+
+ virtual HANDLE getPipeHandle() = 0;
+ virtual HANDLE getWrtPipeHandle() = 0;
+ virtual HANDLE getClientRdPipeHandle()= 0;
+ virtual HANDLE getClientWrtPipeHandle()= 0;
+ virtual HANDLE getNativeWaitHandle() {return NULL;}
+};
+
+class TAnonPipeServer : public TPipeServerImpl {
+public:
+ TAnonPipeServer()
+ {
+ //The anonymous pipe needs to be created first so that the server can
+ //pass the handles on to the client before the serve (acceptImpl)
+ //blocking call.
+ if (!createAnonPipe()) {
+ GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
+ throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
+ }
+ }
+
+ virtual ~TAnonPipeServer() {}
+
+ virtual void interrupt() {} //not currently implemented
+ virtual void close() {
+ PipeR_.reset();
+ PipeW_.reset();
+ ClientAnonRead_.reset();
+ ClientAnonWrite_.reset();
+ }
+
+ virtual boost::shared_ptr<TTransport> acceptImpl();
+
+ virtual HANDLE getPipeHandle() {return PipeR_.h;}
+ virtual HANDLE getWrtPipeHandle() {return PipeW_.h;}
+ virtual HANDLE getClientRdPipeHandle() {return ClientAnonRead_.h;}
+ virtual HANDLE getClientWrtPipeHandle() {return ClientAnonWrite_.h;}
+private:
+ bool createAnonPipe();
+
+ TAutoHandle PipeR_; // Anonymous Pipe (R)
+ TAutoHandle PipeW_; // Anonymous Pipe (W)
+
+ //Client side anonymous pipe handles
+ //? Do we need duplicates to send to client?
+ TAutoHandle ClientAnonRead_;
+ TAutoHandle ClientAnonWrite_;
+};
+
+class TNamedPipeServer : public TPipeServerImpl {
+public:
+ TNamedPipeServer(
+ const std::string &pipename,
+ uint32_t bufsize,
+ uint32_t maxconnections) :
+ stopping_(false),
+ pipename_(pipename),
+ bufsize_(bufsize),
+ maxconns_(maxconnections)
+ {
+ connectOverlap_.action = TOverlappedWorkItem::CONNECT;
+ cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
+ initiateNamedConnect();
+ }
+ virtual ~TNamedPipeServer() {}
+
+ virtual void interrupt()
+ {
+ TAutoCrit lock(pipe_protect_);
+ cached_client_.reset();
+ if(Pipe_.h != INVALID_HANDLE_VALUE) {
+ stopping_ = true;
+ cancelOverlap_.h = Pipe_.h;
+ // This should wake up GetOverlappedResult
+ thread_->addWorkItem(&cancelOverlap_);
+ close();
+ }
+ }
+
+ virtual void close() {
+ Pipe_.reset();
+ }
+
+ virtual boost::shared_ptr<TTransport> acceptImpl();
+
+ virtual HANDLE getPipeHandle() {return Pipe_.h;}
+ virtual HANDLE getWrtPipeHandle() {return INVALID_HANDLE_VALUE;}
+ virtual HANDLE getClientRdPipeHandle() {return INVALID_HANDLE_VALUE;}
+ virtual HANDLE getClientWrtPipeHandle() {return INVALID_HANDLE_VALUE;}
+ virtual HANDLE getNativeWaitHandle() {return listen_event_.h;}
+private:
+ bool createNamedPipe();
+ void initiateNamedConnect();
+
+ TAutoOverlapThread thread_;
+ TOverlappedWorkItem connectOverlap_;
+ TOverlappedWorkItem cancelOverlap_;
+
+ bool stopping_;
+ std::string pipename_;
+ uint32_t bufsize_;
+ uint32_t maxconns_;
+ TManualResetEvent listen_event_;
+ boost::shared_ptr<TPipe> cached_client_;
+ TAutoHandle Pipe_;
+ TCriticalSection pipe_protect_;
+};
+
+HANDLE TPipeServer::getNativeWaitHandle()
+{
+ if(impl_) return impl_->getNativeWaitHandle();
+ return NULL;
+}
+
//---- Constructors ----
TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize) :
- pipename_(pipename),
bufsize_(bufsize),
- Pipe_(INVALID_HANDLE_VALUE),
- wakeup(INVALID_HANDLE_VALUE),
- maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
- isAnonymous(false),
- stop_(false)
- {
- setPipename(pipename);
- createWakeupEvent();
- }
+ isAnonymous_(false)
+{
+ setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT);
+ setPipename(pipename);
+}
TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) :
- pipename_(pipename),
bufsize_(bufsize),
- Pipe_(INVALID_HANDLE_VALUE),
- wakeup(INVALID_HANDLE_VALUE),
- isAnonymous(false),
- stop_(false)
- { //Restrict maxconns_ to 1-PIPE_UNLIMITED_INSTANCES
- if(maxconnections == 0)
- maxconns_ = 1;
- else if (maxconnections > PIPE_UNLIMITED_INSTANCES)
- maxconns_ = PIPE_UNLIMITED_INSTANCES;
- else
- maxconns_ = maxconnections;
-
- setPipename(pipename);
- createWakeupEvent();
- }
+ isAnonymous_(false)
+{
+ setMaxConnections(maxconnections);
+ setPipename(pipename);
+}
TPipeServer::TPipeServer(const std::string &pipename) :
- pipename_(pipename),
bufsize_(1024),
- Pipe_(INVALID_HANDLE_VALUE),
- wakeup(INVALID_HANDLE_VALUE),
- maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT),
- isAnonymous(false),
- stop_(false)
- {
- setPipename(pipename);
- createWakeupEvent();
- }
+ isAnonymous_(false)
+{
+ setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT);
+ setPipename(pipename);
+}
TPipeServer::TPipeServer(int bufsize) :
- pipename_(""),
bufsize_(bufsize),
- Pipe_(INVALID_HANDLE_VALUE),
- wakeup(INVALID_HANDLE_VALUE),
- maxconns_(1),
- isAnonymous(true),
- stop_(false)
- {
- //The anonymous pipe needs to be created first so that the server can
- //pass the handles on to the client before the serve (acceptImpl)
- //blocking call.
- if (!TCreateAnonPipe()) {
- GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
- throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
- }
- createWakeupEvent();
+ isAnonymous_(true)
+{
+ setMaxConnections(1);
+ impl_.reset(new TAnonPipeServer);
}
TPipeServer::TPipeServer() :
- pipename_(""),
bufsize_(1024),
- Pipe_(INVALID_HANDLE_VALUE),
- wakeup(INVALID_HANDLE_VALUE),
- maxconns_(1),
- isAnonymous(true),
- stop_(false)
+ isAnonymous_(true)
{
- if (!TCreateAnonPipe()) {
- GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
- throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed");
- }
- createWakeupEvent();
+ setMaxConnections(1);
+ impl_.reset(new TAnonPipeServer);
}
//---- Destructor ----
-TPipeServer::~TPipeServer() {
- close();
- CloseHandle( wakeup);
- wakeup = INVALID_HANDLE_VALUE;
-}
+TPipeServer::~TPipeServer() {}
//---------------------------------------------------------
// Transport callbacks
//---------------------------------------------------------
+void TPipeServer::listen() {
+ if(isAnonymous_) return;
+ impl_.reset(new TNamedPipeServer(pipename_, bufsize_, maxconns_));
+}
shared_ptr<TTransport> TPipeServer::acceptImpl() {
- shared_ptr<TPipe> client;
+ return impl_->acceptImpl();
+}
- stop_ = FALSE;
+shared_ptr<TTransport> TAnonPipeServer::acceptImpl() {
+ //This 0-byte read serves merely as a blocking call.
+ byte buf;
+ DWORD br;
+ int fSuccess = ReadFile(
+ PipeR_.h, // pipe handle
+ &buf, // buffer to receive reply
+ 0, // size of buffer
+ &br, // number of bytes read
+ NULL); // not overlapped
- if(isAnonymous)
- { //Anonymous Pipe
- //This 0-byte read serves merely as a blocking call.
- byte buf;
- DWORD br;
- int fSuccess = ReadFile(
- Pipe_, // pipe handle
- &buf, // buffer to receive reply
- 0, // size of buffer
- &br, // number of bytes read
- NULL); // not overlapped
-
- if ( !fSuccess && GetLastError() != ERROR_MORE_DATA ) {
- GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError());
- throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms");
- }
- client.reset(new TPipe(Pipe_, PipeW_));
+ if ( !fSuccess && GetLastError() != ERROR_MORE_DATA ) {
+ GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError());
+ throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms");
}
- else
- { //Named Pipe
- if (!TCreateNamedPipe()) {
- GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
- throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
- }
-
- struct TEventCleaner {
- HANDLE hEvent;
- ~TEventCleaner() {CloseHandle(hEvent);}
- };
-
- OVERLAPPED overlapped;
- memset( &overlapped, 0, sizeof(overlapped));
- overlapped.hEvent = CreateEvent( NULL, TRUE, FALSE, NULL);
- {
- TEventCleaner cleaner = {overlapped.hEvent};
- while( ! stop_)
- {
- // Wait for the client to connect; if it succeeds, the
- // function returns a nonzero value. If the function returns
- // zero, GetLastError should return ERROR_PIPE_CONNECTED.
- if( ConnectNamedPipe(Pipe_, &overlapped))
- {
- GlobalOutput.printf("Client connected.");
- client.reset(new TPipe(Pipe_));
- return client;
- }
-
- DWORD dwErr = GetLastError();
- HANDLE events[2] = {overlapped.hEvent, wakeup};
- switch( dwErr)
- {
- case ERROR_PIPE_CONNECTED:
- GlobalOutput.printf("Client connected.");
- client.reset(new TPipe(Pipe_));
- return client;
-
- case ERROR_IO_PENDING:
- DWORD dwWait, dwDummy;
- dwWait = WaitForMultipleObjects( 2, events, FALSE, 3000);
- switch(dwWait)
- {
- case WAIT_OBJECT_0:
- if(GetOverlappedResult(Pipe_, &overlapped, &dwDummy, TRUE))
- {
- GlobalOutput.printf("Client connected.");
- client.reset(new TPipe(Pipe_));
- return client;
- }
- break;
- case WAIT_OBJECT_0 + 1:
- stop_ = TRUE;
- break;
- default:
- break;
- }
- break;
-
- default:
- break;
- }
-
- CancelIo(Pipe_);
- DisconnectNamedPipe(Pipe_);
- }
-
- close();
- GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError());
- throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
- }
- }
-
+ shared_ptr<TPipe> client(new TPipe(PipeR_.h, PipeW_.h));
return client;
}
-void TPipeServer::interrupt() {
- if(Pipe_ != INVALID_HANDLE_VALUE) {
- stop_ = TRUE;
- CancelIo(Pipe_);
- SetEvent(wakeup);
+void TNamedPipeServer::initiateNamedConnect() {
+ if (stopping_) return;
+ if (!createNamedPipe()) {
+ GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
+ throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
}
+
+ // The prior connection has been handled, so close the gate
+ ResetEvent(listen_event_.h);
+ connectOverlap_.reset(NULL, 0, listen_event_.h);
+ connectOverlap_.h = Pipe_.h;
+ thread_->addWorkItem(&connectOverlap_);
+
+ // Wait for the client to connect; if it succeeds, the
+ // function returns a nonzero value. If the function returns
+ // zero, GetLastError should return ERROR_PIPE_CONNECTED.
+ if( connectOverlap_.success )
+ {
+ GlobalOutput.printf("Client connected.");
+ cached_client_.reset(new TPipe(Pipe_.h));
+ Pipe_.release();
+ // make sure people know that a connection is ready
+ SetEvent(listen_event_.h);
+ return;
+ }
+
+ DWORD dwErr = connectOverlap_.last_error;
+ switch( dwErr)
+ {
+ case ERROR_PIPE_CONNECTED:
+ GlobalOutput.printf("Client connected.");
+ cached_client_.reset(new TPipe(Pipe_.h));
+ Pipe_.release();
+ // make sure people know that a connection is ready
+ SetEvent(listen_event_.h);
+ return;
+ case ERROR_IO_PENDING:
+ return; //acceptImpl will do the appropriate WaitForMultipleObjects
+ default:
+ GlobalOutput.perror("TPipeServer ConnectNamedPipe failed, GLE=", dwErr);
+ throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer ConnectNamedPipe failed");
+ }
+}
+
+shared_ptr<TTransport> TNamedPipeServer::acceptImpl() {
+ {
+ TAutoCrit lock(pipe_protect_);
+ if(cached_client_.get() != NULL)
+ {
+ shared_ptr<TPipe> client;
+ //zero out cached_client, since we are about to return it.
+ client.swap(cached_client_);
+
+ //kick off the next connection before returning
+ initiateNamedConnect();
+ return client; //success!
+ }
+ }
+
+ if(Pipe_.h == INVALID_HANDLE_VALUE) {
+ throw TTransportException(
+ TTransportException::NOT_OPEN,
+ "TNamedPipeServer: someone called accept on a closed pipe server");
+ }
+
+ DWORD dwDummy = 0;
+ if(GetOverlappedResult(Pipe_.h, &connectOverlap_.overlap, &dwDummy, TRUE))
+ {
+ TAutoCrit lock(pipe_protect_);
+ GlobalOutput.printf("Client connected.");
+ shared_ptr<TPipe> client(new TPipe(Pipe_.h));
+ Pipe_.release();
+ //kick off the next connection before returning
+ initiateNamedConnect();
+ return client; //success!
+ }
+ //if we got here, then we are in an error / shutdown case
+ DWORD gle = GetLastError(); //save error before doing cleanup
+ close();
+ GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", gle);
+ throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
+}
+
+void TPipeServer::interrupt() {
+ if(impl_) impl_->interrupt();
}
void TPipeServer::close() {
- if(!isAnonymous)
- {
- if(Pipe_ != INVALID_HANDLE_VALUE) {
- DisconnectNamedPipe(Pipe_);
- CloseHandle(Pipe_);
- Pipe_ = INVALID_HANDLE_VALUE;
- }
- }
- else
- {
- try {
- CloseHandle(Pipe_);
- CloseHandle(PipeW_);
- CloseHandle(ClientAnonRead);
- CloseHandle(ClientAnonWrite);
- }
- catch(...) {
- GlobalOutput.perror("TPipeServer anon close GLE=", GetLastError());
- }
- }
+ if(impl_) impl_->close();
}
-bool TPipeServer::TCreateNamedPipe() {
+bool TNamedPipeServer::createNamedPipe() {
//Windows - set security to allow non-elevated apps
//to access pipes created by elevated apps.
@@ -288,31 +356,31 @@
sa.bInheritHandle = FALSE;
// Create an instance of the named pipe
- HANDLE hPipe_ = CreateNamedPipe(
+ TAutoHandle hPipe(CreateNamedPipe(
pipename_.c_str(), // pipe name
PIPE_ACCESS_DUPLEX | // read/write access
FILE_FLAG_OVERLAPPED, // async mode
- PIPE_TYPE_MESSAGE | // message type pipe
- PIPE_READMODE_MESSAGE, // message-read mode
+ PIPE_TYPE_BYTE | // byte type pipe
+ PIPE_READMODE_BYTE, // byte read mode
maxconns_, // max. instances
bufsize_, // output buffer size
bufsize_, // input buffer size
0, // client time-out
- &sa); // default security attribute
+ &sa)); // security attributes
- if(hPipe_ == INVALID_HANDLE_VALUE)
+ if(hPipe.h == INVALID_HANDLE_VALUE)
{
- Pipe_ = INVALID_HANDLE_VALUE;
+ Pipe_.reset();
GlobalOutput.perror("TPipeServer::TCreateNamedPipe() GLE=", GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed", GetLastError());
return false;
}
- Pipe_ = hPipe_;
+ Pipe_.reset(hPipe.release());
return true;
}
-bool TPipeServer::TCreateAnonPipe() {
+bool TAnonPipeServer::createAnonPipe() {
SECURITY_ATTRIBUTES sa;
SECURITY_DESCRIPTOR sd; //security information for pipes
@@ -335,26 +403,19 @@
CloseHandle(PipeW_H);
return false;
}
- ClientAnonRead = ClientAnonReadH;
- ClientAnonWrite = ClientAnonWriteH;
- Pipe_ = Pipe_H;
- PipeW_ = PipeW_H;
+
+ ClientAnonRead_.reset(ClientAnonReadH);
+ ClientAnonWrite_.reset(ClientAnonWriteH);
+ PipeR_.reset(Pipe_H);
+ PipeW_.reset(PipeW_H);
return true;
}
-void TPipeServer::createWakeupEvent() {
- wakeup = CreateEvent( NULL, TRUE, FALSE, NULL);
-}
-
-
//---------------------------------------------------------
// Accessors
//---------------------------------------------------------
-
-string TPipeServer::getPipename() {
- return pipename_;
-}
+string TPipeServer::getPipename() {return pipename_;}
void TPipeServer::setPipename(const std::string &pipename) {
if(pipename.find("\\\\") == -1)
@@ -363,40 +424,27 @@
pipename_ = pipename;
}
-int TPipeServer::getBufferSize() {
- return bufsize_;
-}
+int TPipeServer::getBufferSize() {return bufsize_;}
+void TPipeServer::setBufferSize(int bufsize) {bufsize_ = bufsize;}
-void TPipeServer::setBufferSize(int bufsize) {
- bufsize_ = bufsize;
-}
+HANDLE TPipeServer::getPipeHandle() {return impl_?impl_->getPipeHandle() :INVALID_HANDLE_VALUE;}
+HANDLE TPipeServer::getWrtPipeHandle() {return impl_?impl_->getWrtPipeHandle() :INVALID_HANDLE_VALUE;}
+HANDLE TPipeServer::getClientRdPipeHandle() {return impl_?impl_->getClientRdPipeHandle() :INVALID_HANDLE_VALUE;}
+HANDLE TPipeServer::getClientWrtPipeHandle() {return impl_?impl_->getClientWrtPipeHandle():INVALID_HANDLE_VALUE;}
-HANDLE TPipeServer::getPipeHandle() {
- return Pipe_;
-}
+bool TPipeServer::getAnonymous() { return isAnonymous_; }
+void TPipeServer::setAnonymous(bool anon) { isAnonymous_ = anon;}
-HANDLE TPipeServer::getWrtPipeHandle()
+void TPipeServer::setMaxConnections(uint32_t maxconnections)
{
- return PipeW_;
+ if(maxconnections == 0)
+ maxconns_ = 1;
+ else if (maxconnections > PIPE_UNLIMITED_INSTANCES)
+ maxconns_ = PIPE_UNLIMITED_INSTANCES;
+ else
+ maxconns_ = maxconnections;
}
-HANDLE TPipeServer::getClientRdPipeHandle()
-{
- return ClientAnonRead;
-}
-
-HANDLE TPipeServer::getClientWrtPipeHandle()
-{
- return ClientAnonWrite;
-}
-
-bool TPipeServer::getAnonymous() {
- return isAnonymous;
-}
-
-void TPipeServer::setAnonymous(bool anon) {
- isAnonymous = anon;
-}
#endif //_WIN32
}}} // apache::thrift::transport
diff --git a/lib/cpp/src/thrift/transport/TPipeServer.h b/lib/cpp/src/thrift/transport/TPipeServer.h
index 88a8b6b..98ecde0 100755
--- a/lib/cpp/src/thrift/transport/TPipeServer.h
+++ b/lib/cpp/src/thrift/transport/TPipeServer.h
@@ -23,17 +23,26 @@
#include <thrift/transport/TServerTransport.h>
#include <boost/shared_ptr.hpp>
#ifndef _WIN32
-# include "TServerSocket.h"
+# include <thrift/transport/TServerSocket.h>
+#endif
+#ifdef _WIN32
+# include <thrift/windows/Sync.h>
#endif
-#define TPIPE_SERVER_MAX_CONNS_DEFAULT 10
+#define TPIPE_SERVER_MAX_CONNS_DEFAULT PIPE_UNLIMITED_INSTANCES
namespace apache { namespace thrift { namespace transport {
/**
* Windows Pipes implementation of TServerTransport.
+ * Don't destroy a TPipeServer at global scope, as that will cause a thread join
+ * during DLLMain. That also means that TServer's using TPipeServer shouldn't be at global
+ * scope.
*/
#ifdef _WIN32
+class TPipeServerImpl;
+class TPipe;
+
class TPipeServer : public TServerTransport {
public:
//Constructors
@@ -46,19 +55,13 @@
TPipeServer();
//Destructor
- ~TPipeServer();
+ virtual ~TPipeServer();
//Standard transport callbacks
- void interrupt();
- void close();
- protected:
- boost::shared_ptr<TTransport> acceptImpl();
+ virtual void interrupt();
+ virtual void close();
+ virtual void listen();
- bool TCreateNamedPipe();
- bool TCreateAnonPipe();
- void createWakeupEvent();
-
- public:
//Accessors
std::string getPipename();
void setPipename(const std::string &pipename);
@@ -70,18 +73,21 @@
HANDLE getClientWrtPipeHandle();
bool getAnonymous();
void setAnonymous(bool anon);
+ void setMaxConnections(uint32_t maxconnections);
+
+ //this function is intended to be used in generic / template situations,
+ //so its name needs to be the same as TPipe's
+ HANDLE getNativeWaitHandle();
+protected:
+ virtual boost::shared_ptr<TTransport> acceptImpl();
private:
+ boost::shared_ptr<TPipeServerImpl> impl_;
+
std::string pipename_;
uint32_t bufsize_;
- HANDLE Pipe_; //Named Pipe (R/W) or Anonymous Pipe (R)
uint32_t maxconns_;
- HANDLE PipeW_; //Anonymous Pipe (W)
- HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles
- HANDLE wakeup; // wake up event
- //? Do we need duplicates to send to client?
- bool isAnonymous;
- bool stop_; // stop flag
+ bool isAnonymous_;
};
#else //_WIN32
//*NIX named pipe implementation uses domain socket
diff --git a/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp
new file mode 100644
index 0000000..5dec390
--- /dev/null
+++ b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp
@@ -0,0 +1,156 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <thrift/windows/OverlappedSubmissionThread.h>
+#include <thrift/transport/TTransportException.h>
+#include <boost/noncopyable.hpp>
+#include <boost/scope_exit.hpp>
+#include <process.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+TOverlappedWorkItem::TOverlappedWorkItem() :
+ SLIST_ENTRY(),
+ action(UNKNOWN),
+ h(INVALID_HANDLE_VALUE),
+ buffer(NULL),
+ buffer_len(0),
+ overlap(),
+ last_error(0),
+ success(TRUE)
+{}
+
+void TOverlappedWorkItem::reset(uint8_t *buf, uint32_t len, HANDLE event) {
+ memset( &overlap, 0, sizeof(overlap));
+ overlap.hEvent = event;
+ buffer = buf;
+ buffer_len = len;
+ last_error = 0;
+ success = FALSE;
+}
+
+uint32_t TOverlappedWorkItem::overlappedResults(bool signal_failure) {
+ DWORD bytes = 0;
+ BOOL result = ::GetOverlappedResult(h, &overlap, &bytes, TRUE);
+ if(signal_failure && !result) //get overlapped error case
+ {
+ GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
+ throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
+ }
+ return bytes;
+}
+
+bool TOverlappedWorkItem::process() {
+ BOOST_SCOPE_EXIT( (&doneSubmittingEvent) ) {
+ SetEvent(doneSubmittingEvent.h);
+ } BOOST_SCOPE_EXIT_END
+
+ switch(action) {
+ case(CONNECT):
+ success = ::ConnectNamedPipe(h, &overlap);
+ if(success == FALSE)
+ last_error = ::GetLastError();
+ return true;
+ case(READ):
+ success = ::ReadFile(h, buffer, buffer_len, NULL, &overlap);
+ if(success == FALSE)
+ last_error = ::GetLastError();
+ return true;
+ case(CANCELIO):
+ success = ::CancelIo(h);
+ if(success == FALSE)
+ last_error = ::GetLastError();
+ return true;
+ case(STOP):
+ default:
+ return false;
+ }
+}
+
+void TOverlappedSubmissionThread::addWorkItem(TOverlappedWorkItem *item) {
+ InterlockedPushEntrySList(&workList_, item);
+ SetEvent(workAvailableEvent_.h);
+ WaitForSingleObject(item->doneSubmittingEvent.h, INFINITE);
+}
+
+TOverlappedSubmissionThread *TOverlappedSubmissionThread::acquire_instance() {
+ TAutoCrit lock(instanceGuard_);
+ if(instance_ == NULL)
+ {
+ assert(instanceRefCount_ == 0);
+ instance_ = new TOverlappedSubmissionThread;
+ }
+ ++instanceRefCount_;
+ return instance_;
+}
+void TOverlappedSubmissionThread::release_instance() {
+ TAutoCrit lock(instanceGuard_);
+ if(--instanceRefCount_ == 0)
+ {
+ delete instance_;
+ instance_ = NULL;
+ }
+}
+
+TOverlappedSubmissionThread::TOverlappedSubmissionThread() {
+ stopItem_.action = TOverlappedWorkItem::STOP;
+
+ InitializeSListHead(&workList_);
+ thread_ = (HANDLE)_beginthreadex(
+ NULL,
+ 0,
+ thread_proc,
+ this,
+ 0,
+ NULL);
+ if(thread_ == 0) {
+ GlobalOutput.perror("TOverlappedSubmissionThread unable to create thread, errno=", errno);
+ throw TTransportException(TTransportException::NOT_OPEN, " TOverlappedSubmissionThread unable to create thread");
+ }
+}
+
+TOverlappedSubmissionThread::~TOverlappedSubmissionThread() {
+ addWorkItem(&stopItem_);
+ ::WaitForSingleObject(thread_, INFINITE);
+ CloseHandle(thread_);
+}
+
+void TOverlappedSubmissionThread::run() {
+ for(;;) {
+ WaitForSingleObject(workAvailableEvent_.h, INFINITE);
+ //todo check result
+ SLIST_ENTRY *entry = NULL;
+ while( (entry = InterlockedPopEntrySList(&workList_)) != NULL) {
+ TOverlappedWorkItem &item = *static_cast<TOverlappedWorkItem *>(entry);
+ if(!item.process())
+ return;
+ }
+ }
+}
+
+unsigned __stdcall TOverlappedSubmissionThread::thread_proc(void *addr) {
+ static_cast<TOverlappedSubmissionThread *>(addr)->run();
+ return 0;
+}
+
+TCriticalSection TOverlappedSubmissionThread::instanceGuard_;
+TOverlappedSubmissionThread* TOverlappedSubmissionThread::instance_;
+uint32_t TOverlappedSubmissionThread::instanceRefCount_=0;
+
+}}} //apach::thrift::transport
diff --git a/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h
new file mode 100644
index 0000000..16b7e24
--- /dev/null
+++ b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_WINDOWS_OverlappedSubmissionThread_H_
+#define _THRIFT_WINDOWS_OverlappedSubmissionThread_H_ 1
+
+#ifndef _WIN32
+#error "OverlappedSubmissionThread.h is only usable on Windows"
+#endif
+
+#include <thrift/windows/Sync.h>
+#include <boost/noncopyable.hpp>
+#include <Windows.h>
+
+/*
+ *** Why does this class exist?
+ In short, because we want to enable something similar to a "select" loop, on Windows, with
+ named pipes. The core of the "select" loop is a call to WaitForMultipleObjects. So that means
+ we need a signalable object that indicates when data is available.
+
+ A pipe handle doesn't do that. A pipe handle is signaled when a read or write completes, and if
+ no one has called read or write, then the pipe handle is useless in WaitForMultipleObjects. So
+ instead, we use overlapped I/O. With overlapped I/O, you call read, and associate an event with
+ the read. When the read finishes, the event is signaled. This means that when you create a pipe,
+ you start a read. When the customer calls read on your transport object, you wait for the last
+ read to finish, and then kick off another.
+
+ There is one big caveat to this though. The thread that initiated the read must stay alive. If
+ the thread that initiated the read exits, then the read completes in an error state. To ensure
+ that the initiating thread stays alive, we create a singleton thread whose sole responsibility is
+ to manage this overlapped I/O requests. This introduces some overhead, but it is overhead that
+ is necessary for correct behavior.
+
+ This thread currently supports connect, read, and cancel io. So far, I haven't needed to put any
+ writes on this thread, but if needed, it could be done. The client write buffer would need to be
+ copied to ensure that it doesn't get invalidated.
+
+ *** How does one use this class?
+ Create a TOverlappedWorkItem, and fill in the action and "h", then call reset(). Your work item
+ is now ready to be submitted to the overlapped submission thread. Create a TAutoOverlapThread,
+ and call thread->addWorkItem with your work item. After addWorkItem completes, you may inspect
+ last_error and success. At some point in the future, call workItem.overlappedResults to wait
+ until the operation has completed.
+*/
+
+namespace apache { namespace thrift { namespace transport {
+
+DECLSPEC_ALIGN(MEMORY_ALLOCATION_ALIGNMENT) struct TOverlappedWorkItem : public SLIST_ENTRY {
+ TOverlappedWorkItem();
+
+ enum action_t {
+ UNKNOWN = 3000,
+ CONNECT,
+ READ,
+ CANCELIO,
+ STOP,
+ };
+
+ TAutoResetEvent doneSubmittingEvent;
+ action_t action;
+ HANDLE h;
+ uint8_t *buffer;
+ uint32_t buffer_len;
+ OVERLAPPED overlap;
+
+ DWORD last_error;
+ BOOL success;
+
+ void reset(uint8_t *buf, uint32_t len, HANDLE event);
+ uint32_t overlappedResults(bool signal_failure = true);
+ bool process();
+};
+
+class TOverlappedSubmissionThread : boost::noncopyable
+{
+public:
+ void addWorkItem(TOverlappedWorkItem *item);
+
+//singleton stuff
+public:
+ static TOverlappedSubmissionThread *acquire_instance();
+ static void release_instance();
+private:
+ static TCriticalSection instanceGuard_;
+ static TOverlappedSubmissionThread *instance_;
+ static uint32_t instanceRefCount_;
+
+//thread details
+private:
+ TOverlappedSubmissionThread();
+ ~TOverlappedSubmissionThread();
+ void run();
+ static unsigned __stdcall thread_proc(void *addr);
+
+private:
+ DECLSPEC_ALIGN(MEMORY_ALLOCATION_ALIGNMENT) SLIST_HEADER workList_;
+ TOverlappedWorkItem stopItem_;
+ TAutoResetEvent workAvailableEvent_;
+ HANDLE thread_;
+};
+
+class TAutoOverlapThread : boost::noncopyable {
+private:
+ TOverlappedSubmissionThread *p;
+public:
+ TAutoOverlapThread() : p(TOverlappedSubmissionThread::acquire_instance()) {}
+ ~TAutoOverlapThread() {TOverlappedSubmissionThread::release_instance();}
+ TOverlappedSubmissionThread *operator->() {return p;}
+};
+
+}}} //apache::thrift::transport
+
+#endif
diff --git a/lib/cpp/src/thrift/windows/Sync.h b/lib/cpp/src/thrift/windows/Sync.h
new file mode 100644
index 0000000..ded6ea3
--- /dev/null
+++ b/lib/cpp/src/thrift/windows/Sync.h
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_WINDOWS_Sync_H_
+#define _THRIFT_WINDOWS_Sync_H_ 1
+
+#ifndef _WIN32
+#error "windows/Sync.h is only usable on Windows"
+#endif
+
+#include <thrift/concurrency/Exception.h>
+#include <boost/noncopyable.hpp>
+#include <Windows.h>
+
+/*
+ Lightweight synchronization objects that only make sense on Windows. For cross-platform
+ code, use the classes found in the concurrency namespace
+*/
+
+namespace apache { namespace thrift {
+
+struct TCriticalSection : boost::noncopyable {
+ CRITICAL_SECTION cs;
+ TCriticalSection() {InitializeCriticalSection(&cs);}
+ ~TCriticalSection() {DeleteCriticalSection(&cs);}
+};
+
+class TAutoCrit : boost::noncopyable {
+private:
+ CRITICAL_SECTION *cs_;
+public:
+ explicit TAutoCrit(TCriticalSection &cs) : cs_(&cs.cs) {EnterCriticalSection(cs_);}
+ ~TAutoCrit() {LeaveCriticalSection(cs_);}
+};
+
+struct TAutoResetEvent : boost::noncopyable {
+ HANDLE h;
+
+ TAutoResetEvent() {
+ h = CreateEvent( NULL, FALSE, FALSE, NULL);
+ if(h == NULL) {
+ GlobalOutput.perror("TAutoResetEvent unable to create event, GLE=", GetLastError());
+ throw apache::thrift::concurrency::SystemResourceException("CreateEvent failed");
+ }
+ }
+ ~TAutoResetEvent() {CloseHandle(h);}
+};
+
+struct TManualResetEvent : boost::noncopyable {
+ HANDLE h;
+
+ TManualResetEvent() {
+ h = CreateEvent( NULL, TRUE, FALSE, NULL);
+ if(h == NULL) {
+ GlobalOutput.perror("TManualResetEvent unable to create event, GLE=", GetLastError());
+ throw apache::thrift::concurrency::SystemResourceException("CreateEvent failed");
+ }
+ }
+ ~TManualResetEvent() {CloseHandle(h);}
+};
+
+struct TAutoHandle : boost::noncopyable {
+ HANDLE h;
+ explicit TAutoHandle(HANDLE h_ = INVALID_HANDLE_VALUE) : h(h_) {}
+ ~TAutoHandle() {
+ if(h != INVALID_HANDLE_VALUE)
+ CloseHandle(h);
+ }
+
+ HANDLE release() {
+ HANDLE retval = h;
+ h = INVALID_HANDLE_VALUE;
+ return retval;
+ }
+ void reset(HANDLE h_ = INVALID_HANDLE_VALUE) {
+ if(h_ == h)
+ return;
+ if(h != INVALID_HANDLE_VALUE)
+ CloseHandle(h);
+ h = h_;
+ }
+};
+
+}} //apache::thrift
+
+#endif