Merge pull request #111 from simonpasquier/restore-block-policy

Restore block policy
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index e79f934..b310ec4 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -91,7 +91,7 @@
 end
 
 -- Send the bulk metric message to the Heka pipeline
-function inject_bulk_metric(ts, hostname, source)
+function inject_bulk_metric(ts, hostname, logger, source, m_type)
     if #bulk_datapoints == 0 then
         return
     end
@@ -106,6 +106,7 @@
     end
 
     local msg = {
+        Logger = logger,
         Hostname = hostname,
         Timestamp = ts,
         Payload = payload,
@@ -113,14 +114,15 @@
         Severity = label_to_severity_map.INFO,
         Fields = {
             hostname = hostname,
-            source = source
+            source = source,
+            type = m_type or metric_type['GAUGE']
       }
     }
     -- reset the local table storing the datapoints
     bulk_datapoints = {}
 
     inject_tags(msg)
-    safe_inject_message(msg)
+    return safe_inject_message(msg)
 end
 
 -- Encode a Lua variable as JSON without raising an exception if the encoding
@@ -321,4 +323,9 @@
     return true, value
 end
 
+-- convert a nanosecond value to second
+function convert_to_sec(ns)
+    return math.floor(ns/1e9)
+end
+
 return M
diff --git a/heka/files/lua/filters/hdd_errors_counter.lua b/heka/files/lua/filters/hdd_errors_counter.lua
index 66980bc..6e5146c 100644
--- a/heka/files/lua/filters/hdd_errors_counter.lua
+++ b/heka/files/lua/filters/hdd_errors_counter.lua
@@ -31,19 +31,17 @@
 -- grace_interval parameter allows to take into account log messages that are
 -- received in the current interval but emitted before it.
 local grace_interval = (read_config('grace_interval') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
 
 local error_counters = {}
 local enter_at
 local start_time = os.time()
 
-local function convert_to_sec(ns)
-    return math.floor(ns/1e9)
-end
-
 function process_message ()
     -- timestamp values should be converted to seconds because log timestamps
     -- have a precision of one second (or millisecond sometimes)
-    if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(enter_at or 0), start_time) then
+    if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(utils.convert_to_sec(enter_at or 0), start_time) then
         -- skip the log message if it doesn't fall into the current interval
         return 0
     end
@@ -86,7 +84,7 @@
     end
 
     enter_at = ns
-    utils.inject_bulk_metric(ns, hostname, 'hdd_errors_filter')
+    utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
 
     return 0
-end
\ No newline at end of file
+end
diff --git a/heka/files/lua/filters/heka_monitoring.lua b/heka/files/lua/filters/heka_monitoring.lua
index 35efcad..c40f0c3 100644
--- a/heka/files/lua/filters/heka_monitoring.lua
+++ b/heka/files/lua/filters/heka_monitoring.lua
@@ -80,6 +80,6 @@
         end
     end
 
-    utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
+    utils.inject_bulk_metric(ts, hostname, 'heka_monitoring', 'internal')
     return 0
 end
