Merge "Fix for emitting 0s instead of nothing"
diff --git a/.kitchen.yml b/.kitchen.yml
new file mode 100644
index 0000000..e96d842
--- /dev/null
+++ b/.kitchen.yml
@@ -0,0 +1,47 @@
+---
+driver:
+  name: docker
+  hostname: collectd.ci.local
+  use_sudo: false
+
+provisioner:
+  name: salt_solo
+  salt_install: bootstrap
+  salt_bootstrap_url: https://bootstrap.saltstack.com
+  salt_version: latest
+  require_chef: false
+  log_level: error
+  formula: collectd
+  grains:
+    noservices: True
+  state_top:
+    base:
+      "*":
+        - collectd
+  pillars:
+    top.sls:
+      base:
+        "*":
+          - collectd
+  dependencies:
+    - name: linux
+      repo: git
+      source: https://github.com/salt-formulas/salt-formula-linux
+
+verifier:
+  name: inspec
+  sudo: true
+
+platforms:
+  - name: <%=ENV['PLATFORM'] || 'ubuntu-xenial'%>
+    driver_config:
+      image: <%=ENV['PLATFORM'] || 'trevorj/salty-whales:xenial'%>
+      platform: ubuntu
+
+suites:
+
+  - name: client
+    provisioner:
+      pillars-from-files:
+        collectd.sls: tests/pillar/client.sls
+# vim: ft=yaml sw=2 ts=2 sts=2 tw=125
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..4f34af2
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,40 @@
+sudo: required
+services:
+  - docker
+
+install:
+  - pip install PyYAML
+  - pip install virtualenv
+  - |
+    test -e Gemfile || cat <<EOF > Gemfile
+    source 'https://rubygems.org'
+    gem 'rake'
+    gem 'test-kitchen'
+    gem 'kitchen-docker'
+    gem 'kitchen-inspec'
+    gem 'inspec'
+    gem 'kitchen-salt', :git => 'https://github.com/salt-formulas/kitchen-salt.git'
+  - bundle install
+
+env:
+    - PLATFORM=trevorj/salty-whales:trusty
+    - PLATFORM=trevorj/salty-whales:xenial
+
+
+before_script:
+  - set -o pipefail
+  - make test | tail
+
+script:
+  - test ! -e .kitchen.yml || bundle exec kitchen test -t tests/integration
+
+notifications:
+  webhooks:
+    urls:
+      - https://webhooks.gitter.im/e/6123573504759330786b
+    on_success: change  # options: [always|never|change] default: always
+    on_failure: never  # options: [always|never|change] default: always
+    on_start: never     # options: [always|never|change] default: always
+    on_cancel: never    # options: [always|never|change] default: always
+    on_error: never    # options: [always|never|change] default: always
+  email: false
diff --git a/collectd/client.sls b/collectd/client.sls
index eca7071..e0fbf5d 100644
--- a/collectd/client.sls
+++ b/collectd/client.sls
@@ -1,4 +1,4 @@
-{%- from "collectd/map.jinja" import client with context %}
+{%- from "collectd/map.jinja" import client, service_grains with context %}
 {%- if client.enabled %}
 
 include:
@@ -12,43 +12,6 @@
   - require:
     - pkg: collectd_client_packages
 
