| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <thrift/windows/OverlappedSubmissionThread.h> |
| #include <thrift/transport/TTransportException.h> |
| #include <boost/noncopyable.hpp> |
| #include <boost/scope_exit.hpp> |
| #include <process.h> |
| |
| namespace apache { |
| namespace thrift { |
| namespace transport { |
| |
| TOverlappedWorkItem::TOverlappedWorkItem() |
| : SLIST_ENTRY(), |
| action(UNKNOWN), |
| h(INVALID_HANDLE_VALUE), |
| buffer(NULL), |
| buffer_len(0), |
| overlap(), |
| last_error(0), |
| success(TRUE) { |
| } |
| |
| void TOverlappedWorkItem::reset(uint8_t* buf, uint32_t len, HANDLE event) { |
| memset(&overlap, 0, sizeof(overlap)); |
| overlap.hEvent = event; |
| buffer = buf; |
| buffer_len = len; |
| last_error = 0; |
| success = FALSE; |
| } |
| |
| uint32_t TOverlappedWorkItem::overlappedResults(bool signal_failure) { |
| DWORD bytes = 0; |
| BOOL result = ::GetOverlappedResult(h, &overlap, &bytes, TRUE); |
| if (signal_failure && !result) // get overlapped error case |
| { |
| GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError()); |
| throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed"); |
| } |
| return bytes; |
| } |
| |
| bool TOverlappedWorkItem::process() { |
| BOOST_SCOPE_EXIT((&doneSubmittingEvent)) { SetEvent(doneSubmittingEvent.h); } |
| BOOST_SCOPE_EXIT_END |
| |
| switch (action) { |
| case (CONNECT): |
| success = ::ConnectNamedPipe(h, &overlap); |
| if (success == FALSE) |
| last_error = ::GetLastError(); |
| return true; |
| case (READ): |
| success = ::ReadFile(h, buffer, buffer_len, NULL, &overlap); |
| if (success == FALSE) |
| last_error = ::GetLastError(); |
| return true; |
| case (CANCELIO): |
| success = ::CancelIo(h); |
| if (success == FALSE) |
| last_error = ::GetLastError(); |
| return true; |
| case (STOP): |
| default: |
| return false; |
| } |
| } |
| |
| void TOverlappedSubmissionThread::addWorkItem(TOverlappedWorkItem* item) { |
| InterlockedPushEntrySList(&workList_, item); |
| SetEvent(workAvailableEvent_.h); |
| WaitForSingleObject(item->doneSubmittingEvent.h, INFINITE); |
| } |
| |
| TOverlappedSubmissionThread* TOverlappedSubmissionThread::acquire_instance() { |
| TAutoCrit lock(instanceGuard_); |
| if (instance_ == NULL) { |
| assert(instanceRefCount_ == 0); |
| instance_ = new TOverlappedSubmissionThread; |
| } |
| ++instanceRefCount_; |
| return instance_; |
| } |
| void TOverlappedSubmissionThread::release_instance() { |
| TAutoCrit lock(instanceGuard_); |
| if (--instanceRefCount_ == 0) { |
| delete instance_; |
| instance_ = NULL; |
| } |
| } |
| |
| TOverlappedSubmissionThread::TOverlappedSubmissionThread() { |
| stopItem_.action = TOverlappedWorkItem::STOP; |
| |
| InitializeSListHead(&workList_); |
| thread_ = (HANDLE)_beginthreadex(NULL, 0, thread_proc, this, 0, NULL); |
| if (thread_ == 0) { |
| GlobalOutput.perror("TOverlappedSubmissionThread unable to create thread, errno=", errno); |
| throw TTransportException(TTransportException::NOT_OPEN, |
| " TOverlappedSubmissionThread unable to create thread"); |
| } |
| } |
| |
| TOverlappedSubmissionThread::~TOverlappedSubmissionThread() { |
| addWorkItem(&stopItem_); |
| ::WaitForSingleObject(thread_, INFINITE); |
| CloseHandle(thread_); |
| } |
| |
| void TOverlappedSubmissionThread::run() { |
| for (;;) { |
| WaitForSingleObject(workAvailableEvent_.h, INFINITE); |
| // todo check result |
| SLIST_ENTRY* entry = NULL; |
| while ((entry = InterlockedPopEntrySList(&workList_)) != NULL) { |
| TOverlappedWorkItem& item = *static_cast<TOverlappedWorkItem*>(entry); |
| if (!item.process()) |
| return; |
| } |
| } |
| } |
| |
| unsigned __stdcall TOverlappedSubmissionThread::thread_proc(void* addr) { |
| static_cast<TOverlappedSubmissionThread*>(addr)->run(); |
| return 0; |
| } |
| |
| TCriticalSection TOverlappedSubmissionThread::instanceGuard_; |
| TOverlappedSubmissionThread* TOverlappedSubmissionThread::instance_; |
| uint32_t TOverlappedSubmissionThread::instanceRefCount_ = 0; |
| } |
| } |
| } // apach::thrift::transport |