diff --git a/heka/files/lua/filters/http_metrics_aggregator.lua b/heka/files/lua/filters/http_metrics_aggregator.lua
index 1756a59..81eaf43 100644
--- a/heka/files/lua/filters/http_metrics_aggregator.lua
+++ b/heka/files/lua/filters/http_metrics_aggregator.lua
@@ -34,11 +34,12 @@
 -- and also to compensate the delay introduced by log parsing/decoding
 -- which leads to arrive too late in its interval.
 local grace_time = (read_config('grace_time') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
 
 local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
 
 local percentile_field_name = string.format('upper_%s', percentile_thresh)
-local msg_source = 'http_metric_filter'
 local last_tick = os.time() * 1e9
 local interval_in_ns = interval * 1e9
 
@@ -84,7 +85,7 @@
 
     -- keep only the first 2 tokens because some services like Neutron report
     -- themselves as 'openstack.<service>.server'
-    local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
+    local service = string.gsub(logger, '(%w+)%.(%w+).*', '%1_%2')
     if service == nil then
         return -1, "Cannot match any service from " .. logger
     end
@@ -166,7 +167,7 @@
                 num_metrics = num_metrics - 1
                 if num >= bulk_size then
                     if msg_injected < max_timer_inject then
-                        utils.inject_bulk_metric(ns, hostname, msg_source)
+                        utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
                         msg_injected = msg_injected + 1
                         num = 0
                         num_metrics = 0
@@ -178,7 +179,7 @@
         all_times[service] = nil
     end
     if num > 0 then
-        utils.inject_bulk_metric(ns, hostname, msg_source)
+        utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
         num = 0
         num_metrics = 0
     end
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
index 7da74e9..5199f25 100644
--- a/heka/files/lua/filters/logs_counter.lua
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -18,41 +18,33 @@
 local utils = require 'lma_utils'
 
 local hostname = read_config('hostname') or error('hostname must be specified')
-local interval = (read_config('interval') or error('interval must be specified')) + 0
--- Heka cannot guarantee that logs are processed in real-time so the
--- grace_interval parameter allows to take into account log messages that are
--- received in the current interval but emitted before it.
+-- The filter can receive messages that should be discarded because they are
+-- way too old (Heka cannot guarantee that logs are processed in real-time).
+-- The 'grace_interval' parameter allows to define which log messages should be
+-- kept and which should be discarded. For instance, a value of '10' means that
+-- the filter will take into account log messages that are at most 10 seconds
+-- older than the current time.
 local grace_interval = (read_config('grace_interval') or 0) + 0
+local logger_matcher = read_config('logger_matcher') or '.*'
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
 
 local discovered_services = {}
 local logs_counters = {}
-local last_timer_events = {}
-local current_service = 1
-local enter_at
-local interval_in_ns = interval * 1e9
-local start_time = os.time()
-local msg = {
-    Type = "metric",
-    Timestamp = nil,
-    Severity = 6,
-}
-
-function convert_to_sec(ns)
-    return math.floor(ns/1e9)
-end
+local last_timer_event = os.time() * 1e9
 
 function process_message ()
     local severity = read_message("Fields[severity_label]")
     local logger = read_message("Logger")
 
-    local service = string.match(logger, "^openstack%.(%a+)$")
+    local service = string.match(logger, logger_matcher)
     if service == nil then
-        return -1, "Cannot match any services from " .. logger
+        return -1, "Cannot match any service from " .. logger
     end
 
     -- timestamp values should be converted to seconds because log timestamps
     -- have a precision of one second (or millisecond sometimes)
-    if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(last_timer_events[service] or 0), start_time) then
+    if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < utils.convert_to_sec(last_timer_event) then
         -- skip the the log message if it doesn't fall into the current interval
         return 0
     end
@@ -67,67 +59,29 @@
     end
 
     logs_counters[service][severity] = logs_counters[service][severity] + 1
-
     return 0
 end
 
 function timer_event(ns)
-
-    -- We can only send a maximum of ten events per call.
-    -- So we send all metrics about one service and we will proceed with
-    -- the following services at the next ticker event.
-
-    if #discovered_services == 0 then
-        return 0
-    end
-
-    -- Initialize enter_at during the first call to timer_event
-    if not enter_at then
-        enter_at = ns
-    end
-
-    -- To be able to send a metric we need to check if we are within the
-    -- interval specified in the configuration and if we haven't already sent
-    -- all metrics.
-    if ns - enter_at < interval_in_ns and current_service <= #discovered_services then
-        local service_name = discovered_services[current_service]
-        local last_timer_event = last_timer_events[service_name] or 0
+    for service, counters in pairs(logs_counters) do
         local delta_sec = (ns - last_timer_event) / 1e9
 
-        for level, val in pairs(logs_counters[service_name]) do
+        for level, val in pairs(counters) do
+            utils.add_to_bulk_metric(
+                'log_messages',
+                val / delta_sec,
+                {hostname=hostname, service=service, level=string.lower(level)})
 
