blob: 3d6440c5e6faa010de3620a899cf5cd945039dad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "Monitor.h"
#include "Exception.h"
#include "Util.h"
#include <boost/scoped_ptr.hpp>
#include <assert.h>
#include <errno.h>
#include <iostream>
#include <pthread.h>
namespace apache { namespace thrift { namespace concurrency {
using boost::scoped_ptr;
/**
* Monitor implementation using the POSIX pthread library
*
* @version $Id:$
*/
class Monitor::Impl {
public:
Impl()
: ownedMutex_(new Mutex()),
mutex_(NULL),
condInitialized_(false) {
init(ownedMutex_.get());
}
Impl(Mutex* mutex)
: mutex_(NULL),
condInitialized_(false) {
init(mutex);
}
Impl(Monitor* monitor)
: mutex_(NULL),
condInitialized_(false) {
init(&(monitor->mutex()));
}
~Impl() { cleanup(); }
Mutex& mutex() { return *mutex_; }
void lock() { mutex().lock(); }
void unlock() { mutex().unlock(); }
/**
* Exception-throwing version of waitForTimeRelative(), called simply
* wait(int64) for historical reasons. Timeout is in milliseconds.
*
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
void wait(int64_t timeout_ms) const {
int result = waitForTimeRelative(timeout_ms);
if (result == ETIMEDOUT) {
// pthread_cond_timedwait has been observed to return early on
// various platforms, so comment out this assert.
//assert(Util::currentTime() >= (now + timeout));
throw TimedOutException();
} else if (result != 0) {
throw TException(
"pthread_cond_wait() or pthread_cond_timedwait() failed");
}
}
/**
* Waits until the specified timeout in milliseconds for the condition to
* occur, or waits forever if timeout_ms == 0.
*
* Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
*/
int waitForTimeRelative(int64_t timeout_ms) const {
if (timeout_ms == 0LL) {
return waitForever();
}
struct timespec abstime;
Util::toTimespec(abstime, Util::currentTime() + timeout_ms);
return waitForTime(&abstime);
}
/**
* Waits until the absolute time specified using struct timespec.
* Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
*/
int waitForTime(const timespec* abstime) const {
assert(mutex_);
pthread_mutex_t* mutexImpl =
reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
// XXX Need to assert that caller owns mutex
return pthread_cond_timedwait(&pthread_cond_,
mutexImpl,
abstime);
}
/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
int waitForever() const {
assert(mutex_);
pthread_mutex_t* mutexImpl =
reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
return pthread_cond_wait(&pthread_cond_, mutexImpl);
}
void notify() {
// XXX Need to assert that caller owns mutex
int iret = pthread_cond_signal(&pthread_cond_);
assert(iret == 0);
}
void notifyAll() {
// XXX Need to assert that caller owns mutex
int iret = pthread_cond_broadcast(&pthread_cond_);
assert(iret == 0);
}
private:
void init(Mutex* mutex) {
mutex_ = mutex;
if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
condInitialized_ = true;
}
if (!condInitialized_) {
cleanup();
throw SystemResourceException();
}
}
void cleanup() {
if (condInitialized_) {
condInitialized_ = false;
int iret = pthread_cond_destroy(&pthread_cond_);
assert(iret == 0);
}
}
scoped_ptr<Mutex> ownedMutex_;
Mutex* mutex_;
mutable pthread_cond_t pthread_cond_;
mutable bool condInitialized_;
};
Monitor::Monitor() : impl_(new Monitor::Impl()) {}
Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
Monitor::~Monitor() { delete impl_; }
Mutex& Monitor::mutex() const { return impl_->mutex(); }
void Monitor::lock() const { impl_->lock(); }
void Monitor::unlock() const { impl_->unlock(); }
void Monitor::wait(int64_t timeout) const { impl_->wait(timeout); }
int Monitor::waitForTime(const timespec* abstime) const {
return impl_->waitForTime(abstime);
}
int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
return impl_->waitForTimeRelative(timeout_ms);
}
int Monitor::waitForever() const {
return impl_->waitForever();
}
void Monitor::notify() const { impl_->notify(); }
void Monitor::notifyAll() const { impl_->notifyAll(); }
}}} // apache::thrift::concurrency