Merge "Add Nagios class to the metric_collector metadata"
diff --git a/heka/_common.sls b/heka/_common.sls
index 1753840..9f71c31 100644
--- a/heka/_common.sls
+++ b/heka/_common.sls
@@ -7,15 +7,24 @@
 /usr/share/lma_collector:
   file.recurse:
   - source: salt://heka/files/lua
+  - user: root
+  - group: heka
+  - file_mode: 640
+  - dir_mode: 750
+  - require:
+    - user: heka_user
 
 /usr/share/lma_collector/common/extra_fields.lua:
   file.managed:
   - source: salt://heka/files/extra_fields.lua
   - user: root
-  - mode: 644
+  - group: heka
+  - mode: 640
   - defaults:
       extra_fields: {{ server.extra_fields }}
   - template: jinja
+  - require:
+    - user: heka_user
 
 heka_user:
   user.present:
diff --git a/heka/_service.sls b/heka/_service.sls
index 359e716..70d80ce 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -13,6 +13,7 @@
   file.directory:
   - name: /var/cache/{{ service_name }}
   - user: heka
+  - group: heka
   - mode: 750
   - makedirs: true
 
diff --git a/heka/files/lua/common/influxdb.lua b/heka/files/lua/common/influxdb.lua
index d9c359d..9341399 100644
--- a/heka/files/lua/common/influxdb.lua
+++ b/heka/files/lua/common/influxdb.lua
@@ -26,8 +26,14 @@
 
 setfenv(1, InfluxEncoder) -- Remove external access to contain everything in the module
 
-local function escape_string(str)
-    return tostring(str):gsub("([ ,])", "\\%1")
+local function escape_string(str, preserve_quotes)
+    v = tostring(str)
+    -- single quotes are always forbidden
+    v = v:gsub("'", "")
+    if not preserve_quotes then
+        v = v:gsub('"', "")
+    end
+    return v:gsub("([ ,=])", "\\%1")
 end
 
 local function encode_scalar_value(value)
@@ -40,7 +46,7 @@
         -- string values need to be double quoted
         return '"' .. value:gsub('"', '\\"') .. '"'
     elseif type(value) == "boolean" then
-        return '"' .. tostring(value) .. '"'
+        return tostring(value)
     end
 end
 
@@ -50,7 +56,7 @@
         for k,v in pairs(value) do
             table.insert(
                 values,
-                string.format("%s=%s", escape_string(k), encode_scalar_value(v))
+                string.format("%s=%s", escape_string(k, true), encode_scalar_value(v))
             )
         end
         return table.concat(values, ',')
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index 1d44349..3988c17 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -45,21 +45,6 @@
     ['grafana-server'] = true,
 }
 
--- The following table keeps a list of metrics from plugin where the
--- Fields[hostname] shouldn't be set by default.
-local hostname_free = {
-    ceph_mon = true,
-    ceph_pool = true,
-    check_openstack_api = true,
-    cinder = true,
-    glance = true,
-    hypervisor_stats = true,
-    keystone = true,
-    neutron = true,
-    nova = true,
-    pacemaker = true,
-}
-
 -- this is needed for the libvirt metrics because in that case, collectd sends
 -- the instance's ID instead of the hostname in the 'host' attribute
 local hostname = read_config('hostname') or error('hostname must be specified')
@@ -83,6 +68,7 @@
         end
 
         local metric_source = sample['plugin']
+        local meta = sample['meta'] or {}
 
         for i, value in ipairs(sample['values']) do
             local skip_it = false
@@ -107,25 +93,23 @@
                 }
             }
 
-            -- Check if Fields[hostname] should be added or not to the metric message
-            if not hostname_free[metric_source] then
-                msg['Fields']['hostname'] = sample['host']
-            end
-
             -- Normalize metric name, unfortunately collectd plugins aren't
             -- always consistent on metric namespaces so we need a few if/else
             -- statements to cover all cases.
-            if sample['meta'] and sample['meta']['service_check'] then
-                msg['Fields']['name'] = sample['meta']['service_check'] .. sep .. 'check'
-                msg['Fields']['details'] = sample['meta']['failure']
-                if sample['meta']['local_check'] then
+            if meta['service_check'] then
+                msg['Fields']['name'] = meta['service_check'] .. sep .. 'check'
+                msg['Fields']['details'] = meta['failure']
+                if meta['local_check'] then
                     -- if the check is local to the node, add the hostname
                     msg['Fields']['hostname'] = sample['host']
