blob: 2adf7d7356cf0ac8c5130d7eb4362a99ba0bec2b [file] [log] [blame]
Roger Meier3faaedf2011-10-02 10:51:45 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23#include "Monitor.h"
24#include "Exception.h"
25#include "Util.h"
26
27#include <assert.h>
28#include <errno.h>
29
30#include <boost/scoped_ptr.hpp>
31#include <boost/thread.hpp>
32#include <boost/date_time/posix_time/posix_time.hpp>
Roger Meier3faaedf2011-10-02 10:51:45 +000033
34namespace apache { namespace thrift { namespace concurrency {
35
Roger Meier3faaedf2011-10-02 10:51:45 +000036/**
Roger Meier38315782011-11-06 11:29:41 +000037 * Monitor implementation using the boost thread library
Roger Meier3faaedf2011-10-02 10:51:45 +000038 *
39 * @version $Id:$
40 */
Roger Meier38315782011-11-06 11:29:41 +000041class Monitor::Impl : public boost::condition_variable_any {
Roger Meier3faaedf2011-10-02 10:51:45 +000042
43 public:
44
45 Impl()
46 : ownedMutex_(new Mutex()),
47 mutex_(NULL) {
48 init(ownedMutex_.get());
49 }
50
51 Impl(Mutex* mutex)
52 : mutex_(NULL) {
53 init(mutex);
54 }
55
56 Impl(Monitor* monitor)
57 : mutex_(NULL) {
58 init(&(monitor->mutex()));
59 }
60
61 Mutex& mutex() { return *mutex_; }
62 void lock() { mutex().lock(); }
63 void unlock() { mutex().unlock(); }
64
65 /**
66 * Exception-throwing version of waitForTimeRelative(), called simply
67 * wait(int64) for historical reasons. Timeout is in milliseconds.
68 *
69 * If the condition occurs, this function returns cleanly; on timeout or
70 * error an exception is thrown.
71 */
72 void wait(int64_t timeout_ms) {
73 int result = waitForTimeRelative(timeout_ms);
74 if (result == ETIMEDOUT) {
75 throw TimedOutException();
76 } else if (result != 0) {
77 throw TException(
78 "Monitor::wait() failed");
79 }
80 }
81
82 /**
83 * Waits until the specified timeout in milliseconds for the condition to
84 * occur, or waits forever if timeout_ms == 0.
85 *
86 * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
87 */
88 int waitForTimeRelative(int64_t timeout_ms) {
89 if (timeout_ms == 0LL) {
90 return waitForever();
91 }
92
93 assert(mutex_);
Roger Meier38315782011-11-06 11:29:41 +000094 boost::timed_mutex* mutexImpl =
95 reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl());
Roger Meier3faaedf2011-10-02 10:51:45 +000096 assert(mutexImpl);
97
Roger Meier38315782011-11-06 11:29:41 +000098 boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock);
Roger Meier3faaedf2011-10-02 10:51:45 +000099 int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : ETIMEDOUT;
100 lock.release();
101 return res;
102 }
103
104 /**
105 * Waits until the absolute time specified using struct timespec.
106 * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
107 */
108 int waitForTime(const timespec* abstime) {
109 assert(mutex_);
Roger Meier38315782011-11-06 11:29:41 +0000110 boost::timed_mutex* mutexImpl =
111 reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl());
Roger Meier3faaedf2011-10-02 10:51:45 +0000112 assert(mutexImpl);
113
114 struct timespec currenttime;
115 Util::toTimespec(currenttime, Util::currentTime());
116
117 long tv_sec = abstime->tv_sec - currenttime.tv_sec;
118 long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec;
119 if(tv_sec < 0)
120 tv_sec = 0;
121 if(tv_nsec < 0)
122 tv_nsec = 0;
123
Roger Meier38315782011-11-06 11:29:41 +0000124 boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock);
Roger Meier3faaedf2011-10-02 10:51:45 +0000125 int res = timed_wait(lock, boost::get_system_time() +
126 boost::posix_time::seconds(tv_sec) +
127 boost::posix_time::microseconds(tv_nsec / 1000)
128 ) ? 0 : ETIMEDOUT;
129 lock.release();
130 return res;
131 }
132
133 /**
134 * Waits forever until the condition occurs.
135 * Returns 0 if condition occurs, or an error code otherwise.
136 */
137 int waitForever() {
138 assert(mutex_);
Roger Meier38315782011-11-06 11:29:41 +0000139 boost::timed_mutex* mutexImpl =
140 reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl());
Roger Meier3faaedf2011-10-02 10:51:45 +0000141 assert(mutexImpl);
142
Roger Meier38315782011-11-06 11:29:41 +0000143 boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock);
144 ((boost::condition_variable_any*)this)->wait(lock);
Roger Meier3faaedf2011-10-02 10:51:45 +0000145 lock.release();
146 return 0;
147 }
148
149
150 void notify() {
151 notify_one();
152 }
153
154 void notifyAll() {
155 notify_all();
156 }
157
158 private:
159
160 void init(Mutex* mutex) {
161 mutex_ = mutex;
162 }
163
164 boost::scoped_ptr<Mutex> ownedMutex_;
165 Mutex* mutex_;
166};
167
168Monitor::Monitor() : impl_(new Monitor::Impl()) {}
169Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
170Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
171
172Monitor::~Monitor() { delete impl_; }
173
174Mutex& Monitor::mutex() const { return const_cast<Monitor::Impl*>(impl_)->mutex(); }
175
176void Monitor::lock() const { const_cast<Monitor::Impl*>(impl_)->lock(); }
177
178void Monitor::unlock() const { const_cast<Monitor::Impl*>(impl_)->unlock(); }
179
180void Monitor::wait(int64_t timeout) const { const_cast<Monitor::Impl*>(impl_)->wait(timeout); }
181
182int Monitor::waitForTime(const timespec* abstime) const {
183 return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
184}
185
186int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
187 return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
188}
189
190int Monitor::waitForever() const {
191 return const_cast<Monitor::Impl*>(impl_)->waitForever();
192}
193
194void Monitor::notify() const { const_cast<Monitor::Impl*>(impl_)->notify(); }
195
196void Monitor::notifyAll() const { const_cast<Monitor::Impl*>(impl_)->notifyAll(); }
197
198}}} // apache::thrift::concurrency