blob: 98084af680081329979e4ba2fa489efe30411cb9 [file] [log] [blame]
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require "string"
require "cjson"
local utils = require 'lma_utils'
local sep = '_'
local processes_map = {
ps_code = 'memory_code',
ps_count = '',
ps_cputime = 'cputime',
ps_data = 'memory_data',
ps_disk_octets = 'disk_bytes',
ps_disk_ops = 'disk_ops',
ps_pagefaults = 'pagefaults',
ps_rss = 'memory_rss',
ps_stacksize = 'stacksize',
ps_vm = 'memory_virtual',
}
-- legacy lma_components process
local lma_components = {
collectd = true,
remote_collectd = true,
metric_collector = true,
log_collector = true,
aggregator = true,
remote_collector = true,
elasticsearch = true,
influxd = true,
kibana = true,
['grafana-server'] = true,
}
-- The following table keeps a list of metrics from plugin where the
-- Fields[hostname] shouldn't be set by default.
local hostname_free = {
ceph_mon = true,
ceph_pool = true,
check_openstack_api = true,
cinder = true,
glance = true,
http_check = true,
hypervisor_stats = true,
keystone = true,
neutron = true,
nova = true,
pacemaker = true,
}
-- this is needed for the libvirt metrics because in that case, collectd sends
-- the instance's ID instead of the hostname in the 'host' attribute
local hostname = read_config('hostname') or error('hostname must be specified')
local swap_size = (read_config('swap_size') or 0) + 0
function replace_dot_by_sep (str)
return string.gsub(str, '%.', sep)
end
function process_message ()
local ok, samples = pcall(cjson.decode, read_message("Payload"))
if not ok then
-- TODO: log error
return -1
end
for _, sample in ipairs(samples) do
local metric_prefix = sample['type']
if sample['type_instance'] ~= "" then
metric_prefix = metric_prefix .. sep .. sample['type_instance']
end
local metric_source = sample['plugin']
for i, value in ipairs(sample['values']) do
local skip_it = false
local metric_name = metric_prefix
if sample['dsnames'][i] ~= "value" then
metric_name = metric_name .. sep .. sample['dsnames'][i]
end
local msg = {
Timestamp = sample['time'] * 1e9, -- Heka expects nanoseconds
Hostname = sample['host'],
Logger = "collectd",
Payload = utils.safe_json_encode(sample) or '',
Severity = 6,
Type = "metric",
Fields = {
interval = sample['interval'],
source = metric_source,
type = sample['dstypes'][i],
value = value,
tag_fields = {},
}
}
-- Check if Fields[hostname] should be added or not to the metric message
if not hostname_free[metric_source] then
msg['Fields']['hostname'] = sample['host']
end
-- Normalize metric name, unfortunately collectd plugins aren't
-- always consistent on metric namespaces so we need a few if/else
-- statements to cover all cases.
if sample['meta'] and sample['meta']['service_check'] then
msg['Fields']['name'] = sample['meta']['service_check'] .. sep .. 'check'
msg['Fields']['details'] = sample['meta']['failure']
if sample['meta']['local_check'] then
-- if the check is local to the node, add the hostname
msg['Fields']['hostname'] = sample['host']
end
elseif metric_source == 'memory' or metric_source == 'contextswitch' or
metric_source == 'entropy' or metric_source == 'load' or
metric_source == 'swap' or metric_source == 'uptime' then
msg['Fields']['name'] = metric_name
elseif metric_source == 'df' then
local entity
if sample['type'] == 'df_inodes' then
entity = 'inodes'
elseif sample['type'] == 'percent_inodes' then
entity = 'inodes_percent'
elseif sample['type'] == 'percent_bytes' then
entity = 'space_percent'
else -- sample['type'] == 'df_complex'
entity = 'space'
end
local mount = sample['plugin_instance']
if mount == 'root' then
mount = '/'
else
mount = '/' .. mount:gsub('-', '/')
end
msg['Fields']['name'] = 'fs' .. sep .. entity .. sep .. sample['type_instance']
msg['Fields']['fs'] = mount
table.insert(msg['Fields']['tag_fields'], 'fs')
elseif metric_source == 'disk' then
if sample['type'] == 'disk_io_time' then
msg['Fields']['name'] = 'disk' .. sep .. sample['dsnames'][i]
else
msg['Fields']['name'] = metric_name
end
msg['Fields']['device'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'device')
elseif metric_source == 'cpu' then
msg['Fields']['name'] = 'cpu' .. sep .. sample['type_instance']
msg['Fields']['cpu_number'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'cpu_number')
elseif metric_source == 'netlink' then
local netlink_metric = sample['type']
if netlink_metric == 'if_rx_errors' then
netlink_metric = 'if_errors_rx'
elseif netlink_metric == 'if_tx_errors' then
netlink_metric = 'if_errors_tx'
end
-- Netlink plugin can send one or two values. Use dsnames only when needed.
if sample['dsnames'][i] ~= 'value' then
netlink_metric = netlink_metric .. sep .. sample['dsnames'][i]
end
-- and type of errors is set in type_instance
if sample['type_instance'] ~= '' then
netlink_metric = netlink_metric .. sep .. sample['type_instance']
end
msg['Fields']['name'] = netlink_metric
msg['Fields']['interface'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'interface')
elseif metric_source == 'processes' then
if processes_map[sample['type']] then
-- metrics related to a specific process
local service = sample['plugin_instance']
msg['Fields']['service'] = service
table.insert(msg['Fields']['tag_fields'], 'service')
if lma_components[service] then
msg['Fields']['name'] = 'lma_components'
else
msg['Fields']['name'] = 'process'
end
if processes_map[sample['type']] ~= '' then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. processes_map[sample['type']]
end
if sample['dsnames'][i] ~= 'value' then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
end
-- For ps_cputime, convert it to a percentage: collectd is
-- sending us the number of microseconds allocated to the
-- process as a rate so within 1 second.
if sample['type'] == 'ps_cputime' then
msg['Fields']['value'] = 100 * value / 1e6
end
else
-- metrics related to all processes
msg['Fields']['name'] = 'processes'
if sample['type'] == 'ps_state' then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. 'count'
msg['Fields']['state'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'state')
else
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
end
end
elseif metric_source == 'dbi' and sample['plugin_instance'] == 'mysql_status' then
msg['Fields']['name'] = 'mysql' .. sep .. replace_dot_by_sep(sample['type_instance'])
elseif metric_source == 'mysql' then
if sample['type'] == 'threads' then
msg['Fields']['name'] = 'mysql_' .. metric_name
elseif sample['type'] == 'mysql_commands' then
msg['Fields']['name'] = sample['type']
msg['Fields']['statement'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'statement')
elseif sample['type'] == 'mysql_handler' then
msg['Fields']['name'] = sample['type']
msg['Fields']['handler'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'handler')
else
msg['Fields']['name'] = metric_name
end
elseif metric_source == 'ntpd' then
if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
else
msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. 'peer'
msg['Fields']['server'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'server')
end
elseif metric_source == 'check_openstack_api' then
-- For OpenStack API metrics, plugin_instance = <service name>
msg['Fields']['name'] = 'openstack_check_api'
msg['Fields']['service'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
if sample['meta'] then
msg['Fields']['os_region'] = sample['meta']['region']
end
elseif metric_source == 'hypervisor_stats' then
-- Metrics from the OpenStack hypervisor metrics where
-- type_instance = <metric name> which can end by _MB or _GB
msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep
local name, unit
name, unit = string.match(sample['type_instance'], '^(.+)_(.B)$')
if name then
msg['Fields']['name'] = msg['Fields']['name'] .. name
msg.Fields['value'] = {value = msg.Fields['value'], representation = unit}
else
msg['Fields']['name'] = msg['Fields']['name'] .. sample['type_instance']
end
if sample['meta'] and sample['meta']['host'] then
msg['Fields']['hostname'] = sample['meta']['host']
end
if sample['meta'] and sample['meta']['aggregate'] then
msg['Fields']['aggregate'] = sample['meta']['aggregate']
table.insert(msg['Fields']['tag_fields'], 'aggregate')
end
if sample['meta'] and sample['meta']['aggregate_id'] then
msg['Fields']['aggregate_id'] = sample['meta']['aggregate_id']
table.insert(msg['Fields']['tag_fields'], 'aggregate_id')
end
elseif metric_source == 'rabbitmq_info' then
msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance']
if sample['meta'] and sample['meta']['queue'] then
msg['Fields']['queue'] = sample['meta']['queue']
table.insert(msg['Fields']['tag_fields'], 'queue')
end
elseif metric_source == 'nova' then
if sample['plugin_instance'] == 'nova_services' or
sample['plugin_instance'] == 'nova_services_percent' or
sample['plugin_instance'] == 'nova_service' then
msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
msg['Fields']['service'] = sample['meta']['service']
msg['Fields']['state'] = sample['meta']['state']
table.insert(msg['Fields']['tag_fields'], 'service')
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['plugin_instance'] == 'nova_service' then
msg['Fields']['hostname'] = sample['meta']['host']
end
else
msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
msg['Fields']['state'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'cinder' then
if sample['plugin_instance'] == 'cinder_services' or
sample['plugin_instance'] == 'cinder_services_percent' or
sample['plugin_instance'] == 'cinder_service' then
msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
msg['Fields']['service'] = sample['meta']['service']
msg['Fields']['state'] = sample['meta']['state']
table.insert(msg['Fields']['tag_fields'], 'service')
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['plugin_instance'] == 'cinder_service' then
msg['Fields']['hostname'] = sample['meta']['host']
end
else
msg['Fields']['name'] = 'openstack' .. sep .. 'cinder' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
msg['Fields']['state'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'glance' then
msg['Fields']['name'] = 'openstack' .. sep .. 'glance' .. sep .. sample['type_instance']
msg['Fields']['state'] = sample['meta']['status']
msg['Fields']['visibility'] = sample['meta']['visibility']
table.insert(msg['Fields']['tag_fields'], 'state')
table.insert(msg['Fields']['tag_fields'], 'visibility')
elseif metric_source == 'keystone' then
msg['Fields']['name'] = 'openstack' .. sep .. 'keystone' .. sep .. sample['type_instance']
if sample['meta']['state'] then
msg['Fields']['state'] = sample['meta']['state']
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'neutron' then
if sample['type_instance'] == 'networks' or sample['type_instance'] == 'ports' or sample['type_instance'] == 'routers' or sample['type_instance'] == 'floatingips' then
skip_it = true
elseif sample['type_instance'] == 'subnets' then
msg['Fields']['name'] = 'openstack' .. sep .. 'neutron' .. sep .. 'subnets'
elseif sample['type_instance'] == 'neutron_agents' or
sample['type_instance'] == 'neutron_agents_percent' or
sample['type_instance'] == 'neutron_agent' then
msg['Fields']['name'] = 'openstack_' .. sample['type_instance']
msg['Fields']['service'] = sample['meta']['service']
msg['Fields']['state'] = sample['meta']['state']
table.insert(msg['Fields']['tag_fields'], 'service')
table.insert(msg['Fields']['tag_fields'], 'state')
if sample['type_instance'] == 'neutron_agent' then
msg['Fields']['hostname'] = sample['meta']['host']
end
elseif string.match(sample['type_instance'], '^ports') then
local resource, owner, state = string.match(sample['type_instance'], '^([^.]+)%.([^.]+)%.(.+)$')
msg['Fields']['name'] = 'openstack' .. sep .. 'neutron' .. sep .. replace_dot_by_sep(resource)
msg['Fields']['owner'] = owner
msg['Fields']['state'] = state
table.insert(msg['Fields']['tag_fields'], 'owner')
table.insert(msg['Fields']['tag_fields'], 'state')
else
local resource, state = string.match(sample['type_instance'], '^([^.]+)%.(.+)$')
msg['Fields']['name'] = 'openstack' .. sep .. 'neutron' .. sep .. replace_dot_by_sep(resource)
msg['Fields']['state'] = state
table.insert(msg['Fields']['tag_fields'], 'state')
end
elseif metric_source == 'memcached' then
msg['Fields']['name'] = 'memcached' .. sep .. string.gsub(metric_name, 'memcached_', '')
elseif metric_source == 'haproxy' then
msg['Fields']['name'] = 'haproxy' .. sep .. sample['type_instance']
if sample['meta'] then
if sample['meta']['backend'] then
msg['Fields']['backend'] = sample['meta']['backend']
table.insert(msg['Fields']['tag_fields'], 'backend')
if sample['meta']['state'] then
msg['Fields']['state'] = sample['meta']['state']
table.insert(msg['Fields']['tag_fields'], 'state')
end
if sample['meta']['server'] then
msg['Fields']['server'] = sample['meta']['server']
table.insert(msg['Fields']['tag_fields'], 'server')
end
elseif sample['meta']['frontend'] then
msg['Fields']['frontend'] = sample['meta']['frontend']
table.insert(msg['Fields']['tag_fields'], 'frontend')
end
end
elseif metric_source == 'apache' then
metric_name = string.gsub(metric_name, 'apache_', '')
msg['Fields']['name'] = 'apache' .. sep .. string.gsub(metric_name, 'scoreboard', 'workers')
elseif metric_source == 'ceph_osd_perf' then
msg['Fields']['name'] = 'ceph_perf' .. sep .. sample['type']
msg['Fields']['cluster'] = sample['plugin_instance']
msg['Fields']['osd'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'cluster')
table.insert(msg['Fields']['tag_fields'], 'osd')
elseif metric_source:match('^ceph') then
msg['Fields']['name'] = 'ceph' .. sep .. sample['type']
if sample['dsnames'][i] ~= 'value' then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
end
msg['Fields']['cluster'] = sample['plugin_instance']
table.insert(msg['Fields']['tag_fields'], 'cluster')
if sample['type_instance'] ~= '' then
local additional_tag
if string.match(sample['type'], '^pool_') then
additional_tag = 'pool'
elseif string.match(sample['type'], '^pg_state') then
additional_tag = 'state'
elseif string.match(sample['type'], '^osd_') then
additional_tag = 'osd'
end
if additional_tag then
msg['Fields'][additional_tag] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], additional_tag)
end
end
elseif metric_source == 'pacemaker' then
if sample['meta'] and sample['meta']['host'] then
msg['Fields']['hostname'] = sample['meta']['host']
end
msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
-- add dimension fields
for _, v in ipairs({'status', 'resource'}) do
if sample['meta'] and sample['meta'][v] then
msg['Fields'][v] = sample['meta'][v]
table.insert(msg['Fields']['tag_fields'], v)
end
end
elseif metric_source == 'users' then
-- 'users' is a reserved name for InfluxDB v0.9
msg['Fields']['name'] = 'logged_users'
elseif metric_source == 'libvirt' then
-- collectd sends the instance's ID in the 'host' field
msg['Fields']['instance_id'] = sample['host']
table.insert(msg['Fields']['tag_fields'], 'instance_id')
msg['Fields']['hostname'] = hostname
msg['Hostname'] = hostname
if string.match(sample['type'], '^disk_') then
msg['Fields']['name'] = 'virt' .. sep .. sample['type'] .. sep .. sample['dsnames'][i]
msg['Fields']['device'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'device')
elseif string.match(sample['type'], '^if_') then
msg['Fields']['name'] = 'virt' .. sep .. sample['type'] .. sep .. sample['dsnames'][i]
msg['Fields']['interface'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'interface')
elseif sample['type'] == 'virt_cpu_total' then
msg['Fields']['name'] = 'virt_cpu_time'
elseif sample['type'] == 'virt_vcpu' then
msg['Fields']['name'] = 'virt_vcpu_time'
msg['Fields']['vcpu_number'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'vcpu_number')
else
msg['Fields']['name'] = 'virt' .. sep .. metric_name
end
elseif metric_source == 'influxdb' then
msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
elseif metric_source == 'http_check' then
msg['Fields']['name'] = metric_source
msg['Fields']['service'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
elseif metric_source == 'check_local_endpoint' then
msg['Fields']['name'] = 'openstack_check_local_api'
msg['Fields']['service'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
elseif metric_source == 'nginx' then
msg['Fields']['name'] = 'nginx' .. sep .. string.gsub(sample['type'], '^nginx_', '')
if sample['type_instance'] ~= "" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
end
else
-- generic metric name translation for 3rd-party sources
msg['Fields']['name'] = sample['plugin']
if sample['plugin_instance'] ~= "" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
end
if sample['type'] ~= 'gauge' and sample['type'] ~= 'derive' and
sample['type'] ~= 'counter' and sample['type'] ~= 'absolute' then
-- only for custom DS types
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
end
if sample['type_instance'] ~= "" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
end
if sample['dsnames'][i] ~= "value" then
msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
end
msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
-- check if the hostname field should be kept or not (eg for
-- cluster metrics, discard_hostname == true)
if sample['meta'] and sample['meta']['discard_hostname'] then
msg['Fields']['hostname'] = nil
sample['meta']['discard_hostname'] = nil
end
-- add meta fields as tag_fields
for k, v in pairs(sample['meta'] or {}) do
if tostring(k) ~= '0' then
msg['Fields'][k] = v
table.insert(msg['Fields']['tag_fields'], k)
end
end
end
if not skip_it then
if msg['Fields']['hostname'] then
table.insert(msg['Fields']['tag_fields'], 'hostname')
end
utils.inject_tags(msg)
-- Before injecting the message we need to check that tag_fields is not an
-- empty table otherwise the protobuf encoder fails to encode the table.
if #msg['Fields']['tag_fields'] == 0 then
msg['Fields']['tag_fields'] = nil
end
utils.safe_inject_message(msg)
if metric_source == 'swap' and metric_name == 'swap_used' and swap_size > 0 then
-- collectd 5.4.0 doesn't report the used swap in
-- percentage, this is why the metric is computed and
-- injected by this plugin.
msg['Fields']['name'] = 'swap_percent_used'
msg['Fields']['value'] = value / swap_size
utils.safe_inject_message(msg)
end
end
end
end
return 0
end