blob: 7f1b1ccd2f499ba4c6537320b1e285901aecfe84 [file] [log] [blame]
ben-craig02bade12015-07-17 08:40:48 -05001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
ben-craig02bade12015-07-17 08:40:48 -050020#include <limits>
cyy64750162019-02-08 13:40:59 +080021#include <memory>
22#include <thrift/TApplicationException.h>
23#include <thrift/async/TConcurrentClientSyncInfo.h>
24#include <thrift/transport/TTransportException.h>
ben-craig02bade12015-07-17 08:40:48 -050025
26namespace apache { namespace thrift { namespace async {
27
28using namespace ::apache::thrift::concurrency;
29
30TConcurrentClientSyncInfo::TConcurrentClientSyncInfo() :
31 stop_(false),
32 seqidMutex_(),
33 // test rollover all the time
34 nextseqid_((std::numeric_limits<int32_t>::max)()-10),
35 seqidToMonitorMap_(),
36 freeMonitors_(),
37 writeMutex_(),
38 readMutex_(),
39 recvPending_(false),
40 wakeupSomeone_(false),
41 seqidPending_(0),
42 fnamePending_(),
43 mtypePending_(::apache::thrift::protocol::T_CALL)
44{
45 freeMonitors_.reserve(MONITOR_CACHE_SIZE);
46}
47
48bool TConcurrentClientSyncInfo::getPending(
49 std::string &fname,
50 ::apache::thrift::protocol::TMessageType &mtype,
51 int32_t &rseqid)
52{
53 if(stop_)
54 throwDeadConnection_();
55 wakeupSomeone_ = false;
56 if(recvPending_)
57 {
58 recvPending_ = false;
59 rseqid = seqidPending_;
60 fname = fnamePending_;
61 mtype = mtypePending_;
62 return true;
63 }
64 return false;
65}
66
67void TConcurrentClientSyncInfo::updatePending(
68 const std::string &fname,
69 ::apache::thrift::protocol::TMessageType mtype,
70 int32_t rseqid)
71{
72 recvPending_ = true;
73 seqidPending_ = rseqid;
74 fnamePending_ = fname;
75 mtypePending_ = mtype;
76 MonitorPtr monitor;
77 {
78 Guard seqidGuard(seqidMutex_);
Sebastian Zenker042580f2019-01-29 15:48:12 +010079 auto i = seqidToMonitorMap_.find(rseqid);
ben-craig02bade12015-07-17 08:40:48 -050080 if(i == seqidToMonitorMap_.end())
81 throwBadSeqId_();
82 monitor = i->second;
83 }
84 monitor->notify();
85}
86
87void TConcurrentClientSyncInfo::waitForWork(int32_t seqid)
88{
89 MonitorPtr m;
90 {
91 Guard seqidGuard(seqidMutex_);
92 m = seqidToMonitorMap_[seqid];
93 }
94 while(true)
95 {
96 // be very careful about setting state in this loop that affects waking up. You may exit
97 // this function, attempt to grab some work, and someone else could have beaten you (or not
98 // left) the read mutex, and that will put you right back in this loop, with the mangled
99 // state you left behind.
100 if(stop_)
101 throwDeadConnection_();
102 if(wakeupSomeone_)
103 return;
104 if(recvPending_ && seqidPending_ == seqid)
105 return;
106 m->waitForever();
107 }
108}
109
110void TConcurrentClientSyncInfo::throwBadSeqId_()
111{
112 throw apache::thrift::TApplicationException(
113 TApplicationException::BAD_SEQUENCE_ID,
114 "server sent a bad seqid");
115}
116
117void TConcurrentClientSyncInfo::throwDeadConnection_()
118{
119 throw apache::thrift::transport::TTransportException(
120 apache::thrift::transport::TTransportException::NOT_OPEN,
121 "this client died on another thread, and is now in an unusable state");
122}
123
124void TConcurrentClientSyncInfo::wakeupAnyone_(const Guard &)
125{
126 wakeupSomeone_ = true;
127 if(!seqidToMonitorMap_.empty())
128 {
129 // The monitor map maps integers to monitors. Larger integers are more recent
130 // messages. Since this is ordered, it means that the last element is the most recent.
131 // We are trying to guess which thread will have its message complete next, so we are picking
132 // the most recent. The oldest message is likely to be some polling, long lived message.
133 // If we guess right, the thread we wake up will handle the message that comes in.
134 // If we guess wrong, the thread we wake up will hand off the work to the correct thread,
135 // costing us an extra context switch.
136 seqidToMonitorMap_.rbegin()->second->notify();
137 }
138}
139
140void TConcurrentClientSyncInfo::markBad_(const Guard &)
141{
142 wakeupSomeone_ = true;
143 stop_ = true;
cyy64750162019-02-08 13:40:59 +0800144 for(auto & i : seqidToMonitorMap_)
145 i.second->notify();
ben-craig02bade12015-07-17 08:40:48 -0500146}
147
148TConcurrentClientSyncInfo::MonitorPtr
149TConcurrentClientSyncInfo::newMonitor_(const Guard &)
150{
151 if(freeMonitors_.empty())
cyy64750162019-02-08 13:40:59 +0800152 return std::make_shared<Monitor>(&readMutex_);
ben-craig02bade12015-07-17 08:40:48 -0500153 MonitorPtr retval;
154 //swapping to avoid an atomic operation
155 retval.swap(freeMonitors_.back());
156 freeMonitors_.pop_back();
157 return retval;
158}
159
160void TConcurrentClientSyncInfo::deleteMonitor_(
161 const Guard &,
162 TConcurrentClientSyncInfo::MonitorPtr &m) /*noexcept*/
163{
164 if(freeMonitors_.size() > MONITOR_CACHE_SIZE)
165 {
166 m.reset();
167 return;
168 }
169 //freeMonitors_ was reserved up to MONITOR_CACHE_SIZE in the ctor,
170 //so this shouldn't throw
171 freeMonitors_.push_back(TConcurrentClientSyncInfo::MonitorPtr());
172 //swapping to avoid an atomic operation
173 m.swap(freeMonitors_.back());
174}
175
176int32_t TConcurrentClientSyncInfo::generateSeqId()
177{
178 Guard seqidGuard(seqidMutex_);
179 if(stop_)
180 throwDeadConnection_();
181
182 if(!seqidToMonitorMap_.empty())
183 if(nextseqid_ == seqidToMonitorMap_.begin()->first)
184 throw apache::thrift::TApplicationException(
185 TApplicationException::BAD_SEQUENCE_ID,
186 "about to repeat a seqid");
Triton62f1bd52021-01-09 20:29:12 +0100187 int32_t newSeqId = nextseqid_;
188 if (nextseqid_ == (std::numeric_limits<int32_t>::max)())
189 nextseqid_ = (std::numeric_limits<int32_t>::min)();
190 else
191 ++nextseqid_;
ben-craig02bade12015-07-17 08:40:48 -0500192 seqidToMonitorMap_[newSeqId] = newMonitor_(seqidGuard);
193 return newSeqId;
194}
195
196TConcurrentRecvSentry::TConcurrentRecvSentry(TConcurrentClientSyncInfo *sync, int32_t seqid) :
197 sync_(*sync),
198 seqid_(seqid),
199 committed_(false)
200{
201 sync_.getReadMutex().lock();
202}
203
204TConcurrentRecvSentry::~TConcurrentRecvSentry()
205{
206 {
207 Guard seqidGuard(sync_.seqidMutex_);
208 sync_.deleteMonitor_(seqidGuard, sync_.seqidToMonitorMap_[seqid_]);
209
210 sync_.seqidToMonitorMap_.erase(seqid_);
211 if(committed_)
212 sync_.wakeupAnyone_(seqidGuard);
213 else
214 sync_.markBad_(seqidGuard);
215 }
216 sync_.getReadMutex().unlock();
217}
218
219void TConcurrentRecvSentry::commit()
220{
221 committed_ = true;
222}
223
224TConcurrentSendSentry::TConcurrentSendSentry(TConcurrentClientSyncInfo *sync) :
225 sync_(*sync),
226 committed_(false)
227{
228 sync_.getWriteMutex().lock();
229}
230
231TConcurrentSendSentry::~TConcurrentSendSentry()
232{
233 if(!committed_)
234 {
235 Guard seqidGuard(sync_.seqidMutex_);
236 sync_.markBad_(seqidGuard);
237 }
238 sync_.getWriteMutex().unlock();
239}
240
241void TConcurrentSendSentry::commit()
242{
243 committed_ = true;
244}
245
246
247}}} // apache::thrift::async