Send log_messages metric as bulk

Using bulk metrics for the log counters reduces largely the likelihood
of blocking the Heka pipeline. Instead of injecting (x services
* y levels) metric messages, the filter injects only one big message.

This changes also updates the configuration of the metric_collector
service to deserialize the bulk metric to support alarms on log
counters.
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index e79f934..1afe372 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -91,7 +91,7 @@
 end
 
 -- Send the bulk metric message to the Heka pipeline
-function inject_bulk_metric(ts, hostname, source)
+function inject_bulk_metric(ts, hostname, source, metric_type)
     if #bulk_datapoints == 0 then
         return
     end
@@ -113,14 +113,15 @@
         Severity = label_to_severity_map.INFO,
         Fields = {
             hostname = hostname,
-            source = source
+            source = source,
+            type = metric_type
       }
     }
     -- reset the local table storing the datapoints
     bulk_datapoints = {}
 
     inject_tags(msg)
-    safe_inject_message(msg)
+    return safe_inject_message(msg)
 end
 
 -- Encode a Lua variable as JSON without raising an exception if the encoding
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
index 7da74e9..c1d0391 100644
--- a/heka/files/lua/filters/logs_counter.lua
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -18,24 +18,17 @@
 local utils = require 'lma_utils'
 
 local hostname = read_config('hostname') or error('hostname must be specified')
-local interval = (read_config('interval') or error('interval must be specified')) + 0
--- Heka cannot guarantee that logs are processed in real-time so the
--- grace_interval parameter allows to take into account log messages that are
--- received in the current interval but emitted before it.
+-- The filter can receive messages that should be discarded because they are
+-- way too old (Heka cannot guarantee that logs are processed in real-time).
+-- The 'grace_interval' parameter allows to define which log messages should be
+-- kept and which should be discarded. For instance, a value of '10' means that
+-- the filter will take into account log messages that are at most 10 seconds
+-- older than the current time.
 local grace_interval = (read_config('grace_interval') or 0) + 0
 
 local discovered_services = {}
 local logs_counters = {}
-local last_timer_events = {}
-local current_service = 1
-local enter_at
-local interval_in_ns = interval * 1e9
-local start_time = os.time()
-local msg = {
-    Type = "metric",
-    Timestamp = nil,
-    Severity = 6,
-}
+local last_timer_event = os.time() * 1e9
 
 function convert_to_sec(ns)
     return math.floor(ns/1e9)
@@ -47,12 +40,12 @@
 
     local service = string.match(logger, "^openstack%.(%a+)$")
     if service == nil then
-        return -1, "Cannot match any services from " .. logger
+        return -1, "Cannot match any service from " .. logger
     end
 
     -- timestamp values should be converted to seconds because log timestamps
     -- have a precision of one second (or millisecond sometimes)
-    if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(last_timer_events[service] or 0), start_time) then
+    if convert_to_sec(read_message('Timestamp')) + grace_interval < convert_to_sec(last_timer_event) then
         -- skip the the log message if it doesn't fall into the current interval
         return 0
     end
@@ -67,67 +60,29 @@
     end
 
     logs_counters[service][severity] = logs_counters[service][severity] + 1
-
     return 0
 end
 
 function timer_event(ns)
-
-    -- We can only send a maximum of ten events per call.
-    -- So we send all metrics about one service and we will proceed with
-    -- the following services at the next ticker event.
-
-    if #discovered_services == 0 then
-        return 0
-    end
-
-    -- Initialize enter_at during the first call to timer_event
-    if not enter_at then
-        enter_at = ns
-    end
-
-    -- To be able to send a metric we need to check if we are within the
-    -- interval specified in the configuration and if we haven't already sent
-    -- all metrics.
-    if ns - enter_at < interval_in_ns and current_service <= #discovered_services then
-        local service_name = discovered_services[current_service]
-        local last_timer_event = last_timer_events[service_name] or 0
+    for service, counters in pairs(logs_counters) do
         local delta_sec = (ns - last_timer_event) / 1e9
 
-        for level, val in pairs(logs_counters[service_name]) do
+        for level, val in pairs(counters) do
+            utils.add_to_bulk_metric(
+                'log_messages',
+                val / delta_sec,
+                {hostname=hostname, service=service, level=string.lower(level)})
 
-           -- We don't send the first value
-           if last_timer_event ~= 0 and delta_sec ~= 0 then
-               msg.Timestamp = ns
-               msg.Fields = {
-                   name = 'log_messages',
-                   type = utils.metric_type['DERIVE'],
-                   value = val / delta_sec,
-                   service = service_name,
-                   level = string.lower(level),
-                   hostname = hostname,
-                   tag_fields = {'service', 'level', 'hostname'},
-               }
-
-               utils.inject_tags(msg)
-               ok, err = utils.safe_inject_message(msg)
-               if ok ~= 0 then
-                 return -1, err
-               end
-           end
-
-           -- reset the counter
-           logs_counters[service_name][level] = 0
-
+            -- reset the counter
+            counters[level] = 0
         end
-
-        last_timer_events[service_name] = ns
-        current_service = current_service + 1
     end
 
-    if ns - enter_at >= interval_in_ns then
-        enter_at = ns
-        current_service = 1
+    last_timer_event = ns
+
+    ok, err = utils.inject_bulk_metric(ns, hostname, 'logs_counter', utils.metric_type['DERIVE'])
+    if ok ~= 0 then
+        return -1, err
     end
 
     return 0
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 293f0ea..e8138dd 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -26,10 +26,9 @@
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       preserve_data: true
       message_matcher: "Type == 'log' && Logger =~ /^openstack\\\\./"
-      ticker_interval: 1
+      ticker_interval: 60
       config:
         hostname: '{{ grains.host }}'
-        interval: 60
         grace_interval: 30
 {%- if log_collector.elasticsearch_host is defined %}
   encoder:
@@ -63,7 +62,7 @@
       module_file: /usr/share/lma_collector/decoders/metric.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
-        deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter'
+        deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter logs_counter_filter'
   input:
     heka_collectd:
       engine: http