Improve inject_bulk_metric()

This change ensures that bulk metrics have an explicit Logger field as
well as Fields[source] and Fields[type] like regular metric messages.
It also makes the configuration of the input metric plugin on the
metric_collector side more explicit about which metrics to deserialize.
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index aa89f29..b310ec4 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -91,7 +91,7 @@
 end
 
 -- Send the bulk metric message to the Heka pipeline
-function inject_bulk_metric(ts, hostname, source, metric_type)
+function inject_bulk_metric(ts, hostname, logger, source, m_type)
     if #bulk_datapoints == 0 then
         return
     end
@@ -106,6 +106,7 @@
     end
 
     local msg = {
+        Logger = logger,
         Hostname = hostname,
         Timestamp = ts,
         Payload = payload,
@@ -114,7 +115,7 @@
         Fields = {
             hostname = hostname,
             source = source,
-            type = metric_type
+            type = m_type or metric_type['GAUGE']
       }
     }
     -- reset the local table storing the datapoints
diff --git a/heka/files/lua/filters/hdd_errors_counter.lua b/heka/files/lua/filters/hdd_errors_counter.lua
index fcd5fe1..6e5146c 100644
--- a/heka/files/lua/filters/hdd_errors_counter.lua
+++ b/heka/files/lua/filters/hdd_errors_counter.lua
@@ -31,6 +31,8 @@
 -- grace_interval parameter allows to take into account log messages that are
 -- received in the current interval but emitted before it.
 local grace_interval = (read_config('grace_interval') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
 
 local error_counters = {}
 local enter_at
@@ -82,7 +84,7 @@
     end
 
     enter_at = ns
-    utils.inject_bulk_metric(ns, hostname, 'hdd_errors_filter')
+    utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
 
     return 0
 end
diff --git a/heka/files/lua/filters/heka_monitoring.lua b/heka/files/lua/filters/heka_monitoring.lua
index 35efcad..c40f0c3 100644
--- a/heka/files/lua/filters/heka_monitoring.lua
+++ b/heka/files/lua/filters/heka_monitoring.lua
@@ -80,6 +80,6 @@
         end
     end
 
-    utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
+    utils.inject_bulk_metric(ts, hostname, 'heka_monitoring', 'internal')
     return 0
 end
diff --git a/heka/files/lua/filters/http_metrics_aggregator.lua b/heka/files/lua/filters/http_metrics_aggregator.lua
index 1756a59..81eaf43 100644
--- a/heka/files/lua/filters/http_metrics_aggregator.lua
+++ b/heka/files/lua/filters/http_metrics_aggregator.lua
@@ -34,11 +34,12 @@
 -- and also to compensate the delay introduced by log parsing/decoding
 -- which leads to arrive too late in its interval.
 local grace_time = (read_config('grace_time') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
 
 local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
 
 local percentile_field_name = string.format('upper_%s', percentile_thresh)
-local msg_source = 'http_metric_filter'
 local last_tick = os.time() * 1e9
 local interval_in_ns = interval * 1e9
 
@@ -84,7 +85,7 @@
 
     -- keep only the first 2 tokens because some services like Neutron report
     -- themselves as 'openstack.<service>.server'
-    local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
+    local service = string.gsub(logger, '(%w+)%.(%w+).*', '%1_%2')
     if service == nil then
         return -1, "Cannot match any service from " .. logger
     end
@@ -166,7 +167,7 @@
                 num_metrics = num_metrics - 1
                 if num >= bulk_size then
                     if msg_injected < max_timer_inject then
-                        utils.inject_bulk_metric(ns, hostname, msg_source)
+                        utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
                         msg_injected = msg_injected + 1
                         num = 0
                         num_metrics = 0
@@ -178,7 +179,7 @@
         all_times[service] = nil
     end
     if num > 0 then
-        utils.inject_bulk_metric(ns, hostname, msg_source)
+        utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source)
         num = 0
         num_metrics = 0
     end
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
index 371ff8c..c22749b 100644
--- a/heka/files/lua/filters/logs_counter.lua
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -25,6 +25,8 @@
 -- the filter will take into account log messages that are at most 10 seconds
 -- older than the current time.
 local grace_interval = (read_config('grace_interval') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
 
 local discovered_services = {}
 local logs_counters = {}
@@ -76,7 +78,7 @@
 
     last_timer_event = ns
 
-    ok, err = utils.inject_bulk_metric(ns, hostname, 'logs_counter', utils.metric_type['DERIVE'])
+    ok, err = utils.inject_bulk_metric(ns, hostname, metric_logger, metric_source, utils.metric_type['DERIVE'])
     if ok ~= 0 then
         return -1, err
     end
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index e8138dd..c7a7fdf 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -20,6 +20,8 @@
         bulk_size: 523
         percentile: 90
         grace_time: 5
+        logger: aggregated_http_metrics_filter
+        source: log_collector
     log_counter:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/logs_counter.lua
@@ -30,6 +32,8 @@
       config:
         hostname: '{{ grains.host }}'
         grace_interval: 30
+        logger: log_counter_filter
+        source: log_collector
 {%- if log_collector.elasticsearch_host is defined %}
   encoder:
     elasticsearch:
@@ -62,7 +66,7 @@
       module_file: /usr/share/lma_collector/decoders/metric.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
-        deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter logs_counter_filter'
+        deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter log_counter_filter'
   input:
     heka_collectd:
       engine: http