Roger Meier | 3faaedf | 2011-10-02 10:51:45 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | */ |
| 19 | |
| 20 | #ifdef HAVE_CONFIG_H |
| 21 | #include <config.h> |
| 22 | #endif |
| 23 | #include "Monitor.h" |
| 24 | #include "Exception.h" |
| 25 | #include "Util.h" |
| 26 | |
| 27 | #include <assert.h> |
| 28 | #include <errno.h> |
| 29 | |
| 30 | #include <boost/scoped_ptr.hpp> |
| 31 | #include <boost/thread.hpp> |
| 32 | #include <boost/date_time/posix_time/posix_time.hpp> |
| 33 | #include <boost/interprocess/sync/interprocess_mutex.hpp> |
| 34 | #include <boost/interprocess/sync/interprocess_condition.hpp> |
| 35 | #include <boost/interprocess/sync/scoped_lock.hpp> |
| 36 | |
| 37 | namespace apache { namespace thrift { namespace concurrency { |
| 38 | |
| 39 | using namespace boost::interprocess; |
| 40 | |
| 41 | /** |
| 42 | * Monitor implementation using the boost interprocess library |
| 43 | * |
| 44 | * @version $Id:$ |
| 45 | */ |
| 46 | class Monitor::Impl : public interprocess_condition { |
| 47 | |
| 48 | public: |
| 49 | |
| 50 | Impl() |
| 51 | : ownedMutex_(new Mutex()), |
| 52 | mutex_(NULL) { |
| 53 | init(ownedMutex_.get()); |
| 54 | } |
| 55 | |
| 56 | Impl(Mutex* mutex) |
| 57 | : mutex_(NULL) { |
| 58 | init(mutex); |
| 59 | } |
| 60 | |
| 61 | Impl(Monitor* monitor) |
| 62 | : mutex_(NULL) { |
| 63 | init(&(monitor->mutex())); |
| 64 | } |
| 65 | |
| 66 | Mutex& mutex() { return *mutex_; } |
| 67 | void lock() { mutex().lock(); } |
| 68 | void unlock() { mutex().unlock(); } |
| 69 | |
| 70 | /** |
| 71 | * Exception-throwing version of waitForTimeRelative(), called simply |
| 72 | * wait(int64) for historical reasons. Timeout is in milliseconds. |
| 73 | * |
| 74 | * If the condition occurs, this function returns cleanly; on timeout or |
| 75 | * error an exception is thrown. |
| 76 | */ |
| 77 | void wait(int64_t timeout_ms) { |
| 78 | int result = waitForTimeRelative(timeout_ms); |
| 79 | if (result == ETIMEDOUT) { |
| 80 | throw TimedOutException(); |
| 81 | } else if (result != 0) { |
| 82 | throw TException( |
| 83 | "Monitor::wait() failed"); |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | /** |
| 88 | * Waits until the specified timeout in milliseconds for the condition to |
| 89 | * occur, or waits forever if timeout_ms == 0. |
| 90 | * |
| 91 | * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. |
| 92 | */ |
| 93 | int waitForTimeRelative(int64_t timeout_ms) { |
| 94 | if (timeout_ms == 0LL) { |
| 95 | return waitForever(); |
| 96 | } |
| 97 | |
| 98 | assert(mutex_); |
| 99 | interprocess_mutex* mutexImpl = |
| 100 | reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl()); |
| 101 | assert(mutexImpl); |
| 102 | |
| 103 | scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type()); |
| 104 | int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : ETIMEDOUT; |
| 105 | lock.release(); |
| 106 | return res; |
| 107 | } |
| 108 | |
| 109 | /** |
| 110 | * Waits until the absolute time specified using struct timespec. |
| 111 | * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. |
| 112 | */ |
| 113 | int waitForTime(const timespec* abstime) { |
| 114 | assert(mutex_); |
| 115 | interprocess_mutex* mutexImpl = |
| 116 | reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl()); |
| 117 | assert(mutexImpl); |
| 118 | |
| 119 | struct timespec currenttime; |
| 120 | Util::toTimespec(currenttime, Util::currentTime()); |
| 121 | |
| 122 | long tv_sec = abstime->tv_sec - currenttime.tv_sec; |
| 123 | long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec; |
| 124 | if(tv_sec < 0) |
| 125 | tv_sec = 0; |
| 126 | if(tv_nsec < 0) |
| 127 | tv_nsec = 0; |
| 128 | |
| 129 | scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type()); |
| 130 | int res = timed_wait(lock, boost::get_system_time() + |
| 131 | boost::posix_time::seconds(tv_sec) + |
| 132 | boost::posix_time::microseconds(tv_nsec / 1000) |
| 133 | ) ? 0 : ETIMEDOUT; |
| 134 | lock.release(); |
| 135 | return res; |
| 136 | } |
| 137 | |
| 138 | /** |
| 139 | * Waits forever until the condition occurs. |
| 140 | * Returns 0 if condition occurs, or an error code otherwise. |
| 141 | */ |
| 142 | int waitForever() { |
| 143 | assert(mutex_); |
| 144 | interprocess_mutex* mutexImpl = |
| 145 | reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl()); |
| 146 | assert(mutexImpl); |
| 147 | |
| 148 | scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type()); |
| 149 | ((interprocess_condition*)this)->wait(lock); |
| 150 | lock.release(); |
| 151 | return 0; |
| 152 | } |
| 153 | |
| 154 | |
| 155 | void notify() { |
| 156 | notify_one(); |
| 157 | } |
| 158 | |
| 159 | void notifyAll() { |
| 160 | notify_all(); |
| 161 | } |
| 162 | |
| 163 | private: |
| 164 | |
| 165 | void init(Mutex* mutex) { |
| 166 | mutex_ = mutex; |
| 167 | } |
| 168 | |
| 169 | boost::scoped_ptr<Mutex> ownedMutex_; |
| 170 | Mutex* mutex_; |
| 171 | }; |
| 172 | |
| 173 | Monitor::Monitor() : impl_(new Monitor::Impl()) {} |
| 174 | Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {} |
| 175 | Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {} |
| 176 | |
| 177 | Monitor::~Monitor() { delete impl_; } |
| 178 | |
| 179 | Mutex& Monitor::mutex() const { return const_cast<Monitor::Impl*>(impl_)->mutex(); } |
| 180 | |
| 181 | void Monitor::lock() const { const_cast<Monitor::Impl*>(impl_)->lock(); } |
| 182 | |
| 183 | void Monitor::unlock() const { const_cast<Monitor::Impl*>(impl_)->unlock(); } |
| 184 | |
| 185 | void Monitor::wait(int64_t timeout) const { const_cast<Monitor::Impl*>(impl_)->wait(timeout); } |
| 186 | |
| 187 | int Monitor::waitForTime(const timespec* abstime) const { |
| 188 | return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); |
| 189 | } |
| 190 | |
| 191 | int Monitor::waitForTimeRelative(int64_t timeout_ms) const { |
| 192 | return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms); |
| 193 | } |
| 194 | |
| 195 | int Monitor::waitForever() const { |
| 196 | return const_cast<Monitor::Impl*>(impl_)->waitForever(); |
| 197 | } |
| 198 | |
| 199 | void Monitor::notify() const { const_cast<Monitor::Impl*>(impl_)->notify(); } |
| 200 | |
| 201 | void Monitor::notifyAll() const { const_cast<Monitor::Impl*>(impl_)->notifyAll(); } |
| 202 | |
| 203 | }}} // apache::thrift::concurrency |