Improve inject_bulk_metric()
This change ensures that bulk metrics have an explicit Logger field as
well as Fields[source] and Fields[type] like regular metric messages.
It also makes the configuration of the input metric plugin on the
metric_collector side more explicit about which metrics to deserialize.
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index aa89f29..b310ec4 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -91,7 +91,7 @@
end
-- Send the bulk metric message to the Heka pipeline
-function inject_bulk_metric(ts, hostname, source, metric_type)
+function inject_bulk_metric(ts, hostname, logger, source, m_type)
if #bulk_datapoints == 0 then
return
end
@@ -106,6 +106,7 @@
end
local msg = {
+ Logger = logger,
Hostname = hostname,
Timestamp = ts,
Payload = payload,
@@ -114,7 +115,7 @@
Fields = {
hostname = hostname,
source = source,
- type = metric_type
+ type = m_type or metric_type['GAUGE']
}
}
-- reset the local table storing the datapoints
diff --git a/heka/files/lua/filters/hdd_errors_counter.lua b/heka/files/lua/filters/hdd_errors_counter.lua
index fcd5fe1..6e5146c 100644
--- a/heka/files/lua/filters/hdd_errors_counter.lua
+++ b/heka/files/lua/filters/hdd_errors_counter.lua
@@ -31,6 +31,8 @@
-- grace_interval parameter allows to take into account log messages that are
-- received in the current interval but emitted before it.
local grace_interval = (read_config('grace_interval') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
local error_counters = {}
local enter_at
@@ -82,7 +84,7 @@
end
enter_at = ns
- utils.inject_bulk_metric(ns, hostname, 'hdd_errors_filter')
+ utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
return 0
end
diff --git a/heka/files/lua/filters/heka_monitoring.lua b/heka/files/lua/filters/heka_monitoring.lua
index 35efcad..c40f0c3 100644
--- a/heka/files/lua/filters/heka_monitoring.lua
+++ b/heka/files/lua/filters/heka_monitoring.lua
@@ -80,6 +80,6 @@
end
end
- utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
+ utils.inject_bulk_metric(ts, hostname, 'heka_monitoring', 'internal')
return 0
end
diff --git a/heka/files/lua/filters/http_metrics_aggregator.lua b/heka/files/lua/filters/http_metrics_aggregator.lua
index 1756a59..81eaf43 100644
--- a/heka/files/lua/filters/http_metrics_aggregator.lua
+++ b/heka/files/lua/filters/http_metrics_aggregator.lua
@@ -34,11 +34,12 @@
-- and also to compensate the delay introduced by log parsing/decoding
-- which leads to arrive too late in its interval.
local grace_time = (read_config('grace_time') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
local percentile_field_name = string.format('upper_%s', percentile_thresh)
-local msg_source = 'http_metric_filter'
local last_tick = os.time() * 1e9
local interval_in_ns = interval * 1e9
@@ -84,7 +85,7 @@
-- keep only the first 2 tokens because some services like Neutron report
-- themselves as 'openstack.<service>.server'
- local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
+ local service = string.gsub(logger, '(%w+)%.(%w+).*', '%1_%2')
if service == nil then
return -1, "Cannot match any service from " .. logger
end
@@ -166,7 +167,7 @@
num_metrics = num_metrics - 1
if num >= bulk_size then
if msg_injected < max_timer_inject then
- utils.inject_bulk_metric(ns, hostname, msg_source)
+ utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
msg_injected = msg_injected + 1
num = 0
num_metrics = 0
@@ -178,7 +179,7 @@
all_times[service] = nil
end
if num > 0 then
- utils.inject_bulk_metric(ns, hostname, msg_source)
+ utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
num = 0
num_metrics = 0
end
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
index 371ff8c..c22749b 100644
--- a/heka/files/lua/filters/logs_counter.lua
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -25,6 +25,8 @@
-- the filter will take into account log messages that are at most 10 seconds
-- older than the current time.
local grace_interval = (read_config('grace_interval') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
local discovered_services = {}
local logs_counters = {}
@@ -76,7 +78,7 @@
last_timer_event = ns
- ok, err = utils.inject_bulk_metric(ns, hostname, 'logs_counter', utils.metric_type['DERIVE'])
+ ok, err = utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source, utils.metric_type['DERIVE'])
if ok ~= 0 then
return -1, err
end
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index e8138dd..c7a7fdf 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -20,6 +20,8 @@
bulk_size: 523
percentile: 90
grace_time: 5
+ logger: aggregated_http_metrics_filter
+ source: log_collector
log_counter:
engine: sandbox
module_file: /usr/share/lma_collector/filters/logs_counter.lua
@@ -30,6 +32,8 @@
config:
hostname: '{{ grains.host }}'
grace_interval: 30
+ logger: log_counter_filter
+ source: log_collector
{%- if log_collector.elasticsearch_host is defined %}
encoder:
elasticsearch:
@@ -62,7 +66,7 @@
module_file: /usr/share/lma_collector/decoders/metric.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
config:
- deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter logs_counter_filter'
+ deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter log_counter_filter'
input:
heka_collectd:
engine: http