Merge pull request #27 from elemoine/stacklight-aggregator

Fix and improve the aggregator
diff --git a/heka/_service.sls b/heka/_service.sls
index 63516c6..e873f2e 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -1,3 +1,4 @@
+{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file ignore missing %}{% endmacro %}
 
 {%- set server = salt['pillar.get']('heka:'+service_name) %}
 
@@ -124,8 +125,6 @@
 {%- for service_name, service in pillar.iteritems() %}
 {%- if service.get('_support', {}).get('heka', {}).get('enabled', False) %}
 
-{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file ignore missing %}{% endmacro %}
-
 {%- set grains_fragment_file = service_name+'/meta/heka.yml' %}
 {%- set grains_yaml = load_grains_file(grains_fragment_file)|load_yaml %}
 {%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=grains_yaml) %}
@@ -136,15 +135,19 @@
 {%- endif %}
 
 
-{# Loading the other services' support metadata from salt-mine #}
-
 {%- if service_name in ['remote_collector', 'aggregator'] %}
 
+{# Load the support metadata from heka/meta/heka.yml #}
+
+{%- set grains_fragment_file = 'heka/meta/heka.yml' %}
+{%- set grains_yaml = load_grains_file(grains_fragment_file)|load_yaml %}
+{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=grains_yaml) %}
+
+{# Load the other services' support metadata from salt-mine #}
+
 {%- for node_name, node_grains in salt['mine.get']('*', 'grains.items').iteritems() %}
 {%- if node_grains.heka is defined %}
-
-{%- do service_grains.update(node_grains.heka) %}
-
+{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=node_grains.heka) %}
 {%- endif %}
 {%- endfor %}
 
diff --git a/heka/files/toml/output/aggregator.toml b/heka/files/toml/output/aggregator.toml
new file mode 100644
index 0000000..1eedea7
--- /dev/null
+++ b/heka/files/toml/output/aggregator.toml
@@ -0,0 +1,7 @@
+{%- extends "heka/files/toml/output/tcp.toml" %}
+{%- block address -%}
+address = "{{ output.host }}:5565"
+{%- endblock %}
+{%- block message_matcher -%}
+message_matcher = "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
+{%- endblock %}
diff --git a/heka/files/toml/output/tcp.toml b/heka/files/toml/output/tcp.toml
index dee7fa0..c4115df 100644
--- a/heka/files/toml/output/tcp.toml
+++ b/heka/files/toml/output/tcp.toml
@@ -1,8 +1,12 @@
 [{{ output_name }}_output]
 type="TcpOutput"
+{% block address %}
 address = "{{ output.host }}:{{ output.port }}"
+{% endblock %}
 encoder = "ProtobufEncoder"
+{% block message_matcher %}
 message_matcher = "{{ output.message_matcher }}"
+{% endblock %}
 use_buffering = true
 
 [{{ output_name }}_output.buffering]
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 7fee988..a6e8568 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -108,9 +108,26 @@
       port: 4354
       ticker_interval: 30 
 aggregator:
-  decoder: {}
-  input: {}
-  filter: {}
-  output: {}
-  splitter: {}
-  encoder: {}
+  input:
+    heka_metric:
+      engine: tcp
+      address: 0.0.0.0
+      port: 5565
+      decoder: ProtobufDecoder
+      splitter: HekaFramingSplitter
+  filter:
+    influxdb_accumulator:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      preserve_data: false
+      message_matcher: "Type == 'heka.sandbox.gse_metric'"
+      ticker_interval: 1
+      config:
+        tag_fields: "deployment_id environment_label tenant_id user_id"
+        time_precision: "{{ server.influxdb_time_precision }}"
+  encoder:
+    influxdb:
+      engine: payload
+      append_newlines: false
+      prefix_ts: false