Merge "netlink plugin for redhat family oses"
diff --git a/collectd/files/collectd_http_check.conf b/collectd/files/collectd_http_check.conf
index 8c3673a..c9e418e 100644
--- a/collectd/files/collectd_http_check.conf
+++ b/collectd/files/collectd_http_check.conf
@@ -1,15 +1,25 @@
 {%- if plugin.get('url', {})|length > 0 %}
-Import "http_check"
+Import "collectd_http_check"
 
-<Module "http_check">
+<Module "collectd_http_check">
   MaxRetries "3"
   Timeout "1"
+  PollingInterval "{{ plugin.polling_interval|default("10") }}"
   {%- for name, params in plugin.url.iteritems() %}
   ExpectedCode "{{ name }}" "{{ params.expected_code }}"
   Url "{{ name }}" "{{ params.url }}"
   {%- if params.get('expected_content') %}
   ExpectedContent "{{ name }}" "{{ params.expected_content|replace('"','\\"') }}"
   {%- endif %}
+  {%- if params.verify is defined %}
+  Verify "{{ name }}" "{{ params.verify }}"
+  {%- endif %}
+  {%- if params.get('client_cert') %}
+  ClientCert "{{ name }}" "{{ params.client_cert }}"
+  {%- endif %}
+  {%- if params.get('client_key') %}
+  ClientKey "{{ name }}" "{{ params.client_key }}"
+  {%- endif %}
   {%- endfor %}
 </Module>
 {%- endif %}
