Merge pull request #40 from simonpasquier/collect-notifications
Enable collection of notifications
diff --git a/heka/files/toml/input/amqp.toml b/heka/files/toml/input/amqp.toml
index e75596e..a6c0160 100644
--- a/heka/files/toml/input/amqp.toml
+++ b/heka/files/toml/input/amqp.toml
@@ -1,28 +1,33 @@
[{{ input_name }}_input]
type = "AMQPInput"
-url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}/{{ input.vhost }}"
+url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}:{{ input.port }}/{{ input.vhost }}"
exchange = "{{ input.exchange }}"
exchange_type = "{{ input.exchange_type }}"
-{%- if input.prefetch_count is defined -%}
+{%- if input.prefetch_count is defined %}
prefetch_count = {{ input.prefetch_count }}
{%- endif %}
-{%- if input.exchange_durability is defined -%}
-exchange_durability = "{{ input.exchange_durability }}"
+{%- if input.exchange_durability is defined %}
+exchange_durability = {{ input.exchange_durability|lower }}
{%- endif %}
-{%- if input.exchange_auto_delete is defined -%}
-exchange_auto_delete = "{{ input.exchange_auto_delete }}"
+{%- if input.exchange_auto_delete is defined %}
+exchange_auto_delete = {{ input.exchange_auto_delete|lower }}
{%- endif %}
-{%- if input.queue_auto_delete is defined -%}
-queue_auto_delete = {{ input.queue_auto_delete }}
+{%- if input.queue_auto_delete is defined %}
+queue_auto_delete = {{ input.queue_auto_delete|lower }}
{%- endif %}
-{%- if input.queue is defined -%}
+{%- if input.queue is defined %}
queue = "{{ input.queue }}"
{%- endif %}
-{%- if input.routing_key is defined -%}
+{%- if input.routing_key is defined %}
routing_key = "{{ input.routing_key }}"
{%- endif %}
+{%- if input.can_exit is defined %}
+can_exit = {{ input.can_exit|lower }}
+{%- endif %}
decoder = "{{ input.decoder }}"
+{%- if input.splitter is defined %}
splitter = "{{ input.splitter }}"
+{%- endif %}
{%- if input.ssl is defined and input.ssl.get('enabled', True) %}
[{{ input_name }}_input.tls]
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index a217510..f1d906f 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -50,7 +50,7 @@
engine: elasticsearch
server: "http://{{ log_collector.elasticsearch_host }}:{{ log_collector.elasticsearch_port }}"
encoder: elasticsearch_encoder
- message_matcher: "Type == 'log' || Type == 'notification'"
+ message_matcher: "Type == 'log'"
{%- endif %}
metric_collector:
decoder:
@@ -159,6 +159,14 @@
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
config:
hostname: '{{ grains.host }}'
+{%- if remote_collector.amqp_host is defined %}
+ notification:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/decoders/notification.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ include_full_notification: false
+{%- endif %}
input:
heka_collectd:
engine: http
@@ -166,6 +174,26 @@
port: 8326
decoder: collectd_decoder
splitter: NullSplitter
+{%- if remote_collector.amqp_host is defined %}
+{%- for notification_level in ('info', 'warn', 'error') %}
+ amqp_notification_{{ notification_level }}:
+ engine: amqp
+ host: {{ remote_collector.amqp_host }}
+ port: {{ remote_collector.amqp_port }}
+ user: {{ remote_collector.amqp_user }}
+ password: {{ remote_collector.amqp_password }}
+ vhost: {{ remote_collector.get('amqp_vhost', '') }}
+ exchange: {{ remote_collector.get('amqp_exchange', 'nova') }}
+ exchange_type: topic
+ exchange_durability: false
+ exchange_auto_delete: false
+ queue_auto_delete: false
+ queue: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
+ routing_key: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
+ can_exit: false
+ decoder: notification_decoder
+{%- endfor %}
+{%- endif %}
{%- if remote_collector.influxdb_host is defined %}
filter:
influxdb_accumulator:
@@ -179,13 +207,19 @@
tag_fields: "deployment_id environment_label tenant_id user_id"
time_precision: "{{ remote_collector.influxdb_time_precision }}"
{%- endif %}
-{%- if remote_collector.influxdb_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
encoder:
+{%- if remote_collector.influxdb_host is defined %}
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
{%- endif %}
+{%- if remote_collector.elasticsearch_host is defined %}
+ elasticsearch:
+ engine: elasticsearch
+{%- endif %}
+{%- endif %}
output:
remote_collector_dashboard:
engine: dashboard
@@ -211,6 +245,13 @@
port: "{{ remote_collector.aggregator_port }}"
message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
{%- endif %}
+{%- if remote_collector.elasticsearch_host is defined %}
+ elasticsearch:
+ engine: elasticsearch
+ server: "http://{{ remote_collector.elasticsearch_host }}:{{ remote_collector.elasticsearch_port }}"
+ encoder: elasticsearch_encoder
+ message_matcher: "Type == 'notification'"
+{%- endif %}
aggregator:
policy:
# A policy defining that the cluster's status depends on the member with