+                    table.insert(msg['Fields']['tag_fields'], 'hostname')
                 end
             elseif metric_source == 'memory' or metric_source == 'contextswitch' or
                    metric_source == 'entropy' or metric_source == 'load' or
                    metric_source == 'swap' or metric_source == 'uptime' then
                 msg['Fields']['name'] = metric_name
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'df' then
                 local entity
                 if sample['type'] == 'df_inodes' then
@@ -148,18 +132,26 @@
                 msg['Fields']['name'] = 'fs' .. sep .. entity .. sep .. sample['type_instance']
                 msg['Fields']['fs'] = mount
                 table.insert(msg['Fields']['tag_fields'], 'fs')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'disk' then
                 if sample['type'] == 'disk_io_time' then
                     msg['Fields']['name'] = 'disk' .. sep .. sample['dsnames'][i]
+                elseif sample['type'] == 'pending_operations' then
+                    msg['Fields']['name'] = 'disk' .. sep .. sample['type']
                 else
                     msg['Fields']['name'] = metric_name
                 end
                 msg['Fields']['device'] = sample['plugin_instance']
                 table.insert(msg['Fields']['tag_fields'], 'device')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'cpu' then
                 msg['Fields']['name'] = 'cpu' .. sep .. sample['type_instance']
                 msg['Fields']['cpu_number'] = sample['plugin_instance']
                 table.insert(msg['Fields']['tag_fields'], 'cpu_number')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'netlink' then
                 local netlink_metric = sample['type']
                 if netlink_metric == 'if_rx_errors' then
@@ -179,6 +171,8 @@
                 msg['Fields']['name'] = netlink_metric
                 msg['Fields']['interface'] = sample['plugin_instance']
                 table.insert(msg['Fields']['tag_fields'], 'interface')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'processes' then
                 if processes_map[sample['type']] then
                     -- metrics related to a specific process
@@ -214,8 +208,12 @@
                         msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
                     end
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source ==  'dbi' and sample['plugin_instance'] == 'mysql_status' then
                 msg['Fields']['name'] = 'mysql' .. sep .. replace_dot_by_sep(sample['type_instance'])
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'mysql' then
                 if sample['type'] == 'threads' then
                     msg['Fields']['name'] = 'mysql_' .. metric_name
@@ -230,6 +228,8 @@
                 else
                     msg['Fields']['name'] = metric_name
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'ntpd' then
                 if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
                     msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
@@ -238,15 +238,23 @@
                     msg['Fields']['server'] = sample['type_instance']
                     table.insert(msg['Fields']['tag_fields'], 'server')
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'check_openstack_api' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
+                --
                 -- For OpenStack API metrics, plugin_instance = <service name>
                 msg['Fields']['name'] = 'openstack_check_api'
                 msg['Fields']['service'] = sample['plugin_instance']
                 table.insert(msg['Fields']['tag_fields'], 'service')
-                if sample['meta'] then
-                    msg['Fields']['os_region'] = sample['meta']['region']
-                end
+                msg['Fields']['os_region'] = meta['region']
             elseif metric_source == 'hypervisor_stats' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
+                --
                 -- Metrics from the OpenStack hypervisor metrics where
                 -- type_instance = <metric name> which can end by _MB or _GB
                 msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep
@@ -258,34 +266,41 @@
                 else
                     msg['Fields']['name'] = msg['Fields']['name'] .. sample['type_instance']
                 end
-                if sample['meta'] and sample['meta']['host'] then
-                    msg['Fields']['hostname'] = sample['meta']['host']
+                if meta['host'] then
+                    msg['Fields']['hostname'] = meta['host']
+                    table.insert(msg['Fields']['tag_fields'], 'hostname')
                 end
-                if sample['meta'] and sample['meta']['aggregate'] then
-                    msg['Fields']['aggregate'] = sample['meta']['aggregate']
+                if meta['aggregate'] then
+                    msg['Fields']['aggregate'] = meta['aggregate']
                     table.insert(msg['Fields']['tag_fields'], 'aggregate')
                 end
-                if sample['meta'] and sample['meta']['aggregate_id'] then
-                    msg['Fields']['aggregate_id'] = sample['meta']['aggregate_id']
+                if meta['aggregate_id'] then
+                    msg['Fields']['aggregate_id'] = meta['aggregate_id']
                     table.insert(msg['Fields']['tag_fields'], 'aggregate_id')
                 end
             elseif metric_source == 'rabbitmq_info' then
                 msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance']
