Merge "Add Nagios class to the metric_collector metadata"
diff --git a/heka/_common.sls b/heka/_common.sls
index 1753840..9f71c31 100644
--- a/heka/_common.sls
+++ b/heka/_common.sls
@@ -7,15 +7,24 @@
/usr/share/lma_collector:
file.recurse:
- source: salt://heka/files/lua
+ - user: root
+ - group: heka
+ - file_mode: 640
+ - dir_mode: 750
+ - require:
+ - user: heka_user
/usr/share/lma_collector/common/extra_fields.lua:
file.managed:
- source: salt://heka/files/extra_fields.lua
- user: root
- - mode: 644
+ - group: heka
+ - mode: 640
- defaults:
extra_fields: {{ server.extra_fields }}
- template: jinja
+ - require:
+ - user: heka_user
heka_user:
user.present:
diff --git a/heka/_service.sls b/heka/_service.sls
index 359e716..70d80ce 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -13,6 +13,7 @@
file.directory:
- name: /var/cache/{{ service_name }}
- user: heka
+ - group: heka
- mode: 750
- makedirs: true
diff --git a/heka/files/lua/common/influxdb.lua b/heka/files/lua/common/influxdb.lua
index d9c359d..9341399 100644
--- a/heka/files/lua/common/influxdb.lua
+++ b/heka/files/lua/common/influxdb.lua
@@ -26,8 +26,14 @@
setfenv(1, InfluxEncoder) -- Remove external access to contain everything in the module
-local function escape_string(str)
- return tostring(str):gsub("([ ,])", "\\%1")
+local function escape_string(str, preserve_quotes)
+ v = tostring(str)
+ -- single quotes are always forbidden
+ v = v:gsub("'", "")
+ if not preserve_quotes then
+ v = v:gsub('"', "")
+ end
+ return v:gsub("([ ,=])", "\\%1")
end
local function encode_scalar_value(value)
@@ -40,7 +46,7 @@
-- string values need to be double quoted
return '"' .. value:gsub('"', '\\"') .. '"'
elseif type(value) == "boolean" then
- return '"' .. tostring(value) .. '"'
+ return tostring(value)
end
end
@@ -50,7 +56,7 @@
for k,v in pairs(value) do
table.insert(
values,
- string.format("%s=%s", escape_string(k), encode_scalar_value(v))
+ string.format("%s=%s", escape_string(k, true), encode_scalar_value(v))
)
end
return table.concat(values, ',')
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index 1d44349..3988c17 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -45,21 +45,6 @@
['grafana-server'] = true,
}
--- The following table keeps a list of metrics from plugin where the
--- Fields[hostname] shouldn't be set by default.
-local hostname_free = {
- ceph_mon = true,
- ceph_pool = true,
- check_openstack_api = true,
- cinder = true,
- glance = true,
- hypervisor_stats = true,
- keystone = true,
- neutron = true,
- nova = true,
- pacemaker = true,
-}
-
-- this is needed for the libvirt metrics because in that case, collectd sends
-- the instance's ID instead of the hostname in the 'host' attribute
local hostname = read_config('hostname') or error('hostname must be specified')
@@ -83,6 +68,7 @@
end
local metric_source = sample['plugin']
+ local meta = sample['meta'] or {}
for i, value in ipairs(sample['values']) do
local skip_it = false
@@ -107,25 +93,23 @@
}
}
- -- Check if Fields[hostname] should be added or not to the metric message
- if not hostname_free[metric_source] then
- msg['Fields']['hostname'] = sample['host']
- end
-
-- Normalize metric name, unfortunately collectd plugins aren't
-- always consistent on metric namespaces so we need a few if/else
-- statements to cover all cases.
- if sample['meta'] and sample['meta']['service_check'] then
- msg['Fields']['name'] = sample['meta']['service_check'] .. sep .. 'check'
- msg['Fields']['details'] = sample['meta']['failure']
- if sample['meta']['local_check'] then
+ if meta['service_check'] then
+ msg['Fields']['name'] = meta['service_check'] .. sep .. 'check'
+ msg['Fields']['details'] = meta['failure']
+ if meta['local_check'] then
-- if the check is local to the node, add the hostname
msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
elseif metric_source == 'memory' or metric_source == 'contextswitch' or
metric_source == 'entropy' or metric_source == 'load' or
metric_source == 'swap' or metric_source == 'uptime' then
msg['Fields']['name'] = metric_name
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'df' then
local entity
if sample['type'] == 'df_inodes' then
@@ -148,18 +132,26 @@
msg['Fields']['name'] = 'fs' .. sep .. entity .. sep .. sample['type_instance']
msg['Fields']['fs'] = mount
table.insert(msg['Fields']['tag_fields'], 'fs')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'disk' then
if sample['type'] == 'disk_io_time' then
msg['Fields']['name'] = 'disk' .. sep .. sample['dsnames'][i]
+ elseif sample['type'] == 'pending_operations' then
+ msg['Fields']['name'] = 'disk' .. sep .. sample['type']
else
msg['Fields']['name'] = metric_name
end
msg['Fields']['device'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'device')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'cpu' then
msg['Fields']['name'] = 'cpu' .. sep .. sample['type_instance']
msg['Fields']['cpu_number'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'cpu_number')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'netlink' then
local netlink_metric = sample['type']
if netlink_metric == 'if_rx_errors' then
@@ -179,6 +171,8 @@
msg['Fields']['name'] = netlink_metric
msg['Fields']['interface'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'interface')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'processes' then
if processes_map[sample['type']] then
-- metrics related to a specific process
@@ -214,8 +208,12 @@
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
end
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'dbi' and sample['plugin_instance'] == 'mysql_status' then
msg['Fields']['name'] = 'mysql' .. sep .. replace_dot_by_sep(sample['type_instance'])
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'mysql' then
if sample['type'] == 'threads' then
msg['Fields']['name'] = 'mysql_' .. metric_name
@@ -230,6 +228,8 @@
else
msg['Fields']['name'] = metric_name
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'ntpd' then
if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
@@ -238,15 +238,23 @@
msg['Fields']['server'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'server')
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'check_openstack_api' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
+ --
-- For OpenStack API metrics, plugin_instance = <service name>
msg['Fields']['name'] = 'openstack_check_api'
msg['Fields']['service'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
- if sample['meta'] then
- msg['Fields']['os_region'] = sample['meta']['region']
- end
+ msg['Fields']['os_region'] = meta['region']
elseif metric_source == 'hypervisor_stats' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
+ --
-- Metrics from the OpenStack hypervisor metrics where
-- type_instance = <metric name> which can end by _MB or _GB
msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep
@@ -258,34 +266,41 @@
else
msg['Fields']['name'] = msg['Fields']['name'] .. sample['type_instance']
end
- if sample['meta'] and sample['meta']['host'] then
- msg['Fields']['hostname'] = sample['meta']['host']
+ if meta['host'] then
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
- if sample['meta'] and sample['meta']['aggregate'] then
- msg['Fields']['aggregate'] = sample['meta']['aggregate']
+ if meta['aggregate'] then
+ msg['Fields']['aggregate'] = meta['aggregate']
table.insert(msg['Fields']['tag_fields'], 'aggregate')
end
- if sample['meta'] and sample['meta']['aggregate_id'] then
- msg['Fields']['aggregate_id'] = sample['meta']['aggregate_id']
+ if meta['aggregate_id'] then
+ msg['Fields']['aggregate_id'] = meta['aggregate_id']
table.insert(msg['Fields']['tag_fields'], 'aggregate_id')
end
elseif metric_source == 'rabbitmq_info' then
msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance']
- if sample['meta'] and sample['meta']['queue'] then
- msg['Fields']['queue'] = sample['meta']['queue']
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
+ if meta['queue'] then
+ msg['Fields']['queue'] = meta['queue']
table.insert(msg['Fields']['tag_fields'], 'queue')
end
elseif metric_source == 'nova' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
if sample['plugin_instance'] == 'nova_services' or
sample['plugin_instance'] == 'nova_services_percent' or
sample['plugin_instance'] == 'nova_service' then
msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
- msg['Fields']['service'] = sample['meta']['service']
- msg['Fields']['state'] = sample['meta']['state']
+ msg['Fields']['service'] = meta['service']
+ msg['Fields']['state'] = meta['state']
table.insert(msg['Fields']['tag_fields'], 'service')
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['plugin_instance'] == 'nova_service' then
- msg['Fields']['hostname'] = sample['meta']['host']
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
else
msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -293,16 +308,20 @@
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'cinder' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
if sample['plugin_instance'] == 'cinder_services' or
sample['plugin_instance'] == 'cinder_services_percent' or
sample['plugin_instance'] == 'cinder_service' then
msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
- msg['Fields']['service'] = sample['meta']['service']
- msg['Fields']['state'] = sample['meta']['state']
+ msg['Fields']['service'] = meta['service']
+ msg['Fields']['state'] = meta['state']
table.insert(msg['Fields']['tag_fields'], 'service')
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['plugin_instance'] == 'cinder_service' then
- msg['Fields']['hostname'] = sample['meta']['host']
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
else
msg['Fields']['name'] = 'openstack' .. sep .. 'cinder' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
@@ -310,18 +329,27 @@
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'glance' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
msg['Fields']['name'] = 'openstack' .. sep .. 'glance' .. sep .. sample['type_instance']
- msg['Fields']['state'] = sample['meta']['status']
- msg['Fields']['visibility'] = sample['meta']['visibility']
+ msg['Fields']['state'] = meta['status']
+ msg['Fields']['visibility'] = meta['visibility']
table.insert(msg['Fields']['tag_fields'], 'state')
table.insert(msg['Fields']['tag_fields'], 'visibility')
elseif metric_source == 'keystone' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
msg['Fields']['name'] = 'openstack' .. sep .. 'keystone' .. sep .. sample['type_instance']
- if sample['meta']['state'] then
- msg['Fields']['state'] = sample['meta']['state']
+ if meta['state'] then
+ msg['Fields']['state'] = meta['state']
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'neutron' then
+ -- This code is kept for backward compatibility with the old
+ -- collectd plugin. The new collectd plugin sends payload which
+ -- is compatible with the default decoding.
if sample['type_instance'] == 'networks' or sample['type_instance'] == 'ports' or sample['type_instance'] == 'routers' or sample['type_instance'] == 'floatingips' then
skip_it = true
elseif sample['type_instance'] == 'subnets' then
@@ -330,12 +358,13 @@
sample['type_instance'] == 'neutron_agents_percent' or
sample['type_instance'] == 'neutron_agent' then
msg['Fields']['name'] = 'openstack_' .. sample['type_instance']
- msg['Fields']['service'] = sample['meta']['service']
- msg['Fields']['state'] = sample['meta']['state']
+ msg['Fields']['service'] = meta['service']
+ msg['Fields']['state'] = meta['state']
table.insert(msg['Fields']['tag_fields'], 'service')
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['type_instance'] == 'neutron_agent' then
- msg['Fields']['hostname'] = sample['meta']['host']
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
elseif string.match(sample['type_instance'], '^ports') then
local resource, owner, state = string.match(sample['type_instance'], '^([^.]+)%.([^.]+)%.(.+)$')
@@ -352,28 +381,32 @@
end
elseif metric_source == 'memcached' then
msg['Fields']['name'] = 'memcached' .. sep .. string.gsub(metric_name, 'memcached_', '')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'haproxy' then
msg['Fields']['name'] = 'haproxy' .. sep .. sample['type_instance']
- if sample['meta'] then
- if sample['meta']['backend'] then
- msg['Fields']['backend'] = sample['meta']['backend']
- table.insert(msg['Fields']['tag_fields'], 'backend')
- if sample['meta']['state'] then
- msg['Fields']['state'] = sample['meta']['state']
- table.insert(msg['Fields']['tag_fields'], 'state')
- end
- if sample['meta']['server'] then
- msg['Fields']['server'] = sample['meta']['server']
- table.insert(msg['Fields']['tag_fields'], 'server')
- end
- elseif sample['meta']['frontend'] then
- msg['Fields']['frontend'] = sample['meta']['frontend']
- table.insert(msg['Fields']['tag_fields'], 'frontend')
+ if meta['backend'] then
+ msg['Fields']['backend'] = meta['backend']
+ table.insert(msg['Fields']['tag_fields'], 'backend')
+ if meta['state'] then
+ msg['Fields']['state'] = meta['state']
+ table.insert(msg['Fields']['tag_fields'], 'state')
end
+ if meta['server'] then
+ msg['Fields']['server'] = meta['server']
+ table.insert(msg['Fields']['tag_fields'], 'server')
+ end
+ elseif meta['frontend'] then
+ msg['Fields']['frontend'] = meta['frontend']
+ table.insert(msg['Fields']['tag_fields'], 'frontend')
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'apache' then
metric_name = string.gsub(metric_name, 'apache_', '')
msg['Fields']['name'] = 'apache' .. sep .. string.gsub(metric_name, 'scoreboard', 'workers')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'ceph_osd_perf' then
msg['Fields']['name'] = 'ceph_perf' .. sep .. sample['type']
@@ -381,6 +414,8 @@
msg['Fields']['osd'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'cluster')
table.insert(msg['Fields']['tag_fields'], 'osd')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source:match('^ceph') then
msg['Fields']['name'] = 'ceph' .. sep .. sample['type']
if sample['dsnames'][i] ~= 'value' then
@@ -405,27 +440,31 @@
end
end
elseif metric_source == 'pacemaker' then
- if sample['meta'] and sample['meta']['host'] then
- msg['Fields']['hostname'] = sample['meta']['host']
+ if meta['host'] then
+ msg['Fields']['hostname'] = meta['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
-- add dimension fields
for _, v in ipairs({'status', 'resource'}) do
- if sample['meta'] and sample['meta'][v] then
- msg['Fields'][v] = sample['meta'][v]
+ if meta[v] then
+ msg['Fields'][v] = meta[v]
table.insert(msg['Fields']['tag_fields'], v)
end
end
elseif metric_source == 'users' then
-- 'users' is a reserved name for InfluxDB v0.9
msg['Fields']['name'] = 'logged_users'
- elseif metric_source == 'libvirt' then
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
+ elseif metric_source == 'libvirt' or metric_source == 'virt' then
-- collectd sends the instance's ID in the 'host' field
msg['Fields']['instance_id'] = sample['host']
table.insert(msg['Fields']['tag_fields'], 'instance_id')
msg['Fields']['hostname'] = hostname
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
msg['Hostname'] = hostname
if string.match(sample['type'], '^disk_') then
@@ -447,17 +486,30 @@
end
elseif metric_source == 'influxdb' then
msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'check_local_endpoint' then
msg['Fields']['name'] = 'openstack_check_local_api'
msg['Fields']['service'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
elseif metric_source == 'nginx' then
msg['Fields']['name'] = 'nginx' .. sep .. string.gsub(sample['type'], '^nginx_', '')
if sample['type_instance'] ~= "" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
end
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
else
- -- generic metric name translation for 3rd-party sources
+ -- In the default case, the collectd payload is decoded in a
+ -- generic way.
+ --
+ -- name: <plugin>[_<plugin_instance>][_<type>][_<type_instance]
+ --
+ -- Except for reserved names, all items in the 'meta' dict are
+ -- added to the Fields dict and keys are added to the
+ -- Fields['tag_fields'] array.
msg['Fields']['name'] = sample['plugin']
if sample['plugin_instance'] ~= "" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
@@ -475,16 +527,23 @@
end
msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
- -- check if the hostname field should be kept or not (eg for
- -- cluster metrics, discard_hostname == true)
- if sample['meta'] and sample['meta']['discard_hostname'] then
- msg['Fields']['hostname'] = nil
- sample['meta']['discard_hostname'] = nil
+ if meta['unit'] then
+ msg.Fields['value'] = {
+ value = msg.Fields['value'],
+ representation = meta['unit']
+ }
+ end
+
+ -- if not set, check if the 'hostname' field should be added
+ -- (eg for cluster metrics, discard_hostname == true)
+ if msg['Fields']['hostname'] == nil and not meta['discard_hostname'] then
+ msg['Fields']['hostname'] = msg['Hostname']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
end
-- add meta fields as tag_fields
- for k, v in pairs(sample['meta'] or {}) do
- if tostring(k) ~= '0' then
+ for k, v in pairs(meta) do
+ if tostring(k) ~= '0' and k ~= 'unit' and k ~= 'discard_hostname' then
msg['Fields'][k] = v
table.insert(msg['Fields']['tag_fields'], k)
end
@@ -492,9 +551,6 @@
end
if not skip_it then
- if msg['Fields']['hostname'] then
- table.insert(msg['Fields']['tag_fields'], 'hostname')
- end
utils.inject_tags(msg)
-- Before injecting the message we need to check that tag_fields is not an
-- empty table otherwise the protobuf encoder fails to encode the table.
diff --git a/heka/files/lua/decoders/kubernetes.lua b/heka/files/lua/decoders/kubernetes.lua
new file mode 100644
index 0000000..44ff669
--- /dev/null
+++ b/heka/files/lua/decoders/kubernetes.lua
@@ -0,0 +1,58 @@
+-- Copyright 2017 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+
+local l = require 'lpeg'
+l.locale(l)
+
+local dt = require "date_time"
+local patt = require 'patterns'
+local syslog = require "syslog"
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = {},
+ Severity = 6, -- INFO
+}
+
+local syslog_pattern = read_config("syslog_pattern") or error("syslog_pattern configuration must be specified")
+local syslog_grammar = syslog.build_rsyslog_grammar(syslog_pattern)
+
+local k8s_severity = l.Cg(l.P'I' + l.P'W' + l.P'E' + l.P'F', 'Severity')
+local k8s_time = dt.rfc3339_partial_time
+local message = l.Cg(patt.Message, "Message")
+local k8s_pattern = l.Ct(k8s_severity * l.xdigit^4 * patt.sp^1 * k8s_time * patt.sp^1 * patt.Pid * patt.sp^1 * message)
+
+
+function process_message ()
+ local log = read_message("Payload")
+
+ if utils.parse_syslog_message(syslog_grammar, log, msg) then
+ kube = k8s_pattern:match(msg.Payload)
+ if kube then
+ msg.Payload = kube.Message
+ msg.Severity = utils.label_to_severity_map[kube.Severity] or 6
+ end
+ else
+ msg.Payload = log
+ end
+ msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+ return utils.safe_inject_message(msg)
+end
+
diff --git a/heka/files/lua/decoders/metric.lua b/heka/files/lua/decoders/metric.lua
index 75cec5e..963f827 100644
--- a/heka/files/lua/decoders/metric.lua
+++ b/heka/files/lua/decoders/metric.lua
@@ -15,22 +15,11 @@
require "cjson"
require "string"
-local l = require 'lpeg'
-l.locale(l)
-
-local split_on_space = l.Ct( (l.C((l.P(1) - l.space)^1) * l.space^0)^1 * -1)
-local sources_list = split_on_space:match(read_config('deserialize_for_sources') or '')
-
-local sources = {}
-for _, s in ipairs(sources_list) do
- sources[s] = true
-end
-
local utils = require 'lma_utils'
function process_message ()
local msg = decode_message(read_message("raw"))
- if string.match(msg.Type, 'bulk_metric$') and sources[msg.Fields.source] then
+ if string.match(msg.Type, 'bulk_metric$') then
local ok, metrics = pcall(cjson.decode, msg.Payload)
if not ok then
diff --git a/heka/files/toml/decoder/sandbox.toml b/heka/files/toml/decoder/sandbox.toml
index c073991..d472a3d 100644
--- a/heka/files/toml/decoder/sandbox.toml
+++ b/heka/files/toml/decoder/sandbox.toml
@@ -2,10 +2,8 @@
[{{ decoder_name }}_decoder]
type = "SandboxDecoder"
-filename = "{{ decoder.module_file }}"
-{%- if decoder.module_dir is defined %}
-module_directory = "{{ decoder.module_dir }}"
-{%- endif %}
+filename = "{{ decoder.module_file|default('/usr/share/lma_collector/decoders/' + decoder_name + '.lua' ) }}"
+module_directory = "{{ decoder.module_dir|default('/usr/share/lma_collector/common;/usr/share/heka/lua_modules') }}"
{%- if decoder.memory_limit is defined %}
memory_limit = "{{ decoder.memory_limit }}"
{%- endif %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 12c0c63..5ad8722 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -109,8 +109,6 @@
engine: sandbox
module_file: /usr/share/lma_collector/decoders/metric.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
- config:
- deserialize_for_sources: 'log_collector'
input:
heka_collectd:
engine: http
diff --git a/tests/lua/test_afd.lua b/tests/lua/test_afd.lua
index 478a36c..1016984 100644
--- a/tests/lua/test_afd.lua
+++ b/tests/lua/test_afd.lua
@@ -54,7 +54,7 @@
assertEquals(last_injected_msg.Type, 'afd_metric')
assertEquals(last_injected_msg.Fields.value, consts.OKAY)
assertEquals(last_injected_msg.Fields.hostname, 'node-1')
- assertEquals(last_injected_msg.Fields.notification_handler, 'mail')
+ assertEquals(last_injected_msg.Fields.notification_handler, nil)
assertEquals(last_injected_msg.Payload, '{"alarms":[]}')
end
diff --git a/tests/lua/test_influxdb.lua b/tests/lua/test_influxdb.lua
index 160c6b5..73d650d 100644
--- a/tests/lua/test_influxdb.lua
+++ b/tests/lua/test_influxdb.lua
@@ -21,11 +21,18 @@
TestInfluxDB = {}
+ function TestInfluxDB:test_escaping_characters()
+ local encoder = influxdb.new("s")
+ assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 2, {tag1='"tag1"'}), 'foo,tag1=tag1 value=2.000000 1000')
+ assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 2, {tag1=",tag 1="}), 'foo,tag1=\\,tag\\ 1\\= value=2.000000 1000')
+ assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 'b"ar'), 'foo value="b\\"ar" 1000')
+ assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', true), 'foo value=true 1000')
+ end
+
function TestInfluxDB:test_ms_precision_encoder()
local encoder = influxdb.new("ms")
assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 1), 'foo value=1.000000 1000000')
assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 'bar'), 'foo value="bar" 1000000')
- assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 'b"ar'), 'foo value="b\\"ar" 1000000')
assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', 1, {tag2="t2",tag1="t1"}), 'foo,tag1=t1,tag2=t2 value=1.000000 1000000')
assertEquals(encoder:encode_datapoint(1e9 * 1000, 'foo', {a=1, b=2}), 'foo a=1.000000,b=2.000000 1000000')
end