blob: 703c19ed1dbd9d4f343ca2b0b048777f8b2ba702 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <thrift/concurrency/TimerManager.h>
#include <thrift/concurrency/Exception.h>
#include <assert.h>
#include <iostream>
#include <memory>
#include <set>
namespace apache {
namespace thrift {
namespace concurrency {
using std::shared_ptr;
using std::weak_ptr;
/**
* TimerManager class
*
* @version $Id:$
*/
class TimerManager::Task : public Runnable {
public:
enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
~Task() override = default;
void run() override {
if (state_ == EXECUTING) {
runnable_->run();
state_ = COMPLETE;
}
}
bool operator==(const shared_ptr<Runnable> & runnable) const { return runnable_ == runnable; }
task_iterator it_;
private:
shared_ptr<Runnable> runnable_;
friend class TimerManager::Dispatcher;
STATE state_;
};
class TimerManager::Dispatcher : public Runnable {
public:
Dispatcher(TimerManager* manager) : manager_(manager) {}
~Dispatcher() override = default;
/**
* Dispatcher entry point
*
* As long as dispatcher thread is running, pull tasks off the task taskMap_
* and execute.
*/
void run() override {
{
Synchronized s(manager_->monitor_);
if (manager_->state_ == TimerManager::STARTING) {
manager_->state_ = TimerManager::STARTED;
manager_->monitor_.notifyAll();
}
}
do {
std::set<shared_ptr<TimerManager::Task> > expiredTasks;
{
Synchronized s(manager_->monitor_);
task_iterator expiredTaskEnd;
auto now = std::chrono::steady_clock::now();
while (manager_->state_ == TimerManager::STARTED
&& (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
== manager_->taskMap_.begin()) {
std::chrono::milliseconds timeout(0);
if (!manager_->taskMap_.empty()) {
timeout = std::chrono::duration_cast<std::chrono::milliseconds>(manager_->taskMap_.begin()->first - now);
//because the unit of steady_clock is smaller than millisecond,timeout may be 0.
if (timeout.count() == 0) {
timeout = std::chrono::milliseconds(1);
}
manager_->monitor_.waitForTimeRelative(timeout);
} else {
manager_->monitor_.waitForTimeRelative(0);
}
now = std::chrono::steady_clock::now();
}
if (manager_->state_ == TimerManager::STARTED) {
for (auto ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
shared_ptr<TimerManager::Task> task = ix->second;
expiredTasks.insert(task);
task->it_ = manager_->taskMap_.end();
if (task->state_ == TimerManager::Task::WAITING) {
task->state_ = TimerManager::Task::EXECUTING;
}
manager_->taskCount_--;
}
manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
}
}
for (const auto & expiredTask : expiredTasks) {
expiredTask->run();
}
} while (manager_->state_ == TimerManager::STARTED);
{
Synchronized s(manager_->monitor_);
if (manager_->state_ == TimerManager::STOPPING) {
manager_->state_ = TimerManager::STOPPED;
manager_->monitor_.notifyAll();
}
}
return;
}
private:
TimerManager* manager_;
friend class TimerManager;
};
#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable : 4355) // 'this' used in base member initializer list
#endif
TimerManager::TimerManager()
: taskCount_(0),
state_(TimerManager::UNINITIALIZED),
dispatcher_(std::make_shared<Dispatcher>(this)) {
}
#if defined(_MSC_VER)
#pragma warning(pop)
#endif
TimerManager::~TimerManager() {
// If we haven't been explicitly stopped, do so now. We don't need to grab
// the monitor here, since stop already takes care of reentrancy.
if (state_ != STOPPED) {
try {
stop();
} catch (...) {
// We're really hosed.
}
}
}
void TimerManager::start() {
bool doStart = false;
{
Synchronized s(monitor_);
if (!threadFactory_) {
throw InvalidArgumentException();
}
if (state_ == TimerManager::UNINITIALIZED) {
state_ = TimerManager::STARTING;
doStart = true;
}
}
if (doStart) {
dispatcherThread_ = threadFactory_->newThread(dispatcher_);
dispatcherThread_->start();
}
{
Synchronized s(monitor_);
while (state_ == TimerManager::STARTING) {
monitor_.wait();
}
assert(state_ != TimerManager::STARTING);
}
}
void TimerManager::stop() {
bool doStop = false;
{
Synchronized s(monitor_);
if (state_ == TimerManager::UNINITIALIZED) {
state_ = TimerManager::STOPPED;
} else if (state_ != STOPPING && state_ != STOPPED) {
doStop = true;
state_ = STOPPING;
monitor_.notifyAll();
}
while (state_ != STOPPED) {
monitor_.wait();
}
}
if (doStop) {
// Clean up any outstanding tasks
taskMap_.clear();
// Remove dispatcher's reference to us.
dispatcher_->manager_ = nullptr;
}
}
shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Synchronized s(monitor_);
return threadFactory_;
}
void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Synchronized s(monitor_);
threadFactory_ = value;
}
size_t TimerManager::taskCount() const {
return taskCount_;
}
TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout) {
return add(task, std::chrono::steady_clock::now() + timeout);
}
TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
const std::chrono::time_point<std::chrono::steady_clock>& abstime) {
auto now = std::chrono::steady_clock::now();
if (abstime < now) {
throw InvalidArgumentException();
}
Synchronized s(monitor_);
if (state_ != TimerManager::STARTED) {
throw IllegalStateException();
}
// If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
// if the expiration time is shorter than the current value. Need to test before we insert,
// because the new task might insert at the front.
bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first;
shared_ptr<Task> timer(new Task(task));
taskCount_++;
timer->it_ = taskMap_.emplace(abstime, timer);
// If the task map was empty, or if we have an expiration that is earlier
// than any previously seen, kick the dispatcher so it can update its
// timeout
if (notifyRequired) {
monitor_.notify();
}
return timer;
}
void TimerManager::remove(shared_ptr<Runnable> task) {
Synchronized s(monitor_);
if (state_ != TimerManager::STARTED) {
throw IllegalStateException();
}
bool found = false;
for (auto ix = taskMap_.begin(); ix != taskMap_.end();) {
if (*ix->second == task) {
found = true;
taskCount_--;
taskMap_.erase(ix++);
} else {
++ix;
}
}
if (!found) {
throw NoSuchTaskException();
}
}
void TimerManager::remove(Timer handle) {
Synchronized s(monitor_);
if (state_ != TimerManager::STARTED) {
throw IllegalStateException();
}
shared_ptr<Task> task = handle.lock();
if (!task) {
throw NoSuchTaskException();
}
if (task->it_ == taskMap_.end()) {
// Task is being executed
throw UncancellableTaskException();
}
taskMap_.erase(task->it_);
taskCount_--;
}
TimerManager::STATE TimerManager::state() const {
return state_;
}
}
}
} // apache::thrift::concurrency