Merge pull request #111 from simonpasquier/restore-block-policy
Restore block policy
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index e79f934..b310ec4 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -91,7 +91,7 @@
end
-- Send the bulk metric message to the Heka pipeline
-function inject_bulk_metric(ts, hostname, source)
+function inject_bulk_metric(ts, hostname, logger, source, m_type)
if #bulk_datapoints == 0 then
return
end
@@ -106,6 +106,7 @@
end
local msg = {
+ Logger = logger,
Hostname = hostname,
Timestamp = ts,
Payload = payload,
@@ -113,14 +114,15 @@
Severity = label_to_severity_map.INFO,
Fields = {
hostname = hostname,
- source = source
+ source = source,
+ type = m_type or metric_type['GAUGE']
}
}
-- reset the local table storing the datapoints
bulk_datapoints = {}
inject_tags(msg)
- safe_inject_message(msg)
+ return safe_inject_message(msg)
end
-- Encode a Lua variable as JSON without raising an exception if the encoding
@@ -321,4 +323,9 @@
return true, value
end
+-- convert a nanosecond value to second
+function convert_to_sec(ns)
+ return math.floor(ns/1e9)
+end
+
return M
diff --git a/heka/files/lua/filters/hdd_errors_counter.lua b/heka/files/lua/filters/hdd_errors_counter.lua
index 66980bc..6e5146c 100644
--- a/heka/files/lua/filters/hdd_errors_counter.lua
+++ b/heka/files/lua/filters/hdd_errors_counter.lua
@@ -31,19 +31,17 @@
-- grace_interval parameter allows to take into account log messages that are
-- received in the current interval but emitted before it.
local grace_interval = (read_config('grace_interval') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
local error_counters = {}
local enter_at
local start_time = os.time()
-local function convert_to_sec(ns)
- return math.floor(ns/1e9)
-end
-
function process_message ()
-- timestamp values should be converted to seconds because log timestamps
-- have a precision of one second (or millisecond sometimes)
- if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(enter_at or 0), start_time) then
+ if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(utils.convert_to_sec(enter_at or 0), start_time) then
-- skip the log message if it doesn't fall into the current interval
return 0
end
@@ -86,7 +84,7 @@
end
enter_at = ns
- utils.inject_bulk_metric(ns, hostname, 'hdd_errors_filter')
+ utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
return 0
-end
\ No newline at end of file
+end
diff --git a/heka/files/lua/filters/heka_monitoring.lua b/heka/files/lua/filters/heka_monitoring.lua
index 35efcad..c40f0c3 100644
--- a/heka/files/lua/filters/heka_monitoring.lua
+++ b/heka/files/lua/filters/heka_monitoring.lua
@@ -80,6 +80,6 @@
end
end
- utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
+ utils.inject_bulk_metric(ts, hostname, 'heka_monitoring', 'internal')
return 0
end
diff --git a/heka/files/lua/filters/http_metrics_aggregator.lua b/heka/files/lua/filters/http_metrics_aggregator.lua
index 1756a59..81eaf43 100644
--- a/heka/files/lua/filters/http_metrics_aggregator.lua
+++ b/heka/files/lua/filters/http_metrics_aggregator.lua
@@ -34,11 +34,12 @@
-- and also to compensate the delay introduced by log parsing/decoding
-- which leads to arrive too late in its interval.
local grace_time = (read_config('grace_time') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
local percentile_field_name = string.format('upper_%s', percentile_thresh)
-local msg_source = 'http_metric_filter'
local last_tick = os.time() * 1e9
local interval_in_ns = interval * 1e9
@@ -84,7 +85,7 @@
-- keep only the first 2 tokens because some services like Neutron report
-- themselves as 'openstack.<service>.server'
- local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
+ local service = string.gsub(logger, '(%w+)%.(%w+).*', '%1_%2')
if service == nil then
return -1, "Cannot match any service from " .. logger
end
@@ -166,7 +167,7 @@
num_metrics = num_metrics - 1
if num >= bulk_size then
if msg_injected < max_timer_inject then
- utils.inject_bulk_metric(ns, hostname, msg_source)
+ utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
msg_injected = msg_injected + 1
num = 0
num_metrics = 0
@@ -178,7 +179,7 @@
all_times[service] = nil
end
if num > 0 then
- utils.inject_bulk_metric(ns, hostname, msg_source)
+ utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
num = 0
num_metrics = 0
end
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
index 7da74e9..5199f25 100644
--- a/heka/files/lua/filters/logs_counter.lua
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -18,41 +18,33 @@
local utils = require 'lma_utils'
local hostname = read_config('hostname') or error('hostname must be specified')
-local interval = (read_config('interval') or error('interval must be specified')) + 0
--- Heka cannot guarantee that logs are processed in real-time so the
--- grace_interval parameter allows to take into account log messages that are
--- received in the current interval but emitted before it.
+-- The filter can receive messages that should be discarded because they are
+-- way too old (Heka cannot guarantee that logs are processed in real-time).
+-- The 'grace_interval' parameter allows to define which log messages should be
+-- kept and which should be discarded. For instance, a value of '10' means that
+-- the filter will take into account log messages that are at most 10 seconds
+-- older than the current time.
local grace_interval = (read_config('grace_interval') or 0) + 0
+local logger_matcher = read_config('logger_matcher') or '.*'
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
local discovered_services = {}
local logs_counters = {}
-local last_timer_events = {}
-local current_service = 1
-local enter_at
-local interval_in_ns = interval * 1e9
-local start_time = os.time()
-local msg = {
- Type = "metric",
- Timestamp = nil,
- Severity = 6,
-}
-
-function convert_to_sec(ns)
- return math.floor(ns/1e9)
-end
+local last_timer_event = os.time() * 1e9
function process_message ()
local severity = read_message("Fields[severity_label]")
local logger = read_message("Logger")
- local service = string.match(logger, "^openstack%.(%a+)$")
+ local service = string.match(logger, logger_matcher)
if service == nil then
- return -1, "Cannot match any services from " .. logger
+ return -1, "Cannot match any service from " .. logger
end
-- timestamp values should be converted to seconds because log timestamps
-- have a precision of one second (or millisecond sometimes)
- if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(last_timer_events[service] or 0), start_time) then
+ if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < utils.convert_to_sec(last_timer_event) then
-- skip the the log message if it doesn't fall into the current interval
return 0
end
@@ -67,67 +59,29 @@
end
logs_counters[service][severity] = logs_counters[service][severity] + 1
-
return 0
end
function timer_event(ns)
-
- -- We can only send a maximum of ten events per call.
- -- So we send all metrics about one service and we will proceed with
- -- the following services at the next ticker event.
-
- if #discovered_services == 0 then
- return 0
- end
-
- -- Initialize enter_at during the first call to timer_event
- if not enter_at then
- enter_at = ns
- end
-
- -- To be able to send a metric we need to check if we are within the
- -- interval specified in the configuration and if we haven't already sent
- -- all metrics.
- if ns - enter_at < interval_in_ns and current_service <= #discovered_services then
- local service_name = discovered_services[current_service]
- local last_timer_event = last_timer_events[service_name] or 0
+ for service, counters in pairs(logs_counters) do
local delta_sec = (ns - last_timer_event) / 1e9
- for level, val in pairs(logs_counters[service_name]) do
+ for level, val in pairs(counters) do
+ utils.add_to_bulk_metric(
+ 'log_messages',
+ val / delta_sec,
+ {hostname=hostname, service=service, level=string.lower(level)})
- -- We don't send the first value
- if last_timer_event ~= 0 and delta_sec ~= 0 then
- msg.Timestamp = ns
- msg.Fields = {
- name = 'log_messages',
- type = utils.metric_type['DERIVE'],
- value = val / delta_sec,
- service = service_name,
- level = string.lower(level),
- hostname = hostname,
- tag_fields = {'service', 'level', 'hostname'},
- }
-
- utils.inject_tags(msg)
- ok, err = utils.safe_inject_message(msg)
- if ok ~= 0 then
- return -1, err
- end
- end
-
- -- reset the counter
- logs_counters[service_name][level] = 0
-
+ -- reset the counter
+ counters[level] = 0
end
-
- last_timer_events[service_name] = ns
- current_service = current_service + 1
end
- if ns - enter_at >= interval_in_ns then
- enter_at = ns
- current_service = 1
+ last_timer_event = ns
+
+ ok, err = utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source, utils.metric_type['DERIVE'])
+ if ok ~= 0 then
+ return -1, err
end
return 0
diff --git a/heka/files/toml/output/elasticsearch.toml b/heka/files/toml/output/elasticsearch.toml
index 531e9bc..4356e5e 100644
--- a/heka/files/toml/output/elasticsearch.toml
+++ b/heka/files/toml/output/elasticsearch.toml
@@ -10,4 +10,4 @@
[{{ output_name }}_output.buffering]
max_buffer_size = 1073741824
max_file_size = 134217728
-full_action = "drop"
+full_action = "{{ output.full_action|default("block") }}"
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 293f0ea..1d6c76f 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -20,17 +20,36 @@
bulk_size: 523
percentile: 90
grace_time: 5
+ logger: aggregated_http_metrics_filter
+ source: log_collector
+ # This filter monitors the OpenStack logs and sends the rate of processed
+ # log messages broken down by service and severity every minute
log_counter:
engine: sandbox
module_file: /usr/share/lma_collector/filters/logs_counter.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
preserve_data: true
message_matcher: "Type == 'log' && Logger =~ /^openstack\\\\./"
- ticker_interval: 1
+ ticker_interval: 60
config:
hostname: '{{ grains.host }}'
- interval: 60
grace_interval: 30
+ logger_matcher: '^openstack%.(%a+)$'
+ logger: log_counter_filter
+ source: log_collector
+ hdd_errors:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/hdd_errors_counter.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ preserve_data: false
+ message_matcher: "Type == 'log' && Logger == 'system.kern'"
+ ticker_interval: 10
+ config:
+ grace_interval: 10
+ patterns: "/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/"
+ hostname: '{{ grains.host }}'
+ logger: hdd_errors_filter
+ source: log_collector
{%- if log_collector.elasticsearch_host is defined %}
encoder:
elasticsearch:
@@ -63,7 +82,7 @@
module_file: /usr/share/lma_collector/decoders/metric.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
config:
- deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter'
+ deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_filter log_counter_filter'
input:
heka_collectd:
engine: http
diff --git a/tests/lua/test_lma_utils.lua b/tests/lua/test_lma_utils.lua
index 8b6d198..7df99d7 100644
--- a/tests/lua/test_lma_utils.lua
+++ b/tests/lua/test_lma_utils.lua
@@ -91,6 +91,12 @@
assertEquals(ret, 'foo<BR/>ba')
end
+ function TestLmaUtils:test_convert_to_sec()
+ assertEquals(lma_utils.convert_to_sec(1000000001), 1)
+ assertEquals(lma_utils.convert_to_sec(1999999999), 1)
+ assertEquals(lma_utils.convert_to_sec(2000000001), 2)
+ end
+
lu = LuaUnit
lu:setVerbosity( 1 )
os.exit( lu:run() )