-           -- We don't send the first value
-           if last_timer_event ~= 0 and delta_sec ~= 0 then
-               msg.Timestamp = ns
-               msg.Fields = {
-                   name = 'log_messages',
-                   type = utils.metric_type['DERIVE'],
-                   value = val / delta_sec,
-                   service = service_name,
-                   level = string.lower(level),
-                   hostname = hostname,
-                   tag_fields = {'service', 'level', 'hostname'},
-               }
-
-               utils.inject_tags(msg)
-               ok, err = utils.safe_inject_message(msg)
-               if ok ~= 0 then
-                 return -1, err
-               end
-           end
-
-           -- reset the counter
-           logs_counters[service_name][level] = 0
-
+            -- reset the counter
+            counters[level] = 0
         end
-
-        last_timer_events[service_name] = ns
-        current_service = current_service + 1
     end
 
-    if ns - enter_at >= interval_in_ns then
-        enter_at = ns
-        current_service = 1
+    last_timer_event = ns
+
+    ok, err = utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source, utils.metric_type['DERIVE'])
+    if ok ~= 0 then
+        return -1, err
     end
 
     return 0
diff --git a/heka/files/toml/output/elasticsearch.toml b/heka/files/toml/output/elasticsearch.toml
index 531e9bc..4356e5e 100644
--- a/heka/files/toml/output/elasticsearch.toml
+++ b/heka/files/toml/output/elasticsearch.toml
@@ -10,4 +10,4 @@
 [{{ output_name }}_output.buffering]
 max_buffer_size = 1073741824
 max_file_size = 134217728
-full_action = "drop"
+full_action = "{{ output.full_action|default("block") }}"
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 293f0ea..1d6c76f 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -20,17 +20,36 @@
         bulk_size: 523
         percentile: 90
         grace_time: 5
+        logger: aggregated_http_metrics_filter
+        source: log_collector
+    # This filter monitors the OpenStack logs and sends the rate of processed
+    # log messages broken down by service and severity every  minute
     log_counter:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/logs_counter.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       preserve_data: true
       message_matcher: "Type == 'log' && Logger =~ /^openstack\\\\./"
-      ticker_interval: 1
+      ticker_interval: 60
       config:
         hostname: '{{ grains.host }}'
-        interval: 60
         grace_interval: 30
+        logger_matcher: '^openstack%.(%a+)$'
+        logger: log_counter_filter
+        source: log_collector
+    hdd_errors:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/hdd_errors_counter.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      preserve_data: false
+      message_matcher: "Type == 'log' && Logger == 'system.kern'"
+      ticker_interval: 10
+      config:
+        grace_interval: 10
+        patterns: "/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/"
+        hostname: '{{ grains.host }}'
+        logger: hdd_errors_filter
+        source: log_collector
 {%- if log_collector.elasticsearch_host is defined %}
   encoder:
     elasticsearch:
@@ -63,7 +82,7 @@
       module_file: /usr/share/lma_collector/decoders/metric.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
-        deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter'
+        deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_filter log_counter_filter'
   input:
     heka_collectd:
       engine: http
diff --git a/tests/lua/test_lma_utils.lua b/tests/lua/test_lma_utils.lua
index 8b6d198..7df99d7 100644
--- a/tests/lua/test_lma_utils.lua
+++ b/tests/lua/test_lma_utils.lua
@@ -91,6 +91,12 @@
         assertEquals(ret, 'foo<BR/>ba')
     end
 
+    function TestLmaUtils:test_convert_to_sec()
+        assertEquals(lma_utils.convert_to_sec(1000000001), 1)
+        assertEquals(lma_utils.convert_to_sec(1999999999), 1)
+        assertEquals(lma_utils.convert_to_sec(2000000001), 2)
+    end
+
 lu = LuaUnit
 lu:setVerbosity( 1 )
 os.exit( lu:run() )