Merge changes from topic 'generate-consistent-metrics'

* changes:
  Simplify collectd.lua decoder plugin
  Enhance the collectd decoder for generic metrics
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index 1d44349..7978bd2 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -45,21 +45,6 @@
     ['grafana-server'] = true,
 }
 
--- The following table keeps a list of metrics from plugin where the
--- Fields[hostname] shouldn't be set by default.
-local hostname_free = {
-    ceph_mon = true,
-    ceph_pool = true,
-    check_openstack_api = true,
-    cinder = true,
-    glance = true,
-    hypervisor_stats = true,
-    keystone = true,
-    neutron = true,
-    nova = true,
-    pacemaker = true,
-}
-
 -- this is needed for the libvirt metrics because in that case, collectd sends
 -- the instance's ID instead of the hostname in the 'host' attribute
 local hostname = read_config('hostname') or error('hostname must be specified')
@@ -83,6 +68,7 @@
         end
 
         local metric_source = sample['plugin']
+        local meta = sample['meta'] or {}
 
         for i, value in ipairs(sample['values']) do
             local skip_it = false
@@ -107,25 +93,23 @@
                 }
             }
 
-            -- Check if Fields[hostname] should be added or not to the metric message
-            if not hostname_free[metric_source] then
-                msg['Fields']['hostname'] = sample['host']
-            end
-
             -- Normalize metric name, unfortunately collectd plugins aren't
             -- always consistent on metric namespaces so we need a few if/else
             -- statements to cover all cases.
-            if sample['meta'] and sample['meta']['service_check'] then
-                msg['Fields']['name'] = sample['meta']['service_check'] .. sep .. 'check'
-                msg['Fields']['details'] = sample['meta']['failure']
-                if sample['meta']['local_check'] then
+            if meta['service_check'] then
+                msg['Fields']['name'] = meta['service_check'] .. sep .. 'check'
+                msg['Fields']['details'] = meta['failure']
+                if meta['local_check'] then
                     -- if the check is local to the node, add the hostname
                     msg['Fields']['hostname'] = sample['host']
+                    table.insert(msg['Fields']['tag_fields'], 'hostname')
                 end
             elseif metric_source == 'memory' or metric_source == 'contextswitch' or
                    metric_source == 'entropy' or metric_source == 'load' or
                    metric_source == 'swap' or metric_source == 'uptime' then
                 msg['Fields']['name'] = metric_name
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'df' then
                 local entity
                 if sample['type'] == 'df_inodes' then
@@ -148,6 +132,8 @@
                 msg['Fields']['name'] = 'fs' .. sep .. entity .. sep .. sample['type_instance']
                 msg['Fields']['fs'] = mount
                 table.insert(msg['Fields']['tag_fields'], 'fs')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'disk' then
                 if sample['type'] == 'disk_io_time' then
                     msg['Fields']['name'] = 'disk' .. sep .. sample['dsnames'][i]
@@ -156,10 +142,14 @@
                 end
                 msg['Fields']['device'] = sample['plugin_instance']
                 table.insert(msg['Fields']['tag_fields'], 'device')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'cpu' then
                 msg['Fields']['name'] = 'cpu' .. sep .. sample['type_instance']
                 msg['Fields']['cpu_number'] = sample['plugin_instance']
                 table.insert(msg['Fields']['tag_fields'], 'cpu_number')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'netlink' then
                 local netlink_metric = sample['type']
                 if netlink_metric == 'if_rx_errors' then
@@ -179,6 +169,8 @@
                 msg['Fields']['name'] = netlink_metric
                 msg['Fields']['interface'] = sample['plugin_instance']
                 table.insert(msg['Fields']['tag_fields'], 'interface')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'processes' then
                 if processes_map[sample['type']] then
                     -- metrics related to a specific process
@@ -214,8 +206,12 @@
                         msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
                     end
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source ==  'dbi' and sample['plugin_instance'] == 'mysql_status' then
                 msg['Fields']['name'] = 'mysql' .. sep .. replace_dot_by_sep(sample['type_instance'])
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'mysql' then
                 if sample['type'] == 'threads' then
                     msg['Fields']['name'] = 'mysql_' .. metric_name
@@ -230,6 +226,8 @@
                 else
                     msg['Fields']['name'] = metric_name
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'ntpd' then
                 if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
                     msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