-                if sample['meta'] and sample['meta']['queue'] then
-                    msg['Fields']['queue'] = sample['meta']['queue']
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
+                if meta['queue'] then
+                    msg['Fields']['queue'] = meta['queue']
                     table.insert(msg['Fields']['tag_fields'], 'queue')
                 end
             elseif metric_source == 'nova' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 if sample['plugin_instance'] == 'nova_services' or
                    sample['plugin_instance'] == 'nova_services_percent' or
                    sample['plugin_instance'] == 'nova_service'  then
                     msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
-                    msg['Fields']['service'] = sample['meta']['service']
-                    msg['Fields']['state'] = sample['meta']['state']
+                    msg['Fields']['service'] = meta['service']
+                    msg['Fields']['state'] = meta['state']
                     table.insert(msg['Fields']['tag_fields'], 'service')
                     table.insert(msg['Fields']['tag_fields'], 'state')
                     if sample['plugin_instance'] == 'nova_service'  then
-                        msg['Fields']['hostname'] = sample['meta']['host']
+                        msg['Fields']['hostname'] = meta['host']
+                        table.insert(msg['Fields']['tag_fields'], 'hostname')
                     end
                 else
                     msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -293,16 +308,20 @@
                     table.insert(msg['Fields']['tag_fields'], 'state')
                 end
             elseif metric_source == 'cinder' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 if sample['plugin_instance'] == 'cinder_services' or
                    sample['plugin_instance'] == 'cinder_services_percent' or
                    sample['plugin_instance'] == 'cinder_service' then
                     msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
-                    msg['Fields']['service'] = sample['meta']['service']
-                    msg['Fields']['state'] = sample['meta']['state']
+                    msg['Fields']['service'] = meta['service']
+                    msg['Fields']['state'] = meta['state']
                     table.insert(msg['Fields']['tag_fields'], 'service')
                     table.insert(msg['Fields']['tag_fields'], 'state')
                     if sample['plugin_instance'] == 'cinder_service' then
-                        msg['Fields']['hostname'] = sample['meta']['host']
+                        msg['Fields']['hostname'] = meta['host']
+                        table.insert(msg['Fields']['tag_fields'], 'hostname')
                     end
                 else
                     msg['Fields']['name'] = 'openstack' .. sep .. 'cinder' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -310,18 +329,27 @@
                     table.insert(msg['Fields']['tag_fields'], 'state')
                 end
             elseif metric_source == 'glance' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 msg['Fields']['name'] = 'openstack'  .. sep .. 'glance' .. sep .. sample['type_instance']
-                msg['Fields']['state'] = sample['meta']['status']
-                msg['Fields']['visibility'] = sample['meta']['visibility']
+                msg['Fields']['state'] = meta['status']
+                msg['Fields']['visibility'] = meta['visibility']
                 table.insert(msg['Fields']['tag_fields'], 'state')
                 table.insert(msg['Fields']['tag_fields'], 'visibility')
             elseif metric_source == 'keystone' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 msg['Fields']['name'] = 'openstack'  .. sep .. 'keystone' .. sep .. sample['type_instance']
-                if sample['meta']['state'] then
-                    msg['Fields']['state'] = sample['meta']['state']
+                if meta['state'] then
+                    msg['Fields']['state'] = meta['state']
                     table.insert(msg['Fields']['tag_fields'], 'state')
                 end
             elseif metric_source == 'neutron' then
+                -- This code is kept for backward compatibility with the old
+                -- collectd plugin. The new collectd plugin sends payload which
+                -- is compatible with the default decoding.
                 if sample['type_instance'] == 'networks' or sample['type_instance'] == 'ports' or sample['type_instance'] == 'routers' or sample['type_instance'] == 'floatingips' then
                     skip_it = true
                 elseif sample['type_instance'] == 'subnets' then
@@ -330,12 +358,13 @@
                        sample['type_instance'] == 'neutron_agents_percent' or
                        sample['type_instance'] == 'neutron_agent' then
                     msg['Fields']['name'] = 'openstack_' .. sample['type_instance']
-                    msg['Fields']['service'] = sample['meta']['service']
-                    msg['Fields']['state'] = sample['meta']['state']
+                    msg['Fields']['service'] = meta['service']
+                    msg['Fields']['state'] = meta['state']
                     table.insert(msg['Fields']['tag_fields'], 'service')
                     table.insert(msg['Fields']['tag_fields'], 'state')
                     if sample['type_instance'] == 'neutron_agent'  then
