| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <thrift/concurrency/TimerManager.h> |
| #include <thrift/concurrency/Exception.h> |
| |
| #include <assert.h> |
| #include <iostream> |
| #include <memory> |
| #include <set> |
| |
| namespace apache { |
| namespace thrift { |
| namespace concurrency { |
| |
| using std::shared_ptr; |
| using std::weak_ptr; |
| |
| /** |
| * TimerManager class |
| * |
| * @version $Id:$ |
| */ |
| class TimerManager::Task : public Runnable { |
| |
| public: |
| enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE }; |
| |
| Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {} |
| |
| ~Task() override = default; |
| |
| void run() override { |
| if (state_ == EXECUTING) { |
| runnable_->run(); |
| state_ = COMPLETE; |
| } |
| } |
| |
| bool operator==(const shared_ptr<Runnable> & runnable) const { return runnable_ == runnable; } |
| |
| task_iterator it_; |
| |
| private: |
| shared_ptr<Runnable> runnable_; |
| friend class TimerManager::Dispatcher; |
| STATE state_; |
| }; |
| |
| class TimerManager::Dispatcher : public Runnable { |
| |
| public: |
| Dispatcher(TimerManager* manager) : manager_(manager) {} |
| |
| ~Dispatcher() override = default; |
| |
| /** |
| * Dispatcher entry point |
| * |
| * As long as dispatcher thread is running, pull tasks off the task taskMap_ |
| * and execute. |
| */ |
| void run() override { |
| { |
| Synchronized s(manager_->monitor_); |
| if (manager_->state_ == TimerManager::STARTING) { |
| manager_->state_ = TimerManager::STARTED; |
| manager_->monitor_.notifyAll(); |
| } |
| } |
| |
| do { |
| std::set<shared_ptr<TimerManager::Task> > expiredTasks; |
| { |
| Synchronized s(manager_->monitor_); |
| task_iterator expiredTaskEnd; |
| auto now = std::chrono::steady_clock::now(); |
| while (manager_->state_ == TimerManager::STARTED |
| && (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) |
| == manager_->taskMap_.begin()) { |
| std::chrono::milliseconds timeout(0); |
| if (!manager_->taskMap_.empty()) { |
| timeout = std::chrono::duration_cast<std::chrono::milliseconds>(manager_->taskMap_.begin()->first - now); |
| //because the unit of steady_clock is smaller than millisecond,timeout may be 0. |
| if (timeout.count() == 0) { |
| timeout = std::chrono::milliseconds(1); |
| } |
| manager_->monitor_.waitForTimeRelative(timeout); |
| } else { |
| manager_->monitor_.waitForTimeRelative(0); |
| } |
| now = std::chrono::steady_clock::now(); |
| } |
| |
| if (manager_->state_ == TimerManager::STARTED) { |
| for (auto ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) { |
| shared_ptr<TimerManager::Task> task = ix->second; |
| expiredTasks.insert(task); |
| task->it_ = manager_->taskMap_.end(); |
| if (task->state_ == TimerManager::Task::WAITING) { |
| task->state_ = TimerManager::Task::EXECUTING; |
| } |
| manager_->taskCount_--; |
| } |
| manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd); |
| } |
| } |
| |
| for (const auto & expiredTask : expiredTasks) { |
| expiredTask->run(); |
| } |
| |
| } while (manager_->state_ == TimerManager::STARTED); |
| |
| { |
| Synchronized s(manager_->monitor_); |
| if (manager_->state_ == TimerManager::STOPPING) { |
| manager_->state_ = TimerManager::STOPPED; |
| manager_->monitor_.notifyAll(); |
| } |
| } |
| return; |
| } |
| |
| private: |
| TimerManager* manager_; |
| friend class TimerManager; |
| }; |
| |
| #if defined(_MSC_VER) |
| #pragma warning(push) |
| #pragma warning(disable : 4355) // 'this' used in base member initializer list |
| #endif |
| |
| TimerManager::TimerManager() |
| : taskCount_(0), |
| state_(TimerManager::UNINITIALIZED), |
| dispatcher_(std::make_shared<Dispatcher>(this)) { |
| } |
| |
| #if defined(_MSC_VER) |
| #pragma warning(pop) |
| #endif |
| |
| TimerManager::~TimerManager() { |
| |
| // If we haven't been explicitly stopped, do so now. We don't need to grab |
| // the monitor here, since stop already takes care of reentrancy. |
| |
| if (state_ != STOPPED) { |
| try { |
| stop(); |
| } catch (...) { |
| // We're really hosed. |
| } |
| } |
| } |
| |
| void TimerManager::start() { |
| bool doStart = false; |
| { |
| Synchronized s(monitor_); |
| if (!threadFactory_) { |
| throw InvalidArgumentException(); |
| } |
| if (state_ == TimerManager::UNINITIALIZED) { |
| state_ = TimerManager::STARTING; |
| doStart = true; |
| } |
| } |
| |
| if (doStart) { |
| dispatcherThread_ = threadFactory_->newThread(dispatcher_); |
| dispatcherThread_->start(); |
| } |
| |
| { |
| Synchronized s(monitor_); |
| while (state_ == TimerManager::STARTING) { |
| monitor_.wait(); |
| } |
| assert(state_ != TimerManager::STARTING); |
| } |
| } |
| |
| void TimerManager::stop() { |
| bool doStop = false; |
| { |
| Synchronized s(monitor_); |
| if (state_ == TimerManager::UNINITIALIZED) { |
| state_ = TimerManager::STOPPED; |
| } else if (state_ != STOPPING && state_ != STOPPED) { |
| doStop = true; |
| state_ = STOPPING; |
| monitor_.notifyAll(); |
| } |
| while (state_ != STOPPED) { |
| monitor_.wait(); |
| } |
| } |
| |
| if (doStop) { |
| // Clean up any outstanding tasks |
| taskMap_.clear(); |
| |
| // Remove dispatcher's reference to us. |
| dispatcher_->manager_ = nullptr; |
| } |
| } |
| |
| shared_ptr<const ThreadFactory> TimerManager::threadFactory() const { |
| Synchronized s(monitor_); |
| return threadFactory_; |
| } |
| |
| void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) { |
| Synchronized s(monitor_); |
| threadFactory_ = value; |
| } |
| |
| size_t TimerManager::taskCount() const { |
| return taskCount_; |
| } |
| |
| TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout) { |
| return add(task, std::chrono::steady_clock::now() + timeout); |
| } |
| |
| TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, |
| const std::chrono::time_point<std::chrono::steady_clock>& abstime) { |
| auto now = std::chrono::steady_clock::now(); |
| |
| if (abstime < now) { |
| throw InvalidArgumentException(); |
| } |
| Synchronized s(monitor_); |
| if (state_ != TimerManager::STARTED) { |
| throw IllegalStateException(); |
| } |
| |
| // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him |
| // if the expiration time is shorter than the current value. Need to test before we insert, |
| // because the new task might insert at the front. |
| bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first; |
| |
| shared_ptr<Task> timer(new Task(task)); |
| taskCount_++; |
| timer->it_ = taskMap_.emplace(abstime, timer); |
| |
| // If the task map was empty, or if we have an expiration that is earlier |
| // than any previously seen, kick the dispatcher so it can update its |
| // timeout |
| if (notifyRequired) { |
| monitor_.notify(); |
| } |
| |
| return timer; |
| } |
| |
| void TimerManager::remove(shared_ptr<Runnable> task) { |
| Synchronized s(monitor_); |
| if (state_ != TimerManager::STARTED) { |
| throw IllegalStateException(); |
| } |
| bool found = false; |
| for (auto ix = taskMap_.begin(); ix != taskMap_.end();) { |
| if (*ix->second == task) { |
| found = true; |
| taskCount_--; |
| taskMap_.erase(ix++); |
| } else { |
| ++ix; |
| } |
| } |
| if (!found) { |
| throw NoSuchTaskException(); |
| } |
| } |
| |
| void TimerManager::remove(Timer handle) { |
| Synchronized s(monitor_); |
| if (state_ != TimerManager::STARTED) { |
| throw IllegalStateException(); |
| } |
| |
| shared_ptr<Task> task = handle.lock(); |
| if (!task) { |
| throw NoSuchTaskException(); |
| } |
| |
| if (task->it_ == taskMap_.end()) { |
| // Task is being executed |
| throw UncancellableTaskException(); |
| } |
| |
| taskMap_.erase(task->it_); |
| taskCount_--; |
| } |
| |
| TimerManager::STATE TimerManager::state() const { |
| return state_; |
| } |
| } |
| } |
| } // apache::thrift::concurrency |