blob: 1150282bb485dfce166e1c729c79da14cd7997cf [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <algorithm>
21#include <iostream>
22
23#include "TSocketPool.h"
24
25namespace apache { namespace thrift { namespace transport {
26
27using namespace std;
28
29using boost::shared_ptr;
30
31/**
32 * TSocketPoolServer implementation
33 *
34 */
35TSocketPoolServer::TSocketPoolServer()
36 : host_(""),
37 port_(0),
38 socket_(-1),
39 lastFailTime_(0),
40 consecutiveFailures_(0) {}
41
42/**
43 * Constructor for TSocketPool server
44 */
45TSocketPoolServer::TSocketPoolServer(const string &host, int port)
46 : host_(host),
47 port_(port),
48 socket_(-1),
49 lastFailTime_(0),
50 consecutiveFailures_(0) {}
51
52/**
53 * TSocketPool implementation.
54 *
55 */
56
57TSocketPool::TSocketPool() : TSocket(),
58 numRetries_(1),
59 retryInterval_(60),
60 maxConsecutiveFailures_(1),
61 randomize_(true),
62 alwaysTryLast_(true) {
63}
64
65TSocketPool::TSocketPool(const vector<string> &hosts,
66 const vector<int> &ports) : TSocket(),
67 numRetries_(1),
68 retryInterval_(60),
69 maxConsecutiveFailures_(1),
70 randomize_(true),
71 alwaysTryLast_(true)
72{
73 if (hosts.size() != ports.size()) {
74 GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
75 throw TTransportException(TTransportException::BAD_ARGS);
76 }
77
78 for (unsigned int i = 0; i < hosts.size(); ++i) {
79 addServer(hosts[i], ports[i]);
80 }
81}
82
83TSocketPool::TSocketPool(const vector<pair<string, int> >& servers) : TSocket(),
84 numRetries_(1),
85 retryInterval_(60),
86 maxConsecutiveFailures_(1),
87 randomize_(true),
88 alwaysTryLast_(true)
89{
90 for (unsigned i = 0; i < servers.size(); ++i) {
91 addServer(servers[i].first, servers[i].second);
92 }
93}
94
95TSocketPool::TSocketPool(const vector< shared_ptr<TSocketPoolServer> >& servers) : TSocket(),
96 servers_(servers),
97 numRetries_(1),
98 retryInterval_(60),
99 maxConsecutiveFailures_(1),
100 randomize_(true),
101 alwaysTryLast_(true)
102{
103}
104
105TSocketPool::TSocketPool(const string& host, int port) : TSocket(),
106 numRetries_(1),
107 retryInterval_(60),
108 maxConsecutiveFailures_(1),
109 randomize_(true),
110 alwaysTryLast_(true)
111{
112 addServer(host, port);
113}
114
115TSocketPool::~TSocketPool() {
116 vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
117 vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
118 for (; iter != iterEnd; ++iter) {
119 setCurrentServer(*iter);
120 TSocketPool::close();
121 }
122}
123
124void TSocketPool::addServer(const string& host, int port) {
125 servers_.push_back(shared_ptr<TSocketPoolServer>(new TSocketPoolServer(host, port)));
126}
127
128void TSocketPool::setServers(const vector< shared_ptr<TSocketPoolServer> >& servers) {
129 servers_ = servers;
130}
131
132void TSocketPool::getServers(vector< shared_ptr<TSocketPoolServer> >& servers) {
133 servers = servers_;
134}
135
136void TSocketPool::setNumRetries(int numRetries) {
137 numRetries_ = numRetries;
138}
139
140void TSocketPool::setRetryInterval(int retryInterval) {
141 retryInterval_ = retryInterval;
142}
143
144
145void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) {
146 maxConsecutiveFailures_ = maxConsecutiveFailures;
147}
148
149void TSocketPool::setRandomize(bool randomize) {
150 randomize_ = randomize;
151}
152
153void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) {
154 alwaysTryLast_ = alwaysTryLast;
155}
156
157void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) {
158 currentServer_ = server;
159 host_ = server->host_;
160 port_ = server->port_;
161 socket_ = server->socket_;
162}
163
164/* TODO: without apc we ignore a lot of functionality from the php version */
165void TSocketPool::open() {
166 if (randomize_) {
167 random_shuffle(servers_.begin(), servers_.end());
168 }
169
170 unsigned int numServers = servers_.size();
171 for (unsigned int i = 0; i < numServers; ++i) {
172
173 shared_ptr<TSocketPoolServer> &server = servers_[i];
174 bool retryIntervalPassed = (server->lastFailTime_ == 0);
175 bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
176
177 // Impersonate the server socket
178 setCurrentServer(server);
179
180 if (isOpen()) {
181 // already open means we're done
182 return;
183 }
184
185 if (server->lastFailTime_ > 0) {
186 // The server was marked as down, so check if enough time has elapsed to retry
187 int elapsedTime = time(NULL) - server->lastFailTime_;
188 if (elapsedTime > retryInterval_) {
189 retryIntervalPassed = true;
190 }
191 }
192
193 if (retryIntervalPassed || isLastServer) {
194 for (int j = 0; j < numRetries_; ++j) {
195 try {
196 TSocket::open();
197
198 // Copy over the opened socket so that we can keep it persistent
199 server->socket_ = socket_;
200
201 // reset lastFailTime_ is required
202 if (server->lastFailTime_) {
203 server->lastFailTime_ = 0;
204 }
205
206 // success
207 return;
208 } catch (TException e) {
209 string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what();
210 GlobalOutput(errStr.c_str());
211 // connection failed
212 }
213 }
214
215 ++server->consecutiveFailures_;
216 if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
217 // Mark server as down
218 server->consecutiveFailures_ = 0;
219 server->lastFailTime_ = time(NULL);
220 }
221 }
222 }
223
224 GlobalOutput("TSocketPool::open: all connections failed");
225 throw TTransportException(TTransportException::NOT_OPEN);
226}
227
228void TSocketPool::close() {
229 if (isOpen()) {
230 TSocket::close();
231 currentServer_->socket_ = -1;
232 }
233}
234
235}}} // apache::thrift::transport