Configure outputs in support metadata
diff --git a/heka/files/toml/output/aggregator.toml b/heka/files/toml/output/aggregator.toml
deleted file mode 100644
index 1eedea7..0000000
--- a/heka/files/toml/output/aggregator.toml
+++ /dev/null
@@ -1,7 +0,0 @@
-{%- extends "heka/files/toml/output/tcp.toml" %}
-{%- block address -%}
-address = "{{ output.host }}:5565"
-{%- endblock %}
-{%- block message_matcher -%}
-message_matcher = "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
-{%- endblock %}
diff --git a/heka/files/toml/output/elasticsearch.toml b/heka/files/toml/output/elasticsearch.toml
index 0e8d03b..8240f5f 100644
--- a/heka/files/toml/output/elasticsearch.toml
+++ b/heka/files/toml/output/elasticsearch.toml
@@ -1,8 +1,8 @@
[{{ output_name }}_output]
type = "ElasticSearchOutput"
-message_matcher = "Type == 'log' || Type == 'notification'"
-encoder = "elasticsearch_encoder"
-server = "http://{{ output.host }}:{{ output.port }}"
+message_matcher = "{{ output.message_matcher }}"
+encoder = "{{ output.encoder }}"
+server = "{{ output.server }}"
flush_interval = 5000
flush_count = 100
use_buffering = true
diff --git a/heka/files/toml/output/http.toml b/heka/files/toml/output/http.toml
new file mode 100644
index 0000000..1ec3672
--- /dev/null
+++ b/heka/files/toml/output/http.toml
@@ -0,0 +1,20 @@
+[{{ output_name }}_output]
+type = "HttpOutput"
+message_matcher = "{{ output.message_matcher }}"
+encoder = "{{ output.encoder }}"
+address = "{{ output.address }}"
+{%- if output.username and output.password %}
+username = "{{ output.username }}"
+password = "{{ output.password }}"
+{%- endif %}
+http_timeout = {{ output.timeout }}
+method = "POST"
+use_buffering = true
+
+[{{ output_name }}_output.buffering]
+max_buffer_size = 1610612736
+max_file_size = 134217728
+full_action = "drop"
+
+[{{ output_name }}_output.headers]
+Content-Type = ["application/x-www-form-urlencoded"]
diff --git a/heka/files/toml/output/influxdb.toml b/heka/files/toml/output/influxdb.toml
deleted file mode 100644
index 3c1f6b9..0000000
--- a/heka/files/toml/output/influxdb.toml
+++ /dev/null
@@ -1,22 +0,0 @@
-{%- from "heka/map.jinja" import server with context %}
-
-[{{ output_name }}_output]
-type = "HttpOutput"
-message_matcher = "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
-encoder = "influxdb_encoder"
-address = "http://{{ output.host }}:{{ output.port }}/write?db={{ output.database }}&precision={{ server.influxdb_time_precision }}"
-{%- if output.username and output.password %}
-username = "{{ output.username }}"
-password = "{{ output.password }}"
-{%- endif %}
-http_timeout = 5000
-method = "POST"
-use_buffering = true
-
-[influxdb_output.buffering]
-max_buffer_size = 1610612736
-max_file_size = 134217728
-full_action = "drop"
-
-[influxdb_output.headers]
-Content-Type = ["application/x-www-form-urlencoded"]
diff --git a/heka/files/toml/output/tcp.toml b/heka/files/toml/output/tcp.toml
index c4115df..dee7fa0 100644
--- a/heka/files/toml/output/tcp.toml
+++ b/heka/files/toml/output/tcp.toml
@@ -1,12 +1,8 @@
[{{ output_name }}_output]
type="TcpOutput"
-{% block address %}
address = "{{ output.host }}:{{ output.port }}"
-{% endblock %}
encoder = "ProtobufEncoder"
-{% block message_matcher %}
message_matcher = "{{ output.message_matcher }}"
-{% endblock %}
use_buffering = true
[{{ output_name }}_output.buffering]
diff --git a/heka/map.jinja b/heka/map.jinja
index f7e9d6e..c7de276 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -16,7 +16,6 @@
decoder: {}
extra_fields:
environment_label: {{ grains.domain }}
- influxdb_time_precision: ms
{%- if pillar.get('linux', {}).get('system', {}).timezone is defined %}
timezone: "{{ pillar.linux.system.timezone }}"
{%- endif %}
@@ -29,5 +28,30 @@
- acl
- heka
{%- endload %}
-
{%- set server = salt['grains.filter_by'](server_defaults, merge=salt['pillar.get']('heka:server')) %}
+
+{%- load_yaml as elasticsearch_defaults %}
+default:
+ elasticsearch_port: 9200
+{%- endload %}
+{% set log_collector = salt['grains.filter_by'](elasticsearch_defaults, merge=salt['pillar.get']('heka:log_collector')) %}
+
+{%- load_yaml as influxdb_defaults %}
+default:
+ influxdb_port: 8086
+ influxdb_time_precision: ms
+ influxdb_timeout: 5000
+{%- endload %}
+
+{%- load_yaml as metric_collector_defaults %}
+default:
+ aggregator_port: 5565
+{%- endload %}
+{% set metric_collector = salt['grains.filter_by'](
+ metric_collector_defaults,
+ merge=salt['grains.filter_by'](
+ influxdb_defaults,
+ merge=salt['pillar.get']('heka:metric_collector'))) %}
+
+{% set remote_collector = salt['grains.filter_by'](influxdb_defaults, merge=salt['pillar.get']('heka:remote_collector')) %}
+{% set aggregator = salt['grains.filter_by'](influxdb_defaults, merge=salt['pillar.get']('heka:aggregator')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 51d9ff1..08b99ea 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -1,4 +1,7 @@
-{%- from "heka/map.jinja" import server with context %}
+{%- from "heka/map.jinja" import log_collector with context %}
+{%- from "heka/map.jinja" import metric_collector with context %}
+{%- from "heka/map.jinja" import remote_collector with context %}
+{%- from "heka/map.jinja" import aggregator with context %}
log_collector:
filter:
@@ -26,9 +29,11 @@
hostname: '{{ grains.host }}'
interval: 60
grace_interval: 30
+{%- if log_collector.elasticsearch_host is defined %}
encoder:
elasticsearch:
engine: elasticsearch
+{%- endif %}
output:
metric_collector:
engine: tcp
@@ -40,6 +45,13 @@
host: 127.0.0.1
port: 4352
ticker_interval: 30
+{%- if log_collector.elasticsearch_host is defined %}
+ elasticsearch:
+ engine: elasticsearch
+ server: "http://{{ log_collector.elasticsearch_host }}:{{ log_collector.elasticsearch_port }}"
+ encoder: elasticsearch_encoder
+ message_matcher: "Type == 'log' || Type == 'notification'"
+{%- endif %}
metric_collector:
decoder:
collectd:
@@ -75,6 +87,7 @@
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
preserve_data: false
message_matcher: "Type == 'heka.all-report'"
+{%- if metric_collector.influxdb_host is defined %}
influxdb_accumulator:
engine: sandbox
module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -84,18 +97,40 @@
ticker_interval: 1
config:
tag_fields: "deployment_id environment_label tenant_id user_id"
- time_precision: "{{ server.influxdb_time_precision }}"
+ time_precision: "{{ metric_collector.influxdb_time_precision }}"
+{%- endif %}
+{%- if metric_collector.influxdb_host is defined %}
encoder:
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
+{%- endif %}
output:
metric_dashboard:
engine: dashboard
host: 127.0.0.1
port: 4353
ticker_interval: 30
+{%- if metric_collector.influxdb_host is defined %}
+ influxdb:
+ engine: http
+ address: "http://{{ metric_collector.influxdb_host }}:{{ metric_collector.influxdb_port }}/write?db={{ metric_collector.influxdb_database }}&precision={{ metric_collector.influxdb_time_precision }}"
+ {%- if metric_collector.influxdb_username and metric_collector.influxdb_password %}
+ username: "{{ metric_collector.influxdb_username }}"
+ password: "{{ metric_collector.influxdb_password }}"
+ {%- endif %}
+ message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
+ encoder: influxdb_encoder
+ timeout: {{ metric_collector.influxdb_timeout }}
+{%- endif %}
+{%- if metric_collector.aggregator_host is defined %}
+ aggregator:
+ engine: tcp
+ host: "{{ metric_collector.aggregator_host }}"
+ port: "{{ metric_collector.aggregator_port }}"
+ message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
+{%- endif %}
remote_collector:
decoder:
collectd:
@@ -111,6 +146,7 @@
port: 8326
decoder: collectd_decoder
splitter: NullSplitter
+{%- if remote_collector.influxdb_host is defined %}
filter:
influxdb_accumulator:
engine: sandbox
@@ -121,18 +157,33 @@
ticker_interval: 1
config:
tag_fields: "deployment_id environment_label tenant_id user_id"
- time_precision: "{{ server.influxdb_time_precision }}"
+ time_precision: "{{ remote_collector.influxdb_time_precision }}"
+{%- endif %}
+{%- if remote_collector.influxdb_host is defined %}
encoder:
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
+{%- endif %}
output:
remote_collector_dashboard:
engine: dashboard
host: 127.0.0.1
port: 4354
ticker_interval: 30
+{%- if remote_collector.influxdb_host is defined %}
+ influxdb:
+ engine: http
+ address: "http://{{ remote_collector.influxdb_host }}:{{ remote_collector.influxdb_port }}/write?db={{ remote_collector.influxdb_database }}&precision={{ remote_collector.influxdb_time_precision }}"
+ {%- if remote_collector.influxdb_username and remote_collector.influxdb_password %}
+ username: "{{ remote_collector.influxdb_username }}"
+ password: "{{ remote_collector.influxdb_password }}"
+ {%- endif %}
+ message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
+ encoder: influxdb_encoder
+ timeout: {{ remote_collector.influxdb_timeout }}
+{%- endif %}
aggregator:
policy:
# A policy defining that the cluster's status depends on the member with
@@ -251,6 +302,7 @@
port: 5565
decoder: ProtobufDecoder
splitter: HekaFramingSplitter
+{%- if aggregator.influxdb_host is defined %}
filter:
influxdb_accumulator:
engine: sandbox
@@ -261,9 +313,25 @@
ticker_interval: 1
config:
tag_fields: "deployment_id environment_label tenant_id user_id"
- time_precision: "{{ server.influxdb_time_precision }}"
+ time_precision: "{{ aggregator.influxdb_time_precision }}"
+{%- endif %}
+{%- if aggregator.influxdb_host is defined %}
encoder:
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
+{%- endif %}
+{%- if aggregator.influxdb_host is defined %}
+ output:
+ influxdb:
+ engine: http
+ address: "http://{{ aggregator.influxdb_host }}:{{ aggregator.influxdb_port }}/write?db={{ aggregator.influxdb_database }}&precision={{ aggregator.influxdb_time_precision }}"
+ {%- if aggregator.influxdb_username and aggregator.influxdb_password %}
+ username: "{{ aggregator.influxdb_username }}"
+ password: "{{ aggregator.influxdb_password }}"
+ {%- endif %}
+ message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
+ encoder: influxdb_encoder
+ timeout: {{ aggregator.influxdb_timeout }}
+{%- endif %}