diff --git a/collectd/files/plugin/collectd_base.py b/collectd/files/plugin/collectd_base.py
index 7959e75..6643693 100644
--- a/collectd/files/plugin/collectd_base.py
+++ b/collectd/files/plugin/collectd_base.py
@@ -139,7 +139,7 @@
                 'meta':   {'tagA': 'valA'}}
             {'type': 'dropped_bytes', 'values': [1,2]}
         """
-        raise NotImplemented("Must be implemented by the subclass!")
+        raise NotImplementedError("Must be implemented by the subclass!")
 
     def dispatch_metric(self, metric):
         values = metric['values']
diff --git a/collectd/files/plugin/collectd_http_check.py b/collectd/files/plugin/collectd_http_check.py
new file mode 100644
index 0000000..1ad78c5
--- /dev/null
+++ b/collectd/files/plugin/collectd_http_check.py
@@ -0,0 +1,182 @@
+#!/usr/bin/python
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if __name__ == '__main__':
+    import collectd_fake as collectd
+else:
+    import collectd
+
+import collectd_base as base
+import requests
+
+NAME = 'http_check'
+
+
+class HTTPCheckPlugin(base.Base):
+
+    def __init__(self, *args, **kwargs):
+        super(HTTPCheckPlugin, self).__init__(*args, **kwargs)
+        self.plugin = NAME
+        self.urls = {}
+        self.expected_codes = {}
+        self.expected_contents = {}
+        self.verify = {}
+        self.client_keys = {}
+        self.client_certs = {}
+
+        self.timeout = 3
+        self.max_retries = 2
+
+        self.interval = base.INTERVAL
+        self.polling_interval = base.INTERVAL
+
+        self.sessions = {}
+        self._threads = {}
+
+    def shutdown_callback(self):
+        for tid, t in self._threads.items():
+            if t.is_alive():
+                self.logger.info('Waiting for {} thread to finish'.format(tid))
+                t.stop()
+                t.join()
+
+    def config_callback(self, config):
+        super(HTTPCheckPlugin, self).config_callback(config)
+        for node in config.children:
+            if node.key == "Url":
+                self.urls[node.values[0]] = node.values[1]
+            elif node.key == 'ExpectedCode':
+                self.expected_codes[node.values[0]] = int(node.values[1])
+            elif node.key == 'ExpectedContent':
+                self.expected_contents[node.values[0]] = node.values[1]
+            elif node.key == 'Verify':
+                if node.values[1].lower() == 'false':
+                    self.verify[node.values[0]] = False
+                else:
+                    self.verify[node.values[0]] = node.values[1]
+            elif node.key == 'ClientCert':
+                self.client_certs[node.values[0]] = node.values[1]
+            elif node.key == 'ClientKey':
+                self.client_keys[node.values[0]] = node.values[1]
+
+        for name, url in self.urls.items():
+            session = requests.Session()
+            session.mount(
+                'http://',
+                requests.adapters.HTTPAdapter(max_retries=self.max_retries)
+            )
+            if url.startswith('https'):
+                session.mount(
+                    'https://',
+                    requests.adapters.HTTPAdapter(max_retries=self.max_retries)
+                )
+
+                session.verify = self.verify.get(name, True)
+                if self.client_certs.get(name) and self.client_keys.get(name):
+                    session.cert = (self.client_certs.get(name), self.client_keys.get(name))
+                elif self.client_certs.get(name):
+                    session.cert = self.client_certs.get(name)
+
+            self.sessions[name] = session
+
+    def check_url(self, name, url):
+
+        def get():
+            try:
+                r = self.sessions[name].get(url, timeout=self.timeout)
+            except Exception as e:
+                self.logger.warning("Got exception for '{}': {}".format(
+                    url, e)
+                )
+                status = self.FAIL
+            else:
+
+                expected_code = self.expected_codes.get(name, 200)
+                if r.status_code != expected_code:
+                    self.logger.warning(
+                        ("{} ({}) responded with code {} "
+                         "while {} is expected").format(name, url,
+                                                        r.status_code,
+                                                        expected_code))
+                    status = self.FAIL
+                else:
+                    self.logger.debug(
+                        "Got response from {}: '{}'".format(url, r.content))
+                    status = self.OK
+                    expected_content = self.expected_contents.get(name)
+                    if expected_content:
+                        if r.content != expected_content:
+                            status = self.FAIL
+                            self.logger.warning(
+                                'Content "{}" does not match "{}"'.format(
+                                    r.content[0:30], expected_content
+                                ))
+            return [status]
+
+        if url not in self._threads:
+            t = base.AsyncPoller(self.collectd,
+                                 get,
+                                 self.polling_interval,
+                                 url)
+            t.start()
+            self._threads[url] = t
+
+        t = self._threads[url]
+        if not t.is_alive():
+            self.logger.warning("Unexpected end of the thread {}".format(
+                t.name))
+            del self._threads[url]
+            return []
+
+        return t.results
+
+    def itermetrics(self):
+        for name, url in self.urls.items():
+            r = self.check_url(name, url)
+            if r:
+                yield {'type_instance': name, 'values': r}
+
+
+plugin = HTTPCheckPlugin(collectd, disable_check_metric=True)
+
+
+def config_callback(conf):
+    plugin.config_callback(conf)
+
+
+def notification_callback(notification):
+    plugin.notification_callback(notification)
+
+
+def read_callback():
+    plugin.conditional_read_callback()
+
+
+if __name__ == '__main__':
+    plugin.urls['google_ok'] = 'https://www.google.com'
+    plugin.urls['google_fail'] = 'https://www.google.com/not_found'
+    plugin.urls['no_network'] = 'https://127.0.0.2:999'
+    plugin.expected_codes['google_ok'] = 200
+    plugin.expected_codes['google_fail'] = 200
+    collectd.load_configuration(plugin)
+    plugin.read_callback()
+    import time
+    time.sleep(base.INTERVAL)
+    plugin.read_callback()
+    plugin.shutdown_callback()
+else:
+    collectd.register_config(config_callback)
+    collectd.register_notification(notification_callback)
+    collectd.register_read(read_callback, base.INTERVAL)
diff --git a/collectd/files/plugin/collectd_k8s_kubectl_get.py b/collectd/files/plugin/collectd_k8s_kubectl_get.py
new file mode 100644
index 0000000..1333120
--- /dev/null
+++ b/collectd/files/plugin/collectd_k8s_kubectl_get.py
@@ -0,0 +1,149 @@
+#!/usr/bin/python
+# Copyright 2017 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if __name__ == '__main__':
+    import collectd_fake as collectd
+else:
+    import collectd
+import json
+import time
+
+import collectd_base as base
+
+NAME = 'k8s'
+KUBECTL_BINARY = '/usr/bin/kubectl'
+INTERVAL = 30
+
+
+class K8sPlugin(base.Base):
+
+    def __init__(self, *args, **kwargs):
+        super(K8sPlugin, self).__init__(*args, **kwargs)
+        self.plugin = NAME
+        self._threads = {}
+        self.polling_interval = INTERVAL
+        self.resources = []
+        self._get_nodes = False
+
+    def shutdown_callback(self):
+        for tid, t in self._threads.items():
+            if t.is_alive():
+                self.logger.info('Waiting for {} thread to finish'.format(tid))
+                t.stop()
+                t.join()
+
+    def config_callback(self, config):
+        super(K8sPlugin, self).config_callback(config)
+        for node in config.children:
+            if node.key == 'PollingInterval':
+                self.polling_interval = int(node.values[0])
+            elif node.key == 'GetNodes':
+                if node.values[0].lower() == 'true':
+                    self._get_nodes = True
+
+    def kubectl_get(self, resource):
+
+        def kubectl_poller():
+            cmd = [KUBECTL_BINARY, 'get', '-o', 'json', resource]
+            data = self.execute_to_json(cmd, shell=False, log_error=True)
+            return data.get('items', [])
+
+        if resource not in self._threads:
+            t = base.AsyncPoller(self.collectd,
+                                 kubectl_poller,
+                                 self.polling_interval,
+                                 resource)
+            t.start()
+            self._threads[resource] = t
+
+        t = self._threads[resource]
+        if not t.is_alive():
+            self.logger.warning("Unexpected end of the thread {}".format(
+                t.name))
+            del self._threads[resource]
+            return []
+
+        return t.results
+
+    @staticmethod
+    def _check_conditions(conditions, _type):
+        return all(
+            [cnd.get('status') == 'True' for cnd in conditions
+             if cnd.get('type') == _type]
+        )
+
+    def _iter_node_metrics(self, nodes):
+        if nodes:
+            total, total_ready = (0, 0)
+            for node in nodes:
+                self.logger.debug(node.get('metadata', {}).get('name'))
+                conditions = node.get(
+                    'status', {}).get('conditions', [])
+                if self._check_conditions(conditions, _type='Ready'):
+                    total_ready += 1
+                total += 1
+            if total > 0:
+                yield {'values': (100.0 * (total - total_ready)) / total,
+                       'plugin_instance': 'nodes_percent',
+                       'meta': {'status': 'not_ready'},
+                       }
+
+            yield {'values': total_ready,
+                   'plugin_instance': 'nodes',
+                   'meta': {'status': 'ready'},
+                   }
+            yield {'values': total - total_ready,
+                   'plugin_instance': 'nodes',
+                   'meta': {'status': 'not_ready'},
+                   }
+            yield {'values': total,
+                   'plugin_instance': 'nodes_total'
+                   }
+
+    def itermetrics(self):
+        if self._get_nodes:
+            items = self.kubectl_get('nodes')
+            return self._iter_node_metrics(items)
+
+
+plugin = K8sPlugin(collectd, disable_check_metric=True)
+
+
+def init_callback():
+    plugin.restore_sigchld()
+
+
+def config_callback(conf):
+    plugin.config_callback(conf)
+
+
+def read_callback():
+    plugin.read_callback()
+
+if __name__ == '__main__':
+    collectd.load_configuration(plugin)
+    plugin._get_nodes = True
+    plugin.read_callback()
+    collectd.info('Sleeping for {}s'.format(INTERVAL))
+    time.sleep(INTERVAL)
+    plugin.read_callback()
+    plugin.shutdown_callback()
+else:
+    collectd.register_init(init_callback)
+    collectd.register_config(config_callback)
+    collectd.register_read(read_callback, INTERVAL)
+
+
+
diff --git a/collectd/files/plugin/collectd_openstack.py b/collectd/files/plugin/collectd_openstack.py
index f0ecbfc..a8a72b7 100644
--- a/collectd/files/plugin/collectd_openstack.py
+++ b/collectd/files/plugin/collectd_openstack.py
@@ -44,13 +44,14 @@
     """
     EXPIRATION_TOKEN_DELTA = datetime.timedelta(0, 30)
 
