Merge "Collect Elasticsearch performance metrics"
diff --git a/collectd/files/plugin/collectd_elasticsearch_node.py b/collectd/files/plugin/collectd_elasticsearch_node.py
index 1ce23fa..2cfc7af 100644
--- a/collectd/files/plugin/collectd_elasticsearch_node.py
+++ b/collectd/files/plugin/collectd_elasticsearch_node.py
@@ -27,21 +27,97 @@
     def __init__(self, *args, **kwargs):
         super(ElasticsearchNodePlugin, self).__init__(*args, **kwargs)
         self.plugin = NAME
+        self._previous = {}
+
+    @staticmethod
+    def _metric(name, values, meta=None):
+        return {'type_instance': name, 'values': values, 'meta': meta or {}}
+
+    def _get_latency(self, name, count, time):
+        cname = '{}_count'.format(name)
+        tname = '{}_time'.format(name)
+        prev_count = self._previous.get(cname)
+        prev_time = self._previous.get(tname)
+        self._previous[cname] = count
+        self._previous[tname] = time
+        if prev_count and prev_time:
+            diff_count = count - prev_count
+            diff_time = time - prev_time
+            return diff_time / diff_count if diff_count > 0 else 0
 
     def itermetrics(self):
         stats = self.query_api('_nodes/_local/stats').get(
             'nodes', {}).values()[0]
-        yield {
-            'type_instance': 'documents',
-            'values': stats['indices']['docs']['count']
-        }
-        yield {
-            'type_instance': 'documents_deleted',
-            'values': stats['indices']['docs']['deleted']
-        }
-        # TODO: collectd more metrics
-        # See https://www.elastic.co/guide/en/elasticsearch/guide/current/
-        # _monitoring_individual_nodes.html
+        indices = stats['indices']
+        yield self._metric('documents', indices['docs']['count'])
+        yield self._metric('documents_deleted', indices['docs']['deleted'])
+        yield self._metric(
+            'indexing_current', indices['indexing']['index_current'])
+        yield self._metric(
+            'indexing_failed', indices['indexing']['index_failed'])
+        indexing_latency = self._get_latency(
+            'indexing', indices['indexing']['index_total'],
+            indices['indexing']['index_time_in_millis'])
+        if indexing_latency:
+            yield self._metric('indexing_latency', indexing_latency)
+        yield self._metric('store_size', indices['store']['size_in_bytes'])
+        fd_open = 0
+        if stats['process']['max_file_descriptors'] > 0:
+            fd_open = 100.0 * stats['process']['open_file_descriptors'] \
+                / stats['process']['max_file_descriptors']
+        yield self._metric('fd_open_percent', fd_open)
+
+        thread_pools = stats['thread_pool']
+        for pool in ('bulk', 'flush', 'search', 'index', 'get'):
+            yield self._metric('thread_pool_queue',
+                               thread_pools[pool]['queue'], {'pool': pool})
+            yield self._metric('thread_pool_rejected',
+                               thread_pools[pool]['rejected'], {'pool': pool})
+            yield self._metric('thread_pool_completed',
+                               thread_pools[pool]['completed'], {'pool': pool})
+        mem = stats['jvm']['mem']
+        yield self._metric('jvm_heap_max', mem['heap_max_in_bytes'])
+        yield self._metric('jvm_heap_used_percent', mem['heap_used_percent'])
+        yield self._metric('jvm_heap_used', mem['heap_used_in_bytes'])
+        for pool, stat in mem['pools'].items():
+            yield self._metric(
+                'jvm_heap_pool', stat['used_in_bytes'], {'pool': pool})
+        gc = stats['jvm']['gc']
+        for pool, stat in gc['collectors'].items():
+            yield self._metric('jvm_gc_count', stat['collection_count'],
+                               {'pool': pool})
+            yield self._metric('jvm_gc_time',
+                               stat['collection_time_in_millis'],
+                               {'pool': pool})
+
+        search = indices['search']
+        for phase in ('query', 'fetch'):
+            yield self._metric('{}_current'.format(phase),
+                               search['{}_current'.format(phase)])
+            latency = self._get_latency(
+                phase,
+                search['{}_total'.format(phase)],
+                search['{}_time_in_millis'.format(phase)])
+            if latency is not None:
+                yield self._metric('{}_latency'.format(phase), latency)
+        yield self._metric('query_count', search['query_total'])
+
+        query = indices['query_cache']
+        yield self._metric('query_cache_size', query['memory_size_in_bytes'])
+        yield self._metric('query_cache_evictions', query['evictions'])
+
+        fielddata = indices['fielddata']
+        yield self._metric('fielddata_size', fielddata['memory_size_in_bytes'])
+        yield self._metric('fielddata_evictions', fielddata['evictions'])
+
+        for operation in ('merges', 'flush', 'refresh'):
+            yield self._metric(operation, indices[operation]['total'])
+            latency = self._get_latency(
+                operation,
+                indices[operation]['total'],
+                indices[operation]['total_time_in_millis'])
+            if latency is not None:
+                yield self._metric('{}_latency'.format(operation), latency)
 
 
 plugin = ElasticsearchNodePlugin(collectd)