Merge "Collect Elasticsearch performance metrics"
diff --git a/collectd/files/plugin/collectd_elasticsearch_node.py b/collectd/files/plugin/collectd_elasticsearch_node.py
index 1ce23fa..2cfc7af 100644
--- a/collectd/files/plugin/collectd_elasticsearch_node.py
+++ b/collectd/files/plugin/collectd_elasticsearch_node.py
@@ -27,21 +27,97 @@
def __init__(self, *args, **kwargs):
super(ElasticsearchNodePlugin, self).__init__(*args, **kwargs)
self.plugin = NAME
+ self._previous = {}
+
+ @staticmethod
+ def _metric(name, values, meta=None):
+ return {'type_instance': name, 'values': values, 'meta': meta or {}}
+
+ def _get_latency(self, name, count, time):
+ cname = '{}_count'.format(name)
+ tname = '{}_time'.format(name)
+ prev_count = self._previous.get(cname)
+ prev_time = self._previous.get(tname)
+ self._previous[cname] = count
+ self._previous[tname] = time
+ if prev_count and prev_time:
+ diff_count = count - prev_count
+ diff_time = time - prev_time
+ return diff_time / diff_count if diff_count > 0 else 0
def itermetrics(self):
stats = self.query_api('_nodes/_local/stats').get(
'nodes', {}).values()[0]
- yield {
- 'type_instance': 'documents',
- 'values': stats['indices']['docs']['count']
- }
- yield {
- 'type_instance': 'documents_deleted',
- 'values': stats['indices']['docs']['deleted']
- }
- # TODO: collectd more metrics
- # See https://www.elastic.co/guide/en/elasticsearch/guide/current/
- # _monitoring_individual_nodes.html
+ indices = stats['indices']
+ yield self._metric('documents', indices['docs']['count'])
+ yield self._metric('documents_deleted', indices['docs']['deleted'])
+ yield self._metric(
+ 'indexing_current', indices['indexing']['index_current'])
+ yield self._metric(
+ 'indexing_failed', indices['indexing']['index_failed'])
+ indexing_latency = self._get_latency(
+ 'indexing', indices['indexing']['index_total'],
+ indices['indexing']['index_time_in_millis'])
+ if indexing_latency:
+ yield self._metric('indexing_latency', indexing_latency)
+ yield self._metric('store_size', indices['store']['size_in_bytes'])
+ fd_open = 0
+ if stats['process']['max_file_descriptors'] > 0:
+ fd_open = 100.0 * stats['process']['open_file_descriptors'] \
+ / stats['process']['max_file_descriptors']
+ yield self._metric('fd_open_percent', fd_open)
+
+ thread_pools = stats['thread_pool']
+ for pool in ('bulk', 'flush', 'search', 'index', 'get'):
+ yield self._metric('thread_pool_queue',
+ thread_pools[pool]['queue'], {'pool': pool})
+ yield self._metric('thread_pool_rejected',
+ thread_pools[pool]['rejected'], {'pool': pool})
+ yield self._metric('thread_pool_completed',
+ thread_pools[pool]['completed'], {'pool': pool})
+ mem = stats['jvm']['mem']
+ yield self._metric('jvm_heap_max', mem['heap_max_in_bytes'])
+ yield self._metric('jvm_heap_used_percent', mem['heap_used_percent'])
+ yield self._metric('jvm_heap_used', mem['heap_used_in_bytes'])
+ for pool, stat in mem['pools'].items():
+ yield self._metric(
+ 'jvm_heap_pool', stat['used_in_bytes'], {'pool': pool})
+ gc = stats['jvm']['gc']
+ for pool, stat in gc['collectors'].items():
+ yield self._metric('jvm_gc_count', stat['collection_count'],
+ {'pool': pool})
+ yield self._metric('jvm_gc_time',
+ stat['collection_time_in_millis'],
+ {'pool': pool})
+
+ search = indices['search']
+ for phase in ('query', 'fetch'):
+ yield self._metric('{}_current'.format(phase),
+ search['{}_current'.format(phase)])
+ latency = self._get_latency(
+ phase,
+ search['{}_total'.format(phase)],
+ search['{}_time_in_millis'.format(phase)])
+ if latency is not None:
+ yield self._metric('{}_latency'.format(phase), latency)
+ yield self._metric('query_count', search['query_total'])
+
+ query = indices['query_cache']
+ yield self._metric('query_cache_size', query['memory_size_in_bytes'])
+ yield self._metric('query_cache_evictions', query['evictions'])
+
+ fielddata = indices['fielddata']
+ yield self._metric('fielddata_size', fielddata['memory_size_in_bytes'])
+ yield self._metric('fielddata_evictions', fielddata['evictions'])
+
+ for operation in ('merges', 'flush', 'refresh'):
+ yield self._metric(operation, indices[operation]['total'])
+ latency = self._get_latency(
+ operation,
+ indices[operation]['total'],
+ indices[operation]['total_time_in_millis'])
+ if latency is not None:
+ yield self._metric('{}_latency'.format(operation), latency)
plugin = ElasticsearchNodePlugin(collectd)