Thrift now a TLP - INFRA-3116
git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp
new file mode 100644
index 0000000..1150282
--- /dev/null
+++ b/lib/cpp/src/transport/TSocketPool.cpp
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <algorithm>
+#include <iostream>
+
+#include "TSocketPool.h"
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+
+using boost::shared_ptr;
+
+/**
+ * TSocketPoolServer implementation
+ *
+ */
+TSocketPoolServer::TSocketPoolServer()
+ : host_(""),
+ port_(0),
+ socket_(-1),
+ lastFailTime_(0),
+ consecutiveFailures_(0) {}
+
+/**
+ * Constructor for TSocketPool server
+ */
+TSocketPoolServer::TSocketPoolServer(const string &host, int port)
+ : host_(host),
+ port_(port),
+ socket_(-1),
+ lastFailTime_(0),
+ consecutiveFailures_(0) {}
+
+/**
+ * TSocketPool implementation.
+ *
+ */
+
+TSocketPool::TSocketPool() : TSocket(),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true) {
+}
+
+TSocketPool::TSocketPool(const vector<string> &hosts,
+ const vector<int> &ports) : TSocket(),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true)
+{
+ if (hosts.size() != ports.size()) {
+ GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
+ throw TTransportException(TTransportException::BAD_ARGS);
+ }
+
+ for (unsigned int i = 0; i < hosts.size(); ++i) {
+ addServer(hosts[i], ports[i]);
+ }
+}
+
+TSocketPool::TSocketPool(const vector<pair<string, int> >& servers) : TSocket(),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true)
+{
+ for (unsigned i = 0; i < servers.size(); ++i) {
+ addServer(servers[i].first, servers[i].second);
+ }
+}
+
+TSocketPool::TSocketPool(const vector< shared_ptr<TSocketPoolServer> >& servers) : TSocket(),
+ servers_(servers),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true)
+{
+}
+
+TSocketPool::TSocketPool(const string& host, int port) : TSocket(),
+ numRetries_(1),
+ retryInterval_(60),
+ maxConsecutiveFailures_(1),
+ randomize_(true),
+ alwaysTryLast_(true)
+{
+ addServer(host, port);
+}
+
+TSocketPool::~TSocketPool() {
+ vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
+ vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
+ for (; iter != iterEnd; ++iter) {
+ setCurrentServer(*iter);
+ TSocketPool::close();
+ }
+}
+
+void TSocketPool::addServer(const string& host, int port) {
+ servers_.push_back(shared_ptr<TSocketPoolServer>(new TSocketPoolServer(host, port)));
+}
+
+void TSocketPool::setServers(const vector< shared_ptr<TSocketPoolServer> >& servers) {
+ servers_ = servers;
+}
+
+void TSocketPool::getServers(vector< shared_ptr<TSocketPoolServer> >& servers) {
+ servers = servers_;
+}
+
+void TSocketPool::setNumRetries(int numRetries) {
+ numRetries_ = numRetries;
+}
+
+void TSocketPool::setRetryInterval(int retryInterval) {
+ retryInterval_ = retryInterval;
+}
+
+
+void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) {
+ maxConsecutiveFailures_ = maxConsecutiveFailures;
+}
+
+void TSocketPool::setRandomize(bool randomize) {
+ randomize_ = randomize;
+}
+
+void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) {
+ alwaysTryLast_ = alwaysTryLast;
+}
+
+void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) {
+ currentServer_ = server;
+ host_ = server->host_;
+ port_ = server->port_;
+ socket_ = server->socket_;
+}
+
+/* TODO: without apc we ignore a lot of functionality from the php version */
+void TSocketPool::open() {
+ if (randomize_) {
+ random_shuffle(servers_.begin(), servers_.end());
+ }
+
+ unsigned int numServers = servers_.size();
+ for (unsigned int i = 0; i < numServers; ++i) {
+
+ shared_ptr<TSocketPoolServer> &server = servers_[i];
+ bool retryIntervalPassed = (server->lastFailTime_ == 0);
+ bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
+
+ // Impersonate the server socket
+ setCurrentServer(server);
+
+ if (isOpen()) {
+ // already open means we're done
+ return;
+ }
+
+ if (server->lastFailTime_ > 0) {
+ // The server was marked as down, so check if enough time has elapsed to retry
+ int elapsedTime = time(NULL) - server->lastFailTime_;
+ if (elapsedTime > retryInterval_) {
+ retryIntervalPassed = true;
+ }
+ }
+
+ if (retryIntervalPassed || isLastServer) {
+ for (int j = 0; j < numRetries_; ++j) {
+ try {
+ TSocket::open();
+
+ // Copy over the opened socket so that we can keep it persistent
+ server->socket_ = socket_;
+
+ // reset lastFailTime_ is required
+ if (server->lastFailTime_) {
+ server->lastFailTime_ = 0;
+ }
+
+ // success
+ return;
+ } catch (TException e) {
+ string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what();
+ GlobalOutput(errStr.c_str());
+ // connection failed
+ }
+ }
+
+ ++server->consecutiveFailures_;
+ if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
+ // Mark server as down
+ server->consecutiveFailures_ = 0;
+ server->lastFailTime_ = time(NULL);
+ }
+ }
+ }
+
+ GlobalOutput("TSocketPool::open: all connections failed");
+ throw TTransportException(TTransportException::NOT_OPEN);
+}
+
+void TSocketPool::close() {
+ if (isOpen()) {
+ TSocket::close();
+ currentServer_->socket_ = -1;
+ }
+}
+
+}}} // apache::thrift::transport