-                        msg['Fields']['hostname'] = sample['meta']['host']
+                        msg['Fields']['hostname'] = meta['host']
+                        table.insert(msg['Fields']['tag_fields'], 'hostname')
                     end
                 elseif string.match(sample['type_instance'], '^ports') then
                     local resource, owner, state = string.match(sample['type_instance'], '^([^.]+)%.([^.]+)%.(.+)$')
@@ -352,28 +381,32 @@
                 end
             elseif metric_source == 'memcached' then
                 msg['Fields']['name'] = 'memcached' .. sep .. string.gsub(metric_name, 'memcached_', '')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'haproxy' then
                 msg['Fields']['name'] = 'haproxy' .. sep .. sample['type_instance']
-                if sample['meta'] then
-                    if sample['meta']['backend'] then
-                        msg['Fields']['backend'] = sample['meta']['backend']
-                        table.insert(msg['Fields']['tag_fields'], 'backend')
-                        if sample['meta']['state'] then
-                            msg['Fields']['state'] = sample['meta']['state']
-                            table.insert(msg['Fields']['tag_fields'], 'state')
-                        end
-                        if sample['meta']['server'] then
-                            msg['Fields']['server'] = sample['meta']['server']
-                            table.insert(msg['Fields']['tag_fields'], 'server')
-                        end
-                    elseif sample['meta']['frontend'] then
-                        msg['Fields']['frontend'] = sample['meta']['frontend']
-                        table.insert(msg['Fields']['tag_fields'], 'frontend')
+                if meta['backend'] then
+                    msg['Fields']['backend'] = meta['backend']
+                    table.insert(msg['Fields']['tag_fields'], 'backend')
+                    if meta['state'] then
+                        msg['Fields']['state'] = meta['state']
+                        table.insert(msg['Fields']['tag_fields'], 'state')
                     end
+                    if meta['server'] then
+                        msg['Fields']['server'] = meta['server']
+                        table.insert(msg['Fields']['tag_fields'], 'server')
+                    end
+                elseif meta['frontend'] then
+                    msg['Fields']['frontend'] = meta['frontend']
+                    table.insert(msg['Fields']['tag_fields'], 'frontend')
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'apache' then
                 metric_name = string.gsub(metric_name, 'apache_', '')
                 msg['Fields']['name'] = 'apache' .. sep .. string.gsub(metric_name, 'scoreboard', 'workers')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'ceph_osd_perf' then
                 msg['Fields']['name'] = 'ceph_perf' .. sep .. sample['type']
 
@@ -381,6 +414,8 @@
                 msg['Fields']['osd'] = sample['type_instance']
                 table.insert(msg['Fields']['tag_fields'], 'cluster')
                 table.insert(msg['Fields']['tag_fields'], 'osd')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source:match('^ceph') then
                 msg['Fields']['name'] = 'ceph' .. sep .. sample['type']
                 if sample['dsnames'][i] ~= 'value' then
@@ -405,27 +440,31 @@
                     end
                 end
             elseif metric_source == 'pacemaker' then
-                if sample['meta'] and sample['meta']['host'] then
-                    msg['Fields']['hostname'] = sample['meta']['host']
+                if meta['host'] then
+                    msg['Fields']['hostname'] = meta['host']
+                    table.insert(msg['Fields']['tag_fields'], 'hostname')
                 end
 
                 msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
 
                 -- add dimension fields
                 for _, v in ipairs({'status', 'resource'}) do
-                    if sample['meta'] and sample['meta'][v] then
-                        msg['Fields'][v] = sample['meta'][v]
+                    if meta[v] then
+                        msg['Fields'][v] = meta[v]
                         table.insert(msg['Fields']['tag_fields'], v)
                     end
                 end
             elseif metric_source ==  'users' then
                 -- 'users' is a reserved name for InfluxDB v0.9
                 msg['Fields']['name'] = 'logged_users'
-            elseif metric_source ==  'libvirt' then
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
+            elseif metric_source ==  'libvirt' or metric_source == 'virt' then
                 -- collectd sends the instance's ID in the 'host' field
                 msg['Fields']['instance_id'] = sample['host']
                 table.insert(msg['Fields']['tag_fields'], 'instance_id')
                 msg['Fields']['hostname'] = hostname
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
                 msg['Hostname'] = hostname
 
                 if string.match(sample['type'], '^disk_') then
