Improve Elasticsearch collectd plugin
This change modifies the Elastcisearch plugin to retrieve the cluster
metrics only from the node that is the elected master. This avoids
sending and storing duplicated metrics into InfluxDB.
diff --git a/collectd/files/plugin/elasticsearch_cluster.py b/collectd/files/plugin/elasticsearch_cluster.py
index c3dcf37..f60c1bc 100644
--- a/collectd/files/plugin/elasticsearch_cluster.py
+++ b/collectd/files/plugin/elasticsearch_cluster.py
@@ -35,6 +35,7 @@
self.plugin = NAME
self.address = '127.0.0.1'
self.port = 9200
+ self._node_id = None
self.session = requests.Session()
self.url = None
self.session.mount(
@@ -51,25 +52,41 @@
if node.key == 'Port':
self.port = node.values[0]
- self.url = "http://{address}:{port}/_cluster/health".format(
+ self.url = "http://{address}:{port}/".format(
**{
'address': self.address,
'port': int(self.port),
})
- def itermetrics(self):
+ def query_api(self, resource):
+ url = "{}{}".format(self.url, resource)
try:
- r = self.session.get(self.url)
+ r = self.session.get(url)
except Exception as e:
- msg = "Got exception for '{}': {}".format(self.url, e)
+ msg = "Got exception for '{}': {}".format(url, e)
raise base.CheckException(msg)
if r.status_code != 200:
- msg = "{} responded with code {}".format(
- self.url, r.status_code)
+ msg = "{} responded with code {}".format(url, r.status_code)
raise base.CheckException(msg)
- data = r.json()
+ return r.json()
+
+ @property
+ def node_id(self):
+ if self._node_id is None:
+ local_node = self.query_api('_nodes/_local')
+ self._node_id = local_node.get('nodes', {}).keys()[0]
+
+ return self._node_id
+
+ def itermetrics(self):
+ # Collect cluster metrics only from the elected master
+ master_node = self.query_api('_cluster/state/master_node')
+ if master_node.get('master_node', '') != self.node_id:
+ return
+
+ data = self.query_api('_cluster/health')
self.logger.debug("Got response from Elasticsearch: '%s'" % data)
yield {