blob: 5dec390f02609cd413aa34123f66c0c75c7e4b2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <thrift/windows/OverlappedSubmissionThread.h>
#include <thrift/transport/TTransportException.h>
#include <boost/noncopyable.hpp>
#include <boost/scope_exit.hpp>
#include <process.h>
namespace apache { namespace thrift { namespace transport {
TOverlappedWorkItem::TOverlappedWorkItem() :
SLIST_ENTRY(),
action(UNKNOWN),
h(INVALID_HANDLE_VALUE),
buffer(NULL),
buffer_len(0),
overlap(),
last_error(0),
success(TRUE)
{}
void TOverlappedWorkItem::reset(uint8_t *buf, uint32_t len, HANDLE event) {
memset( &overlap, 0, sizeof(overlap));
overlap.hEvent = event;
buffer = buf;
buffer_len = len;
last_error = 0;
success = FALSE;
}
uint32_t TOverlappedWorkItem::overlappedResults(bool signal_failure) {
DWORD bytes = 0;
BOOL result = ::GetOverlappedResult(h, &overlap, &bytes, TRUE);
if(signal_failure && !result) //get overlapped error case
{
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
return bytes;
}
bool TOverlappedWorkItem::process() {
BOOST_SCOPE_EXIT( (&doneSubmittingEvent) ) {
SetEvent(doneSubmittingEvent.h);
} BOOST_SCOPE_EXIT_END
switch(action) {
case(CONNECT):
success = ::ConnectNamedPipe(h, &overlap);
if(success == FALSE)
last_error = ::GetLastError();
return true;
case(READ):
success = ::ReadFile(h, buffer, buffer_len, NULL, &overlap);
if(success == FALSE)
last_error = ::GetLastError();
return true;
case(CANCELIO):
success = ::CancelIo(h);
if(success == FALSE)
last_error = ::GetLastError();
return true;
case(STOP):
default:
return false;
}
}
void TOverlappedSubmissionThread::addWorkItem(TOverlappedWorkItem *item) {
InterlockedPushEntrySList(&workList_, item);
SetEvent(workAvailableEvent_.h);
WaitForSingleObject(item->doneSubmittingEvent.h, INFINITE);
}
TOverlappedSubmissionThread *TOverlappedSubmissionThread::acquire_instance() {
TAutoCrit lock(instanceGuard_);
if(instance_ == NULL)
{
assert(instanceRefCount_ == 0);
instance_ = new TOverlappedSubmissionThread;
}
++instanceRefCount_;
return instance_;
}
void TOverlappedSubmissionThread::release_instance() {
TAutoCrit lock(instanceGuard_);
if(--instanceRefCount_ == 0)
{
delete instance_;
instance_ = NULL;
}
}
TOverlappedSubmissionThread::TOverlappedSubmissionThread() {
stopItem_.action = TOverlappedWorkItem::STOP;
InitializeSListHead(&workList_);
thread_ = (HANDLE)_beginthreadex(
NULL,
0,
thread_proc,
this,
0,
NULL);
if(thread_ == 0) {
GlobalOutput.perror("TOverlappedSubmissionThread unable to create thread, errno=", errno);
throw TTransportException(TTransportException::NOT_OPEN, " TOverlappedSubmissionThread unable to create thread");
}
}
TOverlappedSubmissionThread::~TOverlappedSubmissionThread() {
addWorkItem(&stopItem_);
::WaitForSingleObject(thread_, INFINITE);
CloseHandle(thread_);
}
void TOverlappedSubmissionThread::run() {
for(;;) {
WaitForSingleObject(workAvailableEvent_.h, INFINITE);
//todo check result
SLIST_ENTRY *entry = NULL;
while( (entry = InterlockedPopEntrySList(&workList_)) != NULL) {
TOverlappedWorkItem &item = *static_cast<TOverlappedWorkItem *>(entry);
if(!item.process())
return;
}
}
}
unsigned __stdcall TOverlappedSubmissionThread::thread_proc(void *addr) {
static_cast<TOverlappedSubmissionThread *>(addr)->run();
return 0;
}
TCriticalSection TOverlappedSubmissionThread::instanceGuard_;
TOverlappedSubmissionThread* TOverlappedSubmissionThread::instance_;
uint32_t TOverlappedSubmissionThread::instanceRefCount_=0;
}}} //apach::thrift::transport