Poll OpenStack resources in background

The collectd plugin spawns threads responsible for polling APIs.

Change-Id: I5d91535bc07a6af2d8f659b9381c225405dba33e
diff --git a/collectd/files/plugin/collectd_base.py b/collectd/files/plugin/collectd_base.py
index c59149f..7923274 100644
--- a/collectd/files/plugin/collectd_base.py
+++ b/collectd/files/plugin/collectd_base.py
@@ -17,6 +17,7 @@
 import json
 import signal
 import subprocess
+import threading
 import time
 import traceback
 
@@ -271,3 +272,47 @@
             if node.key == "Cluster":
                 self.cluster = node.values[0]
         self.plugin_instance = self.cluster
+
+
+class AsyncPoller(threading.Thread):
+    """Execute an independant thread to execute a function periodically
+
+       Args:
+           collectd: used for logging
+           polling_function: a function to execute periodically
+           interval: the interval in second
+           name: (optional) the name of the thread
+    """
+
+    def __init__(self, collectd, polling_function, interval, name=None):
+        super(AsyncPoller, self).__init__(name=name)
+        self.collectd = collectd
+        self.polling_function = polling_function
+        self.interval = interval
+        self._results = None
+
+    def run(self):
+        self.collectd.info('Starting thread {}'.format(self.name))
+        while True:
+            try:
+                started_at = time.time()
+
+                self._results = self.polling_function()
+
+                tosleep = self.interval - (time.time() - started_at)
+                if tosleep > 0:
+                    time.sleep(tosleep)
+                else:
+                    self.collectd.warning(
+                        'Polling took more than {}s for {}'.format(
+                            self.interval, self.name
+                        )
+                    )
+
+            except Exception as e:
+                self._results = None
+                self.collectd.error('{} fails: {}'.format(self.name, e))
+                time.sleep(10)
+
+    def get_results(self):
+        return self._results
diff --git a/collectd/files/plugin/collectd_openstack.py b/collectd/files/plugin/collectd_openstack.py
index a4ec4de..a8fa999 100644
--- a/collectd/files/plugin/collectd_openstack.py
+++ b/collectd/files/plugin/collectd_openstack.py
@@ -163,7 +163,9 @@
         self.max_retries = 2
         self.os_client = None
         self.extra_config = {}
+        self._threads = {}
         self.pagination_limit = None
+        self.polling_interval = 60
 
     def _build_url(self, service, resource):
         s = (self.get_service(service) or {})
@@ -278,6 +280,9 @@
                 keystone_url = node.values[0]
             elif node.key == 'PaginationLimit':
                 self.pagination_limit = int(node.values[0])
+            elif node.key == 'PollingInterval':
+                self.polling_interval = int(node.values[0])
+
         self.os_client = OSClient(username, password, tenant_name,
                                   keystone_url, self.timeout, self.logger,
                                   self.max_retries)
@@ -302,46 +307,74 @@
         if detail:
             resource = '{}/detail'.format(resource)
 
-        url = self._build_url(project, resource)
-        if not url:
-            return
-
         opts = {}
         if self.pagination_limit:
             opts['limit'] = self.pagination_limit
 
         opts.update(params)
-        objs = []
 
-        while True:
-            r = self.os_client.make_request('get', url, params=opts)
-            if not r or object_name not in r.json():
-                self.logger.warning('Could not find %s %s' % (project,
-                                                              object_name))
-                return objs
+        def openstack_api_poller():
+            _objects = []
+            _opts = {}
+            _opts.update(opts)
+            while True:
+                r = self.get(project, resource, params=_opts)
+                if not r or object_name not in r.json():
+                    if r is None:
+                        err = ''
+                    else:
+                        err = r.text
+                    self.collectd.warning('Could not find {}: {} {}'.format(
+                        project, object_name, err
+                    ))
+                    # Avoid to provide incomplete data by reseting current
+                    # set.
+                    _objects = []
+                    break
 
-            resp = r.json()
-            bulk_objs = resp.get(object_name)
+                resp = r.json()
+                bulk_objs = resp.get(object_name)
+                if not bulk_objs:
+                    # emtpy list
+                    break
 
-            if not bulk_objs:
-                break
+                _objects.extend(bulk_objs)
 
-            objs.extend(bulk_objs)
+                links = resp.get('{}_links'.format(object_name))
+                if links is None or self.pagination_limit is None:
+                    # Either the pagination is not supported or there is
+                    # no more data
+                    # In both cases, we got at this stage all the data we
+                    # can have.
+                    break
 
-            links = resp.get('{}_links'.format(object_name))
-            if links is None or self.pagination_limit is None:
-                # Either the pagination is not supported or there is no more
-                # data
-                break
+                # if there is no 'next' link in the response, all data has
+                # been read.
+                if len([i for i in links if i.get('rel') == 'next']) == 0:
+                    break
 
-            # if there is no 'next' link in the response, all data has been
-            # read.
-            if len([i for i in links if i.get('rel') == 'next']) == 0:
-                break
+                _opts['marker'] = bulk_objs[-1]['id']
 
-            opts['marker'] = bulk_objs[-1]['id']
+            return _objects
 
-        return objs
+        poller_id = '{}:{}'.format(project, resource)
+        if poller_id not in self._threads:
+            t = base.AsyncPoller(self.collectd,
+                                 openstack_api_poller,
+                                 self.polling_interval,
+                                 poller_id)
+            t.start()
+            self._threads[poller_id] = t
+
+        t = self._threads[poller_id]
+        if not t.is_alive():
+            self.logger.warning("Unexpected end of the thread {}".format(
+                t.name))
+            del self._threads[poller_id]
+            return []
+
+        results = t.get_results()
+        return [] if results is None else results
 
     def count_objects_group_by(self,
                                list_object,