blob: c7e27c07873591865c6b5af3bd2239c5678ed2c2 [file] [log] [blame]
ben-craig02bade12015-07-17 08:40:48 -05001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <thrift/async/TConcurrentClientSyncInfo.h>
21#include <thrift/TApplicationException.h>
22#include <thrift/transport/TTransportException.h>
23#include <limits>
24
25namespace apache { namespace thrift { namespace async {
26
27using namespace ::apache::thrift::concurrency;
28
29TConcurrentClientSyncInfo::TConcurrentClientSyncInfo() :
30 stop_(false),
31 seqidMutex_(),
32 // test rollover all the time
33 nextseqid_((std::numeric_limits<int32_t>::max)()-10),
34 seqidToMonitorMap_(),
35 freeMonitors_(),
36 writeMutex_(),
37 readMutex_(),
38 recvPending_(false),
39 wakeupSomeone_(false),
40 seqidPending_(0),
41 fnamePending_(),
42 mtypePending_(::apache::thrift::protocol::T_CALL)
43{
44 freeMonitors_.reserve(MONITOR_CACHE_SIZE);
45}
46
47bool TConcurrentClientSyncInfo::getPending(
48 std::string &fname,
49 ::apache::thrift::protocol::TMessageType &mtype,
50 int32_t &rseqid)
51{
52 if(stop_)
53 throwDeadConnection_();
54 wakeupSomeone_ = false;
55 if(recvPending_)
56 {
57 recvPending_ = false;
58 rseqid = seqidPending_;
59 fname = fnamePending_;
60 mtype = mtypePending_;
61 return true;
62 }
63 return false;
64}
65
66void TConcurrentClientSyncInfo::updatePending(
67 const std::string &fname,
68 ::apache::thrift::protocol::TMessageType mtype,
69 int32_t rseqid)
70{
71 recvPending_ = true;
72 seqidPending_ = rseqid;
73 fnamePending_ = fname;
74 mtypePending_ = mtype;
75 MonitorPtr monitor;
76 {
77 Guard seqidGuard(seqidMutex_);
78 MonitorMap::iterator i = seqidToMonitorMap_.find(rseqid);
79 if(i == seqidToMonitorMap_.end())
80 throwBadSeqId_();
81 monitor = i->second;
82 }
83 monitor->notify();
84}
85
86void TConcurrentClientSyncInfo::waitForWork(int32_t seqid)
87{
88 MonitorPtr m;
89 {
90 Guard seqidGuard(seqidMutex_);
91 m = seqidToMonitorMap_[seqid];
92 }
93 while(true)
94 {
95 // be very careful about setting state in this loop that affects waking up. You may exit
96 // this function, attempt to grab some work, and someone else could have beaten you (or not
97 // left) the read mutex, and that will put you right back in this loop, with the mangled
98 // state you left behind.
99 if(stop_)
100 throwDeadConnection_();
101 if(wakeupSomeone_)
102 return;
103 if(recvPending_ && seqidPending_ == seqid)
104 return;
105 m->waitForever();
106 }
107}
108
109void TConcurrentClientSyncInfo::throwBadSeqId_()
110{
111 throw apache::thrift::TApplicationException(
112 TApplicationException::BAD_SEQUENCE_ID,
113 "server sent a bad seqid");
114}
115
116void TConcurrentClientSyncInfo::throwDeadConnection_()
117{
118 throw apache::thrift::transport::TTransportException(
119 apache::thrift::transport::TTransportException::NOT_OPEN,
120 "this client died on another thread, and is now in an unusable state");
121}
122
123void TConcurrentClientSyncInfo::wakeupAnyone_(const Guard &)
124{
125 wakeupSomeone_ = true;
126 if(!seqidToMonitorMap_.empty())
127 {
128 // The monitor map maps integers to monitors. Larger integers are more recent
129 // messages. Since this is ordered, it means that the last element is the most recent.
130 // We are trying to guess which thread will have its message complete next, so we are picking
131 // the most recent. The oldest message is likely to be some polling, long lived message.
132 // If we guess right, the thread we wake up will handle the message that comes in.
133 // If we guess wrong, the thread we wake up will hand off the work to the correct thread,
134 // costing us an extra context switch.
135 seqidToMonitorMap_.rbegin()->second->notify();
136 }
137}
138
139void TConcurrentClientSyncInfo::markBad_(const Guard &)
140{
141 wakeupSomeone_ = true;
142 stop_ = true;
143 for(MonitorMap::iterator i = seqidToMonitorMap_.begin(); i != seqidToMonitorMap_.end(); ++i)
144 i->second->notify();
145}
146
147TConcurrentClientSyncInfo::MonitorPtr
148TConcurrentClientSyncInfo::newMonitor_(const Guard &)
149{
150 if(freeMonitors_.empty())
151 return MonitorPtr(new Monitor(&readMutex_));
152 MonitorPtr retval;
153 //swapping to avoid an atomic operation
154 retval.swap(freeMonitors_.back());
155 freeMonitors_.pop_back();
156 return retval;
157}
158
159void TConcurrentClientSyncInfo::deleteMonitor_(
160 const Guard &,
161 TConcurrentClientSyncInfo::MonitorPtr &m) /*noexcept*/
162{
163 if(freeMonitors_.size() > MONITOR_CACHE_SIZE)
164 {
165 m.reset();
166 return;
167 }
168 //freeMonitors_ was reserved up to MONITOR_CACHE_SIZE in the ctor,
169 //so this shouldn't throw
170 freeMonitors_.push_back(TConcurrentClientSyncInfo::MonitorPtr());
171 //swapping to avoid an atomic operation
172 m.swap(freeMonitors_.back());
173}
174
175int32_t TConcurrentClientSyncInfo::generateSeqId()
176{
177 Guard seqidGuard(seqidMutex_);
178 if(stop_)
179 throwDeadConnection_();
180
181 if(!seqidToMonitorMap_.empty())
182 if(nextseqid_ == seqidToMonitorMap_.begin()->first)
183 throw apache::thrift::TApplicationException(
184 TApplicationException::BAD_SEQUENCE_ID,
185 "about to repeat a seqid");
186 int32_t newSeqId = nextseqid_++;
187 seqidToMonitorMap_[newSeqId] = newMonitor_(seqidGuard);
188 return newSeqId;
189}
190
191TConcurrentRecvSentry::TConcurrentRecvSentry(TConcurrentClientSyncInfo *sync, int32_t seqid) :
192 sync_(*sync),
193 seqid_(seqid),
194 committed_(false)
195{
196 sync_.getReadMutex().lock();
197}
198
199TConcurrentRecvSentry::~TConcurrentRecvSentry()
200{
201 {
202 Guard seqidGuard(sync_.seqidMutex_);
203 sync_.deleteMonitor_(seqidGuard, sync_.seqidToMonitorMap_[seqid_]);
204
205 sync_.seqidToMonitorMap_.erase(seqid_);
206 if(committed_)
207 sync_.wakeupAnyone_(seqidGuard);
208 else
209 sync_.markBad_(seqidGuard);
210 }
211 sync_.getReadMutex().unlock();
212}
213
214void TConcurrentRecvSentry::commit()
215{
216 committed_ = true;
217}
218
219TConcurrentSendSentry::TConcurrentSendSentry(TConcurrentClientSyncInfo *sync) :
220 sync_(*sync),
221 committed_(false)
222{
223 sync_.getWriteMutex().lock();
224}
225
226TConcurrentSendSentry::~TConcurrentSendSentry()
227{
228 if(!committed_)
229 {
230 Guard seqidGuard(sync_.seqidMutex_);
231 sync_.markBad_(seqidGuard);
232 }
233 sync_.getWriteMutex().unlock();
234}
235
236void TConcurrentSendSentry::commit()
237{
238 committed_ = true;
239}
240
241
242}}} // apache::thrift::async