blob: 2cfc7af912cbc4e4396bcbb54e6eda03e80af1f5 [file] [log] [blame]
#!/usr/bin/python
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if __name__ == '__main__':
import collectd_fake as collectd
else:
import collectd
import collectd_elasticsearch_base as base
NAME = 'elasticsearch'
class ElasticsearchNodePlugin(base.ElasticsearchBase):
def __init__(self, *args, **kwargs):
super(ElasticsearchNodePlugin, self).__init__(*args, **kwargs)
self.plugin = NAME
self._previous = {}
@staticmethod
def _metric(name, values, meta=None):
return {'type_instance': name, 'values': values, 'meta': meta or {}}
def _get_latency(self, name, count, time):
cname = '{}_count'.format(name)
tname = '{}_time'.format(name)
prev_count = self._previous.get(cname)
prev_time = self._previous.get(tname)
self._previous[cname] = count
self._previous[tname] = time
if prev_count and prev_time:
diff_count = count - prev_count
diff_time = time - prev_time
return diff_time / diff_count if diff_count > 0 else 0
def itermetrics(self):
stats = self.query_api('_nodes/_local/stats').get(
'nodes', {}).values()[0]
indices = stats['indices']
yield self._metric('documents', indices['docs']['count'])
yield self._metric('documents_deleted', indices['docs']['deleted'])
yield self._metric(
'indexing_current', indices['indexing']['index_current'])
yield self._metric(
'indexing_failed', indices['indexing']['index_failed'])
indexing_latency = self._get_latency(
'indexing', indices['indexing']['index_total'],
indices['indexing']['index_time_in_millis'])
if indexing_latency:
yield self._metric('indexing_latency', indexing_latency)
yield self._metric('store_size', indices['store']['size_in_bytes'])
fd_open = 0
if stats['process']['max_file_descriptors'] > 0:
fd_open = 100.0 * stats['process']['open_file_descriptors'] \
/ stats['process']['max_file_descriptors']
yield self._metric('fd_open_percent', fd_open)
thread_pools = stats['thread_pool']
for pool in ('bulk', 'flush', 'search', 'index', 'get'):
yield self._metric('thread_pool_queue',
thread_pools[pool]['queue'], {'pool': pool})
yield self._metric('thread_pool_rejected',
thread_pools[pool]['rejected'], {'pool': pool})
yield self._metric('thread_pool_completed',
thread_pools[pool]['completed'], {'pool': pool})
mem = stats['jvm']['mem']
yield self._metric('jvm_heap_max', mem['heap_max_in_bytes'])
yield self._metric('jvm_heap_used_percent', mem['heap_used_percent'])
yield self._metric('jvm_heap_used', mem['heap_used_in_bytes'])
for pool, stat in mem['pools'].items():
yield self._metric(
'jvm_heap_pool', stat['used_in_bytes'], {'pool': pool})
gc = stats['jvm']['gc']
for pool, stat in gc['collectors'].items():
yield self._metric('jvm_gc_count', stat['collection_count'],
{'pool': pool})
yield self._metric('jvm_gc_time',
stat['collection_time_in_millis'],
{'pool': pool})
search = indices['search']
for phase in ('query', 'fetch'):
yield self._metric('{}_current'.format(phase),
search['{}_current'.format(phase)])
latency = self._get_latency(
phase,
search['{}_total'.format(phase)],
search['{}_time_in_millis'.format(phase)])
if latency is not None:
yield self._metric('{}_latency'.format(phase), latency)
yield self._metric('query_count', search['query_total'])
query = indices['query_cache']
yield self._metric('query_cache_size', query['memory_size_in_bytes'])
yield self._metric('query_cache_evictions', query['evictions'])
fielddata = indices['fielddata']
yield self._metric('fielddata_size', fielddata['memory_size_in_bytes'])
yield self._metric('fielddata_evictions', fielddata['evictions'])
for operation in ('merges', 'flush', 'refresh'):
yield self._metric(operation, indices[operation]['total'])
latency = self._get_latency(
operation,
indices[operation]['total'],
indices[operation]['total_time_in_millis'])
if latency is not None:
yield self._metric('{}_latency'.format(operation), latency)
plugin = ElasticsearchNodePlugin(collectd)
def config_callback(conf):
plugin.config_callback(conf)
def read_callback():
plugin.read_callback()
if __name__ == '__main__':
collectd.load_configuration(plugin)
plugin.read_callback()
else:
collectd.register_config(config_callback)
collectd.register_read(read_callback)