-    def __init__(self, username, password, tenant, keystone_url, timeout,
-                 logger, max_retries):
+    def __init__(self, username, password, tenant, keystone_url, region,
+                 timeout, logger, max_retries):
         self.logger = logger
         self.username = username
         self.password = password
         self.tenant_name = tenant
         self.keystone_url = keystone_url
+        self.region = region
         self.service_catalog = []
         self.tenant_id = None
         self.timeout = timeout
@@ -108,6 +109,9 @@
         self.service_catalog = []
         for item in data['access']['serviceCatalog']:
             endpoint = item['endpoints'][0]
+            if self.region and self.region != endpoint['region']:
+                continue
+
             self.service_catalog.append({
                 'name': item['name'],
                 'region': endpoint['region'],
@@ -169,6 +173,7 @@
         self.password = None
         self.tenant_name = None
         self.keystone_url = None
+        self.region = None
         self.os_client = None
         self.extra_config = {}
         self._threads = {}
@@ -287,6 +292,8 @@
                 self.tenant_name = node.values[0]
             elif node.key == 'KeystoneUrl':
                 self.keystone_url = node.values[0]
+            elif node.key == 'Region':
+                self.region = node.values[0]
             elif node.key == 'PaginationLimit':
                 self.pagination_limit = int(node.values[0])
             elif node.key == 'PollingInterval':
@@ -303,7 +310,8 @@
 
         self.os_client = OSClient(self.username, self.password,
                                   self.tenant_name, self.keystone_url,
-                                  self.timeout, self.logger, self.max_retries)
+                                  self.region, self.timeout, self.logger,
+                                  self.max_retries)
 
     def get_objects(self, project, object_name, api_version='',
                     params=None, detail=False, since=False):
@@ -368,20 +376,20 @@
 
                 _objects.extend(bulk_objs)
 
-                links = resp.get('{}_links'.format(object_name))
-                if links is None or self.pagination_limit is None:
-                    # Either the pagination is not supported or there is
-                    # no more data
-                    # In both cases, we got at this stage all the data we
-                    # can have.
+                if self.pagination_limit is None:
                     break
 
-                # if there is no 'next' link in the response, all data has
-                # been read.
-                if len([i for i in links if i.get('rel') == 'next']) == 0:
-                    break
+                links = resp.get('{}_links'.format(object_name), [])
+                # Glance has not <object>_links section but a 'next' item
+                has_next = len(
+                    [i for i in links if i.get('rel') == 'next']) > 0 or \
+                    resp.get('next')
 
-                _opts['marker'] = bulk_objs[-1]['id']
+                if has_next:
+                    _opts['marker'] = bulk_objs[-1]['id']
+                else:
+                    # all data has been read
+                    break
 
             if not has_failure:
                 self._last_run = last_run
diff --git a/collectd/files/plugin/http_check.py b/collectd/files/plugin/http_check.py
deleted file mode 100644
index 1002928..0000000
--- a/collectd/files/plugin/http_check.py
+++ /dev/null
@@ -1,114 +0,0 @@
-#!/usr/bin/python
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if __name__ == '__main__':
-    import collectd_fake as collectd
-else:
-    import collectd
-
-import collectd_base as base
-import requests
-
-NAME = 'http_check'
-
-
-class HTTPCheckPlugin(base.Base):
-
-    def __init__(self, *args, **kwargs):
-        super(HTTPCheckPlugin, self).__init__(*args, **kwargs)
-        self.plugin = NAME
-        self.session = requests.Session()
-        self.session.mount(
-            'http://',
-            requests.adapters.HTTPAdapter(max_retries=self.max_retries)
-        )
-        self.session.mount(
-            'https://',
-            requests.adapters.HTTPAdapter(max_retries=self.max_retries)
-        )
-        self.urls = {}
-        self.expected_codes = {}
-        self.expected_contents = {}
-
-    def config_callback(self, config):
-        super(HTTPCheckPlugin, self).config_callback(config)
-        for node in config.children:
-            if node.key == "Url":
-                self.urls[node.values[0]] = node.values[1]
-            elif node.key == 'ExpectedCode':
-                self.expected_codes[node.values[0]] = int(node.values[1])
-            elif node.key == 'ExpectedContent':
-                self.expected_contents[node.values[0]] = node.values[1]
-
-    def itermetrics(self):
-        for name, url in self.urls.items():
-            try:
-                r = self.session.get(url, timeout=self.timeout)
-            except Exception as e:
-                self.logger.warning("Got exception for '{}': {}".format(
-                    url, e)
-                )
-                status = self.FAIL
-            else:
-
-                expected_code = self.expected_codes.get(name, 200)
-                if r.status_code != expected_code:
-                    self.logger.warning(
-                        ("{} ({}) responded with code {} "
-                         "while {} is expected").format(name, url,
-                                                        r.status_code,
-                                                        expected_code))
-                    status = self.FAIL
-                else:
-                    self.logger.debug(
-                        "Got response from {}: '{}'".format(url, r.content))
-                    status = self.OK
-                    expected_content = self.expected_contents.get(name)
-                    if expected_content:
-                        if r.content != expected_content:
-                            status = self.FAIL
-                            self.logger.warning(
-                                'Content "{}" does not match "{}"'.format(
-                                    r.content[0:30], expected_content
-                                ))
-
-            yield {'type_instance': name, 'values': status }
-
-
-plugin = HTTPCheckPlugin(collectd, disable_check_metric=True)
-
-
-def config_callback(conf):
-    plugin.config_callback(conf)
-
-
-def notification_callback(notification):
-    plugin.notification_callback(notification)
-
-
-def read_callback():
-    plugin.conditional_read_callback()
-
-
-if __name__ == '__main__':
-    plugin.urls['google_ok'] = 'https://www.google.com'
-    plugin.urls['google_fail'] = 'https://www.google.com/not_found'
-    plugin.expected_codes['google_ok'] = 200
-    plugin.expected_codes['github_fail'] = 200
-    plugin.read_callback()
-else:
-    collectd.register_config(config_callback)
-    collectd.register_notification(notification_callback)
-    collectd.register_read(read_callback, base.INTERVAL)
diff --git a/collectd/meta/collectd.yml b/collectd/meta/collectd.yml
index c8391c0..71731ae 100644
--- a/collectd/meta/collectd.yml
+++ b/collectd/meta/collectd.yml
@@ -57,3 +57,9 @@
     plugin: python
     template: collectd/files/collectd_http_check.conf
     url: {}
+
+remote_plugin:
+  collectd_http_check:
+    plugin: python
+    template: collectd/files/collectd_http_check.conf
+    url: {}