-{%- set service_grains = {'collectd': {'remote_plugin': {}, 'local_plugin': {}}} %}
-
-{%- for service_name, service in pillar.items() %}
-{%- if service.get('_support', {}).get('collectd', {}).get('enabled', False) %}
-
-{%- set grains_fragment_file = service_name+'/meta/collectd.yml' %}
-{%- macro load_grains_file() %}{% include grains_fragment_file ignore missing %}{% endmacro %}
-{%- set grains_yaml = load_grains_file()|load_yaml %}
-
-{%- if grains_yaml is mapping %}
-{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge={'collectd': grains_yaml}) %}
-{%- endif %}
-
-{%- endif %}
-{%- endfor %}
-
-collectd_client_grain:
-  file.managed:
-  - name: /etc/salt/grains.d/collectd
-  - source: salt://collectd/files/collectd.grain
-  - template: jinja
-  - user: root
-  - mode: 600
-  - defaults:
-    service_grains: {{ service_grains|yaml }}
-  - require:
-    - pkg: collectd_client_packages
-    - file: collectd_client_grains_dir
-
-collectd_client_grain_validity_check:
-  cmd.wait:
-  - name: python -c "import yaml; stream = file('/etc/salt/grains.d/collectd', 'r'); yaml.load(stream); stream.close()"
-  - require:
-    - pkg: collectd_client_packages
-  - watch:
-    - file: collectd_client_grain
-
 {%- set plugins = service_grains.collectd.local_plugin %}
 {%- include "collectd/_service.sls" %}
 
diff --git a/collectd/files/collectd.grain b/collectd/files/collectd.grain
deleted file mode 100644
index 3e3b373..0000000
--- a/collectd/files/collectd.grain
+++ /dev/null
@@ -1 +0,0 @@
-{{ service_grains|yaml(False) }}
diff --git a/collectd/files/plugin/collectd_base.py b/collectd/files/plugin/collectd_base.py
index 36c0060..4a61168 100644
--- a/collectd/files/plugin/collectd_base.py
+++ b/collectd/files/plugin/collectd_base.py
@@ -22,6 +22,8 @@
 import traceback
 
 
+TIMEOUT_BIN = '/usr/bin/timeout'
+
 INTERVAL = 10
 
 
@@ -167,7 +169,8 @@
         )
         v.dispatch()
 
-    def execute(self, cmd, shell=True, cwd=None, log_error=True):
+    def execute(self, cmd, shell=True, cwd=None, log_error=True,
+                signal='TERM'):
         """Executes a program with arguments.
 
         Args:
@@ -179,6 +182,8 @@
             (default=None).
             log_error: whether to log an error when the command returned a
             non-zero status code (default=True).
+            signal: the signal used to kill the command if the timeout occurs
+            (default TERM).
 
         Returns:
             A tuple containing the return code, the standard output and the
@@ -189,9 +194,12 @@
             (-1, None, None) if the program couldn't be executed at all.
         """
         start_time = time.time()
+        full_cmd = [TIMEOUT_BIN, '-k', '1', '-s', signal, str(self.timeout)]
+        full_cmd.extend(cmd)
+
         try:
             proc = subprocess.Popen(
-                cmd,
+                full_cmd,
                 cwd=cwd,
                 shell=shell,
                 stdout=subprocess.PIPE,
@@ -201,18 +209,30 @@
             stdout = stdout.rstrip('\n')
         except Exception as e:
             self.logger.error("Cannot execute command '%s': %s : %s" %
-                              (cmd, str(e), traceback.format_exc()))
+                              (full_cmd, str(e), traceback.format_exc()))
             return (-1, None, None)
 
         returncode = proc.returncode
 
-        if returncode != 0 and log_error:
-            self.logger.error("Command '%s' failed (return code %d): %s" %
-                              (cmd, returncode, stderr))
+        if returncode != 0:
+            # timeout command returns usually 124 (TERM) or 137 (KILL) when the
+            # timeout occurs.
+            # But for some reason, python subprocess rewrites the return
+            # code with the (negative) signal sent when the the signal is not
+            # catched by the process.
+            if returncode == 124 or returncode < 0:
+                stderr = 'timeout {}s'.format(self.timeout)
+                msg = "Command '{}' timeout {}s".format(cmd, self.timeout)
+            else:
+                msg = "Command '{}' failed (return code {}): {}".format(
+                    cmd, returncode, stderr)
+
+            if log_error:
+                self.logger.error(msg)
         if self.debug:
             elapsedtime = time.time() - start_time
             self.logger.info("Command '%s' returned %s in %0.3fs" %
-                             (cmd, returncode, elapsedtime))
+                             (full_cmd, returncode, elapsedtime))
 
         return (returncode, stdout, stderr)
 
