blob: 8997a230bbfe835a0a64ddfe23e969e442c4a482 [file] [log] [blame]
ben-craig02bade12015-07-17 08:40:48 -05001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19#ifndef _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_
20#define _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ 1
21
22#include <thrift/protocol/TProtocol.h>
23#include <thrift/concurrency/Mutex.h>
24#include <thrift/concurrency/Monitor.h>
25#include <boost/shared_ptr.hpp>
26#include <vector>
27#include <string>
28#include <map>
29
30namespace apache { namespace thrift { namespace async {
31
32class TConcurrentClientSyncInfo;
33
34class TConcurrentSendSentry
35{
36public:
37 explicit TConcurrentSendSentry(TConcurrentClientSyncInfo *sync);
38 ~TConcurrentSendSentry();
39
40 void commit();
41private:
42 TConcurrentClientSyncInfo &sync_;
43 bool committed_;
44};
45
46class TConcurrentRecvSentry
47{
48public:
49 TConcurrentRecvSentry(TConcurrentClientSyncInfo *sync, int32_t seqid);
50 ~TConcurrentRecvSentry();
51
52 void commit();
53private:
54 TConcurrentClientSyncInfo &sync_;
55 int32_t seqid_;
56 bool committed_;
57};
58
59class TConcurrentClientSyncInfo
60{
61private: //typedefs
62 typedef boost::shared_ptr< ::apache::thrift::concurrency::Monitor> MonitorPtr;
63 typedef std::map<int32_t, MonitorPtr> MonitorMap;
64public:
65 TConcurrentClientSyncInfo();
66
67 int32_t generateSeqId();
68
69 bool getPending(
70 std::string &fname,
71 ::apache::thrift::protocol::TMessageType &mtype,
72 int32_t &rseqid); /* requires readMutex_ */
73
74 void updatePending(
75 const std::string &fname,
76 ::apache::thrift::protocol::TMessageType mtype,
77 int32_t rseqid); /* requires readMutex_ */
78
79 void waitForWork(int32_t seqid); /* requires readMutex_ */
80
81 ::apache::thrift::concurrency::Mutex &getReadMutex() {return readMutex_;}
82 ::apache::thrift::concurrency::Mutex &getWriteMutex() {return writeMutex_;}
83
84private: //constants
85 enum {MONITOR_CACHE_SIZE = 10};
86private: //functions
87 MonitorPtr newMonitor_(
88 const ::apache::thrift::concurrency::Guard &seqidGuard); /* requires seqidMutex_ */
89 void deleteMonitor_(
90 const ::apache::thrift::concurrency::Guard &seqidGuard,
91 MonitorPtr &m); /*noexcept*/ /* requires seqidMutex_ */
92 void wakeupAnyone_(
93 const ::apache::thrift::concurrency::Guard &seqidGuard); /* requires seqidMutex_ */
94 void markBad_(
95 const ::apache::thrift::concurrency::Guard &seqidGuard); /* requires seqidMutex_ */
96 void throwBadSeqId_();
97 void throwDeadConnection_();
98private: //data members
99
100 volatile bool stop_;
101
102 ::apache::thrift::concurrency::Mutex seqidMutex_;
103 // begin seqidMutex_ protected members
104 int32_t nextseqid_;
105 MonitorMap seqidToMonitorMap_;
106 std::vector<MonitorPtr> freeMonitors_;
107 // end seqidMutex_ protected members
108
109 ::apache::thrift::concurrency::Mutex writeMutex_;
110
111 ::apache::thrift::concurrency::Mutex readMutex_;
112 // begin readMutex_ protected members
113 bool recvPending_;
114 bool wakeupSomeone_;
115 int32_t seqidPending_;
116 std::string fnamePending_;
117 ::apache::thrift::protocol::TMessageType mtypePending_;
118 // end readMutex_ protected members
119
120
121 friend class TConcurrentSendSentry;
122 friend class TConcurrentRecvSentry;
123};
124
125}}} // apache::thrift::async
126
127#endif // _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_