@@ -238,15 +236,23 @@
                     msg['Fields']['server'] = sample['type_instance']
                     table.insert(msg['Fields']['tag_fields'], 'server')
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'check_openstack_api' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
+                --
                 -- For OpenStack API metrics, plugin_instance = <service name>
                 msg['Fields']['name'] = 'openstack_check_api'
                 msg['Fields']['service'] = sample['plugin_instance']
                 table.insert(msg['Fields']['tag_fields'], 'service')
-                if sample['meta'] then
-                    msg['Fields']['os_region'] = sample['meta']['region']
-                end
+                msg['Fields']['os_region'] = meta['region']
             elseif metric_source == 'hypervisor_stats' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
+                --
                 -- Metrics from the OpenStack hypervisor metrics where
                 -- type_instance = <metric name> which can end by _MB or _GB
                 msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep
@@ -258,34 +264,41 @@
                 else
                     msg['Fields']['name'] = msg['Fields']['name'] .. sample['type_instance']
                 end
-                if sample['meta'] and sample['meta']['host'] then
-                    msg['Fields']['hostname'] = sample['meta']['host']
+                if meta['host'] then
+                    msg['Fields']['hostname'] = meta['host']
+                    table.insert(msg['Fields']['tag_fields'], 'hostname')
                 end
-                if sample['meta'] and sample['meta']['aggregate'] then
-                    msg['Fields']['aggregate'] = sample['meta']['aggregate']
+                if meta['aggregate'] then
+                    msg['Fields']['aggregate'] = meta['aggregate']
                     table.insert(msg['Fields']['tag_fields'], 'aggregate')
                 end
-                if sample['meta'] and sample['meta']['aggregate_id'] then
-                    msg['Fields']['aggregate_id'] = sample['meta']['aggregate_id']
+                if meta['aggregate_id'] then
+                    msg['Fields']['aggregate_id'] = meta['aggregate_id']
                     table.insert(msg['Fields']['tag_fields'], 'aggregate_id')
                 end
             elseif metric_source == 'rabbitmq_info' then
                 msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance']
-                if sample['meta'] and sample['meta']['queue'] then
-                    msg['Fields']['queue'] = sample['meta']['queue']
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
+                if meta['queue'] then
+                    msg['Fields']['queue'] = meta['queue']
                     table.insert(msg['Fields']['tag_fields'], 'queue')
                 end
             elseif metric_source == 'nova' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 if sample['plugin_instance'] == 'nova_services' or
                    sample['plugin_instance'] == 'nova_services_percent' or
                    sample['plugin_instance'] == 'nova_service'  then
                     msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
-                    msg['Fields']['service'] = sample['meta']['service']
-                    msg['Fields']['state'] = sample['meta']['state']
+                    msg['Fields']['service'] = meta['service']
+                    msg['Fields']['state'] = meta['state']
                     table.insert(msg['Fields']['tag_fields'], 'service')
                     table.insert(msg['Fields']['tag_fields'], 'state')
                     if sample['plugin_instance'] == 'nova_service'  then
-                        msg['Fields']['hostname'] = sample['meta']['host']
+                        msg['Fields']['hostname'] = meta['host']
+                        table.insert(msg['Fields']['tag_fields'], 'hostname')
                     end
                 else
                     msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -293,16 +306,20 @@
                     table.insert(msg['Fields']['tag_fields'], 'state')
                 end
             elseif metric_source == 'cinder' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 if sample['plugin_instance'] == 'cinder_services' or
                    sample['plugin_instance'] == 'cinder_services_percent' or
                    sample['plugin_instance'] == 'cinder_service' then
                     msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
-                    msg['Fields']['service'] = sample['meta']['service']
-                    msg['Fields']['state'] = sample['meta']['state']
+                    msg['Fields']['service'] = meta['service']
+                    msg['Fields']['state'] = meta['state']
                     table.insert(msg['Fields']['tag_fields'], 'service')
                     table.insert(msg['Fields']['tag_fields'], 'state')
                     if sample['plugin_instance'] == 'cinder_service' then
-                        msg['Fields']['hostname'] = sample['meta']['host']
+                        msg['Fields']['hostname'] = meta['host']
+                        table.insert(msg['Fields']['tag_fields'], 'hostname')
                     end
                 else
                     msg['Fields']['name'] = 'openstack' .. sep .. 'cinder' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -310,18 +327,27 @@
                     table.insert(msg['Fields']['tag_fields'], 'state')
                 end
             elseif metric_source == 'glance' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 msg['Fields']['name'] = 'openstack'  .. sep .. 'glance' .. sep .. sample['type_instance']