@@ -273,6 +293,7 @@
     def shutdown_callback(self):
         pass
 
+
 class CephBase(Base):
 
     def __init__(self, *args, **kwargs):
diff --git a/collectd/files/plugin/collectd_docker_info.py b/collectd/files/plugin/collectd_docker_info.py
index e0589f1..08b8ca4 100644
--- a/collectd/files/plugin/collectd_docker_info.py
+++ b/collectd/files/plugin/collectd_docker_info.py
@@ -31,6 +31,7 @@
     def __init__(self, *args, **kwargs):
         super(DockerInfoPlugin, self).__init__(*args, **kwargs)
         self.plugin = NAME
+        self.timeout = 3
 
     def itermetrics(self):
         cmd = [DOCKER_BINARY, 'info', '-f', "{{ json .}}"]
diff --git a/collectd/files/plugin/collectd_elasticsearch_node.py b/collectd/files/plugin/collectd_elasticsearch_node.py
index 1ce23fa..2cfc7af 100644
--- a/collectd/files/plugin/collectd_elasticsearch_node.py
+++ b/collectd/files/plugin/collectd_elasticsearch_node.py
@@ -27,21 +27,97 @@
     def __init__(self, *args, **kwargs):
         super(ElasticsearchNodePlugin, self).__init__(*args, **kwargs)
         self.plugin = NAME
+        self._previous = {}
+
+    @staticmethod
+    def _metric(name, values, meta=None):
+        return {'type_instance': name, 'values': values, 'meta': meta or {}}
+
+    def _get_latency(self, name, count, time):
+        cname = '{}_count'.format(name)
+        tname = '{}_time'.format(name)
+        prev_count = self._previous.get(cname)
+        prev_time = self._previous.get(tname)
+        self._previous[cname] = count
+        self._previous[tname] = time
+        if prev_count and prev_time:
+            diff_count = count - prev_count
+            diff_time = time - prev_time
+            return diff_time / diff_count if diff_count > 0 else 0
 
     def itermetrics(self):
         stats = self.query_api('_nodes/_local/stats').get(
             'nodes', {}).values()[0]
