Thrift now a TLP - INFRA-3116
git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/TimerManager.cpp b/lib/cpp/src/concurrency/TimerManager.cpp
new file mode 100644
index 0000000..25515dc
--- /dev/null
+++ b/lib/cpp/src/concurrency/TimerManager.cpp
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TimerManager.h"
+#include "Exception.h"
+#include "Util.h"
+
+#include <assert.h>
+#include <iostream>
+#include <set>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+
+typedef std::multimap<int64_t, shared_ptr<TimerManager::Task> >::iterator task_iterator;
+typedef std::pair<task_iterator, task_iterator> task_range;
+
+/**
+ * TimerManager class
+ *
+ * @version $Id:$
+ */
+class TimerManager::Task : public Runnable {
+
+ public:
+ enum STATE {
+ WAITING,
+ EXECUTING,
+ CANCELLED,
+ COMPLETE
+ };
+
+ Task(shared_ptr<Runnable> runnable) :
+ runnable_(runnable),
+ state_(WAITING) {}
+
+ ~Task() {
+ }
+
+ void run() {
+ if (state_ == EXECUTING) {
+ runnable_->run();
+ state_ = COMPLETE;
+ }
+ }
+
+ private:
+ shared_ptr<Runnable> runnable_;
+ class TimerManager::Dispatcher;
+ friend class TimerManager::Dispatcher;
+ STATE state_;
+};
+
+class TimerManager::Dispatcher: public Runnable {
+
+ public:
+ Dispatcher(TimerManager* manager) :
+ manager_(manager) {}
+
+ ~Dispatcher() {}
+
+ /**
+ * Dispatcher entry point
+ *
+ * As long as dispatcher thread is running, pull tasks off the task taskMap_
+ * and execute.
+ */
+ void run() {
+ {
+ Synchronized s(manager_->monitor_);
+ if (manager_->state_ == TimerManager::STARTING) {
+ manager_->state_ = TimerManager::STARTED;
+ manager_->monitor_.notifyAll();
+ }
+ }
+
+ do {
+ std::set<shared_ptr<TimerManager::Task> > expiredTasks;
+ {
+ Synchronized s(manager_->monitor_);
+ task_iterator expiredTaskEnd;
+ int64_t now = Util::currentTime();
+ while (manager_->state_ == TimerManager::STARTED &&
+ (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
+ int64_t timeout = 0LL;
+ if (!manager_->taskMap_.empty()) {
+ timeout = manager_->taskMap_.begin()->first - now;
+ }
+ assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
+ try {
+ manager_->monitor_.wait(timeout);
+ } catch (TimedOutException &e) {}
+ now = Util::currentTime();
+ }
+
+ if (manager_->state_ == TimerManager::STARTED) {
+ for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
+ shared_ptr<TimerManager::Task> task = ix->second;
+ expiredTasks.insert(task);
+ if (task->state_ == TimerManager::Task::WAITING) {
+ task->state_ = TimerManager::Task::EXECUTING;
+ }
+ manager_->taskCount_--;
+ }
+ manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
+ }
+ }
+
+ for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
+ (*ix)->run();
+ }
+
+ } while (manager_->state_ == TimerManager::STARTED);
+
+ {
+ Synchronized s(manager_->monitor_);
+ if (manager_->state_ == TimerManager::STOPPING) {
+ manager_->state_ = TimerManager::STOPPED;
+ manager_->monitor_.notify();
+ }
+ }
+ return;
+ }
+
+ private:
+ TimerManager* manager_;
+ friend class TimerManager;
+};
+
+TimerManager::TimerManager() :
+ taskCount_(0),
+ state_(TimerManager::UNINITIALIZED),
+ dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
+}
+
+
+TimerManager::~TimerManager() {
+
+ // If we haven't been explicitly stopped, do so now. We don't need to grab
+ // the monitor here, since stop already takes care of reentrancy.
+
+ if (state_ != STOPPED) {
+ try {
+ stop();
+ } catch(...) {
+ throw;
+ // uhoh
+ }
+ }
+}
+
+void TimerManager::start() {
+ bool doStart = false;
+ {
+ Synchronized s(monitor_);
+ if (threadFactory_ == NULL) {
+ throw InvalidArgumentException();
+ }
+ if (state_ == TimerManager::UNINITIALIZED) {
+ state_ = TimerManager::STARTING;
+ doStart = true;
+ }
+ }
+
+ if (doStart) {
+ dispatcherThread_ = threadFactory_->newThread(dispatcher_);
+ dispatcherThread_->start();
+ }
+
+ {
+ Synchronized s(monitor_);
+ while (state_ == TimerManager::STARTING) {
+ monitor_.wait();
+ }
+ assert(state_ != TimerManager::STARTING);
+ }
+}
+
+void TimerManager::stop() {
+ bool doStop = false;
+ {
+ Synchronized s(monitor_);
+ if (state_ == TimerManager::UNINITIALIZED) {
+ state_ = TimerManager::STOPPED;
+ } else if (state_ != STOPPING && state_ != STOPPED) {
+ doStop = true;
+ state_ = STOPPING;
+ monitor_.notifyAll();
+ }
+ while (state_ != STOPPED) {
+ monitor_.wait();
+ }
+ }
+
+ if (doStop) {
+ // Clean up any outstanding tasks
+ for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) {
+ taskMap_.erase(ix);
+ }
+
+ // Remove dispatcher's reference to us.
+ dispatcher_->manager_ = NULL;
+ }
+}
+
+shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
+ Synchronized s(monitor_);
+ return threadFactory_;
+}
+
+void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
+ Synchronized s(monitor_);
+ threadFactory_ = value;
+}
+
+size_t TimerManager::taskCount() const {
+ return taskCount_;
+}
+
+void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
+ int64_t now = Util::currentTime();
+ timeout += now;
+
+ {
+ Synchronized s(monitor_);
+ if (state_ != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+
+ taskCount_++;
+ taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
+
+ // If the task map was empty, or if we have an expiration that is earlier
+ // than any previously seen, kick the dispatcher so it can update its
+ // timeout
+ if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
+ monitor_.notify();
+ }
+ }
+}
+
+void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
+
+ int64_t expiration;
+ Util::toMilliseconds(expiration, value);
+
+ int64_t now = Util::currentTime();
+
+ if (expiration < now) {
+ throw InvalidArgumentException();
+ }
+
+ add(task, expiration - now);
+}
+
+
+void TimerManager::remove(shared_ptr<Runnable> task) {
+ Synchronized s(monitor_);
+ if (state_ != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+}
+
+const TimerManager::STATE TimerManager::state() const { return state_; }
+
+}}} // apache::thrift::concurrency
+