@@ -447,17 +486,30 @@
                 end
             elseif metric_source == 'influxdb' then
                 msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'check_local_endpoint' then
                 msg['Fields']['name'] = 'openstack_check_local_api'
                 msg['Fields']['service'] = sample['type_instance']
                 table.insert(msg['Fields']['tag_fields'], 'service')
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             elseif metric_source == 'nginx' then
                 msg['Fields']['name'] = 'nginx' .. sep .. string.gsub(sample['type'], '^nginx_', '')
                 if sample['type_instance'] ~= "" then
                     msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
                 end
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
             else
-                -- generic metric name translation for 3rd-party sources
+                -- In the default case, the collectd payload is decoded in a
+                -- generic way.
+                --
+                -- name:  <plugin>[_<plugin_instance>][_<type>][_<type_instance]
+                --
+                -- Except for reserved names, all items in the 'meta' dict are
+                -- added to the Fields dict and keys are added to the
+                -- Fields['tag_fields'] array.
                 msg['Fields']['name'] = sample['plugin']
                 if sample['plugin_instance'] ~= "" then
                     msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
@@ -475,16 +527,23 @@
                 end
                 msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
 
-                -- check if the hostname field should be kept or not (eg for
-                -- cluster metrics, discard_hostname == true)
-                if sample['meta'] and sample['meta']['discard_hostname'] then
-                    msg['Fields']['hostname'] = nil
-                    sample['meta']['discard_hostname'] = nil
+                if meta['unit'] then
+                    msg.Fields['value'] = {
+                        value = msg.Fields['value'],
+                        representation = meta['unit']
+                    }
+                end
+
+                -- if not set, check if the 'hostname' field should be added
+                -- (eg for cluster metrics, discard_hostname == true)
+                if msg['Fields']['hostname'] == nil and not meta['discard_hostname'] then
+                    msg['Fields']['hostname'] = msg['Hostname']
+                    table.insert(msg['Fields']['tag_fields'], 'hostname')
                 end
 
                 -- add meta fields as tag_fields
-                for k, v in pairs(sample['meta'] or {}) do
-                    if tostring(k) ~= '0' then
+                for k, v in pairs(meta) do
+                    if tostring(k) ~= '0' and k ~= 'unit' and k ~= 'discard_hostname' then
                         msg['Fields'][k] = v
                         table.insert(msg['Fields']['tag_fields'], k)
                    end
@@ -492,9 +551,6 @@
             end
 
             if not skip_it then
-                if msg['Fields']['hostname'] then
-                     table.insert(msg['Fields']['tag_fields'], 'hostname')
-                end
                 utils.inject_tags(msg)
                 -- Before injecting the message we need to check that tag_fields is not an
                 -- empty table otherwise the protobuf encoder fails to encode the table.
diff --git a/heka/files/lua/decoders/kubernetes.lua b/heka/files/lua/decoders/kubernetes.lua
new file mode 100644
index 0000000..44ff669
--- /dev/null
+++ b/heka/files/lua/decoders/kubernetes.lua
@@ -0,0 +1,58 @@
+-- Copyright 2017 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+
+local l      = require 'lpeg'
+l.locale(l)
+
+local dt     = require "date_time"
+local patt   = require 'patterns'
+local syslog = require "syslog"
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = {},
+    Severity    = 6, -- INFO
+}
+
+local syslog_pattern = read_config("syslog_pattern") or error("syslog_pattern configuration must be specified")
+local syslog_grammar = syslog.build_rsyslog_grammar(syslog_pattern)
+
+local k8s_severity = l.Cg(l.P'I' + l.P'W' + l.P'E' + l.P'F', 'Severity')
+local k8s_time = dt.rfc3339_partial_time
+local message = l.Cg(patt.Message, "Message")
+local k8s_pattern = l.Ct(k8s_severity * l.xdigit^4 * patt.sp^1 * k8s_time * patt.sp^1 * patt.Pid * patt.sp^1 * message)
+
+
+function process_message ()
+    local log = read_message("Payload")
+
+    if utils.parse_syslog_message(syslog_grammar, log, msg) then
+        kube = k8s_pattern:match(msg.Payload)
+        if kube then
+           msg.Payload = kube.Message
+           msg.Severity = utils.label_to_severity_map[kube.Severity] or 6
+        end
+    else
+        msg.Payload = log
+    end
+    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+    return utils.safe_inject_message(msg)
+end
+
diff --git a/heka/files/lua/decoders/metric.lua b/heka/files/lua/decoders/metric.lua
index 75cec5e..963f827 100644
--- a/heka/files/lua/decoders/metric.lua
+++ b/heka/files/lua/decoders/metric.lua
@@ -15,22 +15,11 @@
 require "cjson"
 require "string"
 