-        yield {
-            'type_instance': 'documents',
-            'values': stats['indices']['docs']['count']
-        }
-        yield {
-            'type_instance': 'documents_deleted',
-            'values': stats['indices']['docs']['deleted']
-        }
-        # TODO: collectd more metrics
-        # See https://www.elastic.co/guide/en/elasticsearch/guide/current/
-        # _monitoring_individual_nodes.html
+        indices = stats['indices']
+        yield self._metric('documents', indices['docs']['count'])
+        yield self._metric('documents_deleted', indices['docs']['deleted'])
+        yield self._metric(
+            'indexing_current', indices['indexing']['index_current'])
+        yield self._metric(
+            'indexing_failed', indices['indexing']['index_failed'])
+        indexing_latency = self._get_latency(
+            'indexing', indices['indexing']['index_total'],
+            indices['indexing']['index_time_in_millis'])
+        if indexing_latency:
+            yield self._metric('indexing_latency', indexing_latency)
+        yield self._metric('store_size', indices['store']['size_in_bytes'])
+        fd_open = 0
+        if stats['process']['max_file_descriptors'] > 0:
+            fd_open = 100.0 * stats['process']['open_file_descriptors'] \
+                / stats['process']['max_file_descriptors']
+        yield self._metric('fd_open_percent', fd_open)
+
+        thread_pools = stats['thread_pool']
+        for pool in ('bulk', 'flush', 'search', 'index', 'get'):
+            yield self._metric('thread_pool_queue',
+                               thread_pools[pool]['queue'], {'pool': pool})
+            yield self._metric('thread_pool_rejected',
+                               thread_pools[pool]['rejected'], {'pool': pool})
+            yield self._metric('thread_pool_completed',
+                               thread_pools[pool]['completed'], {'pool': pool})
+        mem = stats['jvm']['mem']
+        yield self._metric('jvm_heap_max', mem['heap_max_in_bytes'])
+        yield self._metric('jvm_heap_used_percent', mem['heap_used_percent'])
+        yield self._metric('jvm_heap_used', mem['heap_used_in_bytes'])
+        for pool, stat in mem['pools'].items():
+            yield self._metric(
+                'jvm_heap_pool', stat['used_in_bytes'], {'pool': pool})
+        gc = stats['jvm']['gc']
+        for pool, stat in gc['collectors'].items():
+            yield self._metric('jvm_gc_count', stat['collection_count'],
+                               {'pool': pool})
+            yield self._metric('jvm_gc_time',
+                               stat['collection_time_in_millis'],
+                               {'pool': pool})
+
+        search = indices['search']
+        for phase in ('query', 'fetch'):
+            yield self._metric('{}_current'.format(phase),
+                               search['{}_current'.format(phase)])
+            latency = self._get_latency(
+                phase,
+                search['{}_total'.format(phase)],
+                search['{}_time_in_millis'.format(phase)])
+            if latency is not None:
+                yield self._metric('{}_latency'.format(phase), latency)
+        yield self._metric('query_count', search['query_total'])
+
+        query = indices['query_cache']
+        yield self._metric('query_cache_size', query['memory_size_in_bytes'])
+        yield self._metric('query_cache_evictions', query['evictions'])
+
+        fielddata = indices['fielddata']
+        yield self._metric('fielddata_size', fielddata['memory_size_in_bytes'])
+        yield self._metric('fielddata_evictions', fielddata['evictions'])
+
+        for operation in ('merges', 'flush', 'refresh'):
+            yield self._metric(operation, indices[operation]['total'])
+            latency = self._get_latency(
+                operation,
+                indices[operation]['total'],
+                indices[operation]['total_time_in_millis'])
+            if latency is not None:
+                yield self._metric('{}_latency'.format(operation), latency)
 
 
 plugin = ElasticsearchNodePlugin(collectd)
