blob: 0bc5eb565f5b5c11c77dc33f161ecad8b7866593 [file] [log] [blame]
ben-craig02bade12015-07-17 08:40:48 -05001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19#ifndef _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_
20#define _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ 1
21
22#include <thrift/protocol/TProtocol.h>
23#include <thrift/concurrency/Mutex.h>
24#include <thrift/concurrency/Monitor.h>
cyy316723a2019-01-05 16:35:14 +080025#include <memory>
ben-craig02bade12015-07-17 08:40:48 -050026#include <vector>
27#include <string>
28#include <map>
29
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010030namespace apache {
31namespace thrift {
32namespace async {
ben-craig02bade12015-07-17 08:40:48 -050033
34class TConcurrentClientSyncInfo;
35
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010036class TConcurrentSendSentry {
ben-craig02bade12015-07-17 08:40:48 -050037public:
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010038 explicit TConcurrentSendSentry(TConcurrentClientSyncInfo* sync);
ben-craig02bade12015-07-17 08:40:48 -050039 ~TConcurrentSendSentry();
40
41 void commit();
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010042
ben-craig02bade12015-07-17 08:40:48 -050043private:
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010044 TConcurrentClientSyncInfo& sync_;
ben-craig02bade12015-07-17 08:40:48 -050045 bool committed_;
46};
47
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010048class TConcurrentRecvSentry {
ben-craig02bade12015-07-17 08:40:48 -050049public:
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010050 TConcurrentRecvSentry(TConcurrentClientSyncInfo* sync, int32_t seqid);
ben-craig02bade12015-07-17 08:40:48 -050051 ~TConcurrentRecvSentry();
52
53 void commit();
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010054
ben-craig02bade12015-07-17 08:40:48 -050055private:
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010056 TConcurrentClientSyncInfo& sync_;
ben-craig02bade12015-07-17 08:40:48 -050057 int32_t seqid_;
58 bool committed_;
59};
60
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010061class TConcurrentClientSyncInfo {
62private: // typedefs
cyy316723a2019-01-05 16:35:14 +080063 typedef std::shared_ptr< ::apache::thrift::concurrency::Monitor> MonitorPtr;
ben-craig02bade12015-07-17 08:40:48 -050064 typedef std::map<int32_t, MonitorPtr> MonitorMap;
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010065
ben-craig02bade12015-07-17 08:40:48 -050066public:
67 TConcurrentClientSyncInfo();
68
69 int32_t generateSeqId();
70
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010071 bool getPending(std::string& fname,
72 ::apache::thrift::protocol::TMessageType& mtype,
73 int32_t& rseqid); /* requires readMutex_ */
ben-craig02bade12015-07-17 08:40:48 -050074
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010075 void updatePending(const std::string& fname,
76 ::apache::thrift::protocol::TMessageType mtype,
77 int32_t rseqid); /* requires readMutex_ */
ben-craig02bade12015-07-17 08:40:48 -050078
79 void waitForWork(int32_t seqid); /* requires readMutex_ */
80
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010081 ::apache::thrift::concurrency::Mutex& getReadMutex() { return readMutex_; }
82 ::apache::thrift::concurrency::Mutex& getWriteMutex() { return writeMutex_; }
ben-craig02bade12015-07-17 08:40:48 -050083
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010084private: // constants
85 enum { MONITOR_CACHE_SIZE = 10 };
86
87private: // functions
ben-craig02bade12015-07-17 08:40:48 -050088 MonitorPtr newMonitor_(
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010089 const ::apache::thrift::concurrency::Guard& seqidGuard); /* requires seqidMutex_ */
90 void deleteMonitor_(const ::apache::thrift::concurrency::Guard& seqidGuard, MonitorPtr& m);
91 /*noexcept*/ /* requires seqidMutex_ */
ben-craig02bade12015-07-17 08:40:48 -050092 void wakeupAnyone_(
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010093 const ::apache::thrift::concurrency::Guard& seqidGuard); /* requires seqidMutex_ */
94 void markBad_(const ::apache::thrift::concurrency::Guard& seqidGuard); /* requires seqidMutex_ */
ben-craig02bade12015-07-17 08:40:48 -050095 void throwBadSeqId_();
96 void throwDeadConnection_();
ben-craig02bade12015-07-17 08:40:48 -050097
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +010098private: // data members
ben-craig02bade12015-07-17 08:40:48 -050099 volatile bool stop_;
100
101 ::apache::thrift::concurrency::Mutex seqidMutex_;
102 // begin seqidMutex_ protected members
103 int32_t nextseqid_;
104 MonitorMap seqidToMonitorMap_;
105 std::vector<MonitorPtr> freeMonitors_;
106 // end seqidMutex_ protected members
107
108 ::apache::thrift::concurrency::Mutex writeMutex_;
109
110 ::apache::thrift::concurrency::Mutex readMutex_;
111 // begin readMutex_ protected members
112 bool recvPending_;
113 bool wakeupSomeone_;
114 int32_t seqidPending_;
115 std::string fnamePending_;
116 ::apache::thrift::protocol::TMessageType mtypePending_;
117 // end readMutex_ protected members
118
ben-craig02bade12015-07-17 08:40:48 -0500119 friend class TConcurrentSendSentry;
120 friend class TConcurrentRecvSentry;
121};
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100122}
123}
124} // apache::thrift::async
ben-craig02bade12015-07-17 08:40:48 -0500125
126#endif // _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_