Poll OpenStack resources in background
The collectd plugin spawns threads responsible for polling APIs.
Change-Id: I5d91535bc07a6af2d8f659b9381c225405dba33e
diff --git a/collectd/files/plugin/collectd_base.py b/collectd/files/plugin/collectd_base.py
index c59149f..7923274 100644
--- a/collectd/files/plugin/collectd_base.py
+++ b/collectd/files/plugin/collectd_base.py
@@ -17,6 +17,7 @@
import json
import signal
import subprocess
+import threading
import time
import traceback
@@ -271,3 +272,47 @@
if node.key == "Cluster":
self.cluster = node.values[0]
self.plugin_instance = self.cluster
+
+
+class AsyncPoller(threading.Thread):
+ """Execute an independant thread to execute a function periodically
+
+ Args:
+ collectd: used for logging
+ polling_function: a function to execute periodically
+ interval: the interval in second
+ name: (optional) the name of the thread
+ """
+
+ def __init__(self, collectd, polling_function, interval, name=None):
+ super(AsyncPoller, self).__init__(name=name)
+ self.collectd = collectd
+ self.polling_function = polling_function
+ self.interval = interval
+ self._results = None
+
+ def run(self):
+ self.collectd.info('Starting thread {}'.format(self.name))
+ while True:
+ try:
+ started_at = time.time()
+
+ self._results = self.polling_function()
+
+ tosleep = self.interval - (time.time() - started_at)
+ if tosleep > 0:
+ time.sleep(tosleep)
+ else:
+ self.collectd.warning(
+ 'Polling took more than {}s for {}'.format(
+ self.interval, self.name
+ )
+ )
+
+ except Exception as e:
+ self._results = None
+ self.collectd.error('{} fails: {}'.format(self.name, e))
+ time.sleep(10)
+
+ def get_results(self):
+ return self._results
diff --git a/collectd/files/plugin/collectd_openstack.py b/collectd/files/plugin/collectd_openstack.py
index a4ec4de..a8fa999 100644
--- a/collectd/files/plugin/collectd_openstack.py
+++ b/collectd/files/plugin/collectd_openstack.py
@@ -163,7 +163,9 @@
self.max_retries = 2
self.os_client = None
self.extra_config = {}
+ self._threads = {}
self.pagination_limit = None
+ self.polling_interval = 60
def _build_url(self, service, resource):
s = (self.get_service(service) or {})
@@ -278,6 +280,9 @@
keystone_url = node.values[0]
elif node.key == 'PaginationLimit':
self.pagination_limit = int(node.values[0])
+ elif node.key == 'PollingInterval':
+ self.polling_interval = int(node.values[0])
+
self.os_client = OSClient(username, password, tenant_name,
keystone_url, self.timeout, self.logger,
self.max_retries)
@@ -302,46 +307,74 @@
if detail:
resource = '{}/detail'.format(resource)
- url = self._build_url(project, resource)
- if not url:
- return
-
opts = {}
if self.pagination_limit:
opts['limit'] = self.pagination_limit
opts.update(params)
- objs = []
- while True:
- r = self.os_client.make_request('get', url, params=opts)
- if not r or object_name not in r.json():
- self.logger.warning('Could not find %s %s' % (project,
- object_name))
- return objs
+ def openstack_api_poller():
+ _objects = []
+ _opts = {}
+ _opts.update(opts)
+ while True:
+ r = self.get(project, resource, params=_opts)
+ if not r or object_name not in r.json():
+ if r is None:
+ err = ''
+ else:
+ err = r.text
+ self.collectd.warning('Could not find {}: {} {}'.format(
+ project, object_name, err
+ ))
+ # Avoid to provide incomplete data by reseting current
+ # set.
+ _objects = []
+ break
- resp = r.json()
- bulk_objs = resp.get(object_name)
+ resp = r.json()
+ bulk_objs = resp.get(object_name)
+ if not bulk_objs:
+ # emtpy list
+ break
- if not bulk_objs:
- break
+ _objects.extend(bulk_objs)
- objs.extend(bulk_objs)
+ links = resp.get('{}_links'.format(object_name))
+ if links is None or self.pagination_limit is None:
+ # Either the pagination is not supported or there is
+ # no more data
+ # In both cases, we got at this stage all the data we
+ # can have.
+ break
- links = resp.get('{}_links'.format(object_name))
- if links is None or self.pagination_limit is None:
- # Either the pagination is not supported or there is no more
- # data
- break
+ # if there is no 'next' link in the response, all data has
+ # been read.
+ if len([i for i in links if i.get('rel') == 'next']) == 0:
+ break
- # if there is no 'next' link in the response, all data has been
- # read.
- if len([i for i in links if i.get('rel') == 'next']) == 0:
- break
+ _opts['marker'] = bulk_objs[-1]['id']
- opts['marker'] = bulk_objs[-1]['id']
+ return _objects
- return objs
+ poller_id = '{}:{}'.format(project, resource)
+ if poller_id not in self._threads:
+ t = base.AsyncPoller(self.collectd,
+ openstack_api_poller,
+ self.polling_interval,
+ poller_id)
+ t.start()
+ self._threads[poller_id] = t
+
+ t = self._threads[poller_id]
+ if not t.is_alive():
+ self.logger.warning("Unexpected end of the thread {}".format(
+ t.name))
+ del self._threads[poller_id]
+ return []
+
+ results = t.get_results()
+ return [] if results is None else results
def count_objects_group_by(self,
list_object,