diff --git a/collectd/files/plugin/collectd_k8s_get.py b/collectd/files/plugin/collectd_k8s_get.py
new file mode 100644
index 0000000..2d8742a
--- /dev/null
+++ b/collectd/files/plugin/collectd_k8s_get.py
@@ -0,0 +1,192 @@
+#!/usr/bin/python
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if __name__ == '__main__':
+    import collectd_fake as collectd
+else:
+    import collectd
+
+import collectd_base as base
+import requests
+
+NAME = 'k8s'
+INTERVAL = 30
+
+
+class K8sGetPlugin(base.Base):
+    def __init__(self, *args, **kwargs):
+        super(K8sGetPlugin, self).__init__(*args, **kwargs)
+        self._threads = {}
+        self.session = None
+        self.plugin = NAME
+        self.endpoint = None
+        self.verify = False
+        self.client_key = None
+        self.client_certs = None
+
+        self.interval = INTERVAL
+        self.polling_interval = INTERVAL
+
+        self.timeout = 3
+        self.max_retries = 2
+
+    def shutdown_callback(self):
+        for tid, t in self._threads.items():
+            if t.is_alive():
+                self.logger.info('Waiting for {} thread to finish'.format(tid))
+                t.stop()
+                t.join()
+
+    def config_callback(self, config):
+        super(K8sGetPlugin, self).config_callback(config)
+        for node in config.children:
+            if node.key == "Endpoint":
+                self.endpoint = node.values[0]
+            elif node.key == 'Verify':
+                if node.values[0].lower() == 'false':
+                    self.verify = False
+            elif node.key == 'ClientCert':
+                self.client_cert = node.values[0]
+            elif node.key == 'ClientKey':
+                self.client_key = node.values[0]
+
+        session = requests.Session()
+        if self.endpoint.startswith('https'):
+            session.mount(
+                'https://',
+                requests.adapters.HTTPAdapter(max_retries=self.max_retries)
+            )
+        else:
+            session.mount(
+                'http://',
+                requests.adapters.HTTPAdapter(max_retries=self.max_retries)
+            )
+
+        session.verify = self.verify
+        if self.client_cert and self.client_key:
+            session.cert = (self.client_cert, self.client_key)
+        elif self.client_cert:
+            session.cert = self.client_cert
+
+        self.session = session
+
+    def get(self, url):
+
+        def get():
+            try:
+                r = self.session.get(url, timeout=self.timeout)
+                data = r.json()
+            except Exception as e:
+                self.logger.warning("Got exception for '{}': {}".format(
+                    url, e)
+                )
+                raise base.CheckException('Fail to get {}'.self(url))
+
+            else:
+
+                if r.status_code != 200:
+                    msg = ("{} responded with code {} "
+                           "while 200 is expected").format(url, r.status_code)
+                    self.logger.warning(msg)
+                    raise base.CheckException(msg)
+            return data.get('items', [])
+
+        if url not in self._threads:
+            t = base.AsyncPoller(self.collectd,
+                                 get,
+                                 self.polling_interval,
+                                 url)
+            t.start()
+            self._threads[url] = t
+
+        t = self._threads[url]
+        if not t.is_alive():
+            self.logger.warning("Unexpected end of the thread {}".format(
+                t.name))
+            del self._threads[url]
+            return []
+
+        return t.results
+
+    @staticmethod
+    def _check_conditions(conditions, _type):
+        return all(
+            [cnd.get('status') == 'True' for cnd in conditions
+             if cnd.get('type') == _type]
+        )
+
+    def itermetrics(self):
+        nodes = self.get('{}/api/v1/nodes'.format(self.endpoint))
+        total, total_ready = (0, 0)
+        for node in nodes:
+            self.logger.debug(node.get('metadata', {}).get('name'))
+            conditions = node.get(
+                'status', {}).get('conditions', [])
+            if self._check_conditions(conditions, _type='Ready'):
+                total_ready += 1
+            total += 1
+        if total > 0:
+            yield {'values': (100.0 * (total - total_ready)) / total,
+                   'plugin_instance': 'nodes_percent',
+                   'meta': {'status': 'not_ready',
+                            'discard_hostname': True},
+                   }
+
+        yield {'values': total_ready,
+               'plugin_instance': 'nodes',
+               'meta': {'status': 'ready', 'discard_hostname': True},
+               }
+        yield {'values': total - total_ready,
+               'plugin_instance': 'nodes',
+               'meta': {'status': 'not_ready', 'discard_hostname': True},
+               }
+        yield {'values': total,
+               'plugin_instance': 'nodes_total',
+               'meta': {'discard_hostname': True}
+               }
+
+
+plugin = K8sGetPlugin(collectd, disable_check_metric=True)
+
+
+def config_callback(conf):
+    plugin.config_callback(conf)
+
+
+def notification_callback(notification):
+    plugin.notification_callback(notification)
+
+
+def read_callback():
+    plugin.conditional_read_callback()
+
+
+if __name__ == '__main__':
+    plugin.endpoint = 'https://172.16.10.253:443'
+    plugin.verify = False
+    plugin.client_key = '/etc/kubernetes/ssl/kubelet-client.key'
+    plugin.client_cert = '/etc/kubernetes/ssl/kubelet-client.crt'
+
+    collectd.load_configuration(plugin)
+    plugin.read_callback()
+    import time
+    time.sleep(base.INTERVAL)
+    plugin.read_callback()
+    plugin.shutdown_callback()
+else:
+    collectd.register_config(config_callback)
+    collectd.register_notification(notification_callback)
+    collectd.register_read(read_callback, base.INTERVAL)
+
diff --git a/collectd/files/plugin/collectd_k8s_kubectl_get.py b/collectd/files/plugin/collectd_k8s_kubectl_get.py
deleted file mode 100644
index 1333120..0000000
--- a/collectd/files/plugin/collectd_k8s_kubectl_get.py
+++ /dev/null
@@ -1,149 +0,0 @@
-#!/usr/bin/python
-# Copyright 2017 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if __name__ == '__main__':
-    import collectd_fake as collectd
-else:
-    import collectd
-import json
-import time
-
-import collectd_base as base
-
-NAME = 'k8s'
-KUBECTL_BINARY = '/usr/bin/kubectl'
-INTERVAL = 30
-
-
-class K8sPlugin(base.Base):
-
-    def __init__(self, *args, **kwargs):
-        super(K8sPlugin, self).__init__(*args, **kwargs)
-        self.plugin = NAME
-        self._threads = {}
-        self.polling_interval = INTERVAL
-        self.resources = []
-        self._get_nodes = False
-
-    def shutdown_callback(self):
-        for tid, t in self._threads.items():
-            if t.is_alive():
-                self.logger.info('Waiting for {} thread to finish'.format(tid))
-                t.stop()
-                t.join()
-
-    def config_callback(self, config):
-        super(K8sPlugin, self).config_callback(config)
-        for node in config.children:
-            if node.key == 'PollingInterval':
-                self.polling_interval = int(node.values[0])
-            elif node.key == 'GetNodes':
-                if node.values[0].lower() == 'true':
-                    self._get_nodes = True
-
-    def kubectl_get(self, resource):
-
-        def kubectl_poller():
-            cmd = [KUBECTL_BINARY, 'get', '-o', 'json', resource]
-            data = self.execute_to_json(cmd, shell=False, log_error=True)
-            return data.get('items', [])
-
-        if resource not in self._threads:
-            t = base.AsyncPoller(self.collectd,
-                                 kubectl_poller,
-                                 self.polling_interval,
-                                 resource)
-            t.start()
-            self._threads[resource] = t
-
-        t = self._threads[resource]
-        if not t.is_alive():
-            self.logger.warning("Unexpected end of the thread {}".format(
-                t.name))
-            del self._threads[resource]
-            return []
-
-        return t.results
-
-    @staticmethod
-    def _check_conditions(conditions, _type):
-        return all(
-            [cnd.get('status') == 'True' for cnd in conditions
-             if cnd.get('type') == _type]
-        )
-
-    def _iter_node_metrics(self, nodes):
-        if nodes:
-            total, total_ready = (0, 0)
-            for node in nodes:
-                self.logger.debug(node.get('metadata', {}).get('name'))
-                conditions = node.get(
-                    'status', {}).get('conditions', [])
-                if self._check_conditions(conditions, _type='Ready'):
-                    total_ready += 1
-                total += 1
-            if total > 0:
-                yield {'values': (100.0 * (total - total_ready)) / total,
-                       'plugin_instance': 'nodes_percent',
-                       'meta': {'status': 'not_ready'},
-                       }
-
-            yield {'values': total_ready,
-                   'plugin_instance': 'nodes',
-                   'meta': {'status': 'ready'},
-                   }
-            yield {'values': total - total_ready,
-                   'plugin_instance': 'nodes',
-                   'meta': {'status': 'not_ready'},
-                   }
-            yield {'values': total,
-                   'plugin_instance': 'nodes_total'
-                   }
-
-    def itermetrics(self):
-        if self._get_nodes:
-            items = self.kubectl_get('nodes')
-            return self._iter_node_metrics(items)
-
-
-plugin = K8sPlugin(collectd, disable_check_metric=True)
-
-
-def init_callback():
-    plugin.restore_sigchld()
-
-
-def config_callback(conf):
-    plugin.config_callback(conf)
-
-
-def read_callback():
-    plugin.read_callback()
-
-if __name__ == '__main__':
-    collectd.load_configuration(plugin)
-    plugin._get_nodes = True
-    plugin.read_callback()
-    collectd.info('Sleeping for {}s'.format(INTERVAL))
-    time.sleep(INTERVAL)
-    plugin.read_callback()
-    plugin.shutdown_callback()
-else:
-    collectd.register_init(init_callback)
-    collectd.register_config(config_callback)
-    collectd.register_read(read_callback, INTERVAL)
-
-
-
diff --git a/collectd/files/plugin/hypervisor_stats.py b/collectd/files/plugin/hypervisor_stats.py
index 5fc3bdb..7d1696f 100644
--- a/collectd/files/plugin/hypervisor_stats.py
+++ b/collectd/files/plugin/hypervisor_stats.py
@@ -140,6 +140,7 @@
                     'meta': {
                         'aggregate': agg,
                         'aggregate_id': agg_id,
+                        'meta': {'discard_hostname': True}
                     }
                 }
         # Dispatch the global metrics
