blob: 5ac6fe00b02d74330c925e5007b790e2546a29a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <thrift/windows/OverlappedSubmissionThread.h>
#include <thrift/transport/TTransportException.h>
#include <boost/noncopyable.hpp>
#include <boost/scope_exit.hpp>
#include <process.h>
namespace apache {
namespace thrift {
namespace transport {
TOverlappedWorkItem::TOverlappedWorkItem()
: SLIST_ENTRY(),
action(UNKNOWN),
h(INVALID_HANDLE_VALUE),
buffer(NULL),
buffer_len(0),
overlap(),
last_error(0),
success(TRUE) {
}
void TOverlappedWorkItem::reset(uint8_t* buf, uint32_t len, HANDLE event) {
memset(&overlap, 0, sizeof(overlap));
overlap.hEvent = event;
buffer = buf;
buffer_len = len;
last_error = 0;
success = FALSE;
}
uint32_t TOverlappedWorkItem::overlappedResults(bool signal_failure) {
DWORD bytes = 0;
BOOL result = ::GetOverlappedResult(h, &overlap, &bytes, TRUE);
if (signal_failure && !result) // get overlapped error case
{
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
return bytes;
}
bool TOverlappedWorkItem::process() {
BOOST_SCOPE_EXIT((&doneSubmittingEvent)) { SetEvent(doneSubmittingEvent.h); }
BOOST_SCOPE_EXIT_END
switch (action) {
case (CONNECT):
success = ::ConnectNamedPipe(h, &overlap);
if (success == FALSE)
last_error = ::GetLastError();
return true;
case (READ):
success = ::ReadFile(h, buffer, buffer_len, NULL, &overlap);
if (success == FALSE)
last_error = ::GetLastError();
return true;
case (CANCELIO):
success = ::CancelIo(h);
if (success == FALSE)
last_error = ::GetLastError();
return true;
case (STOP):
default:
return false;
}
}
void TOverlappedSubmissionThread::addWorkItem(TOverlappedWorkItem* item) {
InterlockedPushEntrySList(&workList_, item);
SetEvent(workAvailableEvent_.h);
WaitForSingleObject(item->doneSubmittingEvent.h, INFINITE);
}
TOverlappedSubmissionThread* TOverlappedSubmissionThread::acquire_instance() {
TAutoCrit lock(instanceGuard_);
if (instance_ == NULL) {
assert(instanceRefCount_ == 0);
instance_ = new TOverlappedSubmissionThread;
}
++instanceRefCount_;
return instance_;
}
void TOverlappedSubmissionThread::release_instance() {
TAutoCrit lock(instanceGuard_);
if (--instanceRefCount_ == 0) {
delete instance_;
instance_ = NULL;
}
}
TOverlappedSubmissionThread::TOverlappedSubmissionThread() {
stopItem_.action = TOverlappedWorkItem::STOP;
InitializeSListHead(&workList_);
thread_ = (HANDLE)_beginthreadex(NULL, 0, thread_proc, this, 0, NULL);
if (thread_ == 0) {
GlobalOutput.perror("TOverlappedSubmissionThread unable to create thread, errno=", errno);
throw TTransportException(TTransportException::NOT_OPEN,
" TOverlappedSubmissionThread unable to create thread");
}
}
TOverlappedSubmissionThread::~TOverlappedSubmissionThread() {
addWorkItem(&stopItem_);
::WaitForSingleObject(thread_, INFINITE);
CloseHandle(thread_);
}
void TOverlappedSubmissionThread::run() {
for (;;) {
WaitForSingleObject(workAvailableEvent_.h, INFINITE);
// todo check result
SLIST_ENTRY* entry = NULL;
while ((entry = InterlockedPopEntrySList(&workList_)) != NULL) {
TOverlappedWorkItem& item = *static_cast<TOverlappedWorkItem*>(entry);
if (!item.process())
return;
}
}
}
unsigned __stdcall TOverlappedSubmissionThread::thread_proc(void* addr) {
static_cast<TOverlappedSubmissionThread*>(addr)->run();
return 0;
}
TCriticalSection TOverlappedSubmissionThread::instanceGuard_;
TOverlappedSubmissionThread* TOverlappedSubmissionThread::instance_;
uint32_t TOverlappedSubmissionThread::instanceRefCount_ = 0;
}
}
} // apach::thrift::transport