Network SNMP metering, OpenStack meter checks
diff --git a/collectd/client.sls b/collectd/client.sls
index aed79fa..2047c93 100644
--- a/collectd/client.sls
+++ b/collectd/client.sls
@@ -85,6 +85,19 @@
{%- endif %}
{%- endfor %}
+{%- if pillar.get('external', {}).network_device is defined %}
+{{ client.config_dir }}/network_snmp.conf:
+ file.managed:
+ - source: salt://collectd/files/conf.d/network_snmp.conf
+ - template: jinja
+ - user: root
+ - mode: 660
+ - require:
+ - file: {{ client.config_dir }}
+ - watch_in:
+ - service: collectd_service
+{%- endif %}
+
/etc/collectd/filters.conf:
file.managed:
- source: salt://collectd/files/filters.conf
diff --git a/collectd/files/conf.d/network_metering.conf b/collectd/files/conf.d/network_metering.conf
deleted file mode 100644
index 7623c71..0000000
--- a/collectd/files/conf.d/network_metering.conf
+++ /dev/null
@@ -1,27 +0,0 @@
-LoadPlugin snmp
-<Plugin snmp>
- <Data "std_traffic">
- Type "if_octets"
- Table true
- Instance "1.3.6.1.2.1.31.1.1.1.1"
- Values "1.3.6.1.2.1.31.1.1.1.6" "1.3.6.1.2.1.31.1.1.1.10"
- </Data>
- <Data "std_rate">
- Type "if_packets"
- Table true
- Instance "1.3.6.1.2.1.31.1.1.1.1"
- Values "1.3.6.1.2.1.31.1.1.1.7" "1.3.6.1.2.1.31.1.1.1.11"
- </Data>
-{%- for plugin in pillar.collectd.client.plugins %}
-{%- if plugin_name == plugin.name %}
-{%- for device_name, device in plugin.network_device.iteritems() %}
- <Host "{{ device_name }}">
- Address "{{ device.address }}"
- Version {{ device.version }}
- Community "{{ device.community }}"
- Collect "std_traffic" "std_rate"
- </Host>
-{%- endfor %}
-{%- endif %}
-{%- endfor %}
-</Plugin>
\ No newline at end of file
diff --git a/collectd/files/conf.d/network_snmp.conf b/collectd/files/conf.d/network_snmp.conf
new file mode 100644
index 0000000..a5a9a19
--- /dev/null
+++ b/collectd/files/conf.d/network_snmp.conf
@@ -0,0 +1,25 @@
+{%- if pillar.get('external', {}).network_device is defined %}
+LoadPlugin snmp
+<Plugin snmp>
+ <Data "std_traffic">
+ Type "if_octets"
+ Table true
+ Instance "1.3.6.1.2.1.31.1.1.1.1"
+ Values "1.3.6.1.2.1.31.1.1.1.6" "1.3.6.1.2.1.31.1.1.1.10"
+ </Data>
+ <Data "std_rate">
+ Type "if_packets"
+ Table true
+ Instance "1.3.6.1.2.1.31.1.1.1.1"
+ Values "1.3.6.1.2.1.31.1.1.1.7" "1.3.6.1.2.1.31.1.1.1.11"
+ </Data>
+ {%- for device_name, device in pillar.external.network_device.iteritems() %}
+ <Host "{{ device_name }}">
+ Address "{{ device.address }}"
+ Version {{ device.get('version', '2') }}
+ Community "{{ device.community }}"
+ Collect "std_traffic" "std_rate"
+ </Host>
+ {%- endfor %}
+</Plugin>
+{%- endif %}
\ No newline at end of file
diff --git a/collectd/files/plugin/openstack/base.py b/collectd/files/plugin/openstack/base.py
new file mode 100644
index 0000000..a602c2c
--- /dev/null
+++ b/collectd/files/plugin/openstack/base.py
@@ -0,0 +1,195 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import signal
+import subprocess
+import sys
+import time
+import traceback
+
+import collectd
+
+
+class Base(object):
+ """ Base class for writing Python plugins.
+ """
+
+ MAX_IDENTIFIER_LENGTH = 63
+
+ def __init__(self, *args, **kwargs):
+ self.debug = False
+ self.timeout = 5
+ self.logger = collectd
+ self.plugin = None
+ self.plugin_instance = ''
+
+ def config_callback(self, conf):
+ for node in conf.children:
+ if node.key == "Debug":
+ if node.values[0] in ['True', 'true']:
+ self.debug = True
+ if node.key == "Timeout":
+ self.timeout = int(node.values[0])
+
+ def read_callback(self):
+ try:
+ for metric in self.itermetrics():
+ self.dispatch_metric(metric)
+ except Exception as e:
+ self.logger.error('%s: Failed to get metrics: %s: %s' %
+ (self.plugin, e, traceback.format_exc()))
+ return
+
+ def itermetrics(self):
+ """
+ Iterate over the collected metrics
+
+ This class must be implemented by the subclass and should yield dict
+ objects that represent the collected values. Each dict has 3 keys:
+ - 'values', a scalar number or a list of numbers if the type
+ defines several datasources.
+ - 'type_instance' (optional)
+ - 'type' (optional, default='gauge')
+
+ For example:
+
+ {'type_instance':'foo', 'values': 1}
+ {'type_instance':'bar', 'type': 'DERIVE', 'values': 1}
+ {'type': 'dropped_bytes', 'values': [1,2]}
+ """
+ raise NotImplemented("Must be implemented by the subclass!")
+
+ def dispatch_metric(self, metric):
+ values = metric['values']
+ if not isinstance(values, list) and not isinstance(values, tuple):
+ values = (values,)
+
+ type_instance = str(metric.get('type_instance', ''))
+ if len(type_instance) > self.MAX_IDENTIFIER_LENGTH:
+ self.logger.warning(
+ '%s: Identifier "%s..." too long (length: %d, max limit: %d)' %
+ (self.plugin, type_instance[:24], len(type_instance),
+ self.MAX_IDENTIFIER_LENGTH))
+
+ v = collectd.Values(
+ plugin=self.plugin,
+ type=metric.get('type', 'gauge'),
+ plugin_instance=self.plugin_instance,
+ type_instance=type_instance,
+ values=values,
+ # w/a for https://github.com/collectd/collectd/issues/716
+ meta={'0': True}
+ )
+ v.dispatch()
+
+ def execute(self, cmd, shell=True, cwd=None):
+ """
+ Executes a program with arguments.
+
+ Args:
+ cmd: a list of program arguments where the first item is the
+ program name.
+ shell: whether to use the shell as the program to execute (default=
+ True).
+ cwd: the directory to change to before running the program
+ (default=None).
+
+ Returns:
+ A tuple containing the standard output and error strings if the
+ program execution has been successful.
+
+ ("foobar\n", "")
+
+ None if the command couldn't be executed or returned a non-zero
+ status code
+ """
+ start_time = time.time()
+ try:
+ proc = subprocess.Popen(
+ cmd,
+ cwd=cwd,
+ shell=shell,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE
+ )
+ (stdout, stderr) = proc.communicate()
+ stdout = stdout.rstrip('\n')
+ except Exception as e:
+ self.logger.error("Cannot execute command '%s': %s : %s" %
+ (cmd, str(e), traceback.format_exc()))
+ return None
+
+ returncode = proc.returncode
+
+ if returncode != 0:
+ self.logger.error("Command '%s' failed (return code %d): %s" %
+ (cmd, returncode, stderr))
+ return None
+ elapsedtime = time.time() - start_time
+
+ if self.debug:
+ self.logger.info("Command '%s' returned %s in %0.3fs" %
+ (cmd, returncode, elapsedtime))
+
+ if not stdout and self.debug:
+ self.logger.info("Command '%s' returned no output!", cmd)
+
+ return (stdout, stderr)
+
+ def execute_to_json(self, *args, **kwargs):
+ """
+ Executes a program and decodes the output as a JSON string.
+
+ See execute().
+
+ Returns:
+ A Python object or None if the execution of the program failed.
+ """
+ outputs = self.execute(*args, **kwargs)
+ if outputs:
+ return json.loads(outputs[0])
+ return
+
+ @staticmethod
+ def restore_sigchld():
+ """
+ Restores the SIGCHLD handler for Python <= v2.6.
+
+ This should be provided to collectd as the init callback by plugins
+ that execute external programs.
+
+ Note that it will BREAK the exec plugin!!!
+
+ See https://github.com/deniszh/collectd-iostat-python/issues/2 for
+ details.
+ """
+ if sys.version_info[0] == 2 and sys.version_info[1] <= 6:
+ signal.signal(signal.SIGCHLD, signal.SIG_DFL)
+
+
+class CephBase(Base):
+
+ def __init__(self, *args, **kwargs):
+ super(CephBase, self).__init__(*args, **kwargs)
+ self.cluster = 'ceph'
+
+ def config_callback(self, conf):
+ super(CephBase, self).config_callback(conf)
+
+ for node in conf.children:
+ if node.key == "Cluster":
+ self.cluster = node.values[0]
+ self.plugin_instance = self.cluster
diff --git a/collectd/files/plugin/openstack/build_ceph_perf_types.py b/collectd/files/plugin/openstack/build_ceph_perf_types.py
new file mode 100755
index 0000000..bbb49f3
--- /dev/null
+++ b/collectd/files/plugin/openstack/build_ceph_perf_types.py
@@ -0,0 +1,83 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+import os
+import subprocess
+import sys
+
+
+class CephPerfCollectionSchema(object):
+
+ def __init__(self, collection, schema):
+ self.collection = collection
+ self.schema = schema
+
+ def __str__(self):
+ def sanitize(s):
+ return s.replace('::', '_').replace('-', '_').lower()
+
+ return '\n'.join(['%s_%s value:GAUGE:U:U' % (sanitize(self.collection),
+ sanitize(k))
+ for k in sorted(self.schema.iterkeys())])
+
+
+class CephPerfSchema(object):
+
+ def __init__(self, socket_path):
+ self.socket_path = socket_path
+
+ @staticmethod
+ def run_command(cmd):
+ try:
+ proc = subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE
+ )
+ (stdout, stderr) = proc.communicate()
+ stdout = stdout.rstrip('\n')
+ except Exception as e:
+ print "Cannot execute command '%s': %s" % (cmd, str(e))
+ raise e
+
+ return json.loads(stdout)
+
+ def ceph_version(self):
+ cmd = ['/usr/bin/ceph', '--admin-daemon', self.socket_path, 'version']
+ return self.run_command(cmd).get('version')
+
+ def itertypes(self):
+ cmd = ['/usr/bin/ceph', '--admin-daemon', self.socket_path, 'perf',
+ 'schema']
+
+ for collection, schema in self.run_command(cmd).iteritems():
+ yield CephPerfCollectionSchema(collection, schema)
+
+
+def main():
+ script_name = os.path.basename(sys.argv[0])
+ if len(sys.argv) < 2 or len(sys.argv) > 3:
+ print "usage: %s <Ceph OSD socket> [namespace]" % script_name
+ else:
+ schema = CephPerfSchema(sys.argv[1])
+ collection = sys.argv[2] if len(sys.argv) == 3 else None
+ print "# File generated automatically by the %s script" % script_name
+ print "# Ceph version: %s" % schema.ceph_version()
+ for item in schema.itertypes():
+ if collection is None or item.collection == collection:
+ print item
+
+if __name__ == '__main__':
+ main()
diff --git a/collectd/files/plugin/openstack/ceph_osd_perf.py b/collectd/files/plugin/openstack/ceph_osd_perf.py
new file mode 100644
index 0000000..f01975d
--- /dev/null
+++ b/collectd/files/plugin/openstack/ceph_osd_perf.py
@@ -0,0 +1,99 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import base
+import collectd
+import glob
+import re
+
+
+INTERVAL = 60
+RE_OSD_ID = re.compile(".*?osd\.(\d+)\.asok$")
+
+
+class CephOSDPerfPlugin(base.CephBase):
+ """ Collect OSD performance counters of all OSD daemons running on the host.
+ """
+
+ # Collect only metrics from the 'osd' namespace
+ PREFIXES = ('osd')
+
+ def __init__(self, *args, **kwargs):
+ super(CephOSDPerfPlugin, self).__init__(*args, **kwargs)
+ self.plugin = 'ceph_osd_perf'
+ self.socket_glob = None
+
+ def config_callback(self, conf):
+ super(CephOSDPerfPlugin, self).config_callback(conf)
+ for node in conf.children:
+ if node.key == "AdminSocket":
+ self.socket_glob = node.values[0]
+
+ if not self.socket_glob:
+ raise Exception("AdminSocket not defined")
+
+ @staticmethod
+ def convert_to_collectd_value(value):
+ if isinstance(value, dict):
+ if value['avgcount'] > 0:
+ # See https://www.mail-archive.com/ceph-users@lists.ceph.com/msg18705.html
+ return value['sum'] / value['avgcount']
+ else:
+ return 0.0
+ else:
+ return value
+
+ @staticmethod
+ def convert_to_collectd_type(*args):
+ return '_'.join([s.replace('::', '_').replace('-', '_').lower() for s
+ in args])
+
+ def itermetrics(self):
+ for socket_name in glob.glob(self.socket_glob):
+ m = RE_OSD_ID.match(socket_name)
+ if not m:
+ continue
+
+ osd_id = m.group(1)
+ perf_dump = self.execute_to_json('ceph --admin-daemon %s perf dump'
+ % socket_name)
+ for prefix, stats in perf_dump.iteritems():
+ if prefix not in self.PREFIXES or not stats:
+ continue
+
+ for k in sorted(stats.iterkeys()):
+ yield {
+ 'type': self.convert_to_collectd_type(prefix, k),
+ 'type_instance': osd_id,
+ 'values': self.convert_to_collectd_value(stats[k])
+ }
+
+plugin = CephOSDPerfPlugin()
+
+
+def init_callback():
+ plugin.restore_sigchld()
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_init(init_callback)
+collectd.register_config(config_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/ceph_osd_stats.py b/collectd/files/plugin/openstack/ceph_osd_stats.py
new file mode 100644
index 0000000..da83c68
--- /dev/null
+++ b/collectd/files/plugin/openstack/ceph_osd_stats.py
@@ -0,0 +1,66 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import base
+import collectd
+
+INTERVAL = 60
+
+
+class CephOSDStatsPlugin(base.CephBase):
+ """ Collect per OSD stats about store size and commit latency."""
+
+ def __init__(self, *args, **kwargs):
+ super(CephOSDStatsPlugin, self).__init__(*args, **kwargs)
+ self.plugin = 'ceph_osd'
+
+ def itermetrics(self):
+ osd_stats = self.execute_to_json('ceph pg dump osds --format json')
+ if not osd_stats:
+ return
+
+ for osd in osd_stats:
+ osd_id = osd['osd']
+
+ yield {
+ 'type_instance': osd_id,
+ 'type': 'osd_space',
+ 'values': [osd['kb_used'] * 1000, osd['kb'] * 1000],
+ }
+
+ yield {
+ 'type_instance': osd_id,
+ 'type': 'osd_latency',
+ 'values': [osd['fs_perf_stat']['apply_latency_ms'],
+ osd['fs_perf_stat']['commit_latency_ms']],
+ }
+
+plugin = CephOSDStatsPlugin()
+
+
+def init_callback():
+ plugin.restore_sigchld()
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_init(init_callback)
+collectd.register_config(config_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/ceph_pg_mon_status.py b/collectd/files/plugin/openstack/ceph_pg_mon_status.py
new file mode 100644
index 0000000..dba4238
--- /dev/null
+++ b/collectd/files/plugin/openstack/ceph_pg_mon_status.py
@@ -0,0 +1,98 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import base
+import collectd
+
+
+INTERVAL = 30
+HEALTH_MAP = {
+ 'HEALTH_OK': 1,
+ 'HEALTH_WARN': 2,
+ 'HEALTH_ERR': 3,
+}
+
+
+class CephMonPlugin(base.CephBase):
+ """ Collect states and information about ceph cluster and placement groups.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(CephMonPlugin, self).__init__(*args, **kwargs)
+ self.plugin = 'ceph_mon'
+
+ def itermetrics(self):
+ status = self.execute_to_json('ceph -s --format json')
+ if not status:
+ return
+
+ yield {
+ 'type': 'health',
+ 'values': HEALTH_MAP[status['health']['overall_status']],
+ }
+
+ if 'mons' in status['monmap']:
+ monitor_nb = len(status['monmap']['mons'])
+ else:
+ monitor_nb = 0
+ yield {
+ 'type': 'monitor_count',
+ 'values': monitor_nb
+ }
+
+ yield {
+ 'type': 'quorum_count',
+ 'values': len(status.get('quorum', []))
+ }
+
+ pgmap = status['pgmap']
+ yield {
+ 'type': 'pg_bytes',
+ 'values': [pgmap['bytes_used'], pgmap['bytes_avail'],
+ pgmap['bytes_total']],
+ }
+ yield {
+ 'type': 'pg_data_bytes',
+ 'values': pgmap['data_bytes']
+ }
+ yield {
+ 'type': 'pg_count',
+ 'values': pgmap['num_pgs']
+ }
+
+ for state in pgmap['pgs_by_state']:
+ yield {
+ 'type': 'pg_state_count',
+ 'type_instance': state['state_name'],
+ 'values': state['count']
+ }
+
+plugin = CephMonPlugin()
+
+
+def init_callback():
+ plugin.restore_sigchld()
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_init(init_callback)
+collectd.register_config(config_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/ceph_pool_osd.py b/collectd/files/plugin/openstack/ceph_pool_osd.py
new file mode 100644
index 0000000..2668b03
--- /dev/null
+++ b/collectd/files/plugin/openstack/ceph_pool_osd.py
@@ -0,0 +1,135 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import base
+import collectd
+
+INTERVAL = 60
+
+
+class CephPoolPlugin(base.CephBase):
+ """ Collect Ceph pool metrics and OSD daemons state"""
+
+ def __init__(self, *args, **kwargs):
+ super(CephPoolPlugin, self).__init__(*args, **kwargs)
+ self.plugin = 'ceph_pool'
+
+ def itermetrics(self):
+ df = self.execute_to_json('ceph df --format json')
+ if not df:
+ return
+
+ objects_count = 0
+ for pool in df['pools']:
+ objects_count += pool['stats'].get('objects', 0)
+ for m in ('bytes_used', 'max_avail', 'objects'):
+ yield {
+ 'type': 'pool_%s' % m,
+ 'type_instance': pool['name'],
+ 'values': pool['stats'].get(m, 0),
+ }
+
+ yield {
+ 'type': 'objects_count',
+ 'values': objects_count
+ }
+ yield {
+ 'type': 'pool_count',
+ 'values': len(df['pools'])
+ }
+
+ if 'total_bytes' in df['stats']:
+ # compatibility with 0.84+
+ total = df['stats']['total_bytes']
+ used = df['stats']['total_used_bytes']
+ avail = df['stats']['total_avail_bytes']
+ else:
+ # compatibility with <0.84
+ total = df['stats']['total_space'] * 1024
+ used = df['stats']['total_used'] * 1024
+ avail = df['stats']['total_avail'] * 1024
+
+ yield {
+ 'type': 'pool_total_bytes',
+ 'values': [used, avail, total]
+ }
+ yield {
+ 'type': 'pool_total_percent',
+ 'values': [100.0 * used / total, 100.0 * avail / total]
+ }
+
+ stats = self.execute_to_json('ceph osd pool stats --format json')
+ if not stats:
+ return
+
+ for pool in stats:
+ client_io_rate = pool.get('client_io_rate', {})
+ yield {
+ 'type': 'pool_bytes_rate',
+ 'type_instance': pool['pool_name'],
+ 'values': [client_io_rate.get('read_bytes_sec', 0),
+ client_io_rate.get('write_bytes_sec', 0)]
+ }
+ yield {
+ 'type': 'pool_ops_rate',
+ 'type_instance': pool['pool_name'],
+ 'values': client_io_rate.get('op_per_sec', 0)
+ }
+
+ osd = self.execute_to_json('ceph osd dump --format json')
+ if not osd:
+ return
+
+ for pool in osd['pools']:
+ for name in ('size', 'pg_num', 'pg_placement_num'):
+ yield {
+ 'type': 'pool_%s' % name,
+ 'type_instance': pool['pool_name'],
+ 'values': pool[name]
+ }
+
+ _up, _down, _in, _out = (0, 0, 0, 0)
+ for osd in osd['osds']:
+ if osd['up'] == 1:
+ _up += 1
+ else:
+ _down += 1
+ if osd['in'] == 1:
+ _in += 1
+ else:
+ _out += 1
+
+ yield {
+ 'type': 'osd_count',
+ 'values': [_up, _down, _in, _out]
+ }
+
+plugin = CephPoolPlugin()
+
+
+def init_callback():
+ plugin.restore_sigchld()
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_init(init_callback)
+collectd.register_config(config_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/check_openstack_api.py b/collectd/files/plugin/openstack/check_openstack_api.py
new file mode 100644
index 0000000..1262bdf
--- /dev/null
+++ b/collectd/files/plugin/openstack/check_openstack_api.py
@@ -0,0 +1,132 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Collectd plugin for checking the status of OpenStack API services
+import collectd
+import openstack
+
+from urlparse import urlparse
+
+PLUGIN_NAME = 'check_openstack_api'
+INTERVAL = openstack.INTERVAL
+
+
+class APICheckPlugin(openstack.CollectdPlugin):
+ """ Class to check the status of OpenStack API services.
+ """
+ FAIL = 0
+ OK = 1
+ UNKNOWN = 2
+
+ # TODO: sahara, murano
+ CHECK_MAP = {
+ 'keystone': {
+ 'path': '/', 'expect': 300, 'name': 'keystone-public-api'},
+ 'heat': {'path': '/', 'expect': 300, 'name': 'heat-api'},
+ 'heat-cfn': {'path': '/', 'expect': 300, 'name': 'heat-cfn-api'},
+ 'glance': {'path': '/', 'expect': 300, 'name': 'glance-api'},
+ 'cinder': {'path': '/', 'expect': 200, 'name': 'cinder-api'},
+ 'cinderv2': {'path': '/', 'expect': 200, 'name': 'cinder-v2-api'},
+ 'neutron': {'path': '/', 'expect': 200, 'name': 'neutron-api'},
+ 'nova': {'path': '/', 'expect': 200, 'name': 'nova-api'},
+ # Ceilometer requires authentication for all paths
+ 'ceilometer': {
+ 'path': 'v2/capabilities', 'expect': 200, 'auth': True,
+ 'name': 'ceilometer-api'},
+ 'swift': {'path': 'healthcheck', 'expect': 200, 'name': 'swift-api'},
+ 'swift_s3': {
+ 'path': 'healthcheck', 'expect': 200, 'name': 'swift-s3-api'},
+ }
+
+ def _service_url(self, endpoint, path):
+ url = urlparse(endpoint)
+ u = '%s://%s' % (url.scheme, url.netloc)
+ if path != '/':
+ u = '%s/%s' % (u, path)
+ return u
+
+ def check_api(self):
+ """ Check the status of all the API services.
+
+ Yields a list of dict items with 'service', 'status' (either OK,
+ FAIL or UNKNOWN) and 'region' keys.
+ """
+ catalog = self.service_catalog
+ for service in catalog:
+ name = service['name']
+ if name not in self.CHECK_MAP:
+ self.logger.notice("No check found for service '%s', skipping it" % name)
+ status = self.UNKNOWN
+ else:
+ check = self.CHECK_MAP[name]
+ url = self._service_url(service['url'], check['path'])
+ r = self.raw_get(url, token_required=check.get('auth', False))
+
+ if r is None or r.status_code != check['expect']:
+ def _status(ret):
+ return 'N/A' if r is None else r.status_code
+
+ self.logger.notice(
+ "Service %s check failed "
+ "(returned '%s' but expected '%s')" % (
+ name, _status(r), check['expect'])
+ )
+ status = self.FAIL
+ else:
+ status = self.OK
+
+ yield {
+ 'service': check.get('name', name),
+ 'status': status,
+ 'region': service['region']
+ }
+
+ @openstack.read_callback_wrapper
+ def read_callback(self):
+ for item in self.check_api():
+ if item['status'] == self.UNKNOWN:
+ # skip if status is UNKNOWN
+ continue
+
+ value = collectd.Values(
+ plugin=PLUGIN_NAME,
+ plugin_instance=item['service'],
+ type='gauge',
+ type_instance=item['region'],
+ interval=INTERVAL,
+ values=[item['status']],
+ # w/a for https://github.com/collectd/collectd/issues/716
+ meta={'0': True}
+ )
+ value.dispatch()
+
+
+plugin = APICheckPlugin(collectd)
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def notification_callback(notification):
+ plugin.notification_callback(notification)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_config(config_callback)
+collectd.register_notification(notification_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/haproxy.py b/collectd/files/plugin/openstack/haproxy.py
new file mode 100644
index 0000000..c3e25d5
--- /dev/null
+++ b/collectd/files/plugin/openstack/haproxy.py
@@ -0,0 +1,264 @@
+# haproxy-collectd-plugin - haproxy.py
+#
+# Original Author: Michael Leinartas
+# Substantial additions by Mirantis
+# Description: This is a collectd plugin which runs under the Python plugin to
+# collect metrics from haproxy.
+# Plugin structure and logging func taken from https://github.com/phrawzty/rabbitmq-collectd-plugin
+
+# Copyright (c) 2011 Michael Leinartas
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+import collectd
+import socket
+import csv
+
+NAMES_MAPPING = {}
+NAME = 'haproxy'
+RECV_SIZE = 1024
+METRIC_TYPES = {
+ 'bin': ('bytes_in', 'gauge'),
+ 'bout': ('bytes_out', 'gauge'),
+ 'chkfail': ('failed_checks', 'gauge'),
+ 'CurrConns': ('connections', 'gauge'),
+ 'CurrSslConns': ('ssl_connections', 'gauge'),
+ 'downtime': ('downtime', 'gauge'),
+ 'dresp': ('denied_responses', 'gauge'),
+ 'dreq': ('denied_requests', 'gauge'),
+ 'econ': ('error_connection', 'gauge'),
+ 'ereq': ('error_requests', 'gauge'),
+ 'eresp': ('error_responses', 'gauge'),
+ 'hrsp_1xx': ('response_1xx', 'gauge'),
+ 'hrsp_2xx': ('response_2xx', 'gauge'),
+ 'hrsp_3xx': ('response_3xx', 'gauge'),
+ 'hrsp_4xx': ('response_4xx', 'gauge'),
+ 'hrsp_5xx': ('response_5xx', 'gauge'),
+ 'hrsp_other': ('response_other', 'gauge'),
+ 'PipesUsed': ('pipes_used', 'gauge'),
+ 'PipesFree': ('pipes_free', 'gauge'),
+ 'qcur': ('queue_current', 'gauge'),
+ 'Tasks': ('tasks', 'gauge'),
+ 'Run_queue': ('run_queue', 'gauge'),
+ 'stot': ('session_total', 'gauge'),
+ 'scur': ('session_current', 'gauge'),
+ 'wredis': ('redistributed', 'gauge'),
+ 'wretr': ('retries', 'gauge'),
+ 'status': ('status', 'gauge'),
+ 'Uptime_sec': ('uptime', 'gauge'),
+ 'up': ('up', 'gauge'),
+ 'down': ('down', 'gauge'),
+}
+
+STATUS_MAP = {
+ 'DOWN': 0,
+ 'UP': 1,
+}
+
+FRONTEND_TYPE = '0'
+BACKEND_TYPE = '1'
+BACKEND_SERVER_TYPE = '2'
+
+METRIC_AGGREGATED = ['bin', 'bout', 'qcur','scur','eresp',
+ 'hrsp_1xx','hrsp_2xx', 'hrsp_3xx', 'hrsp_4xx', 'hrsp_5xx',
+ 'hrsp_other', 'wretr']
+
+
+METRIC_DELIM = '.' # for the frontend/backend stats
+
+DEFAULT_SOCKET = '/var/lib/haproxy/stats'
+DEFAULT_PROXY_MONITORS = [ 'server', 'frontend', 'backend', 'backend_server' ]
+VERBOSE_LOGGING = False
+PROXY_MONITORS = []
+
+class HAProxySocket(object):
+ def __init__(self, socket_file=DEFAULT_SOCKET):
+ self.socket_file = socket_file
+
+ def connect(self):
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(self.socket_file)
+ return s
+
+ def communicate(self, command):
+ ''' Send a single command to the socket and return a single response (raw string) '''
+ s = self.connect()
+ if not command.endswith('\n'): command += '\n'
+ s.send(command)
+ result = ''
+ buf = ''
+ buf = s.recv(RECV_SIZE)
+ while buf:
+ result += buf
+ buf = s.recv(RECV_SIZE)
+ s.close()
+ return result
+
+ def get_server_info(self):
+ result = {}
+ output = self.communicate('show info')
+ for line in output.splitlines():
+ try:
+ key,val = line.split(':')
+ except ValueError, e:
+ continue
+ result[key.strip()] = val.strip()
+ return result
+
+ def get_server_stats(self):
+ output = self.communicate('show stat')
+ #sanitize and make a list of lines
+ output = output.lstrip('# ').strip()
+ output = [ l.strip(',') for l in output.splitlines() ]
+ csvreader = csv.DictReader(output)
+ result = [ d.copy() for d in csvreader ]
+ return result
+
+def get_stats():
+ stats = dict()
+ haproxy = HAProxySocket(HAPROXY_SOCKET)
+
+ try:
+ server_info = haproxy.get_server_info()
+ server_stats = haproxy.get_server_stats()
+ except socket.error, e:
+ logger('warn', "status err Unable to connect to HAProxy socket at %s" % HAPROXY_SOCKET)
+ return stats
+
+ if 'server' in PROXY_MONITORS:
+ for key,val in server_info.items():
+ try:
+ stats[key] = int(val)
+ except (TypeError, ValueError), e:
+ pass
+
+ for statdict in server_stats:
+ if not (statdict['svname'].lower() in PROXY_MONITORS or
+ statdict['pxname'].lower() in PROXY_MONITORS or
+ ('backend_server' in PROXY_MONITORS and
+ statdict['type'] == BACKEND_SERVER_TYPE)):
+ continue
+
+ if statdict['pxname'] in PROXY_IGNORE:
+ continue
+
+ pxname = statdict['pxname']
+ # Translate to meaningful names
+ if pxname in NAMES_MAPPING:
+ pxname = NAMES_MAPPING.get(pxname)
+ else:
+ logger('warn', 'Mapping missing for "%s"' % pxname)
+
+ if statdict['type'] == BACKEND_SERVER_TYPE:
+ # Count the number of servers per backend and per status
+ for status_val in STATUS_MAP.keys():
+ # Initialize all possible metric keys to zero
+ metricname = METRIC_DELIM.join(['backend', pxname, 'servers', status_val.lower()])
+ if metricname not in stats:
+ stats[metricname] = 0
+ if statdict['status'] == status_val:
+ stats[metricname] += 1
+ continue
+
+ for key, val in statdict.items():
+ metricname = METRIC_DELIM.join([statdict['svname'].lower(), pxname, key])
+ try:
+ if key == 'status' and statdict['type'] == BACKEND_TYPE:
+ if val in STATUS_MAP:
+ val = STATUS_MAP[val]
+ else:
+ continue
+ stats[metricname] = int(val)
+ if key in METRIC_AGGREGATED:
+ agg_metricname = METRIC_DELIM.join([statdict['svname'].lower(), key])
+ if agg_metricname not in stats:
+ stats[agg_metricname] = 0
+ stats[agg_metricname] += int(val)
+ except (TypeError, ValueError), e:
+ pass
+ return stats
+
+def configure_callback(conf):
+ global PROXY_MONITORS, PROXY_IGNORE, HAPROXY_SOCKET, VERBOSE_LOGGING
+ PROXY_MONITORS = [ ]
+ PROXY_IGNORE = [ ]
+ HAPROXY_SOCKET = DEFAULT_SOCKET
+ VERBOSE_LOGGING = False
+
+ for node in conf.children:
+ if node.key == "ProxyMonitor":
+ PROXY_MONITORS.append(node.values[0])
+ elif node.key == "ProxyIgnore":
+ PROXY_IGNORE.append(node.values[0])
+ elif node.key == "Socket":
+ HAPROXY_SOCKET = node.values[0]
+ elif node.key == "Verbose":
+ VERBOSE_LOGGING = bool(node.values[0])
+ elif node.key == "Mapping":
+ NAMES_MAPPING[node.values[0]] = node.values[1]
+ else:
+ logger('warn', 'Unknown config key: %s' % node.key)
+
+ if not PROXY_MONITORS:
+ PROXY_MONITORS += DEFAULT_PROXY_MONITORS
+ PROXY_MONITORS = [ p.lower() for p in PROXY_MONITORS ]
+
+def read_callback():
+ logger('verb', "beginning read_callback")
+ info = get_stats()
+
+ if not info:
+ logger('warn', "%s: No data received" % NAME)
+ return
+
+ for key,value in info.items():
+ key_prefix = ''
+ key_root = key
+ if not value in METRIC_TYPES:
+ try:
+ key_prefix, key_root = key.rsplit(METRIC_DELIM,1)
+ except ValueError, e:
+ pass
+ if not key_root in METRIC_TYPES:
+ continue
+
+ key_root, val_type = METRIC_TYPES[key_root]
+ key_name = METRIC_DELIM.join([ n for n in [key_prefix, key_root] if n ])
+ val = collectd.Values(plugin=NAME, type=val_type)
+ val.type_instance = key_name
+ val.values = [ value ]
+ # w/a for https://github.com/collectd/collectd/issues/716
+ val.meta = {'0': True}
+ val.dispatch()
+
+def logger(t, msg):
+ if t == 'err':
+ collectd.error('%s: %s' % (NAME, msg))
+ elif t == 'warn':
+ collectd.warning('%s: %s' % (NAME, msg))
+ elif t == 'verb':
+ if VERBOSE_LOGGING:
+ collectd.info('%s: %s' % (NAME, msg))
+ else:
+ collectd.notice('%s: %s' % (NAME, msg))
+
+collectd.register_config(configure_callback)
+collectd.register_read(read_callback)
diff --git a/collectd/files/plugin/openstack/hypervisor_stats.py b/collectd/files/plugin/openstack/hypervisor_stats.py
new file mode 100644
index 0000000..692effe
--- /dev/null
+++ b/collectd/files/plugin/openstack/hypervisor_stats.py
@@ -0,0 +1,89 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Collectd plugin for getting hypervisor statistics from Nova
+import collectd
+import openstack
+
+PLUGIN_NAME = 'hypervisor_stats'
+INTERVAL = openstack.INTERVAL
+
+
+class HypervisorStatsPlugin(openstack.CollectdPlugin):
+ """ Class to report the statistics on Nova hypervisors.
+ """
+ VALUE_MAP = {
+ 'current_workload': 'total_running_tasks',
+ 'running_vms': 'total_running_instances',
+ 'local_gb_used': 'total_used_disk_GB',
+ 'free_disk_gb': 'total_free_disk_GB',
+ 'memory_mb_used': 'total_used_ram_MB',
+ 'free_ram_mb': 'total_free_ram_MB',
+ 'vcpus_used': 'total_used_vcpus',
+ }
+
+ def config_callback(self, config):
+ super(HypervisorStatsPlugin, self).config_callback(config)
+ for node in config.children:
+ if node.key == 'CpuAllocationRatio':
+ self.extra_config['cpu_ratio'] = float(node.values[0])
+ if 'cpu_ratio' not in self.extra_config:
+ self.logger.warning('CpuAllocationRatio parameter not set')
+
+ def dispatch_value(self, name, value):
+ v = collectd.Values(
+ plugin=PLUGIN_NAME,
+ type='gauge',
+ type_instance=name,
+ interval=INTERVAL,
+ # w/a for https://github.com/collectd/collectd/issues/716
+ meta={'0': True},
+ values=[value]
+ )
+ v.dispatch()
+
+ @openstack.read_callback_wrapper
+ def read_callback(self):
+ r = self.get('nova', 'os-hypervisors/statistics')
+ if not r:
+ self.logger.warning("Could not get hypervisor statistics")
+ return
+
+ stats = r.json().get('hypervisor_statistics', {})
+ for k, v in self.VALUE_MAP.iteritems():
+ self.dispatch_value(v, stats.get(k, 0))
+ if 'cpu_ratio' in self.extra_config:
+ vcpus = int(self.extra_config['cpu_ratio'] * stats.get('vcpus', 0))
+ self.dispatch_value('total_free_vcpus',
+ vcpus - stats.get('vcpus_used', 0))
+
+
+plugin = HypervisorStatsPlugin(collectd)
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def notification_callback(notification):
+ plugin.notification_callback(notification)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_config(config_callback)
+collectd.register_notification(notification_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/openstack.py b/collectd/files/plugin/openstack/openstack.py
new file mode 100644
index 0000000..efb6b51
--- /dev/null
+++ b/collectd/files/plugin/openstack/openstack.py
@@ -0,0 +1,313 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import datetime
+import dateutil.parser
+import dateutil.tz
+from functools import wraps
+import requests
+import simplejson as json
+
+# By default, query OpenStack API endpoints every 50 seconds. We choose a value
+# less than the default group by interval (which is 60 seconds) to avoid gaps
+# in the Grafana graphs.
+INTERVAL = 50
+
+
+class OSClient(object):
+ """ Base class for querying the OpenStack API endpoints.
+
+ It uses the Keystone service catalog to discover the API endpoints.
+ """
+ EXPIRATION_TOKEN_DELTA = datetime.timedelta(0, 30)
+
+ def __init__(self, username, password, tenant, keystone_url, timeout,
+ logger, max_retries):
+ self.logger = logger
+ self.username = username
+ self.password = password
+ self.tenant_name = tenant
+ self.keystone_url = keystone_url
+ self.service_catalog = []
+ self.tenant_id = None
+ self.timeout = timeout
+ self.token = None
+ self.valid_until = None
+
+ # Note: prior to urllib3 v1.9, retries are made on failed connections
+ # but not on timeout and backoff time is not supported.
+ # (at this time we ship requests 2.2.1 and urllib3 1.6.1 or 1.7.1)
+ self.session = requests.Session()
+ self.session.mount('http://', requests.adapters.HTTPAdapter(max_retries=max_retries))
+ self.session.mount('https://', requests.adapters.HTTPAdapter(max_retries=max_retries))
+
+ self.get_token()
+
+ def is_valid_token(self):
+ now = datetime.datetime.now(tz=dateutil.tz.tzutc())
+ return self.token and self.valid_until and self.valid_until > now
+
+ def clear_token(self):
+ self.token = None
+ self.valid_until = None
+
+ def get_token(self):
+ self.clear_token()
+ data = json.dumps({
+ "auth":
+ {
+ 'tenantName': self.tenant_name,
+ 'passwordCredentials':
+ {
+ 'username': self.username,
+ 'password': self.password
+ }
+ }
+ }
+ )
+ self.logger.info("Trying to get token from '%s'" % self.keystone_url)
+ r = self.make_request('post',
+ '%s/tokens' % self.keystone_url, data=data,
+ token_required=False)
+ if not r:
+ self.logger.error("Cannot get a valid token from %s" %
+ self.keystone_url)
+ return
+
+ if r.status_code < 200 or r.status_code > 299:
+ self.logger.error("%s responded with code %d" %
+ (self.keystone_url, r.status_code))
+ return
+
+ data = r.json()
+ self.logger.debug("Got response from Keystone: '%s'" % data)
+ self.token = data['access']['token']['id']
+ self.tenant_id = data['access']['token']['tenant']['id']
+ self.valid_until = dateutil.parser.parse(
+ data['access']['token']['expires']) - self.EXPIRATION_TOKEN_DELTA
+ self.service_catalog = []
+ for item in data['access']['serviceCatalog']:
+ endpoint = item['endpoints'][0]
+ self.service_catalog.append({
+ 'name': item['name'],
+ 'region': endpoint['region'],
+ 'service_type': item['type'],
+ 'url': endpoint['internalURL'],
+ 'admin_url': endpoint['adminURL'],
+ })
+
+ self.logger.debug("Got token '%s'" % self.token)
+ return self.token
+
+ def make_request(self, verb, url, data=None, token_required=True):
+ kwargs = {
+ 'url': url,
+ 'timeout': self.timeout,
+ 'headers': {'Content-type': 'application/json'}
+ }
+ if token_required and not self.is_valid_token() and \
+ not self.get_token():
+ self.logger.error("Aborting request, no valid token")
+ return
+ elif token_required:
+ kwargs['headers']['X-Auth-Token'] = self.token
+
+ if data is not None:
+ kwargs['data'] = data
+
+ func = getattr(self.session, verb.lower())
+
+ try:
+ r = func(**kwargs)
+ except Exception as e:
+ self.logger.error("Got exception for '%s': '%s'" %
+ (kwargs['url'], e))
+ return
+
+ self.logger.info("%s responded with status code %d" %
+ (kwargs['url'], r.status_code))
+ if r.status_code == 401:
+ # Clear token in case it is revoked or invalid
+ self.clear_token()
+
+ return r
+
+
+# A decorator that will call the decorated function only when the plugin has
+# detected that it is currently active.
+def read_callback_wrapper(f):
+ @wraps(f)
+ def wrapper(self, *args, **kwargs):
+ if self.do_collect_data:
+ f(self, *args, **kwargs)
+
+ return wrapper
+
+
+class CollectdPlugin(object):
+
+ def __init__(self, logger):
+ self.os_client = None
+ self.logger = logger
+ self.timeout = 5
+ self.max_retries = 3
+ self.extra_config = {}
+ # attributes controlling whether the plugin is in collect mode or not
+ self.do_collect_data = True
+ self.depends_on_resource = None
+
+ def _build_url(self, service, resource):
+ s = (self.get_service(service) or {})
+ # the adminURL must be used to access resources with Keystone API v2
+ if service == 'keystone' and \
+ (resource in ['tenants', 'users'] or 'OS-KS' in resource):
+ url = s.get('admin_url')
+ else:
+ url = s.get('url')
+
+ if url:
+ if url[-1] != '/':
+ url += '/'
+ url = "%s%s" % (url, resource)
+ else:
+ self.logger.error("Service '%s' not found in catalog" % service)
+ return url
+
+ def raw_get(self, url, token_required=False):
+ return self.os_client.make_request('get', url,
+ token_required=token_required)
+
+ def get(self, service, resource):
+ url = self._build_url(service, resource)
+ if not url:
+ return
+ self.logger.info("GET '%s'" % url)
+ return self.os_client.make_request('get', url)
+
+ @property
+ def service_catalog(self):
+ if not self.os_client.service_catalog:
+ # In case the service catalog is empty (eg Keystone was down when
+ # collectd started), we should try to get a new token
+ self.os_client.get_token()
+ return self.os_client.service_catalog
+
+ def get_service(self, service_name):
+ return next((x for x in self.service_catalog
+ if x['name'] == service_name), None)
+
+ def config_callback(self, config):
+ for node in config.children:
+ if node.key == 'Timeout':
+ self.timeout = int(node.values[0])
+ elif node.key == 'MaxRetries':
+ self.max_retries = int(node.values[0])
+ elif node.key == 'Username':
+ username = node.values[0]
+ elif node.key == 'Password':
+ password = node.values[0]
+ elif node.key == 'Tenant':
+ tenant_name = node.values[0]
+ elif node.key == 'KeystoneUrl':
+ keystone_url = node.values[0]
+ elif node.key == 'DependsOnResource':
+ self.depends_on_resource = node.values[0]
+ self.os_client = OSClient(username, password, tenant_name,
+ keystone_url, self.timeout, self.logger,
+ self.max_retries)
+
+ def notification_callback(self, notification):
+ if not self.depends_on_resource:
+ return
+
+ try:
+ data = json.loads(notification.message)
+ except ValueError:
+ return
+
+ if 'value' not in data:
+ self.logger.warning(
+ "%s: missing 'value' in notification" %
+ self.__class__.__name__)
+ elif 'resource' not in data:
+ self.logger.warning(
+ "%s: missing 'resource' in notification" %
+ self.__class__.__name__)
+ elif data['resource'] == self.depends_on_resource:
+ do_collect_data = data['value'] > 0
+ if self.do_collect_data != do_collect_data:
+ # log only the transitions
+ self.logger.notice("%s: do_collect_data=%s" %
+ (self.__class__.__name__, do_collect_data))
+ self.do_collect_data = do_collect_data
+
+ def read_callback(self):
+ """ Read metrics and dispatch values
+
+ This method should be overriden by the derived classes.
+ """
+ raise "read_callback() method needs to be overriden!"
+
+ def get_objects(self, project, object_name, api_version='',
+ params='all_tenants=1'):
+ """ Return a list of OpenStack objects
+
+ See get_objects_details()
+ """
+ return self._get_objects(project, object_name, api_version, params,
+ False)
+
+ def get_objects_details(self, project, object_name, api_version='',
+ params='all_tenants=1'):
+ """ Return a list of details about OpenStack objects
+
+ The API version is not always included in the URL endpoint
+ registered in Keystone (eg Glance). In this case, use the
+ api_version parameter to specify which version should be used.
+ """
+ return self._get_objects(project, object_name, api_version, params,
+ True)
+
+ def _get_objects(self, project, object_name, api_version, params, detail):
+ if api_version:
+ resource = '%s/%s' % (api_version, object_name)
+ else:
+ resource = '%s' % (object_name)
+ if detail:
+ resource = '%s/detail' % (resource)
+ if params:
+ resource = '%s?%s' % (resource, params)
+ # TODO(scroiset): use pagination to handle large collection
+ r = self.get(project, resource)
+ if not r or object_name not in r.json():
+ self.logger.warning('Could not find %s %s' % (project,
+ object_name))
+ return []
+ return r.json().get(object_name)
+
+ def count_objects_group_by(self,
+ list_object,
+ group_by_func,
+ count_func=None):
+
+ """ Dispatch values of object number grouped by criteria."""
+
+ status = {}
+ for obj in list_object:
+ s = group_by_func(obj)
+ if s in status:
+ status[s] += count_func(obj) if count_func else 1
+ else:
+ status[s] = count_func(obj) if count_func else 1
+ return status
diff --git a/collectd/files/plugin/openstack/openstack_cinder.py b/collectd/files/plugin/openstack/openstack_cinder.py
new file mode 100644
index 0000000..52d28e8
--- /dev/null
+++ b/collectd/files/plugin/openstack/openstack_cinder.py
@@ -0,0 +1,96 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Collectd plugin for getting statistics from Cinder
+import collectd
+import openstack
+
+PLUGIN_NAME = 'cinder'
+INTERVAL = openstack.INTERVAL
+
+
+class CinderStatsPlugin(openstack.CollectdPlugin):
+ """ Class to report the statistics on Cinder service.
+
+ number of volumes broken down by state
+ total size of volumes usable and in error state
+ """
+
+ def config_callback(self, config):
+ super(CinderStatsPlugin, self).config_callback(config)
+
+ @openstack.read_callback_wrapper
+ def read_callback(self):
+ volumes_details = self.get_objects_details('cinder', 'volumes')
+
+ def groupby(d):
+ return d.get('status', 'unknown').lower()
+
+ def count_size_bytes(d):
+ return d.get('size', 0) * 10**9
+
+ status = self.count_objects_group_by(volumes_details,
+ group_by_func=groupby)
+ for s, nb in status.iteritems():
+ self.dispatch_value('volumes', s, nb)
+
+ sizes = self.count_objects_group_by(volumes_details,
+ group_by_func=groupby,
+ count_func=count_size_bytes)
+ for n, size in sizes.iteritems():
+ self.dispatch_value('volumes_size', n, size)
+
+ snaps_details = self.get_objects_details('cinder', 'snapshots')
+ status_snaps = self.count_objects_group_by(snaps_details,
+ group_by_func=groupby)
+ for s, nb in status_snaps.iteritems():
+ self.dispatch_value('snapshots', s, nb)
+
+ sizes = self.count_objects_group_by(snaps_details,
+ group_by_func=groupby,
+ count_func=count_size_bytes)
+ for n, size in sizes.iteritems():
+ self.dispatch_value('snapshots_size', n, size)
+
+ def dispatch_value(self, plugin_instance, name, value):
+ v = collectd.Values(
+ plugin=PLUGIN_NAME, # metric source
+ plugin_instance=plugin_instance,
+ type='gauge',
+ type_instance=name,
+ interval=INTERVAL,
+ # w/a for https://github.com/collectd/collectd/issues/716
+ meta={'0': True},
+ values=[value]
+ )
+ v.dispatch()
+
+plugin = CinderStatsPlugin(collectd)
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def notification_callback(notification):
+ plugin.notification_callback(notification)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_config(config_callback)
+collectd.register_notification(notification_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/openstack_glance.py b/collectd/files/plugin/openstack/openstack_glance.py
new file mode 100644
index 0000000..4634ff6
--- /dev/null
+++ b/collectd/files/plugin/openstack/openstack_glance.py
@@ -0,0 +1,100 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Collectd plugin for getting resource statistics from Glance
+import collectd
+import openstack
+
+PLUGIN_NAME = 'glance'
+INTERVAL = openstack.INTERVAL
+
+
+class GlanceStatsPlugin(openstack.CollectdPlugin):
+ """ Class to report the statistics on Glance service.
+
+ number of image broken down by state
+ total size of images usable and in error state
+ """
+
+ def config_callback(self, config):
+ super(GlanceStatsPlugin, self).config_callback(config)
+
+ @openstack.read_callback_wrapper
+ def read_callback(self):
+
+ def is_snap(d):
+ return d.get('properties', {}).get('image_type') == 'snapshot'
+
+ def groupby(d):
+ p = 'public' if d.get('is_public', True) else 'private'
+ status = d.get('status', 'unknown').lower()
+ if is_snap(d):
+ return 'snapshots.%s.%s' % (p, status)
+ return 'images.%s.%s' % (p, status)
+
+ images_details = self.get_objects_details('glance', 'images',
+ api_version='v1',
+ params='is_public=None')
+ status = self.count_objects_group_by(images_details,
+ group_by_func=groupby)
+ for s, nb in status.iteritems():
+ self.dispatch_value(s, nb)
+
+ # sizes
+ def count_size_bytes(d):
+ return d.get('size', 0)
+
+ def groupby_size(d):
+ p = 'public' if d.get('is_public', True) else 'private'
+ status = d.get('status', 'unknown').lower()
+ if is_snap(d):
+ return 'snapshots_size.%s.%s' % (p, status)
+ return 'images_size.%s.%s' % (p, status)
+
+ sizes = self.count_objects_group_by(images_details,
+ group_by_func=groupby_size,
+ count_func=count_size_bytes)
+ for s, nb in sizes.iteritems():
+ self.dispatch_value(s, nb)
+
+ def dispatch_value(self, name, value):
+ v = collectd.Values(
+ plugin=PLUGIN_NAME, # metric source
+ type='gauge',
+ type_instance=name,
+ interval=INTERVAL,
+ # w/a for https://github.com/collectd/collectd/issues/716
+ meta={'0': True},
+ values=[value]
+ )
+ v.dispatch()
+
+plugin = GlanceStatsPlugin(collectd)
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def notification_callback(notification):
+ plugin.notification_callback(notification)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_config(config_callback)
+collectd.register_notification(notification_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/openstack_keystone.py b/collectd/files/plugin/openstack/openstack_keystone.py
new file mode 100644
index 0000000..cfa6bd7
--- /dev/null
+++ b/collectd/files/plugin/openstack/openstack_keystone.py
@@ -0,0 +1,98 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Collectd plugin for getting statistics from Keystone
+import collectd
+import openstack
+
+PLUGIN_NAME = 'keystone'
+INTERVAL = openstack.INTERVAL
+
+
+class KeystoneStatsPlugin(openstack.CollectdPlugin):
+ """ Class to report the statistics on Keystone service.
+
+ number of tenants, users broken down by state
+ number of roles
+ """
+
+ def config_callback(self, config):
+ super(KeystoneStatsPlugin, self).config_callback(config)
+
+ @openstack.read_callback_wrapper
+ def read_callback(self):
+
+ def groupby(d):
+ return 'enabled' if d.get('enabled') else 'disabled'
+
+ # tenants
+ r = self.get('keystone', 'tenants')
+ if not r:
+ self.logger.warning('Could not find Keystone tenants')
+ return
+ tenants_details = r.json().get('tenants', [])
+ status = self.count_objects_group_by(tenants_details,
+ group_by_func=groupby)
+ for s, nb in status.iteritems():
+ self.dispatch_value('tenants.' + s, nb)
+
+ # users
+ r = self.get('keystone', 'users')
+ if not r:
+ self.logger.warning('Could not find Keystone users')
+ return
+ users_details = r.json().get('users', [])
+ status = self.count_objects_group_by(users_details,
+ group_by_func=groupby)
+ for s, nb in status.iteritems():
+ self.dispatch_value('users.' + s, nb)
+
+ # roles
+ r = self.get('keystone', 'OS-KSADM/roles')
+ if not r:
+ self.logger.warning('Could not find Keystone roles')
+ return
+ roles = r.json().get('roles', [])
+ self.dispatch_value('roles', len(roles))
+
+ def dispatch_value(self, name, value):
+ v = collectd.Values(
+ plugin=PLUGIN_NAME, # metric source
+ type='gauge',
+ type_instance=name,
+ interval=INTERVAL,
+ # w/a for https://github.com/collectd/collectd/issues/716
+ meta={'0': True},
+ values=[value]
+ )
+ v.dispatch()
+
+plugin = KeystoneStatsPlugin(collectd)
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def notification_callback(notification):
+ plugin.notification_callback(notification)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_config(config_callback)
+collectd.register_notification(notification_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/openstack_neutron.py b/collectd/files/plugin/openstack/openstack_neutron.py
new file mode 100644
index 0000000..6327c0a
--- /dev/null
+++ b/collectd/files/plugin/openstack/openstack_neutron.py
@@ -0,0 +1,127 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Collectd plugin for getting resource statistics from Neutron
+import collectd
+import openstack
+
+PLUGIN_NAME = 'neutron'
+INTERVAL = openstack.INTERVAL
+
+
+class NeutronStatsPlugin(openstack.CollectdPlugin):
+ """ Class to report the statistics on Neutron service.
+
+ number of networks broken down by status
+ number of subnets
+ number of ports broken down by owner and status
+ number of routers broken down by status
+ number of floating IP addresses broken down by free/associated
+ """
+
+ def config_callback(self, config):
+ super(NeutronStatsPlugin, self).config_callback(config)
+
+ @openstack.read_callback_wrapper
+ def read_callback(self):
+ def groupby_network(x):
+ return "networks.%s" % x.get('status', 'unknown').lower()
+
+ def groupby_router(x):
+ return "routers.%s" % x.get('status', 'unknown').lower()
+
+ def groupby_port(x):
+ owner = x.get('device_owner', 'unknown')
+ if owner.startswith('network:'):
+ owner = owner.replace('network:', '')
+ elif owner.startswith('compute:'):
+ # The part after 'compute:' is the name of the Nova AZ
+ owner = 'compute'
+ status = x.get('status', 'unknown').lower()
+ return "ports.%s.%s" % (owner, status)
+
+ def groupby_floating(x):
+ if x.get('port_id', None):
+ status = 'associated'
+ else:
+ status = 'free'
+ return "floatingips.%s" % status
+
+ # Networks
+ networks = self.get_objects('neutron', 'networks', api_version='v2.0')
+ status = self.count_objects_group_by(networks,
+ group_by_func=groupby_network)
+ for s, nb in status.iteritems():
+ self.dispatch_value(s, nb)
+ self.dispatch_value('networks', len(networks))
+
+ # Subnets
+ subnets = self.get_objects('neutron', 'subnets', api_version='v2.0')
+ self.dispatch_value('subnets', len(subnets))
+
+ # Ports
+ ports = self.get_objects('neutron', 'ports', api_version='v2.0')
+ status = self.count_objects_group_by(ports,
+ group_by_func=groupby_port)
+ for s, nb in status.iteritems():
+ self.dispatch_value(s, nb)
+ self.dispatch_value('ports', len(ports))
+
+ # Routers
+ routers = self.get_objects('neutron', 'routers', api_version='v2.0')
+ status = self.count_objects_group_by(routers,
+ group_by_func=groupby_router)
+ for s, nb in status.iteritems():
+ self.dispatch_value(s, nb)
+ self.dispatch_value('routers', len(routers))
+
+ # Floating IP addresses
+ floatingips = self.get_objects('neutron', 'floatingips',
+ api_version='v2.0')
+ status = self.count_objects_group_by(floatingips,
+ group_by_func=groupby_floating)
+ for s, nb in status.iteritems():
+ self.dispatch_value(s, nb)
+ self.dispatch_value('floatingips', len(floatingips))
+
+ def dispatch_value(self, name, value):
+ v = collectd.Values(
+ plugin=PLUGIN_NAME, # metric source
+ type='gauge',
+ type_instance=name,
+ interval=INTERVAL,
+ # w/a for https://github.com/collectd/collectd/issues/716
+ meta={'0': True},
+ values=[value]
+ )
+ v.dispatch()
+
+plugin = NeutronStatsPlugin(collectd)
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def notification_callback(notification):
+ plugin.notification_callback(notification)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_config(config_callback)
+collectd.register_notification(notification_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/openstack_nova.py b/collectd/files/plugin/openstack/openstack_nova.py
new file mode 100644
index 0000000..8999f0f
--- /dev/null
+++ b/collectd/files/plugin/openstack/openstack_nova.py
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Collectd plugin for getting statistics from Nova
+import collectd
+import openstack
+
+PLUGIN_NAME = 'nova'
+INTERVAL = openstack.INTERVAL
+
+
+class NovaStatsPlugin(openstack.CollectdPlugin):
+ """ Class to report the statistics on Nova service.
+
+ number of instances broken down by state
+ """
+
+ def config_callback(self, config):
+ super(NovaStatsPlugin, self).config_callback(config)
+
+ @openstack.read_callback_wrapper
+ def read_callback(self):
+ servers_details = self.get_objects_details('nova', 'servers')
+
+ def groupby(d):
+ return d.get('status', 'unknown').lower()
+ status = self.count_objects_group_by(servers_details,
+ group_by_func=groupby)
+ for s, nb in status.iteritems():
+ self.dispatch_value('instances', s, nb)
+
+ def dispatch_value(self, plugin_instance, name, value):
+ v = collectd.Values(
+ plugin=PLUGIN_NAME, # metric source
+ plugin_instance=plugin_instance,
+ type='gauge',
+ type_instance=name,
+ interval=INTERVAL,
+ # w/a for https://github.com/collectd/collectd/issues/716
+ meta={'0': True},
+ values=[value]
+ )
+ v.dispatch()
+
+plugin = NovaStatsPlugin(collectd)
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def notification_callback(notification):
+ plugin.notification_callback(notification)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_config(config_callback)
+collectd.register_notification(notification_callback)
+collectd.register_read(read_callback, INTERVAL)
diff --git a/collectd/files/plugin/openstack/pacemaker_resource.py b/collectd/files/plugin/openstack/pacemaker_resource.py
new file mode 100644
index 0000000..4389a25
--- /dev/null
+++ b/collectd/files/plugin/openstack/pacemaker_resource.py
@@ -0,0 +1,79 @@
+#!/usr/bin/python
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collectd
+import socket
+
+import base
+
+NAME = 'pacemaker_resource'
+CRM_RESOURCE_BIN = '/usr/sbin/crm_resource'
+
+
+class PacemakerResourcePlugin(base.Base):
+
+ def __init__(self, *args, **kwargs):
+ super(PacemakerResourcePlugin, self).__init__(*args, **kwargs)
+ self.plugin = NAME
+ self.crm_resource_bin = CRM_RESOURCE_BIN
+ self.hostname = socket.getfqdn()
+ self.resources = []
+
+ def config_callback(self, conf):
+ super(PacemakerResourcePlugin, self).config_callback(conf)
+
+ for node in conf.children:
+ if node.key == 'Resource':
+ self.resources.extend(node.values)
+ elif node.key == 'Hostname':
+ self.hostname = node.values[0]
+ elif node.key == 'CrmResourceBin':
+ self.crm_resource_bin = node.values[0]
+
+ def itermetrics(self):
+ for resource in self.resources:
+ out, err = self.execute([self.crm_resource_bin, '--locate',
+ '--quiet', '--resource', resource],
+ shell=False)
+ if not out:
+ self.logger.error("%s: Failed to get the status for '%s'" %
+ (self.plugin, resource))
+
+ else:
+ value = 0
+ if self.hostname == out.lstrip("\n"):
+ value = 1
+ yield {
+ 'type_instance': resource,
+ 'values': value
+ }
+
+plugin = PacemakerResourcePlugin()
+
+
+def init_callback():
+ plugin.restore_sigchld()
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_init(init_callback)
+collectd.register_config(config_callback)
+collectd.register_read(read_callback)
diff --git a/collectd/files/plugin/openstack/rabbitmq_info.py b/collectd/files/plugin/openstack/rabbitmq_info.py
new file mode 100644
index 0000000..48753ca
--- /dev/null
+++ b/collectd/files/plugin/openstack/rabbitmq_info.py
@@ -0,0 +1,229 @@
+# Name: rabbitmq-collectd-plugin - rabbitmq_info.py
+# Author: https://github.com/phrawzty/rabbitmq-collectd-plugin/commits/master
+# Description: This plugin uses Collectd's Python plugin to obtain RabbitMQ
+# metrics.
+#
+# Copyright 2012 Daniel Maher
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collectd
+import re
+
+import base
+
+
+NAME = 'rabbitmq_info'
+# Override in config by specifying 'RmqcBin'.
+RABBITMQCTL_BIN = '/usr/sbin/rabbitmqctl'
+# Override in config by specifying 'PmapBin'
+PMAP_BIN = '/usr/bin/pmap'
+# Override in config by specifying 'Vhost'.
+VHOST = "/"
+
+# Used to find disk nodes and running nodes.
+CLUSTER_STATUS = re.compile('.*disc,\[([^\]]+)\].*running_nodes,\[([^\]]+)\]',
+ re.S)
+
+
+class RabbitMqPlugin(base.Base):
+
+ # we need to substract the length of the longest prefix (eg '.consumers')
+ MAX_QUEUE_IDENTIFIER_LENGTH = base.Base.MAX_IDENTIFIER_LENGTH - 10
+
+ def __init__(self, *args, **kwargs):
+ super(RabbitMqPlugin, self).__init__(*args, **kwargs)
+ self.plugin = NAME
+ self.rabbitmqctl_bin = RABBITMQCTL_BIN
+ self.pidfile = None
+ self.pmap_bin = PMAP_BIN
+ self.vhost = VHOST
+
+ def config_callback(self, conf):
+ super(RabbitMqPlugin, self).config_callback(conf)
+
+ for node in conf.children:
+ if node.key == 'RmqcBin':
+ self.rabbitmqctl_bin = node.values[0]
+ elif node.key == 'PmapBin':
+ self.pmap_bin = node.values[0]
+ elif node.key == 'PidFile':
+ self.pidfile = node.values[0]
+ elif node.key == 'Vhost':
+ self.vhost = node.values[0]
+ else:
+ self.logger.warning('Unknown config key: %s' % node.key)
+
+ def itermetrics(self):
+ stats = {}
+ stats['messages'] = 0
+ stats['memory'] = 0
+ stats['consumers'] = 0
+ stats['queues'] = 0
+ stats['unmirrored_queues'] = 0
+ stats['pmap_mapped'] = 0
+ stats['pmap_used'] = 0
+ stats['pmap_shared'] = 0
+
+ out, err = self.execute([self.rabbitmqctl_bin, '-q', 'status'],
+ shell=False)
+ if not out:
+ self.logger.error('%s: Failed to get the status' %
+ self.rabbitmqctl_bin)
+ return
+
+ for v in ('vm_memory_limit', 'disk_free_limit', 'disk_free'):
+ try:
+ stats[v] = int(re.findall('{%s,([0-9]+)}' % v, out)[0])
+ except:
+ self.logger.error('%s: Failed to get %s' %
+ (self.rabbitmqctl_bin, v))
+
+ mem_str = re.findall('{memory,\s+\[([^\]]+)\]\}', out)
+ # We are only interested by the total of memory used
+ # TODO: Get all informations about memory usage from mem_str
+ try:
+ stats['used_memory'] = int(re.findall('total,([0-9]+)',
+ mem_str[0])[0])
+ except:
+ self.logger.error('%s: Failed to get the memory used by rabbitmq' %
+ self.rabbitmqctl_bin)
+
+ if 'vm_memory_limit' in stats and 'used_memory' in stats:
+ stats['remaining_memory'] = stats['vm_memory_limit'] - stats['used_memory']
+ if 'disk_free' in stats and 'disk_free_limit' in stats:
+ stats['remaining_disk'] = stats['disk_free'] - stats['disk_free_limit']
+
+ out, err = self.execute([self.rabbitmqctl_bin, '-q', 'cluster_status'],
+ shell=False)
+ if not out:
+ self.logger.error('%s: Failed to get the cluster status' %
+ self.rabbitmqctl_bin)
+ return
+
+ # TODO: Need to be modified in case we are using RAM nodes.
+ status = CLUSTER_STATUS.findall(out)
+ if len(status) == 0:
+ self.logger.error('%s: Failed to parse (%s)' %
+ (self.rabbitmqctl_bin, out))
+ else:
+ stats['total_nodes'] = len(status[0][0].split(","))
+ stats['running_nodes'] = len(status[0][1].split(","))
+
+ out, err = self.execute([self.rabbitmqctl_bin, '-q',
+ 'list_connections'], shell=False)
+ if not out:
+ self.logger.error('%s: Failed to get the number of connections' %
+ self.rabbitmqctl_bin)
+ return
+ stats['connections'] = len(out.split('\n'))
+
+ out, err = self.execute([self.rabbitmqctl_bin, '-q', 'list_exchanges'],
+ shell=False)
+ if not out:
+ self.logger.error('%s: Failed to get the number of exchanges' %
+ self.rabbitmqctl_bin)
+ return
+ stats['exchanges'] = len(out.split('\n'))
+
+ out, err = self.execute([self.rabbitmqctl_bin, '-q', '-p', self.vhost,
+ 'list_queues', 'name', 'messages', 'memory',
+ 'consumers', 'slave_pids',
+ 'synchronised_slave_pids'], shell=False)
+ if not out:
+ self.logger.error('%s: Failed to get the list of queues' %
+ self.rabbitmqctl_bin)
+ return
+
+ for line in out.split('\n'):
+ ctl_stats = line.split('\t')
+ try:
+ ctl_stats[1] = int(ctl_stats[1])
+ ctl_stats[2] = int(ctl_stats[2])
+ ctl_stats[3] = int(ctl_stats[3])
+ except:
+ continue
+ queue_name = ctl_stats[0][:self.MAX_QUEUE_IDENTIFIER_LENGTH]
+ stats['queues'] += 1
+ stats['messages'] += ctl_stats[1]
+ stats['memory'] += ctl_stats[2]
+ stats['consumers'] += ctl_stats[3]
+ stats['%s.messages' % queue_name] = ctl_stats[1]
+ stats['%s.memory' % queue_name] = ctl_stats[2]
+ stats['%s.consumers' % queue_name] = ctl_stats[3]
+ # we need to check if the list of synchronised slaves is
+ # equal to the list of slaves.
+ try:
+ slaves = re.findall('<([a-zA-Z@\-.0-9]+)>', ctl_stats[4])
+ for s in slaves:
+ if s not in ctl_stats[5]:
+ stats['unmirrored_queues'] += 1
+ break
+ except IndexError:
+ pass
+
+ if not stats['memory'] > 0:
+ self.logger.warning(
+ '%s reports 0 memory usage. This is probably incorrect.' %
+ self.rabbitmqctl_bin)
+
+ # pmap metrics are only collected if the location of the pid file is
+ # explicitly configured
+ if self.pidfile:
+ try:
+ with open(self.pidfile, 'r') as f:
+ pid = f.read().strip()
+ except:
+ self.logger.error('Unable to read %s' % self.pidfile)
+ return
+
+ # use pmap to get proper memory stats
+ out, err = self.execute([self.pmap_bin, '-d', pid], shell=False)
+ if not out:
+ self.logger.error('Failed to run %s' % self.pmap_bin)
+ return
+
+ out = out.split('\n')[-1]
+ if re.match('mapped', out):
+ m = re.match(r"\D+(\d+)\D+(\d+)\D+(\d+)", out)
+ stats['pmap_mapped'] = int(m.group(1))
+ stats['pmap_used'] = int(m.group(2))
+ stats['pmap_shared'] = int(m.group(3))
+ else:
+ self.logger.warning('%s returned something strange.' %
+ self.pmap_bin)
+
+ # TODO(pasquier-s): define and use own types instead of the generic
+ # GAUGE type
+ for k, v in stats.iteritems():
+ yield {'type_instance': k, 'values': v}
+
+
+plugin = RabbitMqPlugin()
+
+
+def init_callback():
+ plugin.restore_sigchld()
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def read_callback():
+ plugin.read_callback()
+
+collectd.register_init(init_callback)
+collectd.register_config(config_callback)
+collectd.register_read(read_callback)
diff --git a/collectd/files/plugin/openstack/types/ceph.db b/collectd/files/plugin/openstack/types/ceph.db
new file mode 100644
index 0000000..0630a56
--- /dev/null
+++ b/collectd/files/plugin/openstack/types/ceph.db
@@ -0,0 +1,46 @@
+# For all types, plugin_instance contains the Ceph cluster name
+
+# cluster health metrics
+health value:GAUGE:1:3
+monitor_count value:GAUGE:0:U
+quorum_count value:GAUGE:0:U
+# number of placement groups per state, type_instance is the PG's state
+pg_state_count value:GAUGE:0:U
+# total number of objects
+objects_count value:GAUGE:0:U
+# total number of placement groups
+pg_count value:GAUGE:0:U
+# used, free and total amount of bytes
+pg_bytes used:GAUGE:0:U free:GAUGE:0:U total:GAUGE:0:U
+# amount of data bytes
+pg_data_bytes value:GAUGE:0:U
+
+# number of bytes used in the pool, type_instance is the pool name
+pool_bytes_used value:GAUGE:0:U
+# max number of bytes available in the pool, type_instance is the pool name
+pool_max_avail value:GAUGE:0:U
+# number of objects in the pool, type_instance is the pool name
+pool_objects value:GAUGE:0:U
+# number of pools
+pool_count value:GAUGE:0:U
+# used, free and total amount of bytes for all pools
+pool_total_bytes used:GAUGE:0:U free:GAUGE:0:U total:GAUGE:0:U
+# percentage of used, free and total space for all pools
+pool_total_percent used:GAUGE:0:101 free:GAUGE:0:101
+# number of bytes received and transmitted by second, type_instance is the pool name
+pool_bytes_rate rx:GAUGE:0:U tx:GAUGE:0:U
+# number of operations per second, type_instance is the pool name
+pool_ops_rate value:GAUGE:0:U
+# number of objects, type_instance is the pool name
+pool_pg_num value:GAUGE:0:U
+# number of placement groups, type_instance is the pool name
+pool_pg_placement_num value:GAUGE:0:U
+# size of the pool, type_instance is the pool name
+pool_size value:GAUGE:0:U
+
+# number of OSDs per state
+osd_count up:GAUGE:0:U down:GAUGE:0:U in:GAUGE:0:U out:GAUGE:0:U
+# amount of used and total space in bytes, type_instance is the OSD identifier
+osd_space used:GAUGE:0:U total:GAUGE:0:U
+# apply and commit latencies in milliseconds, type_instance is the OSD identifier
+osd_latency appply:GAUGE:0:U commit:GAUGE:0:U
diff --git a/collectd/files/plugin/openstack/types/ceph_perf.db b/collectd/files/plugin/openstack/types/ceph_perf.db
new file mode 100644
index 0000000..015b4e4
--- /dev/null
+++ b/collectd/files/plugin/openstack/types/ceph_perf.db
@@ -0,0 +1,71 @@
+# File generated automatically by the build_ceph_perf_types.py script
+# Ceph version: 0.80.9
+osd_agent_evict value:GAUGE:U:U
+osd_agent_flush value:GAUGE:U:U
+osd_agent_skip value:GAUGE:U:U
+osd_agent_wake value:GAUGE:U:U
+osd_buffer_bytes value:GAUGE:U:U
+osd_copyfrom value:GAUGE:U:U
+osd_heartbeat_from_peers value:GAUGE:U:U
+osd_heartbeat_to_peers value:GAUGE:U:U
+osd_loadavg value:GAUGE:U:U
+osd_map_message_epoch_dups value:GAUGE:U:U
+osd_map_message_epochs value:GAUGE:U:U
+osd_map_messages value:GAUGE:U:U
+osd_messages_delayed_for_map value:GAUGE:U:U
+osd_numpg value:GAUGE:U:U
+osd_numpg_primary value:GAUGE:U:U
+osd_numpg_replica value:GAUGE:U:U
+osd_numpg_stray value:GAUGE:U:U
+osd_op value:GAUGE:U:U
+osd_op_in_bytes value:GAUGE:U:U
+osd_op_latency value:GAUGE:U:U
+osd_op_out_bytes value:GAUGE:U:U
+osd_op_process_latency value:GAUGE:U:U
+osd_op_r value:GAUGE:U:U
+osd_op_r_latency value:GAUGE:U:U
+osd_op_r_out_bytes value:GAUGE:U:U
+osd_op_r_process_latency value:GAUGE:U:U
+osd_op_rw value:GAUGE:U:U
+osd_op_rw_in_bytes value:GAUGE:U:U
+osd_op_rw_latency value:GAUGE:U:U
+osd_op_rw_out_bytes value:GAUGE:U:U
+osd_op_rw_process_latency value:GAUGE:U:U
+osd_op_rw_rlat value:GAUGE:U:U
+osd_op_w value:GAUGE:U:U
+osd_op_w_in_bytes value:GAUGE:U:U
+osd_op_w_latency value:GAUGE:U:U
+osd_op_w_process_latency value:GAUGE:U:U
+osd_op_w_rlat value:GAUGE:U:U
+osd_op_wip value:GAUGE:U:U
+osd_opq value:GAUGE:U:U
+osd_pull value:GAUGE:U:U
+osd_push value:GAUGE:U:U
+osd_push_in value:GAUGE:U:U
+osd_push_in_bytes value:GAUGE:U:U
+osd_push_out_bytes value:GAUGE:U:U
+osd_recovery_ops value:GAUGE:U:U
+osd_stat_bytes value:GAUGE:U:U
+osd_stat_bytes_avail value:GAUGE:U:U
+osd_stat_bytes_used value:GAUGE:U:U
+osd_subop value:GAUGE:U:U
+osd_subop_in_bytes value:GAUGE:U:U
+osd_subop_latency value:GAUGE:U:U
+osd_subop_pull value:GAUGE:U:U
+osd_subop_pull_latency value:GAUGE:U:U
+osd_subop_push value:GAUGE:U:U
+osd_subop_push_in_bytes value:GAUGE:U:U
+osd_subop_push_latency value:GAUGE:U:U
+osd_subop_w value:GAUGE:U:U
+osd_subop_w_in_bytes value:GAUGE:U:U
+osd_subop_w_latency value:GAUGE:U:U
+osd_tier_clean value:GAUGE:U:U
+osd_tier_delay value:GAUGE:U:U
+osd_tier_dirty value:GAUGE:U:U
+osd_tier_evict value:GAUGE:U:U
+osd_tier_flush value:GAUGE:U:U
+osd_tier_flush_fail value:GAUGE:U:U
+osd_tier_promote value:GAUGE:U:U
+osd_tier_try_flush value:GAUGE:U:U
+osd_tier_try_flush_fail value:GAUGE:U:U
+osd_tier_whiteout value:GAUGE:U:U