@@ -147,6 +148,7 @@
             yield {
                 'type_instance': 'total_{}'.format(k),
                 'values': v,
+                'meta': {'discard_hostname': True}
             }
 
 plugin = HypervisorStatsPlugin(collectd, PLUGIN_NAME,
diff --git a/collectd/files/plugin/openstack_neutron.py b/collectd/files/plugin/openstack_neutron.py
index 1d147c7..a297b94 100644
--- a/collectd/files/plugin/openstack_neutron.py
+++ b/collectd/files/plugin/openstack_neutron.py
@@ -75,7 +75,7 @@
         yield {
             'plugin_instance': 'networks',
             'type_instance': 'total',
-            'values': len(status),
+            'values': len(networks),
             'meta': {'discard_hostname': True},
         }
 
diff --git a/collectd/map.jinja b/collectd/map.jinja
index 1077513..650c640 100644
--- a/collectd/map.jinja
+++ b/collectd/map.jinja
@@ -51,3 +51,15 @@
         'automatic_starting': True,
     }
 }, merge=salt['pillar.get']('collectd:remote_client')) %}
+
+{%- set service_grains = {'collectd': {'remote_plugin': {}, 'local_plugin': {}}} %}
+{%- for service_name, service in pillar.items() %}
+  {%- if service.get('_support', {}).get('collectd', {}).get('enabled', False) %}
+    {%- set grains_fragment_file = service_name+'/meta/collectd.yml' %}
+    {%- macro load_grains_file() %}{% include grains_fragment_file ignore missing %}{% endmacro %}
+    {%- set grains_yaml = load_grains_file()|load_yaml %}
+    {%- if grains_yaml is mapping %}
+      {%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge={'collectd': grains_yaml}) %}
+    {%- endif %}
+  {%- endif %}
+{%- endfor %}
diff --git a/collectd/meta/salt.yml b/collectd/meta/salt.yml
new file mode 100644
index 0000000..30f0f1f
--- /dev/null
+++ b/collectd/meta/salt.yml
@@ -0,0 +1,6 @@
+grain:
+  {%- if pillar.collectd.client is defined %}
+  {%- from "collectd/map.jinja" import service_grains with context -%}
+  collectd:
+    {{ service_grains|yaml(False)|indent(4) }}
+  {%- endif %}
diff --git a/tests/pillar/client.sls b/tests/pillar/client.sls
index b970e72..93bd7ad 100644
--- a/tests/pillar/client.sls
+++ b/tests/pillar/client.sls
@@ -1,4 +1,3 @@
-
 collectd:
   client:
     enabled: true
@@ -8,4 +7,7 @@
         engine: carbon
         host: 127.0.0.1
         port: 2023
-
+linux:
+  system:
+    name: hostname
+    domain: domain