Send log_messages metric as bulk
Using bulk metrics for the log counters reduces largely the likelihood
of blocking the Heka pipeline. Instead of injecting (x services
* y levels) metric messages, the filter injects only one big message.
This changes also updates the configuration of the metric_collector
service to deserialize the bulk metric to support alarms on log
counters.
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index e79f934..1afe372 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -91,7 +91,7 @@
end
-- Send the bulk metric message to the Heka pipeline
-function inject_bulk_metric(ts, hostname, source)
+function inject_bulk_metric(ts, hostname, source, metric_type)
if #bulk_datapoints == 0 then
return
end
@@ -113,14 +113,15 @@
Severity = label_to_severity_map.INFO,
Fields = {
hostname = hostname,
- source = source
+ source = source,
+ type = metric_type
}
}
-- reset the local table storing the datapoints
bulk_datapoints = {}
inject_tags(msg)
- safe_inject_message(msg)
+ return safe_inject_message(msg)
end
-- Encode a Lua variable as JSON without raising an exception if the encoding
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
index 7da74e9..c1d0391 100644
--- a/heka/files/lua/filters/logs_counter.lua
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -18,24 +18,17 @@
local utils = require 'lma_utils'
local hostname = read_config('hostname') or error('hostname must be specified')
-local interval = (read_config('interval') or error('interval must be specified')) + 0
--- Heka cannot guarantee that logs are processed in real-time so the
--- grace_interval parameter allows to take into account log messages that are
--- received in the current interval but emitted before it.
+-- The filter can receive messages that should be discarded because they are
+-- way too old (Heka cannot guarantee that logs are processed in real-time).
+-- The 'grace_interval' parameter allows to define which log messages should be
+-- kept and which should be discarded. For instance, a value of '10' means that
+-- the filter will take into account log messages that are at most 10 seconds
+-- older than the current time.
local grace_interval = (read_config('grace_interval') or 0) + 0
local discovered_services = {}
local logs_counters = {}
-local last_timer_events = {}
-local current_service = 1
-local enter_at
-local interval_in_ns = interval * 1e9
-local start_time = os.time()
-local msg = {
- Type = "metric",
- Timestamp = nil,
- Severity = 6,
-}
+local last_timer_event = os.time() * 1e9
function convert_to_sec(ns)
return math.floor(ns/1e9)
@@ -47,12 +40,12 @@
local service = string.match(logger, "^openstack%.(%a+)$")
if service == nil then
- return -1, "Cannot match any services from " .. logger
+ return -1, "Cannot match any service from " .. logger
end
-- timestamp values should be converted to seconds because log timestamps
-- have a precision of one second (or millisecond sometimes)
- if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(last_timer_events[service] or 0), start_time) then
+ if convert_to_sec(read_message('Timestamp')) + grace_interval < convert_to_sec(last_timer_event) then
-- skip the the log message if it doesn't fall into the current interval
return 0
end
@@ -67,67 +60,29 @@
end
logs_counters[service][severity] = logs_counters[service][severity] + 1
-
return 0
end
function timer_event(ns)
-
- -- We can only send a maximum of ten events per call.
- -- So we send all metrics about one service and we will proceed with
- -- the following services at the next ticker event.
-
- if #discovered_services == 0 then
- return 0
- end
-
- -- Initialize enter_at during the first call to timer_event
- if not enter_at then
- enter_at = ns
- end
-
- -- To be able to send a metric we need to check if we are within the
- -- interval specified in the configuration and if we haven't already sent
- -- all metrics.
- if ns - enter_at < interval_in_ns and current_service <= #discovered_services then
- local service_name = discovered_services[current_service]
- local last_timer_event = last_timer_events[service_name] or 0
+ for service, counters in pairs(logs_counters) do
local delta_sec = (ns - last_timer_event) / 1e9
- for level, val in pairs(logs_counters[service_name]) do
+ for level, val in pairs(counters) do
+ utils.add_to_bulk_metric(
+ 'log_messages',
+ val / delta_sec,
+ {hostname=hostname, service=service, level=string.lower(level)})
- -- We don't send the first value
- if last_timer_event ~= 0 and delta_sec ~= 0 then
- msg.Timestamp = ns
- msg.Fields = {
- name = 'log_messages',
- type = utils.metric_type['DERIVE'],
- value = val / delta_sec,
- service = service_name,
- level = string.lower(level),
- hostname = hostname,
- tag_fields = {'service', 'level', 'hostname'},
- }
-
- utils.inject_tags(msg)
- ok, err = utils.safe_inject_message(msg)
- if ok ~= 0 then
- return -1, err
- end
- end
-
- -- reset the counter
- logs_counters[service_name][level] = 0
-
+ -- reset the counter
+ counters[level] = 0
end
-
- last_timer_events[service_name] = ns
- current_service = current_service + 1
end
- if ns - enter_at >= interval_in_ns then
- enter_at = ns
- current_service = 1
+ last_timer_event = ns
+
+ ok, err = utils.inject_bulk_metric(ns, hostname, 'logs_counter', utils.metric_type['DERIVE'])
+ if ok ~= 0 then
+ return -1, err
end
return 0
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 293f0ea..e8138dd 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -26,10 +26,9 @@
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
preserve_data: true
message_matcher: "Type == 'log' && Logger =~ /^openstack\\\\./"
- ticker_interval: 1
+ ticker_interval: 60
config:
hostname: '{{ grains.host }}'
- interval: 60
grace_interval: 30
{%- if log_collector.elasticsearch_host is defined %}
encoder:
@@ -63,7 +62,7 @@
module_file: /usr/share/lma_collector/decoders/metric.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
config:
- deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter'
+ deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter logs_counter_filter'
input:
heka_collectd:
engine: http