-                msg['Fields']['state'] = sample['meta']['status']
-                msg['Fields']['visibility'] = sample['meta']['visibility']
+                msg['Fields']['state'] = meta['status']
+                msg['Fields']['visibility'] = meta['visibility']
                 table.insert(msg['Fields']['tag_fields'], 'state')
                 table.insert(msg['Fields']['tag_fields'], 'visibility')
             elseif metric_source == 'keystone' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 msg['Fields']['name'] = 'openstack'  .. sep .. 'keystone' .. sep .. sample['type_instance']
-                if sample['meta']['state'] then
-                    msg['Fields']['state'] = sample['meta']['state']
+                if meta['state'] then
+                    msg['Fields']['state'] = meta['state']
                     table.insert(msg['Fields']['tag_fields'], 'state')
                 end
             elseif metric_source == 'neutron' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 if sample['type_instance'] == 'networks' or sample['type_instance'] == 'ports' or sample['type_instance'] == 'routers' or sample['type_instance'] == 'floatingips' then
                     skip_it = true
                 elseif sample['type_instance'] == 'subnets' then
@@ -330,12 +356,13 @@
                        sample['type_instance'] == 'neutron_agents_percent' or
                        sample['type_instance'] == 'neutron_agent' then
                     msg['Fields']['name'] = 'openstack_' .. sample['type_instance']
-                    msg['Fields']['service'] = sample['meta']['service']
-                    msg['Fields']['state'] = sample['meta']['state']
+                    msg['Fields']['service'] = meta['service']
+                    msg['Fields']['state'] = meta['state']
                     table.insert(msg['Fields']['tag_fields'], 'service')
                     table.insert(msg['Fields']['tag_fields'], 'state')
                     if sample['type_instance'] == 'neutron_agent'  then
-                        msg['Fields']['hostname'] = sample['meta']['host']
+                        msg['Fields']['hostname'] = meta['host']
+                        table.insert(msg['Fields']['tag_fields'], 'hostname')
                     end
                 elseif string.match(sample['type_instance'], '^ports') then
                     local resource, owner, state = string.match(sample['type_instance'], '^([^.]+)%.([^.]+)%.(.+)$')
@@ -352,28 +379,32 @@
                 end
             elseif metric_source == 'memcached' then
                 msg['Fields']['name'] = 'memcached' .. sep .. string.gsub(metric_name, 'memcached_', '')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'haproxy' then
                 msg['Fields']['name'] = 'haproxy' .. sep .. sample['type_instance']
-                if sample['meta'] then
-                    if sample['meta']['backend'] then
-                        msg['Fields']['backend'] = sample['meta']['backend']
-                        table.insert(msg['Fields']['tag_fields'], 'backend')
-                        if sample['meta']['state'] then
-                            msg['Fields']['state'] = sample['meta']['state']
-                            table.insert(msg['Fields']['tag_fields'], 'state')
-                        end
-                        if sample['meta']['server'] then
-                            msg['Fields']['server'] = sample['meta']['server']
-                            table.insert(msg['Fields']['tag_fields'], 'server')
-                        end
-                    elseif sample['meta']['frontend'] then
-                        msg['Fields']['frontend'] = sample['meta']['frontend']
-                        table.insert(msg['Fields']['tag_fields'], 'frontend')
+                if meta['backend'] then
+                    msg['Fields']['backend'] = meta['backend']
+                    table.insert(msg['Fields']['tag_fields'], 'backend')
+                    if meta['state'] then
+                        msg['Fields']['state'] = meta['state']
+                        table.insert(msg['Fields']['tag_fields'], 'state')
                     end
+                    if meta['server'] then
+                        msg['Fields']['server'] = meta['server']
+                        table.insert(msg['Fields']['tag_fields'], 'server')
+                    end
+                elseif meta['frontend'] then
+                    msg['Fields']['frontend'] = meta['frontend']
+                    table.insert(msg['Fields']['tag_fields'], 'frontend')
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'apache' then
                 metric_name = string.gsub(metric_name, 'apache_', '')
                 msg['Fields']['name'] = 'apache' .. sep .. string.gsub(metric_name, 'scoreboard', 'workers')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'ceph_osd_perf' then
                 msg['Fields']['name'] = 'ceph_perf' .. sep .. sample['type']
 
