Merge "Add watchdog for Heka services"
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index 349e731..0ceeff6 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -91,7 +91,7 @@
end
-- Send the bulk metric message to the Heka pipeline
-function inject_bulk_metric(ts, hostname, logger, source, m_type)
+function inject_bulk_metric(ts, hostname, source, m_type)
if #bulk_datapoints == 0 then
return
end
@@ -106,13 +106,12 @@
end
local msg = {
- Logger = logger,
- Hostname = hostname,
Timestamp = ts,
Payload = payload,
Type = 'bulk_metric', -- prepended with 'heka.sandbox'
Severity = label_to_severity_map.INFO,
Fields = {
+ hostname = hostname, -- inject_message() can't set the top-level Hostname
source = source,
type = m_type or metric_type['GAUGE']
}
diff --git a/heka/files/lua/decoders/metric.lua b/heka/files/lua/decoders/metric.lua
index 9b26158..75cec5e 100644
--- a/heka/files/lua/decoders/metric.lua
+++ b/heka/files/lua/decoders/metric.lua
@@ -18,19 +18,19 @@
local l = require 'lpeg'
l.locale(l)
-local loggers_pattern = l.Ct( (l.C((l.P(1) - l.space)^1) * l.space^0)^1 * -1)
-local loggers_list = loggers_pattern:match(read_config('deserialize_bulk_metric_for_loggers') or '')
+local split_on_space = l.Ct( (l.C((l.P(1) - l.space)^1) * l.space^0)^1 * -1)
+local sources_list = split_on_space:match(read_config('deserialize_for_sources') or '')
-local loggers = {}
-for _, logger in ipairs(loggers_list) do
- loggers[logger] = true
+local sources = {}
+for _, s in ipairs(sources_list) do
+ sources[s] = true
end
local utils = require 'lma_utils'
function process_message ()
local msg = decode_message(read_message("raw"))
- if string.match(msg.Type, 'bulk_metric$') and loggers[msg.Logger] ~= nil then
+ if string.match(msg.Type, 'bulk_metric$') and sources[msg.Fields.source] then
local ok, metrics = pcall(cjson.decode, msg.Payload)
if not ok then
diff --git a/heka/files/lua/filters/audit_authentications.lua b/heka/files/lua/filters/audit_authentications.lua
index a2eba0d..1ad9ae8 100644
--- a/heka/files/lua/filters/audit_authentications.lua
+++ b/heka/files/lua/filters/audit_authentications.lua
@@ -15,7 +15,6 @@
require 'os'
local utils = require 'lma_utils'
-local hostname = read_config('hostname') or error('hostname must be specified')
-- The filter can receive messages that should be discarded because they are
-- way too old (Heka cannot guarantee that messages are processed in real-time).
-- The 'grace_interval' parameter allows to define which messages should be
@@ -23,14 +22,11 @@
-- the filter will take into account messages that are at most 10 seconds
-- older than the current time.
local grace_interval = (read_config('grace_interval') or 0) + 0
-local metric_logger = read_config('logger')
local metric_source = read_config('source')
local msg = {
Type = "multivalue_metric", -- will be prefixed by "heka.sandbox."
- Hostname = hostname,
Severity = 6,
- Logger = read_config('logger') or error('logger must be specified')
}
local global_counters = {
total=0,
diff --git a/heka/files/lua/filters/hdd_errors_counter.lua b/heka/files/lua/filters/hdd_errors_counter.lua
index f34da24..1da62ab 100644
--- a/heka/files/lua/filters/hdd_errors_counter.lua
+++ b/heka/files/lua/filters/hdd_errors_counter.lua
@@ -31,7 +31,6 @@
-- grace_interval parameter allows to take into account log messages that are
-- received in the current interval but emitted before it.
local grace_interval = (read_config('grace_interval') or 0) + 0
-local metric_logger = read_config('logger')
local metric_source = read_config('source')
local error_counters = {}
@@ -87,7 +86,7 @@
end
enter_at = ns
- utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
+ utils.inject_bulk_metric(ns, hostname, metric_source)
return 0
end
diff --git a/heka/files/lua/filters/heka_monitoring.lua b/heka/files/lua/filters/heka_monitoring.lua
index c61251f..c622619 100644
--- a/heka/files/lua/filters/heka_monitoring.lua
+++ b/heka/files/lua/filters/heka_monitoring.lua
@@ -78,6 +78,6 @@
end
end
- utils.inject_bulk_metric(ts, hostname, 'heka_monitoring', 'internal')
+ utils.inject_bulk_metric(ts, hostname, 'internal')
return 0
end
diff --git a/heka/files/lua/filters/http_metrics_aggregator.lua b/heka/files/lua/filters/http_metrics_aggregator.lua
index 81eaf43..6abe3b7 100644
--- a/heka/files/lua/filters/http_metrics_aggregator.lua
+++ b/heka/files/lua/filters/http_metrics_aggregator.lua
@@ -34,7 +34,6 @@
-- and also to compensate the delay introduced by log parsing/decoding
-- which leads to arrive too late in its interval.
local grace_time = (read_config('grace_time') or 0) + 0
-local metric_logger = read_config('logger')
local metric_source = read_config('source')
local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
@@ -167,7 +166,7 @@
num_metrics = num_metrics - 1
if num >= bulk_size then
if msg_injected < max_timer_inject then
- utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
+ utils.inject_bulk_metric(ns, hostname, metric_source)
msg_injected = msg_injected + 1
num = 0
num_metrics = 0
@@ -179,7 +178,7 @@
all_times[service] = nil
end
if num > 0 then
- utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
+ utils.inject_bulk_metric(ns, hostname, metric_source)
num = 0
num_metrics = 0
end
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
index 5199f25..e6ef229 100644
--- a/heka/files/lua/filters/logs_counter.lua
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -26,7 +26,6 @@
-- older than the current time.
local grace_interval = (read_config('grace_interval') or 0) + 0
local logger_matcher = read_config('logger_matcher') or '.*'
-local metric_logger = read_config('logger')
local metric_source = read_config('source')
local discovered_services = {}
@@ -79,7 +78,7 @@
last_timer_event = ns
- ok, err = utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source, utils.metric_type['DERIVE'])
+ ok, err = utils.inject_bulk_metric(ns, hostname, metric_source, utils.metric_type['DERIVE'])
if ok ~= 0 then
return -1, err
end
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 0e3f9ba..1b2a3ca 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -19,7 +19,6 @@
bulk_size: 523
percentile: 90
grace_time: 5
- logger: aggregated_http_metrics_filter
source: log_collector
# This filter monitors the OpenStack logs and sends the rate of processed
# log messages broken down by service and severity every minute
@@ -34,7 +33,6 @@
hostname: '{{ grains.host }}'
grace_interval: 30
logger_matcher: '^openstack%.(%a+)$'
- logger: log_counter_filter
source: log_collector
hdd_errors:
engine: sandbox
@@ -47,7 +45,6 @@
grace_interval: 10
patterns: "/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/"
hostname: '{{ grains.host }}'
- logger: hdd_errors_filter
source: log_collector
{%- if log_collector.sensu_host is defined %}
watchdog:
@@ -111,7 +108,7 @@
module_file: /usr/share/lma_collector/decoders/metric.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
config:
- deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_filter log_counter_filter'
+ deserialize_for_sources: 'log_collector'
input:
heka_collectd:
engine: http
@@ -303,9 +300,7 @@
message_matcher: "Type == 'audit' && Fields[action] == 'authenticate'"
ticker_interval: 60
config:
- hostname: '{{ grains.host }}'
grace_interval: 30
- logger: authentication_filter
source: remote_collector
{%- endif %}
{%- endif %}