Enhance the collectd decoder for generic metrics
Change-Id: I7a56e3b2390b83bb6a2c72f1c33ab47498d4202c
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index 1d44349..a3c497d 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -45,21 +45,6 @@
['grafana-server'] = true,
}
--- The following table keeps a list of metrics from plugin where the
--- Fields[hostname] shouldn't be set by default.
-local hostname_free = {
- ceph_mon = true,
- ceph_pool = true,
- check_openstack_api = true,
- cinder = true,
- glance = true,
- hypervisor_stats = true,
- keystone = true,
- neutron = true,
- nova = true,
- pacemaker = true,
-}
-
-- this is needed for the libvirt metrics because in that case, collectd sends
-- the instance's ID instead of the hostname in the 'host' attribute
local hostname = read_config('hostname') or error('hostname must be specified')
@@ -107,11 +92,6 @@
}
}
- -- Check if Fields[hostname] should be added or not to the metric message
- if not hostname_free[metric_source] then
- msg['Fields']['hostname'] = sample['host']
- end
-
-- Normalize metric name, unfortunately collectd plugins aren't
-- always consistent on metric namespaces so we need a few if/else
-- statements to cover all cases.
@@ -121,11 +101,14 @@
if sample['meta']['local_check'] then
-- if the check is local to the node, add the hostname
msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
elseif metric_source == 'memory' or metric_source == 'contextswitch' or
metric_source == 'entropy' or metric_source == 'load' or
metric_source == 'swap' or metric_source == 'uptime' then
msg['Fields']['name'] = metric_name
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'df' then
local entity
if sample['type'] == 'df_inodes' then
@@ -148,6 +131,8 @@
msg['Fields']['name'] = 'fs' .. sep .. entity .. sep .. sample['type_instance']
msg['Fields']['fs'] = mount
table.insert(msg['Fields']['tag_fields'], 'fs')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'disk' then
if sample['type'] == 'disk_io_time' then
msg['Fields']['name'] = 'disk' .. sep .. sample['dsnames'][i]
@@ -156,10 +141,14 @@
end
msg['Fields']['device'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'device')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'cpu' then
msg['Fields']['name'] = 'cpu' .. sep .. sample['type_instance']
msg['Fields']['cpu_number'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'cpu_number')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'netlink' then
local netlink_metric = sample['type']
if netlink_metric == 'if_rx_errors' then
@@ -179,6 +168,8 @@
msg['Fields']['name'] = netlink_metric
msg['Fields']['interface'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'interface')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'processes' then
if processes_map[sample['type']] then
-- metrics related to a specific process
@@ -214,8 +205,12 @@
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
end
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'dbi' and sample['plugin_instance'] == 'mysql_status' then
msg['Fields']['name'] = 'mysql' .. sep .. replace_dot_by_sep(sample['type_instance'])
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'mysql' then
if sample['type'] == 'threads' then
msg['Fields']['name'] = 'mysql_' .. metric_name
@@ -230,6 +225,8 @@
else
msg['Fields']['name'] = metric_name
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'ntpd' then
if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
@@ -238,7 +235,13 @@
msg['Fields']['server'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'server')
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'check_openstack_api' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
+ --
-- For OpenStack API metrics, plugin_instance = <service name>
msg['Fields']['name'] = 'openstack_check_api'
msg['Fields']['service'] = sample['plugin_instance']
@@ -247,6 +250,10 @@
msg['Fields']['os_region'] = sample['meta']['region']
end
elseif metric_source == 'hypervisor_stats' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
+ --
-- Metrics from the OpenStack hypervisor metrics where
-- type_instance = <metric name> which can end by _MB or _GB
msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep
@@ -260,6 +267,7 @@
end
if sample['meta'] and sample['meta']['host'] then
msg['Fields']['hostname'] = sample['meta']['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
if sample['meta'] and sample['meta']['aggregate'] then
msg['Fields']['aggregate'] = sample['meta']['aggregate']
@@ -271,11 +279,16 @@
end
elseif metric_source == 'rabbitmq_info' then
msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance']
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
if sample['meta'] and sample['meta']['queue'] then
msg['Fields']['queue'] = sample['meta']['queue']
table.insert(msg['Fields']['tag_fields'], 'queue')
end
elseif metric_source == 'nova' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
if sample['plugin_instance'] == 'nova_services' or
sample['plugin_instance'] == 'nova_services_percent' or
sample['plugin_instance'] == 'nova_service' then
@@ -286,6 +299,7 @@
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['plugin_instance'] == 'nova_service' then
msg['Fields']['hostname'] = sample['meta']['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
else
msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -293,6 +307,9 @@
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'cinder' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
if sample['plugin_instance'] == 'cinder_services' or
sample['plugin_instance'] == 'cinder_services_percent' or
sample['plugin_instance'] == 'cinder_service' then
@@ -303,6 +320,7 @@
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['plugin_instance'] == 'cinder_service' then
msg['Fields']['hostname'] = sample['meta']['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
else
msg['Fields']['name'] = 'openstack' .. sep .. 'cinder' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -310,18 +328,27 @@
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'glance' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
msg['Fields']['name'] = 'openstack' .. sep .. 'glance' .. sep .. sample['type_instance']
msg['Fields']['state'] = sample['meta']['status']
msg['Fields']['visibility'] = sample['meta']['visibility']
table.insert(msg['Fields']['tag_fields'], 'state')
table.insert(msg['Fields']['tag_fields'], 'visibility')
elseif metric_source == 'keystone' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
msg['Fields']['name'] = 'openstack' .. sep .. 'keystone' .. sep .. sample['type_instance']
if sample['meta']['state'] then
msg['Fields']['state'] = sample['meta']['state']
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'neutron' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
if sample['type_instance'] == 'networks' or sample['type_instance'] == 'ports' or sample['type_instance'] == 'routers' or sample['type_instance'] == 'floatingips' then
skip_it = true
elseif sample['type_instance'] == 'subnets' then
@@ -336,6 +363,7 @@
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['type_instance'] == 'neutron_agent' then
msg['Fields']['hostname'] = sample['meta']['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
elseif string.match(sample['type_instance'], '^ports') then
local resource, owner, state = string.match(sample['type_instance'], '^([^.]+)%.([^.]+)%.(.+)$')
@@ -352,6 +380,8 @@
end
elseif metric_source == 'memcached' then
msg['Fields']['name'] = 'memcached' .. sep .. string.gsub(metric_name, 'memcached_', '')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'haproxy' then
msg['Fields']['name'] = 'haproxy' .. sep .. sample['type_instance']
if sample['meta'] then
@@ -371,9 +401,13 @@
table.insert(msg['Fields']['tag_fields'], 'frontend')
end
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'apache' then
metric_name = string.gsub(metric_name, 'apache_', '')
msg['Fields']['name'] = 'apache' .. sep .. string.gsub(metric_name, 'scoreboard', 'workers')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'ceph_osd_perf' then
msg['Fields']['name'] = 'ceph_perf' .. sep .. sample['type']
@@ -381,6 +415,8 @@
msg['Fields']['osd'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'cluster')
table.insert(msg['Fields']['tag_fields'], 'osd')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source:match('^ceph') then
msg['Fields']['name'] = 'ceph' .. sep .. sample['type']
if sample['dsnames'][i] ~= 'value' then
@@ -407,6 +443,7 @@
elseif metric_source == 'pacemaker' then
if sample['meta'] and sample['meta']['host'] then
msg['Fields']['hostname'] = sample['meta']['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
@@ -421,11 +458,14 @@
elseif metric_source == 'users' then
-- 'users' is a reserved name for InfluxDB v0.9
msg['Fields']['name'] = 'logged_users'
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'libvirt' then
-- collectd sends the instance's ID in the 'host' field
msg['Fields']['instance_id'] = sample['host']
table.insert(msg['Fields']['tag_fields'], 'instance_id')
msg['Fields']['hostname'] = hostname
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
msg['Hostname'] = hostname
if string.match(sample['type'], '^disk_') then
@@ -447,17 +487,30 @@
end
elseif metric_source == 'influxdb' then
msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'check_local_endpoint' then
msg['Fields']['name'] = 'openstack_check_local_api'
msg['Fields']['service'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'nginx' then
msg['Fields']['name'] = 'nginx' .. sep .. string.gsub(sample['type'], '^nginx_', '')
if sample['type_instance'] ~= "" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
else
- -- generic metric name translation for 3rd-party sources
+ -- In the default case, the collectd payload is decoded in a
+ -- generic way.
+ --
+ -- name: <plugin>[_<plugin_instance>][_<type>][_<type_instance]
+ --
+ -- Except for reserved names, all items in the 'meta' dict are
+ -- added to the Fields dict and keys are added to the
+ -- Fields['tag_fields'] array.
msg['Fields']['name'] = sample['plugin']
if sample['plugin_instance'] ~= "" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
@@ -475,16 +528,24 @@
end
msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
- -- check if the hostname field should be kept or not (eg for
- -- cluster metrics, discard_hostname == true)
- if sample['meta'] and sample['meta']['discard_hostname'] then
- msg['Fields']['hostname'] = nil
- sample['meta']['discard_hostname'] = nil
+ if sample['meta'] and sample['meta']['unit'] then
+ msg.Fields['value'] = {
+ value = msg.Fields['value'],
+ representation = sample['meta']['unit']
+ }
+ end
+
+ -- if not set, check if the 'hostname' field should be added
+ -- (eg for cluster metrics, discard_hostname == true)
+ if msg['Fields']['hostname'] == nil and
+ sample['meta'] and not sample['meta']['discard_hostname'] then
+ msg['Fields']['hostname'] = msg['Hostname']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
-- add meta fields as tag_fields
for k, v in pairs(sample['meta'] or {}) do
- if tostring(k) ~= '0' then
+ if tostring(k) ~= '0' and k ~= 'unit' and k ~= 'discard_hostname' then
msg['Fields'][k] = v
table.insert(msg['Fields']['tag_fields'], k)
end
@@ -492,9 +553,6 @@
end
if not skip_it then
- if msg['Fields']['hostname'] then
- table.insert(msg['Fields']['tag_fields'], 'hostname')
- end
utils.inject_tags(msg)
-- Before injecting the message we need to check that tag_fields is not an
-- empty table otherwise the protobuf encoder fails to encode the table.