blob: 7a9b589bd7c76f642ce1b28670ed3811286ef005 [file] [log] [blame]
Roger Meier3faaedf2011-10-02 10:51:45 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23#include "Monitor.h"
24#include "Exception.h"
25#include "Util.h"
26
27#include <assert.h>
28#include <errno.h>
29
30#include <boost/scoped_ptr.hpp>
31#include <boost/thread.hpp>
32#include <boost/date_time/posix_time/posix_time.hpp>
33#include <boost/interprocess/sync/interprocess_mutex.hpp>
34#include <boost/interprocess/sync/interprocess_condition.hpp>
35#include <boost/interprocess/sync/scoped_lock.hpp>
36
37namespace apache { namespace thrift { namespace concurrency {
38
39using namespace boost::interprocess;
40
41/**
42 * Monitor implementation using the boost interprocess library
43 *
44 * @version $Id:$
45 */
46class Monitor::Impl : public interprocess_condition {
47
48 public:
49
50 Impl()
51 : ownedMutex_(new Mutex()),
52 mutex_(NULL) {
53 init(ownedMutex_.get());
54 }
55
56 Impl(Mutex* mutex)
57 : mutex_(NULL) {
58 init(mutex);
59 }
60
61 Impl(Monitor* monitor)
62 : mutex_(NULL) {
63 init(&(monitor->mutex()));
64 }
65
66 Mutex& mutex() { return *mutex_; }
67 void lock() { mutex().lock(); }
68 void unlock() { mutex().unlock(); }
69
70 /**
71 * Exception-throwing version of waitForTimeRelative(), called simply
72 * wait(int64) for historical reasons. Timeout is in milliseconds.
73 *
74 * If the condition occurs, this function returns cleanly; on timeout or
75 * error an exception is thrown.
76 */
77 void wait(int64_t timeout_ms) {
78 int result = waitForTimeRelative(timeout_ms);
79 if (result == ETIMEDOUT) {
80 throw TimedOutException();
81 } else if (result != 0) {
82 throw TException(
83 "Monitor::wait() failed");
84 }
85 }
86
87 /**
88 * Waits until the specified timeout in milliseconds for the condition to
89 * occur, or waits forever if timeout_ms == 0.
90 *
91 * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
92 */
93 int waitForTimeRelative(int64_t timeout_ms) {
94 if (timeout_ms == 0LL) {
95 return waitForever();
96 }
97
98 assert(mutex_);
99 interprocess_mutex* mutexImpl =
100 reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
101 assert(mutexImpl);
102
103 scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
104 int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : ETIMEDOUT;
105 lock.release();
106 return res;
107 }
108
109 /**
110 * Waits until the absolute time specified using struct timespec.
111 * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
112 */
113 int waitForTime(const timespec* abstime) {
114 assert(mutex_);
115 interprocess_mutex* mutexImpl =
116 reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
117 assert(mutexImpl);
118
119 struct timespec currenttime;
120 Util::toTimespec(currenttime, Util::currentTime());
121
122 long tv_sec = abstime->tv_sec - currenttime.tv_sec;
123 long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec;
124 if(tv_sec < 0)
125 tv_sec = 0;
126 if(tv_nsec < 0)
127 tv_nsec = 0;
128
129 scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
130 int res = timed_wait(lock, boost::get_system_time() +
131 boost::posix_time::seconds(tv_sec) +
132 boost::posix_time::microseconds(tv_nsec / 1000)
133 ) ? 0 : ETIMEDOUT;
134 lock.release();
135 return res;
136 }
137
138 /**
139 * Waits forever until the condition occurs.
140 * Returns 0 if condition occurs, or an error code otherwise.
141 */
142 int waitForever() {
143 assert(mutex_);
144 interprocess_mutex* mutexImpl =
145 reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
146 assert(mutexImpl);
147
148 scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
149 ((interprocess_condition*)this)->wait(lock);
150 lock.release();
151 return 0;
152 }
153
154
155 void notify() {
156 notify_one();
157 }
158
159 void notifyAll() {
160 notify_all();
161 }
162
163 private:
164
165 void init(Mutex* mutex) {
166 mutex_ = mutex;
167 }
168
169 boost::scoped_ptr<Mutex> ownedMutex_;
170 Mutex* mutex_;
171};
172
173Monitor::Monitor() : impl_(new Monitor::Impl()) {}
174Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
175Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
176
177Monitor::~Monitor() { delete impl_; }
178
179Mutex& Monitor::mutex() const { return const_cast<Monitor::Impl*>(impl_)->mutex(); }
180
181void Monitor::lock() const { const_cast<Monitor::Impl*>(impl_)->lock(); }
182
183void Monitor::unlock() const { const_cast<Monitor::Impl*>(impl_)->unlock(); }
184
185void Monitor::wait(int64_t timeout) const { const_cast<Monitor::Impl*>(impl_)->wait(timeout); }
186
187int Monitor::waitForTime(const timespec* abstime) const {
188 return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
189}
190
191int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
192 return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
193}
194
195int Monitor::waitForever() const {
196 return const_cast<Monitor::Impl*>(impl_)->waitForever();
197}
198
199void Monitor::notify() const { const_cast<Monitor::Impl*>(impl_)->notify(); }
200
201void Monitor::notifyAll() const { const_cast<Monitor::Impl*>(impl_)->notifyAll(); }
202
203}}} // apache::thrift::concurrency