Merge "Add watchdog for Heka services"
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index 349e731..0ceeff6 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -91,7 +91,7 @@
 end
 
 -- Send the bulk metric message to the Heka pipeline
-function inject_bulk_metric(ts, hostname, logger, source, m_type)
+function inject_bulk_metric(ts, hostname, source, m_type)
     if #bulk_datapoints == 0 then
         return
     end
@@ -106,13 +106,12 @@
     end
 
     local msg = {
-        Logger = logger,
-        Hostname = hostname,
         Timestamp = ts,
         Payload = payload,
         Type = 'bulk_metric', -- prepended with 'heka.sandbox'
         Severity = label_to_severity_map.INFO,
         Fields = {
+            hostname = hostname, -- inject_message() can't set the top-level Hostname
             source = source,
             type = m_type or metric_type['GAUGE']
       }
diff --git a/heka/files/lua/decoders/metric.lua b/heka/files/lua/decoders/metric.lua
index 9b26158..75cec5e 100644
--- a/heka/files/lua/decoders/metric.lua
+++ b/heka/files/lua/decoders/metric.lua
@@ -18,19 +18,19 @@
 local l = require 'lpeg'
 l.locale(l)
 
-local loggers_pattern = l.Ct( (l.C((l.P(1) - l.space)^1) * l.space^0)^1 * -1)
-local loggers_list = loggers_pattern:match(read_config('deserialize_bulk_metric_for_loggers') or '')
+local split_on_space = l.Ct( (l.C((l.P(1) - l.space)^1) * l.space^0)^1 * -1)
+local sources_list = split_on_space:match(read_config('deserialize_for_sources') or '')
 
-local loggers = {}
-for _, logger in ipairs(loggers_list) do
-    loggers[logger] = true
+local sources = {}
+for _, s in ipairs(sources_list) do
+    sources[s] = true
 end
 
 local utils = require 'lma_utils'
 
 function process_message ()
     local msg = decode_message(read_message("raw"))
-    if string.match(msg.Type, 'bulk_metric$') and loggers[msg.Logger] ~= nil then
+    if string.match(msg.Type, 'bulk_metric$') and sources[msg.Fields.source] then
 
         local ok, metrics = pcall(cjson.decode, msg.Payload)
         if not ok then
diff --git a/heka/files/lua/filters/audit_authentications.lua b/heka/files/lua/filters/audit_authentications.lua
index a2eba0d..1ad9ae8 100644
--- a/heka/files/lua/filters/audit_authentications.lua
+++ b/heka/files/lua/filters/audit_authentications.lua
@@ -15,7 +15,6 @@
 require 'os'
 local utils = require 'lma_utils'
 
-local hostname = read_config('hostname') or error('hostname must be specified')
 -- The filter can receive messages that should be discarded because they are
 -- way too old (Heka cannot guarantee that messages are processed in real-time).
 -- The 'grace_interval' parameter allows to define which messages should be
@@ -23,14 +22,11 @@
 -- the filter will take into account messages that are at most 10 seconds
 -- older than the current time.
 local grace_interval = (read_config('grace_interval') or 0) + 0
-local metric_logger = read_config('logger')
 local metric_source = read_config('source')
 
 local msg = {
     Type = "multivalue_metric", -- will be prefixed by "heka.sandbox."
-    Hostname = hostname,
     Severity = 6,
-    Logger = read_config('logger') or error('logger must be specified')
 }
 local global_counters = {
     total=0,
diff --git a/heka/files/lua/filters/hdd_errors_counter.lua b/heka/files/lua/filters/hdd_errors_counter.lua
index f34da24..1da62ab 100644
--- a/heka/files/lua/filters/hdd_errors_counter.lua
+++ b/heka/files/lua/filters/hdd_errors_counter.lua
@@ -31,7 +31,6 @@
 -- grace_interval parameter allows to take into account log messages that are
 -- received in the current interval but emitted before it.
 local grace_interval = (read_config('grace_interval') or 0) + 0
-local metric_logger = read_config('logger')
 local metric_source = read_config('source')
 
 local error_counters = {}
@@ -87,7 +86,7 @@
     end
 
     enter_at = ns
-    utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
+    utils.inject_bulk_metric(ns, hostname, metric_source)
 
     return 0
 end
diff --git a/heka/files/lua/filters/heka_monitoring.lua b/heka/files/lua/filters/heka_monitoring.lua
index c61251f..c622619 100644
--- a/heka/files/lua/filters/heka_monitoring.lua
+++ b/heka/files/lua/filters/heka_monitoring.lua
@@ -78,6 +78,6 @@
         end
     end
 
-    utils.inject_bulk_metric(ts, hostname, 'heka_monitoring', 'internal')
+    utils.inject_bulk_metric(ts, hostname, 'internal')
     return 0
 end
diff --git a/heka/files/lua/filters/http_metrics_aggregator.lua b/heka/files/lua/filters/http_metrics_aggregator.lua
index 81eaf43..6abe3b7 100644
--- a/heka/files/lua/filters/http_metrics_aggregator.lua
+++ b/heka/files/lua/filters/http_metrics_aggregator.lua
@@ -34,7 +34,6 @@
 -- and also to compensate the delay introduced by log parsing/decoding
 -- which leads to arrive too late in its interval.
 local grace_time = (read_config('grace_time') or 0) + 0
-local metric_logger = read_config('logger')
 local metric_source = read_config('source')
 
 local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
@@ -167,7 +166,7 @@
                 num_metrics = num_metrics - 1
                 if num >= bulk_size then
                     if msg_injected < max_timer_inject then
-                        utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
+                        utils.inject_bulk_metric(ns, hostname, metric_source)
                         msg_injected = msg_injected + 1
                         num = 0
                         num_metrics = 0
@@ -179,7 +178,7 @@
         all_times[service] = nil
     end
     if num > 0 then
-        utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
+        utils.inject_bulk_metric(ns, hostname, metric_source)
         num = 0
         num_metrics = 0
     end
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
index 5199f25..e6ef229 100644
--- a/heka/files/lua/filters/logs_counter.lua
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -26,7 +26,6 @@
 -- older than the current time.
 local grace_interval = (read_config('grace_interval') or 0) + 0
 local logger_matcher = read_config('logger_matcher') or '.*'
-local metric_logger = read_config('logger')
 local metric_source = read_config('source')
 
 local discovered_services = {}
@@ -79,7 +78,7 @@
 
     last_timer_event = ns
 
-    ok, err = utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source, utils.metric_type['DERIVE'])
+    ok, err = utils.inject_bulk_metric(ns, hostname, metric_source, utils.metric_type['DERIVE'])
     if ok ~= 0 then
         return -1, err
     end
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 0e3f9ba..1b2a3ca 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -19,7 +19,6 @@
         bulk_size: 523
         percentile: 90
         grace_time: 5
-        logger: aggregated_http_metrics_filter
         source: log_collector
     # This filter monitors the OpenStack logs and sends the rate of processed
     # log messages broken down by service and severity every  minute
@@ -34,7 +33,6 @@
         hostname: '{{ grains.host }}'
         grace_interval: 30
         logger_matcher: '^openstack%.(%a+)$'
-        logger: log_counter_filter
         source: log_collector
     hdd_errors:
       engine: sandbox
@@ -47,7 +45,6 @@
         grace_interval: 10
         patterns: "/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/"
         hostname: '{{ grains.host }}'
-        logger: hdd_errors_filter
         source: log_collector
 {%- if log_collector.sensu_host is defined %}
     watchdog:
@@ -111,7 +108,7 @@
       module_file: /usr/share/lma_collector/decoders/metric.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
-        deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_filter log_counter_filter'
+        deserialize_for_sources: 'log_collector'
   input:
     heka_collectd:
       engine: http
@@ -303,9 +300,7 @@
       message_matcher: "Type == 'audit' &&  Fields[action] == 'authenticate'"
       ticker_interval: 60
       config:
-        hostname: '{{ grains.host }}'
         grace_interval: 30
-        logger: authentication_filter
         source: remote_collector
   {%- endif %}
 {%- endif %}