@@ -381,6 +412,8 @@
                 msg['Fields']['osd'] = sample['type_instance']
                 table.insert(msg['Fields']['tag_fields'], 'cluster')
                 table.insert(msg['Fields']['tag_fields'], 'osd')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source:match('^ceph') then
                 msg['Fields']['name'] = 'ceph' .. sep .. sample['type']
                 if sample['dsnames'][i] ~= 'value' then
@@ -405,27 +438,31 @@
                     end
                 end
             elseif metric_source == 'pacemaker' then
-                if sample['meta'] and sample['meta']['host'] then
-                    msg['Fields']['hostname'] = sample['meta']['host']
+                if meta['host'] then
+                    msg['Fields']['hostname'] = meta['host']
+                    table.insert(msg['Fields']['tag_fields'], 'hostname')
                 end
 
                 msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
 
                 -- add dimension fields
                 for _, v in ipairs({'status', 'resource'}) do
-                    if sample['meta'] and sample['meta'][v] then
-                        msg['Fields'][v] = sample['meta'][v]
+                    if meta[v] then
+                        msg['Fields'][v] = meta[v]
                         table.insert(msg['Fields']['tag_fields'], v)
                     end
                 end
             elseif metric_source ==  'users' then
                 -- 'users' is a reserved name for InfluxDB v0.9
                 msg['Fields']['name'] = 'logged_users'
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source ==  'libvirt' then
                 -- collectd sends the instance's ID in the 'host' field
                 msg['Fields']['instance_id'] = sample['host']
                 table.insert(msg['Fields']['tag_fields'], 'instance_id')
                 msg['Fields']['hostname'] = hostname
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
                 msg['Hostname'] = hostname
 
                 if string.match(sample['type'], '^disk_') then
@@ -447,17 +484,30 @@
                 end
             elseif metric_source == 'influxdb' then
                 msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'check_local_endpoint' then
                 msg['Fields']['name'] = 'openstack_check_local_api'
                 msg['Fields']['service'] = sample['type_instance']
                 table.insert(msg['Fields']['tag_fields'], 'service')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'nginx' then
                 msg['Fields']['name'] = 'nginx' .. sep .. string.gsub(sample['type'], '^nginx_', '')
                 if sample['type_instance'] ~= "" then
                     msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             else
-                -- generic metric name translation for 3rd-party sources
+                -- In the default case, the collectd payload is decoded in a
+                -- generic way.
+                --
+                -- name:  <plugin>[_<plugin_instance>][_<type>][_<type_instance]
+                --
+                -- Except for reserved names, all items in the 'meta' dict are
+                -- added to the Fields dict and keys are added to the
+                -- Fields['tag_fields'] array.
                 msg['Fields']['name'] = sample['plugin']
                 if sample['plugin_instance'] ~= "" then
                     msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
@@ -475,16 +525,23 @@
                 end
                 msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
 
-                -- check if the hostname field should be kept or not (eg for
-                -- cluster metrics, discard_hostname == true)
-                if sample['meta'] and sample['meta']['discard_hostname'] then
-                    msg['Fields']['hostname'] = nil
-                    sample['meta']['discard_hostname'] = nil
+                if meta['unit'] then
+                    msg.Fields['value'] = {
+                        value = msg.Fields['value'],
+                        representation = meta['unit']
+                    }
+                end
+
+                -- if not set, check if the 'hostname' field should be added
+                -- (eg for cluster metrics, discard_hostname == true)
+                if msg['Fields']['hostname'] == nil and not meta['discard_hostname'] then
+                    msg['Fields']['hostname'] = msg['Hostname']
+                    table.insert(msg['Fields']['tag_fields'], 'hostname')
                 end
 
                 -- add meta fields as tag_fields
-                for k, v in pairs(sample['meta'] or {}) do
-                    if tostring(k) ~= '0' then
+                for k, v in pairs(meta) do
+                    if tostring(k) ~= '0' and k ~= 'unit' and k ~= 'discard_hostname' then
                         msg['Fields'][k] = v
                         table.insert(msg['Fields']['tag_fields'], k)
                    end
@@ -492,9 +549,6 @@
             end
 
             if not skip_it then
-                if msg['Fields']['hostname'] then
-                     table.insert(msg['Fields']['tag_fields'], 'hostname')
-                end
                 utils.inject_tags(msg)
                 -- Before injecting the message we need to check that tag_fields is not an
                 -- empty table otherwise the protobuf encoder fails to encode the table.