Merge pull request #27 from elemoine/stacklight-aggregator
Fix and improve the aggregator
diff --git a/heka/_service.sls b/heka/_service.sls
index 63516c6..e873f2e 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -1,3 +1,4 @@
+{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file ignore missing %}{% endmacro %}
{%- set server = salt['pillar.get']('heka:'+service_name) %}
@@ -124,8 +125,6 @@
{%- for service_name, service in pillar.iteritems() %}
{%- if service.get('_support', {}).get('heka', {}).get('enabled', False) %}
-{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file ignore missing %}{% endmacro %}
-
{%- set grains_fragment_file = service_name+'/meta/heka.yml' %}
{%- set grains_yaml = load_grains_file(grains_fragment_file)|load_yaml %}
{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=grains_yaml) %}
@@ -136,15 +135,19 @@
{%- endif %}
-{# Loading the other services' support metadata from salt-mine #}
-
{%- if service_name in ['remote_collector', 'aggregator'] %}
+{# Load the support metadata from heka/meta/heka.yml #}
+
+{%- set grains_fragment_file = 'heka/meta/heka.yml' %}
+{%- set grains_yaml = load_grains_file(grains_fragment_file)|load_yaml %}
+{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=grains_yaml) %}
+
+{# Load the other services' support metadata from salt-mine #}
+
{%- for node_name, node_grains in salt['mine.get']('*', 'grains.items').iteritems() %}
{%- if node_grains.heka is defined %}
-
-{%- do service_grains.update(node_grains.heka) %}
-
+{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=node_grains.heka) %}
{%- endif %}
{%- endfor %}
diff --git a/heka/files/toml/output/aggregator.toml b/heka/files/toml/output/aggregator.toml
new file mode 100644
index 0000000..1eedea7
--- /dev/null
+++ b/heka/files/toml/output/aggregator.toml
@@ -0,0 +1,7 @@
+{%- extends "heka/files/toml/output/tcp.toml" %}
+{%- block address -%}
+address = "{{ output.host }}:5565"
+{%- endblock %}
+{%- block message_matcher -%}
+message_matcher = "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
+{%- endblock %}
diff --git a/heka/files/toml/output/tcp.toml b/heka/files/toml/output/tcp.toml
index dee7fa0..c4115df 100644
--- a/heka/files/toml/output/tcp.toml
+++ b/heka/files/toml/output/tcp.toml
@@ -1,8 +1,12 @@
[{{ output_name }}_output]
type="TcpOutput"
+{% block address %}
address = "{{ output.host }}:{{ output.port }}"
+{% endblock %}
encoder = "ProtobufEncoder"
+{% block message_matcher %}
message_matcher = "{{ output.message_matcher }}"
+{% endblock %}
use_buffering = true
[{{ output_name }}_output.buffering]
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 7fee988..a6e8568 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -108,9 +108,26 @@
port: 4354
ticker_interval: 30
aggregator:
- decoder: {}
- input: {}
- filter: {}
- output: {}
- splitter: {}
- encoder: {}
+ input:
+ heka_metric:
+ engine: tcp
+ address: 0.0.0.0
+ port: 5565
+ decoder: ProtobufDecoder
+ splitter: HekaFramingSplitter
+ filter:
+ influxdb_accumulator:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ preserve_data: false
+ message_matcher: "Type == 'heka.sandbox.gse_metric'"
+ ticker_interval: 1
+ config:
+ tag_fields: "deployment_id environment_label tenant_id user_id"
+ time_precision: "{{ server.influxdb_time_precision }}"
+ encoder:
+ influxdb:
+ engine: payload
+ append_newlines: false
+ prefix_ts: false