Merge "Add multi-region support to the OpenStack plugins"
diff --git a/collectd/files/collectd_http_check.conf b/collectd/files/collectd_http_check.conf
index 6246e3b..8c3673a 100644
--- a/collectd/files/collectd_http_check.conf
+++ b/collectd/files/collectd_http_check.conf
@@ -7,6 +7,9 @@
{%- for name, params in plugin.url.iteritems() %}
ExpectedCode "{{ name }}" "{{ params.expected_code }}"
Url "{{ name }}" "{{ params.url }}"
+ {%- if params.get('expected_content') %}
+ ExpectedContent "{{ name }}" "{{ params.expected_content|replace('"','\\"') }}"
+ {%- endif %}
{%- endfor %}
</Module>
{%- endif %}
diff --git a/collectd/files/plugin/collectd_base.py b/collectd/files/plugin/collectd_base.py
index 7959e75..6643693 100644
--- a/collectd/files/plugin/collectd_base.py
+++ b/collectd/files/plugin/collectd_base.py
@@ -139,7 +139,7 @@
'meta': {'tagA': 'valA'}}
{'type': 'dropped_bytes', 'values': [1,2]}
"""
- raise NotImplemented("Must be implemented by the subclass!")
+ raise NotImplementedError("Must be implemented by the subclass!")
def dispatch_metric(self, metric):
values = metric['values']
diff --git a/collectd/files/plugin/collectd_k8s_kubectl_get.py b/collectd/files/plugin/collectd_k8s_kubectl_get.py
new file mode 100644
index 0000000..1333120
--- /dev/null
+++ b/collectd/files/plugin/collectd_k8s_kubectl_get.py
@@ -0,0 +1,149 @@
+#!/usr/bin/python
+# Copyright 2017 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if __name__ == '__main__':
+ import collectd_fake as collectd
+else:
+ import collectd
+import json
+import time
+
+import collectd_base as base
+
+NAME = 'k8s'
+KUBECTL_BINARY = '/usr/bin/kubectl'
+INTERVAL = 30
+
+
+class K8sPlugin(base.Base):
+
+ def __init__(self, *args, **kwargs):
+ super(K8sPlugin, self).__init__(*args, **kwargs)
+ self.plugin = NAME
+ self._threads = {}
+ self.polling_interval = INTERVAL
+ self.resources = []
+ self._get_nodes = False
+
+ def shutdown_callback(self):
+ for tid, t in self._threads.items():
+ if t.is_alive():
+ self.logger.info('Waiting for {} thread to finish'.format(tid))
+ t.stop()
+ t.join()
+
+ def config_callback(self, config):
+ super(K8sPlugin, self).config_callback(config)
+ for node in config.children:
+ if node.key == 'PollingInterval':
+ self.polling_interval = int(node.values[0])
+ elif node.key == 'GetNodes':
+ if node.values[0].lower() == 'true':
+ self._get_nodes = True
+
+ def kubectl_get(self, resource):
+
+ def kubectl_poller():
+ cmd = [KUBECTL_BINARY, 'get', '-o', 'json', resource]
+ data = self.execute_to_json(cmd, shell=False, log_error=True)
+ return data.get('items', [])
+
+ if resource not in self._threads:
+ t = base.AsyncPoller(self.collectd,
+ kubectl_poller,
+ self.polling_interval,
+ resource)
+ t.start()
+ self._threads[resource] = t
+
+ t = self._threads[resource]
+ if not t.is_alive():
+ self.logger.warning("Unexpected end of the thread {}".format(
+ t.name))
+ del self._threads[resource]
+ return []
+
+ return t.results
+
+ @staticmethod
+ def _check_conditions(conditions, _type):
+ return all(
+ [cnd.get('status') == 'True' for cnd in conditions
+ if cnd.get('type') == _type]
+ )
+
+ def _iter_node_metrics(self, nodes):
+ if nodes:
+ total, total_ready = (0, 0)
+ for node in nodes:
+ self.logger.debug(node.get('metadata', {}).get('name'))
+ conditions = node.get(
+ 'status', {}).get('conditions', [])
+ if self._check_conditions(conditions, _type='Ready'):
+ total_ready += 1
+ total += 1
+ if total > 0:
+ yield {'values': (100.0 * (total - total_ready)) / total,
+ 'plugin_instance': 'nodes_percent',
+ 'meta': {'status': 'not_ready'},
+ }
+
+ yield {'values': total_ready,
+ 'plugin_instance': 'nodes',
+ 'meta': {'status': 'ready'},
+ }
+ yield {'values': total - total_ready,
+ 'plugin_instance': 'nodes',
+ 'meta': {'status': 'not_ready'},
+ }
+ yield {'values': total,
+ 'plugin_instance': 'nodes_total'
+ }
+
+ def itermetrics(self):
+ if self._get_nodes:
+ items = self.kubectl_get('nodes')
+ return self._iter_node_metrics(items)
+
+
+plugin = K8sPlugin(collectd, disable_check_metric=True)
+
+
+def init_callback():
+ plugin.restore_sigchld()
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def read_callback():
+ plugin.read_callback()
+
+if __name__ == '__main__':
+ collectd.load_configuration(plugin)
+ plugin._get_nodes = True
+ plugin.read_callback()
+ collectd.info('Sleeping for {}s'.format(INTERVAL))
+ time.sleep(INTERVAL)
+ plugin.read_callback()
+ plugin.shutdown_callback()
+else:
+ collectd.register_init(init_callback)
+ collectd.register_config(config_callback)
+ collectd.register_read(read_callback, INTERVAL)
+
+
+
diff --git a/collectd/files/plugin/collectd_openstack.py b/collectd/files/plugin/collectd_openstack.py
index e893a56..a8a72b7 100644
--- a/collectd/files/plugin/collectd_openstack.py
+++ b/collectd/files/plugin/collectd_openstack.py
@@ -376,20 +376,20 @@
_objects.extend(bulk_objs)
- links = resp.get('{}_links'.format(object_name))
- if links is None or self.pagination_limit is None:
- # Either the pagination is not supported or there is
- # no more data
- # In both cases, we got at this stage all the data we
- # can have.
+ if self.pagination_limit is None:
break
- # if there is no 'next' link in the response, all data has
- # been read.
- if len([i for i in links if i.get('rel') == 'next']) == 0:
- break
+ links = resp.get('{}_links'.format(object_name), [])
+ # Glance has not <object>_links section but a 'next' item
+ has_next = len(
+ [i for i in links if i.get('rel') == 'next']) > 0 or \
+ resp.get('next')
- _opts['marker'] = bulk_objs[-1]['id']
+ if has_next:
+ _opts['marker'] = bulk_objs[-1]['id']
+ else:
+ # all data has been read
+ break
if not has_failure:
self._last_run = last_run
diff --git a/collectd/files/plugin/http_check.py b/collectd/files/plugin/http_check.py
index 9009801..1002928 100644
--- a/collectd/files/plugin/http_check.py
+++ b/collectd/files/plugin/http_check.py
@@ -40,6 +40,7 @@
)
self.urls = {}
self.expected_codes = {}
+ self.expected_contents = {}
def config_callback(self, config):
super(HTTPCheckPlugin, self).config_callback(config)
@@ -48,6 +49,8 @@
self.urls[node.values[0]] = node.values[1]
elif node.key == 'ExpectedCode':
self.expected_codes[node.values[0]] = int(node.values[1])
+ elif node.key == 'ExpectedContent':
+ self.expected_contents[node.values[0]] = node.values[1]
def itermetrics(self):
for name, url in self.urls.items():
@@ -57,7 +60,7 @@
self.logger.warning("Got exception for '{}': {}".format(
url, e)
)
- yield {'type_instance': name, 'values': self.FAIL}
+ status = self.FAIL
else:
expected_code = self.expected_codes.get(name, 200)
@@ -67,11 +70,21 @@
"while {} is expected").format(name, url,
r.status_code,
expected_code))
- yield {'type_instance': name, 'values': self.FAIL}
+ status = self.FAIL
else:
self.logger.debug(
"Got response from {}: '{}'".format(url, r.content))
- yield {'type_instance': name, 'values': self.OK}
+ status = self.OK
+ expected_content = self.expected_contents.get(name)
+ if expected_content:
+ if r.content != expected_content:
+ status = self.FAIL
+ self.logger.warning(
+ 'Content "{}" does not match "{}"'.format(
+ r.content[0:30], expected_content
+ ))
+
+ yield {'type_instance': name, 'values': status }
plugin = HTTPCheckPlugin(collectd, disable_check_metric=True)