Merge "Use changes-since parameter when polling Nova server details"
diff --git a/collectd/files/plugin/collectd_base.py b/collectd/files/plugin/collectd_base.py
index ebf3120..393a585 100644
--- a/collectd/files/plugin/collectd_base.py
+++ b/collectd/files/plugin/collectd_base.py
@@ -292,14 +292,19 @@
            polling_function: a function to execute periodically
            interval: the interval in second
            name: (optional) the name of the thread
+           reset_on_read: (default False) if True, all results returned by the
+                          polling_function() are accumulated until they are
+                          read.
     """
 
-    def __init__(self, collectd, polling_function, interval, name=None):
+    def __init__(self, collectd, polling_function, interval, name=None,
+                 reset_on_read=False):
         super(AsyncPoller, self).__init__(name=name)
         self.collectd = collectd
         self.polling_function = polling_function
         self.interval = interval
-        self._results = None
+        self._results = []
+        self._reset_on_read = reset_on_read
 
     def run(self):
         self.collectd.info('Starting thread {}'.format(self.name))
@@ -307,8 +312,7 @@
             try:
                 started_at = time.time()
 
-                self._results = self.polling_function()
-
+                self.results = self.polling_function()
                 tosleep = self.interval - (time.time() - started_at)
                 if tosleep > 0:
                     time.sleep(tosleep)
@@ -320,9 +324,20 @@
                     )
 
             except Exception as e:
-                self._results = None
+                self.results = []
                 self.collectd.error('{} fails: {}'.format(self.name, e))
                 time.sleep(10)
 
-    def get_results(self):
-        return self._results
+    @property
+    def results(self):
+        r = self._results
+        if self._reset_on_read:
+            self._results = []
+        return r
+
+    @results.setter
+    def results(self, value):
+        if self._reset_on_read:
+            self._results.extend(value)
+        else:
+            self._results = value
diff --git a/collectd/files/plugin/collectd_openstack.py b/collectd/files/plugin/collectd_openstack.py
index 1aeeb36..2da8670 100644
--- a/collectd/files/plugin/collectd_openstack.py
+++ b/collectd/files/plugin/collectd_openstack.py
@@ -166,6 +166,8 @@
         self._threads = {}
         self.pagination_limit = None
         self.polling_interval = 60
+        self._last_run = None
+        self.changes_since = False
 
     def _build_url(self, service, resource):
         s = (self.get_service(service) or {})
@@ -251,7 +253,7 @@
         url = self._build_url(service, resource)
         if not url:
             return
-        self.logger.info("GET '%s'" % url)
+        self.logger.info('GET({}) {}'.format(url, params))
         return self.os_client.make_request('get', url, params=params)
 
     @property
@@ -287,7 +289,7 @@
                                   self.max_retries)
 
     def get_objects(self, project, object_name, api_version='',
-                    params=None, detail=False):
+                    params=None, detail=False, since=False):
         """ Return a list of OpenStack objects
 
             The API version is not always included in the URL endpoint
@@ -295,6 +297,7 @@
             api_version parameter to specify which version should be used.
 
         """
+        self.changes_since = since
         if params is None:
             params = {}
 
@@ -316,9 +319,18 @@
             _objects = []
             _opts = {}
             _opts.update(opts)
+
+            if self.changes_since and self._last_run:
+                _opts['changes-since'] = self._last_run.isoformat()
+
+            # Keep track of the initial request time
+            last_run = datetime.datetime.now(tz=dateutil.tz.tzutc())
+            has_failure = False
+
             while True:
                 r = self.get(project, resource, params=_opts)
                 if not r or object_name not in r.json():
+                    has_failure = True
                     if r is None:
                         err = ''
                     else:
@@ -354,6 +366,9 @@
 
                 _opts['marker'] = bulk_objs[-1]['id']
 
+            if not has_failure:
+                self._last_run = last_run
+
             return _objects
 
         poller_id = '{}:{}'.format(project, resource)
@@ -361,7 +376,7 @@
             t = base.AsyncPoller(self.collectd,
                                  openstack_api_poller,
                                  self.polling_interval,
-                                 poller_id)
+                                 poller_id, self.changes_since)
             t.start()
             self._threads[poller_id] = t
 
@@ -372,8 +387,7 @@
             del self._threads[poller_id]
             return []
 
-        results = t.get_results()
-        return [] if results is None else results
+        return t.results
 
     def count_objects_group_by(self,
                                list_object,
diff --git a/collectd/files/plugin/openstack_nova.py b/collectd/files/plugin/openstack_nova.py
index 4f8f1a8..084e7b1 100644
--- a/collectd/files/plugin/openstack_nova.py
+++ b/collectd/files/plugin/openstack_nova.py
@@ -17,6 +17,8 @@
 import collectd
 
 import collectd_openstack as openstack
+from itertools import groupby
+
 
 PLUGIN_NAME = 'nova'
 INTERVAL = openstack.INTERVAL
@@ -32,22 +34,33 @@
         self.plugin = PLUGIN_NAME
         self.interval = INTERVAL
         self.pagination_limit = 500
+        self._cache = {}
 
     def itermetrics(self):
-        servers_details = self.get_objects('nova', 'servers',
-                                           params={'all_tenants': 1},
-                                           detail=True)
+        server_details = self.get_objects('nova', 'servers',
+                                          params={'all_tenants': 1},
+                                          detail=True, since=True)
 
-        def groupby(d):
-            return d.get('status', 'unknown').lower()
+        for server in server_details:
+            _id = server.get('id')
+            status = server.get('status', 'unknown').lower()
+            if status == 'deleted':
+                try:
+                    self.logger.debug(
+                        'remove deleted instance {} from cache'.format(_id))
+                    del self._cache[_id]
+                except KeyError:
+                    self.logger.warning(
+                        'cannot find instance in cache {}'.format(_id))
+            else:
+                self._cache[_id] = status
 
-        status = self.count_objects_group_by(servers_details,
-                                             group_by_func=groupby)
-        for s, nb in status.iteritems():
+        servers = sorted(self._cache.values())
+        for status, g in groupby(servers):
             yield {
                 'plugin_instance': 'instances',
-                'values': nb,
-                'type_instance': s,
+                'values': len(list(g)),
+                'type_instance': status,
             }