Merge pull request #40 from simonpasquier/collect-notifications

Enable collection of notifications
diff --git a/heka/files/toml/input/amqp.toml b/heka/files/toml/input/amqp.toml
index e75596e..a6c0160 100644
--- a/heka/files/toml/input/amqp.toml
+++ b/heka/files/toml/input/amqp.toml
@@ -1,28 +1,33 @@
 [{{ input_name }}_input]
 type = "AMQPInput"
-url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}/{{ input.vhost }}"
+url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}:{{ input.port }}/{{ input.vhost }}"
 exchange = "{{ input.exchange }}"
 exchange_type = "{{ input.exchange_type }}"
-{%- if input.prefetch_count is defined -%}
+{%- if input.prefetch_count is defined %}
 prefetch_count = {{ input.prefetch_count }}
 {%- endif %}
-{%- if input.exchange_durability is defined -%}
-exchange_durability = "{{ input.exchange_durability }}"
+{%- if input.exchange_durability is defined %}
+exchange_durability = {{ input.exchange_durability|lower }}
 {%- endif %}
-{%- if input.exchange_auto_delete is defined -%}
-exchange_auto_delete = "{{ input.exchange_auto_delete }}"
+{%- if input.exchange_auto_delete is defined %}
+exchange_auto_delete = {{ input.exchange_auto_delete|lower }}
 {%- endif %}
-{%- if input.queue_auto_delete is defined -%}
-queue_auto_delete = {{ input.queue_auto_delete }}
+{%- if input.queue_auto_delete is defined %}
+queue_auto_delete = {{ input.queue_auto_delete|lower }}
 {%- endif %}
-{%- if input.queue is defined -%}
+{%- if input.queue is defined %}
 queue = "{{ input.queue }}"
 {%- endif %}
-{%- if input.routing_key is defined -%}
+{%- if input.routing_key is defined %}
 routing_key = "{{ input.routing_key }}"
 {%- endif %}
+{%- if input.can_exit is defined %}
+can_exit = {{ input.can_exit|lower }}
+{%- endif %}
 decoder = "{{ input.decoder }}"
+{%- if input.splitter is defined %}
 splitter = "{{ input.splitter }}"
+{%- endif %}
 
 {%- if input.ssl is defined and input.ssl.get('enabled', True) %}
 [{{ input_name }}_input.tls]
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index a217510..f1d906f 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -50,7 +50,7 @@
       engine: elasticsearch
       server: "http://{{ log_collector.elasticsearch_host }}:{{ log_collector.elasticsearch_port }}"
       encoder: elasticsearch_encoder
-      message_matcher: "Type == 'log' || Type == 'notification'"
+      message_matcher: "Type == 'log'"
 {%- endif %}
 metric_collector:
   decoder:
@@ -159,6 +159,14 @@
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
         hostname: '{{ grains.host }}'
+{%- if remote_collector.amqp_host is defined %}
+    notification:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/decoders/notification.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        include_full_notification: false
+{%- endif %}
   input:
     heka_collectd:
       engine: http
@@ -166,6 +174,26 @@
       port: 8326
       decoder: collectd_decoder
       splitter: NullSplitter
+{%- if remote_collector.amqp_host is defined %}
+{%- for notification_level in ('info', 'warn', 'error') %}
+    amqp_notification_{{ notification_level }}:
+      engine: amqp
+      host: {{ remote_collector.amqp_host }}
+      port: {{ remote_collector.amqp_port }}
+      user: {{ remote_collector.amqp_user }}
+      password: {{ remote_collector.amqp_password }}
+      vhost: {{ remote_collector.get('amqp_vhost', '') }}
+      exchange: {{ remote_collector.get('amqp_exchange', 'nova') }}
+      exchange_type: topic
+      exchange_durability: false
+      exchange_auto_delete: false
+      queue_auto_delete: false
+      queue: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
+      routing_key: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
+      can_exit: false
+      decoder: notification_decoder
+{%- endfor %}
+{%- endif %}
 {%- if remote_collector.influxdb_host is defined %}
   filter:
     influxdb_accumulator:
@@ -179,13 +207,19 @@
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ remote_collector.influxdb_time_precision }}"
 {%- endif %}
-{%- if remote_collector.influxdb_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
   encoder:
+{%- if remote_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
 {%- endif %}
+{%- if remote_collector.elasticsearch_host is defined %}
+    elasticsearch:
+      engine: elasticsearch
+{%- endif %}
+{%- endif %}
   output:
     remote_collector_dashboard:
       engine: dashboard
@@ -211,6 +245,13 @@
       port: "{{ remote_collector.aggregator_port }}"
       message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
 {%- endif %}
+{%- if remote_collector.elasticsearch_host is defined %}
+    elasticsearch:
+      engine: elasticsearch
+      server: "http://{{ remote_collector.elasticsearch_host }}:{{ remote_collector.elasticsearch_port }}"
+      encoder: elasticsearch_encoder
+      message_matcher: "Type == 'notification'"
+{%- endif %}
 aggregator:
   policy:
     # A policy defining that the cluster's status depends on the member with