Merge changes from topic 'generate-consistent-metrics'
* changes:
Simplify collectd.lua decoder plugin
Enhance the collectd decoder for generic metrics
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index 1d44349..7978bd2 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -45,21 +45,6 @@
['grafana-server'] = true,
}
--- The following table keeps a list of metrics from plugin where the
--- Fields[hostname] shouldn't be set by default.
-local hostname_free = {
- ceph_mon = true,
- ceph_pool = true,
- check_openstack_api = true,
- cinder = true,
- glance = true,
- hypervisor_stats = true,
- keystone = true,
- neutron = true,
- nova = true,
- pacemaker = true,
-}
-
-- this is needed for the libvirt metrics because in that case, collectd sends
-- the instance's ID instead of the hostname in the 'host' attribute
local hostname = read_config('hostname') or error('hostname must be specified')
@@ -83,6 +68,7 @@
end
local metric_source = sample['plugin']
+ local meta = sample['meta'] or {}
for i, value in ipairs(sample['values']) do
local skip_it = false
@@ -107,25 +93,23 @@
}
}
- -- Check if Fields[hostname] should be added or not to the metric message
- if not hostname_free[metric_source] then
- msg['Fields']['hostname'] = sample['host']
- end
-
-- Normalize metric name, unfortunately collectd plugins aren't
-- always consistent on metric namespaces so we need a few if/else
-- statements to cover all cases.
- if sample['meta'] and sample['meta']['service_check'] then
- msg['Fields']['name'] = sample['meta']['service_check'] .. sep .. 'check'
- msg['Fields']['details'] = sample['meta']['failure']
- if sample['meta']['local_check'] then
+ if meta['service_check'] then
+ msg['Fields']['name'] = meta['service_check'] .. sep .. 'check'
+ msg['Fields']['details'] = meta['failure']
+ if meta['local_check'] then
-- if the check is local to the node, add the hostname
msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
elseif metric_source == 'memory' or metric_source == 'contextswitch' or
metric_source == 'entropy' or metric_source == 'load' or
metric_source == 'swap' or metric_source == 'uptime' then
msg['Fields']['name'] = metric_name
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'df' then
local entity
if sample['type'] == 'df_inodes' then
@@ -148,6 +132,8 @@
msg['Fields']['name'] = 'fs' .. sep .. entity .. sep .. sample['type_instance']
msg['Fields']['fs'] = mount
table.insert(msg['Fields']['tag_fields'], 'fs')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'disk' then
if sample['type'] == 'disk_io_time' then
msg['Fields']['name'] = 'disk' .. sep .. sample['dsnames'][i]
@@ -156,10 +142,14 @@
end
msg['Fields']['device'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'device')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'cpu' then
msg['Fields']['name'] = 'cpu' .. sep .. sample['type_instance']
msg['Fields']['cpu_number'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'cpu_number')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'netlink' then
local netlink_metric = sample['type']
if netlink_metric == 'if_rx_errors' then
@@ -179,6 +169,8 @@
msg['Fields']['name'] = netlink_metric
msg['Fields']['interface'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'interface')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'processes' then
if processes_map[sample['type']] then
-- metrics related to a specific process
@@ -214,8 +206,12 @@
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
end
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'dbi' and sample['plugin_instance'] == 'mysql_status' then
msg['Fields']['name'] = 'mysql' .. sep .. replace_dot_by_sep(sample['type_instance'])
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'mysql' then
if sample['type'] == 'threads' then
msg['Fields']['name'] = 'mysql_' .. metric_name
@@ -230,6 +226,8 @@
else
msg['Fields']['name'] = metric_name
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'ntpd' then
if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
@@ -238,15 +236,23 @@
msg['Fields']['server'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'server')
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'check_openstack_api' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
+ --
-- For OpenStack API metrics, plugin_instance = <service name>
msg['Fields']['name'] = 'openstack_check_api'
msg['Fields']['service'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
- if sample['meta'] then
- msg['Fields']['os_region'] = sample['meta']['region']
- end
+ msg['Fields']['os_region'] = meta['region']
elseif metric_source == 'hypervisor_stats' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
+ --
-- Metrics from the OpenStack hypervisor metrics where
-- type_instance = <metric name> which can end by _MB or _GB
msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep
@@ -258,34 +264,41 @@
else
msg['Fields']['name'] = msg['Fields']['name'] .. sample['type_instance']
end
- if sample['meta'] and sample['meta']['host'] then
- msg['Fields']['hostname'] = sample['meta']['host']
+ if meta['host'] then
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
- if sample['meta'] and sample['meta']['aggregate'] then
- msg['Fields']['aggregate'] = sample['meta']['aggregate']
+ if meta['aggregate'] then
+ msg['Fields']['aggregate'] = meta['aggregate']
table.insert(msg['Fields']['tag_fields'], 'aggregate')
end
- if sample['meta'] and sample['meta']['aggregate_id'] then
- msg['Fields']['aggregate_id'] = sample['meta']['aggregate_id']
+ if meta['aggregate_id'] then
+ msg['Fields']['aggregate_id'] = meta['aggregate_id']
table.insert(msg['Fields']['tag_fields'], 'aggregate_id')
end
elseif metric_source == 'rabbitmq_info' then
msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance']
- if sample['meta'] and sample['meta']['queue'] then
- msg['Fields']['queue'] = sample['meta']['queue']
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
+ if meta['queue'] then
+ msg['Fields']['queue'] = meta['queue']
table.insert(msg['Fields']['tag_fields'], 'queue')
end
elseif metric_source == 'nova' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
if sample['plugin_instance'] == 'nova_services' or
sample['plugin_instance'] == 'nova_services_percent' or
sample['plugin_instance'] == 'nova_service' then
msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
- msg['Fields']['service'] = sample['meta']['service']
- msg['Fields']['state'] = sample['meta']['state']
+ msg['Fields']['service'] = meta['service']
+ msg['Fields']['state'] = meta['state']
table.insert(msg['Fields']['tag_fields'], 'service')
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['plugin_instance'] == 'nova_service' then
- msg['Fields']['hostname'] = sample['meta']['host']
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
else
msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -293,16 +306,20 @@
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'cinder' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
if sample['plugin_instance'] == 'cinder_services' or
sample['plugin_instance'] == 'cinder_services_percent' or
sample['plugin_instance'] == 'cinder_service' then
msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
- msg['Fields']['service'] = sample['meta']['service']
- msg['Fields']['state'] = sample['meta']['state']
+ msg['Fields']['service'] = meta['service']
+ msg['Fields']['state'] = meta['state']
table.insert(msg['Fields']['tag_fields'], 'service')
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['plugin_instance'] == 'cinder_service' then
- msg['Fields']['hostname'] = sample['meta']['host']
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
else
msg['Fields']['name'] = 'openstack' .. sep .. 'cinder' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -310,18 +327,27 @@
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'glance' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
msg['Fields']['name'] = 'openstack' .. sep .. 'glance' .. sep .. sample['type_instance']
- msg['Fields']['state'] = sample['meta']['status']
- msg['Fields']['visibility'] = sample['meta']['visibility']
+ msg['Fields']['state'] = meta['status']
+ msg['Fields']['visibility'] = meta['visibility']
table.insert(msg['Fields']['tag_fields'], 'state')
table.insert(msg['Fields']['tag_fields'], 'visibility')
elseif metric_source == 'keystone' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
msg['Fields']['name'] = 'openstack' .. sep .. 'keystone' .. sep .. sample['type_instance']
- if sample['meta']['state'] then
- msg['Fields']['state'] = sample['meta']['state']
+ if meta['state'] then
+ msg['Fields']['state'] = meta['state']
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'neutron' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
if sample['type_instance'] == 'networks' or sample['type_instance'] == 'ports' or sample['type_instance'] == 'routers' or sample['type_instance'] == 'floatingips' then
skip_it = true
elseif sample['type_instance'] == 'subnets' then
@@ -330,12 +356,13 @@
sample['type_instance'] == 'neutron_agents_percent' or
sample['type_instance'] == 'neutron_agent' then
msg['Fields']['name'] = 'openstack_' .. sample['type_instance']
- msg['Fields']['service'] = sample['meta']['service']
- msg['Fields']['state'] = sample['meta']['state']
+ msg['Fields']['service'] = meta['service']
+ msg['Fields']['state'] = meta['state']
table.insert(msg['Fields']['tag_fields'], 'service')
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['type_instance'] == 'neutron_agent' then
- msg['Fields']['hostname'] = sample['meta']['host']
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
elseif string.match(sample['type_instance'], '^ports') then
local resource, owner, state = string.match(sample['type_instance'], '^([^.]+)%.([^.]+)%.(.+)$')
@@ -352,28 +379,32 @@
end
elseif metric_source == 'memcached' then
msg['Fields']['name'] = 'memcached' .. sep .. string.gsub(metric_name, 'memcached_', '')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'haproxy' then
msg['Fields']['name'] = 'haproxy' .. sep .. sample['type_instance']
- if sample['meta'] then
- if sample['meta']['backend'] then
- msg['Fields']['backend'] = sample['meta']['backend']
- table.insert(msg['Fields']['tag_fields'], 'backend')
- if sample['meta']['state'] then
- msg['Fields']['state'] = sample['meta']['state']
- table.insert(msg['Fields']['tag_fields'], 'state')
- end
- if sample['meta']['server'] then
- msg['Fields']['server'] = sample['meta']['server']
- table.insert(msg['Fields']['tag_fields'], 'server')
- end
- elseif sample['meta']['frontend'] then
- msg['Fields']['frontend'] = sample['meta']['frontend']
- table.insert(msg['Fields']['tag_fields'], 'frontend')
+ if meta['backend'] then
+ msg['Fields']['backend'] = meta['backend']
+ table.insert(msg['Fields']['tag_fields'], 'backend')
+ if meta['state'] then
+ msg['Fields']['state'] = meta['state']
+ table.insert(msg['Fields']['tag_fields'], 'state')
end
+ if meta['server'] then
+ msg['Fields']['server'] = meta['server']
+ table.insert(msg['Fields']['tag_fields'], 'server')
+ end
+ elseif meta['frontend'] then
+ msg['Fields']['frontend'] = meta['frontend']
+ table.insert(msg['Fields']['tag_fields'], 'frontend')
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'apache' then
metric_name = string.gsub(metric_name, 'apache_', '')
msg['Fields']['name'] = 'apache' .. sep .. string.gsub(metric_name, 'scoreboard', 'workers')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'ceph_osd_perf' then
msg['Fields']['name'] = 'ceph_perf' .. sep .. sample['type']
@@ -381,6 +412,8 @@
msg['Fields']['osd'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'cluster')
table.insert(msg['Fields']['tag_fields'], 'osd')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source:match('^ceph') then
msg['Fields']['name'] = 'ceph' .. sep .. sample['type']
if sample['dsnames'][i] ~= 'value' then
@@ -405,27 +438,31 @@
end
end
elseif metric_source == 'pacemaker' then
- if sample['meta'] and sample['meta']['host'] then
- msg['Fields']['hostname'] = sample['meta']['host']
+ if meta['host'] then
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
-- add dimension fields
for _, v in ipairs({'status', 'resource'}) do
- if sample['meta'] and sample['meta'][v] then
- msg['Fields'][v] = sample['meta'][v]
+ if meta[v] then
+ msg['Fields'][v] = meta[v]
table.insert(msg['Fields']['tag_fields'], v)
end
end
elseif metric_source == 'users' then
-- 'users' is a reserved name for InfluxDB v0.9
msg['Fields']['name'] = 'logged_users'
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'libvirt' then
-- collectd sends the instance's ID in the 'host' field
msg['Fields']['instance_id'] = sample['host']
table.insert(msg['Fields']['tag_fields'], 'instance_id')
msg['Fields']['hostname'] = hostname
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
msg['Hostname'] = hostname
if string.match(sample['type'], '^disk_') then
@@ -447,17 +484,30 @@
end
elseif metric_source == 'influxdb' then
msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'check_local_endpoint' then
msg['Fields']['name'] = 'openstack_check_local_api'
msg['Fields']['service'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'nginx' then
msg['Fields']['name'] = 'nginx' .. sep .. string.gsub(sample['type'], '^nginx_', '')
if sample['type_instance'] ~= "" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
else
- -- generic metric name translation for 3rd-party sources
+ -- In the default case, the collectd payload is decoded in a
+ -- generic way.
+ --
+ -- name: <plugin>[_<plugin_instance>][_<type>][_<type_instance]
+ --
+ -- Except for reserved names, all items in the 'meta' dict are
+ -- added to the Fields dict and keys are added to the
+ -- Fields['tag_fields'] array.
msg['Fields']['name'] = sample['plugin']
if sample['plugin_instance'] ~= "" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
@@ -475,16 +525,23 @@
end
msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
- -- check if the hostname field should be kept or not (eg for
- -- cluster metrics, discard_hostname == true)
- if sample['meta'] and sample['meta']['discard_hostname'] then
- msg['Fields']['hostname'] = nil
- sample['meta']['discard_hostname'] = nil
+ if meta['unit'] then
+ msg.Fields['value'] = {
+ value = msg.Fields['value'],
+ representation = meta['unit']
+ }
+ end
+
+ -- if not set, check if the 'hostname' field should be added
+ -- (eg for cluster metrics, discard_hostname == true)
+ if msg['Fields']['hostname'] == nil and not meta['discard_hostname'] then
+ msg['Fields']['hostname'] = msg['Hostname']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
-- add meta fields as tag_fields
- for k, v in pairs(sample['meta'] or {}) do
- if tostring(k) ~= '0' then
+ for k, v in pairs(meta) do
+ if tostring(k) ~= '0' and k ~= 'unit' and k ~= 'discard_hostname' then
msg['Fields'][k] = v
table.insert(msg['Fields']['tag_fields'], k)
end
@@ -492,9 +549,6 @@
end
if not skip_it then
- if msg['Fields']['hostname'] then
- table.insert(msg['Fields']['tag_fields'], 'hostname')
- end
utils.inject_tags(msg)
-- Before injecting the message we need to check that tag_fields is not an
-- empty table otherwise the protobuf encoder fails to encode the table.