-local l = require 'lpeg'
-l.locale(l)
-
-local split_on_space = l.Ct( (l.C((l.P(1) - l.space)^1) * l.space^0)^1 * -1)
-local sources_list = split_on_space:match(read_config('deserialize_for_sources') or '')
-
-local sources = {}
-for _, s in ipairs(sources_list) do
-    sources[s] = true
-end
-
 local utils = require 'lma_utils'
 
 function process_message ()
     local msg = decode_message(read_message("raw"))
-    if string.match(msg.Type, 'bulk_metric$') and sources[msg.Fields.source] then
+    if string.match(msg.Type, 'bulk_metric$') then
 
         local ok, metrics = pcall(cjson.decode, msg.Payload)
         if not ok then
diff --git a/heka/files/toml/decoder/sandbox.toml b/heka/files/toml/decoder/sandbox.toml
index c073991..d472a3d 100644
--- a/heka/files/toml/decoder/sandbox.toml
+++ b/heka/files/toml/decoder/sandbox.toml
@@ -2,10 +2,8 @@
 
 [{{ decoder_name }}_decoder]
 type = "SandboxDecoder"
-filename = "{{ decoder.module_file }}"
-{%- if decoder.module_dir is defined %}
-module_directory = "{{ decoder.module_dir }}"
-{%- endif %}
+filename = "{{ decoder.module_file|default('/usr/share/lma_collector/decoders/' + decoder_name + '.lua' ) }}"
+module_directory = "{{ decoder.module_dir|default('/usr/share/lma_collector/common;/usr/share/heka/lua_modules') }}"
 {%- if decoder.memory_limit is defined %}
 memory_limit = "{{ decoder.memory_limit }}"
 {%- endif %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 12c0c63..5ad8722 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -109,8 +109,6 @@
       engine: sandbox
       module_file: /usr/share/lma_collector/decoders/metric.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
-      config:
-        deserialize_for_sources: 'log_collector'
   input:
     heka_collectd:
       engine: http
diff --git a/tests/lua/test_afd.lua b/tests/lua/test_afd.lua
index 478a36c..1016984 100644
--- a/tests/lua/test_afd.lua
+++ b/tests/lua/test_afd.lua
@@ -54,7 +54,7 @@
         assertEquals(last_injected_msg.Type, 'afd_metric')
         assertEquals(last_injected_msg.Fields.value, consts.OKAY)
         assertEquals(last_injected_msg.Fields.hostname, 'node-1')
-        assertEquals(last_injected_msg.Fields.notification_handler, 'mail')
+        assertEquals(last_injected_msg.Fields.notification_handler, nil)
         assertEquals(last_injected_msg.Payload, '{"alarms":[]}')
     end
 
diff --git a/tests/lua/test_influxdb.lua b/tests/lua/test_influxdb.lua
index 160c6b5..73d650d 100644
--- a/tests/lua/test_influxdb.lua
+++ b/tests/lua/test_influxdb.lua
@@ -21,11 +21,18 @@
 
 TestInfluxDB = {}
 
+    function TestInfluxDB:test_escaping_characters()
+        local encoder = influxdb.new("s")
+        assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 2, {tag1='"tag1"'}), 'foo,tag1=tag1 value=2.000000 1000')
+        assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 2, {tag1=",tag 1="}), 'foo,tag1=\\,tag\\ 1\\= value=2.000000 1000')
+        assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 'b"ar'), 'foo value="b\\"ar" 1000')
+        assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', true), 'foo value=true 1000')
+    end
+
     function TestInfluxDB:test_ms_precision_encoder()
         local encoder = influxdb.new("ms")
         assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 1), 'foo value=1.000000 1000000')
         assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 'bar'), 'foo value="bar" 1000000')
-        assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 'b"ar'), 'foo value="b\\"ar" 1000000')
         assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 1, {tag2="t2",tag1="t1"}), 'foo,tag1=t1,tag2=t2 value=1.000000 1000000')
         assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', {a=1, b=2}), 'foo a=1.000000,b=2.000000 1000000')
     end