Compute log-based metrics as rates or counters

By default, the metrics will still be sent as rates but for Prometheus
based monitoring, they will be sent as counters.

Change-Id: Ia9e64c35b32a1fa49071d698f91fdb2a7665a225
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index 7488ba4..9d039bc 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -326,4 +326,18 @@
     return math.floor(ns/1e9)
 end
 
+-- convert a number/string value to boolean
+function convert_to_bool(v, default)
+    if type(v) == "boolean" then
+        return v
+    end
+    if type(v) == "number" then
+        return v > 0
+    end
+    if type(v) == "string" then
+        return string.lower(v) == "true"
+    end
+    return not not default
+end
+
 return M
diff --git a/heka/files/lua/filters/audit_authentications.lua b/heka/files/lua/filters/audit_authentications.lua
index 1ad9ae8..44b41fc 100644
--- a/heka/files/lua/filters/audit_authentications.lua
+++ b/heka/files/lua/filters/audit_authentications.lua
@@ -23,6 +23,7 @@
 -- older than the current time.
 local grace_interval = (read_config('grace_interval') or 0) + 0
 local metric_source = read_config('source')
+local emit_rates = utils.convert_to_bool(read_config('emit_rates'), true)
 
 local msg = {
     Type = "multivalue_metric", -- will be prefixed by "heka.sandbox."
@@ -82,24 +83,26 @@
     msg.Fields.failed = global_counters.failed
     utils.safe_inject_message(msg)
 
-    -- send the rates
-    msg.Fields.name = 'authentications_rate'
-    msg.Fields.type = utils.metric_type['DERIVE']
-    local delta_sec = (ns - last_timer_event) / 1e9
-    msg.Fields.all = ticker_counters.total / delta_sec
-    msg.Fields.success = ticker_counters.success / delta_sec
-    msg.Fields.failed = ticker_counters.failed / delta_sec
-    utils.safe_inject_message(msg)
-
-    -- send the percentages
-    if ticker_counters.total > 0 then
-        msg.Fields.name = 'authentications_percent'
-        msg.Fields.type = utils.metric_type['GAUGE']
-        msg.Fields.value_fields = {'success', 'failed'}
-        msg.Fields.all = nil
-        msg.Fields.success = 100.0 * ticker_counters.success / ticker_counters.total
-        msg.Fields.failed = 100.0 * ticker_counters.failed / ticker_counters.total
+    if emit_rates then
+        -- send the rates
+        msg.Fields.name = 'authentications_rate'
+        msg.Fields.type = utils.metric_type['DERIVE']
+        local delta_sec = (ns - last_timer_event) / 1e9
+        msg.Fields.all = ticker_counters.total / delta_sec
+        msg.Fields.success = ticker_counters.success / delta_sec
+        msg.Fields.failed = ticker_counters.failed / delta_sec
         utils.safe_inject_message(msg)
+
+        -- send the percentages
+        if ticker_counters.total > 0 then
+            msg.Fields.name = 'authentications_percent'
+            msg.Fields.type = utils.metric_type['GAUGE']
+            msg.Fields.value_fields = {'success', 'failed'}
+            msg.Fields.all = nil
+            msg.Fields.success = 100.0 * ticker_counters.success / ticker_counters.total
+            msg.Fields.failed = 100.0 * ticker_counters.failed / ticker_counters.total
+            utils.safe_inject_message(msg)
+        end
     end
 
     -- reset the variables
diff --git a/heka/files/lua/filters/failed_logins.lua b/heka/files/lua/filters/failed_logins.lua
index ecf259f..309d15d 100644
--- a/heka/files/lua/filters/failed_logins.lua
+++ b/heka/files/lua/filters/failed_logins.lua
@@ -24,6 +24,7 @@
 -- older than the current time.
 local grace_interval = (read_config('grace_interval') or 0) + 0
 local metric_source = read_config('source')
+local emit_rates = utils.convert_to_bool(read_config('emit_rates'), true)
 
 local msg = {
     Type = "metric", -- will be prefixed by "heka.sandbox."
@@ -56,14 +57,16 @@
     msg.Timestamp = ns
     msg.Fields.name = 'failed_logins_total'
     msg.Fields.value = global_counter
-    msg.Fields.type = utils.metric_type['GAUGE']
+    msg.Fields.type = utils.metric_type['COUNTER']
     utils.inject_tags(msg)
     utils.safe_inject_message(msg)
 
-    msg.Fields.name = 'failed_logins_rate'
-    msg.Fields.type = utils.metric_type['DERIVE']
-    msg.Fields.value = ticker_counter / ((ns - last_timer_event) / 1e9)
-    utils.safe_inject_message(msg)
+    if emit_rates then
+        msg.Fields.name = 'failed_logins_rate'
+        msg.Fields.type = utils.metric_type['DERIVE']
+        msg.Fields.value = ticker_counter / ((ns - last_timer_event) / 1e9)
+        utils.safe_inject_message(msg)
+    end
 
     ticker_counter = 0
     last_timer_event = ns
diff --git a/heka/files/lua/filters/hdd_errors_counter.lua b/heka/files/lua/filters/hdd_errors_counter.lua
index 1da62ab..ab9e952 100644
--- a/heka/files/lua/filters/hdd_errors_counter.lua
+++ b/heka/files/lua/filters/hdd_errors_counter.lua
@@ -18,6 +18,7 @@
 local utils = require 'lma_utils'
 
 local hostname = read_config('hostname') or error('hostname must be specified')
+local emit_rates = utils.convert_to_bool(read_config('emit_rates'), true)
 local patterns_config = read_config('patterns') or error('patterns must be specified')
 local patterns = {}
 for pattern in string.gmatch(patterns_config, "/(%S+)/") do
@@ -32,6 +33,12 @@
 -- received in the current interval but emitted before it.
 local grace_interval = (read_config('grace_interval') or 0) + 0
 local metric_source = read_config('source')
+local metric_name = "hdd_errors"
+local metric_type = utils.metric_type['DERIVE']
+if emit_rates then
+    metric_name = "hdd_errors_rate"
+    metric_type = utils.metric_type['COUNTER']
+end
 
 local error_counters = {}
 local enter_at
@@ -76,17 +83,23 @@
     local delta_sec = (ns - (enter_at or 0)) / 1e9
     for dev, value in pairs(error_counters) do
         -- Don`t send values at the first ticker interval
-        if enter_at ~= nil then
+        if enter_at ~= nil or emit_rates then
+            if emit_rates then
+                value = value / delta_sec
+            end
+
             utils.add_to_bulk_metric(
-                "hdd_errors_rate",
-                value / delta_sec,
+                metric_name,
+                value,
                 {device=dev, hostname=hostname})
         end
-        error_counters[dev] = 0
+        if emit_rates then
+            error_counters[dev] = 0
+        end
     end
 
     enter_at = ns
-    utils.inject_bulk_metric(ns, hostname, metric_source)
+    utils.inject_bulk_metric(ns, hostname, metric_source, metric_type)
 
     return 0
 end
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
index e6ef229..703e4dd 100644
--- a/heka/files/lua/filters/logs_counter.lua
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -27,6 +27,11 @@
 local grace_interval = (read_config('grace_interval') or 0) + 0
 local logger_matcher = read_config('logger_matcher') or '.*'
 local metric_source = read_config('source')
+local emit_rates = utils.convert_to_bool(read_config('emit_rates'), true)
+local metric_type = utils.metric_type['COUNTER']
+if emit_rates then
+    metric_type = utils.metric_type['DERIVE']
+end
 
 local discovered_services = {}
 local logs_counters = {}
@@ -66,19 +71,24 @@
         local delta_sec = (ns - last_timer_event) / 1e9
 
         for level, val in pairs(counters) do
+            if emit_rates then
+                val = val / delta_sec
+            end
             utils.add_to_bulk_metric(
                 'log_messages',
-                val / delta_sec,
+                val,
                 {hostname=hostname, service=service, level=string.lower(level)})
 
             -- reset the counter
-            counters[level] = 0
+            if emit_rates then
+                counters[level] = 0
+            end
         end
     end
 
     last_timer_event = ns
 
-    ok, err = utils.inject_bulk_metric(ns, hostname, metric_source, utils.metric_type['DERIVE'])
+    ok, err = utils.inject_bulk_metric(ns, hostname, metric_source, metric_type)
     if ok ~= 0 then
         return -1, err
     end
diff --git a/heka/map.jinja b/heka/map.jinja
index 827f184..297705f 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -51,6 +51,7 @@
   'default': {
     'container_mode': False,
     'alarms_enabled': True,
+    'emit_rates': True,
     'prefix_dir': default_prefix_dir,
     'elasticsearch_port': default_elasticsearch_port,
     'poolsize': 100,
@@ -88,6 +89,7 @@
   'default': {
     'container_mode': False,
     'alarms_enabled': True,
+    'emit_rates': True,
     'prefix_dir': default_prefix_dir,
     'amqp_port': default_amqp_port,
     'amqp_vhost': '',
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 5ad045a..cd3f657 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -45,6 +45,7 @@
         grace_interval: 30
         logger_matcher: '^openstack%.(%a+)$'
         source: log_collector
+        emit_rates: {{ log_collector.emit_rates }}
     hdd_errors:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/hdd_errors_counter.lua
@@ -57,6 +58,7 @@
         patterns: "/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/"
         hostname: '{{ grains.host }}'
         source: log_collector
+        emit_rates: {{ log_collector.emit_rates }}
 {%- if log_collector.sensu_host is defined %}
     watchdog:
       engine: sandbox
@@ -335,6 +337,7 @@
       config:
         grace_interval: 30
         source: remote_collector
+        emit_rates: {{ remote_collector.emit_rates }}
   {%- endif %}
 {%- endif %}
 {%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined or remote_collector.sensu_host is defined %}
diff --git a/tests/lua/test_lma_utils.lua b/tests/lua/test_lma_utils.lua
index 7df99d7..1dd6e14 100644
--- a/tests/lua/test_lma_utils.lua
+++ b/tests/lua/test_lma_utils.lua
@@ -97,6 +97,17 @@
         assertEquals(lma_utils.convert_to_sec(2000000001), 2)
     end
 
+    function TestLmaUtils:test_convert_to_bool()
+        assertEquals(lma_utils.convert_to_bool(true), true)
+        assertEquals(lma_utils.convert_to_bool(0), false)
+        assertEquals(lma_utils.convert_to_bool(1), true)
+        assertEquals(lma_utils.convert_to_bool("false"), false)
+        assertEquals(lma_utils.convert_to_bool("tRue"), true)
+        assertEquals(lma_utils.convert_to_bool(nil), false)
+        assertEquals(lma_utils.convert_to_bool(nil, true), true)
+        assertEquals(lma_utils.convert_to_bool("false", true), false)
+    end
+
 lu = LuaUnit
 lu:setVerbosity( 1 )
 os.exit( lu:run() )