blob: 38436ee381e5e9d9d8ddc923bcdb948ecbdf287a [file] [log] [blame]
Jake Farrellb95b0ff2012-03-22 21:49:10 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19module thrift.util.awaitable;
20
21import core.sync.condition;
22import core.sync.mutex;
23import core.time : Duration;
24import std.exception : enforce;
25import std.socket/+ : Socket, socketPair+/; // DMD @@BUG314@@
26import thrift.base;
27
28// To avoid DMD @@BUG6395@@.
29import thrift.internal.algorithm;
30
31/**
32 * An event that can occur at some point in the future and which can be
33 * awaited, either by blocking until it occurs, or by registering a callback
34 * delegate.
35 */
36interface TAwaitable {
37 /**
38 * Waits until the event occurs.
39 *
40 * Calling wait() for an event that has already occurred is a no-op.
41 */
42 void wait();
43
44 /**
45 * Waits until the event occurs or the specified timeout expires.
46 *
47 * Calling wait() for an event that has already occurred is a no-op.
48 *
49 * Returns: Whether the event was triggered before the timeout expired.
50 */
51 bool wait(Duration timeout);
52
53 /**
54 * Registers a callback that is called if the event occurs.
55 *
56 * The delegate will likely be invoked from a different thread, and is
57 * expected not to perform expensive work as it will usually be invoked
58 * synchronously by the notifying thread. The order in which registered
59 * callbacks are invoked is not specified.
60 *
61 * The callback must never throw, but nothrow semantics are difficult to
62 * enforce, so currently exceptions are just swallowed by
63 * TAwaitable implementations.
64 *
65 * If the event has already occurred, the delegate is immediately executed
66 * in the current thread.
67 */
68 void addCallback(void delegate() dg);
69
70 /**
71 * Removes a previously added callback.
72 *
73 * Returns: Whether the callback could be found in the list, i.e. whether it
74 * was previously added.
75 */
76 bool removeCallback(void delegate() dg);
77}
78
79/**
80 * A simple TAwaitable event triggered by just calling a trigger() method.
81 */
82class TOneshotEvent : TAwaitable {
83 this() {
84 mutex_ = new Mutex;
85 condition_ = new Condition(mutex_);
86 }
87
88 override void wait() {
89 synchronized (mutex_) {
90 while (!triggered_) condition_.wait();
91 }
92 }
93
94 override bool wait(Duration timeout) {
95 synchronized (mutex_) {
96 if (triggered_) return true;
97 condition_.wait(timeout);
98 return triggered_;
99 }
100 }
101
102 override void addCallback(void delegate() dg) {
103 mutex_.lock();
104 scope (failure) mutex_.unlock();
105
106 callbacks_ ~= dg;
107
108 if (triggered_) {
109 mutex_.unlock();
110 dg();
111 return;
112 }
113
114 mutex_.unlock();
115 }
116
117 override bool removeCallback(void delegate() dg) {
118 synchronized (mutex_) {
119 auto oldLength = callbacks_.length;
120 callbacks_ = removeEqual(callbacks_, dg);
121 return callbacks_.length < oldLength;
122 }
123 }
124
125 /**
126 * Triggers the event.
127 *
128 * Any registered event callbacks are executed synchronously before the
129 * function returns.
130 */
131 void trigger() {
132 synchronized (mutex_) {
133 if (!triggered_) {
134 triggered_ = true;
135 condition_.notifyAll();
136 foreach (c; callbacks_) c();
137 }
138 }
139 }
140
141private:
142 bool triggered_;
143 Mutex mutex_;
144 Condition condition_;
145 void delegate()[] callbacks_;
146}
147
148/**
149 * Translates TAwaitable events into dummy messages on a socket that can be
150 * used e.g. to wake up from a select() call.
151 */
152final class TSocketNotifier {
153 this() {
154 auto socks = socketPair();
155 foreach (s; socks) s.blocking = false;
156 sendSocket_ = socks[0];
157 recvSocket_ = socks[1];
158 }
159
160 /**
161 * The socket the messages will be sent to.
162 */
163 Socket socket() @property {
164 return recvSocket_;
165 }
166
167 /**
168 * Atatches the socket notifier to the specified awaitable, causing it to
169 * write a byte to the notification socket when the awaitable callbacks are
170 * invoked.
171 *
172 * If the event has already been triggered, the dummy byte is written
173 * immediately to the socket.
174 *
175 * A socket notifier can only be attached to a single awaitable at a time.
176 *
177 * Throws: TException if the socket notifier is already attached.
178 */
179 void attach(TAwaitable awaitable) {
180 enforce(!awaitable_, new TException("Already attached."));
181 awaitable.addCallback(&notify);
182 awaitable_ = awaitable;
183 }
184
185 /**
186 * Detaches the socket notifier from the awaitable it is currently attached
187 * to.
188 *
189 * Throws: TException if the socket notifier is not currently attached.
190 */
191 void detach() {
192 enforce(awaitable_, new TException("Not attached."));
193
194 // Soak up any not currently read notification bytes.
195 ubyte[1] dummy = void;
196 while (recvSocket_.receive(dummy) != Socket.ERROR) {}
197
198 auto couldRemove = awaitable_.removeCallback(&notify);
199 assert(couldRemove);
200 awaitable_ = null;
201 }
202
203private:
204 void notify() {
205 ubyte[1] zero;
206 sendSocket_.send(zero);
207 }
208
209 TAwaitable awaitable_;
210 Socket sendSocket_;
211 Socket recvSocket_;
212}