Merge pull request #46 from elemoine/stacklight-fix-policies

Fix the gse_policies structure
diff --git a/heka/_service.sls b/heka/_service.sls
index c1c85ed..8a09ea7 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -78,8 +78,6 @@
     'decoder': {},
     'input': {},
     'trigger': {},
-    'alarm': {},
-    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -90,7 +88,6 @@
     'input': {},
     'trigger': {},
     'alarm': {},
-    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -101,7 +98,6 @@
     'input': {},
     'trigger': {},
     'alarm': {},
-    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -111,7 +107,6 @@
     'decoder': {},
     'input': {},
     'trigger': {},
-    'alarm': {},
     'alarm_cluster': {},
     'filter': {},
     'splitter': {},
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index f94a0e8..0befcf0 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -216,6 +216,14 @@
                 else
                     msg['Fields']['name'] = metric_name
                 end
+            elseif metric_source == 'ntpd' then
+                if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
+                    msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
+                else
+                    msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. 'peer'
+                    msg['Fields']['server'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'server')
+                end
             elseif metric_source == 'check_openstack_api' then
                 -- For OpenStack API metrics, plugin_instance = <service name>
                 msg['Fields']['name'] = 'openstack_check_api'
@@ -420,7 +428,31 @@
                 msg['Fields']['service'] = sample['type_instance']
                 table.insert(msg['Fields']['tag_fields'], 'service')
             else
-                msg['Fields']['name'] = replace_dot_by_sep(metric_name)
+                -- generic metric name translation for 3rd-party sources
+                msg['Fields']['name'] = sample['plugin']
+                if sample['plugin_instance'] ~= "" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
+                end
+                if sample['type'] ~= 'gauge' and sample['type'] ~= 'derive' and
+                   sample['type'] ~= 'counter' and sample['type'] ~= 'absolute' then
+                   -- only for custom DS types
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
+                end
+                if sample['type_instance'] ~= "" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
+                end
+                if sample['dsnames'][i] ~= "value" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
+                end
+                msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
+
+                -- add meta fields as tag_fields
+                for k, v in pairs(sample['meta'] or {}) do
+                    if tostring(k) ~= '0' then
+                        msg['Fields'][k] = v
+                        table.insert(msg['Fields']['tag_fields'], k)
+                   end
+                end
             end
 
             if not skip_it then
diff --git a/heka/map.jinja b/heka/map.jinja
index e6d8a90..300960e 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -56,6 +56,7 @@
     'influxdb_port': default_influxdb_port,
     'influxdb_time_precision': default_influxdb_time_precision,
     'influxdb_timeout': default_influxdb_timeout,
+    'aggregator_port': default_aggregator_port,
   }
 }, merge=salt['pillar.get']('heka:remote_collector')) %}
 
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 08b99ea..67254c8 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -153,7 +153,7 @@
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       preserve_data: false
-      message_matcher: "Type =~ /metric$/"
+      message_matcher: "Type == 'heka.sandbox.afd_metric'"
       ticker_interval: 1
       config:
         tag_fields: "deployment_id environment_label tenant_id user_id"
@@ -184,6 +184,13 @@
       encoder: influxdb_encoder
       timeout: {{ remote_collector.influxdb_timeout }}
 {%- endif %}
+{%- if remote_collector.aggregator_host is defined %}
+    aggregator:
+      engine: tcp
+      host: "{{ remote_collector.aggregator_host }}"
+      port: "{{ remote_collector.aggregator_port }}"
+      message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
+{%- endif %}
 aggregator:
   policy:
     # A policy defining that the cluster's status depends on the member with