Merge pull request #108 from simonpasquier/fix-afd-alarm-matcher

Fix AFD alarm messsage matcher
diff --git a/README.rst b/README.rst
index 6d463cb..2aaef14 100644
--- a/README.rst
+++ b/README.rst
@@ -155,10 +155,11 @@
         influxdb_time_precision: ms
         influxdb_username: lma
         resource_decoding: false
-        rabbit_host: 172.16.10.253
-        rabbit_port: 5672
-        rabbit_queue: metering.sample
-        rabbit_vhost: /openstack
+        amqp_exchange: ceilometer
+        amqp_host: 172.16.10.253
+        amqp_port: 5672
+        amqp_queue: metering.sample
+        amqp_vhost: /openstack
 
 Default values:
 
@@ -169,9 +170,10 @@
 * ``influxdb_time_precision: ms``
 * ``influxdb_timeout: 5000``
 * ``poolsize: 100``
-* ``rabbit_port: 5672``
-* ``rabbit_vhost: /openstack``
-* ``rabbit_queue: metering.sample``
+* ``amqp_exchange: ceilometer``
+* ``amqp_port: 5672``
+* ``amqp_queue: metering.sample``
+* ``amqp_vhost: /openstack``
 * ``resource_decoding: false``
 
 Read more
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
index 8f86efd..efbd091 100644
--- a/_modules/heka_alarming.py
+++ b/_modules/heka_alarming.py
@@ -83,6 +83,8 @@
     """
     filtered_grains = {}
     for service_name, service_data in grains.items():
+        if not service_data:
+            continue
         alarm = service_data.get('alarm')
         if alarm:
             filtered_grains[service_name] = {'alarm': alarm}
diff --git a/heka/map.jinja b/heka/map.jinja
index 2e47090..1677ffc 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -39,11 +39,6 @@
 {% set default_nagios_host_alarm_clusters = '00-clusters' %}
 {% set default_automatic_starting = True %}
 {% set default_amqp_port = 5672 %}
-{% set default_amqp_vhost = '' %}
-
-{% set default_rabbit_port = 5672 %}
-{% set default_rabbit_vhost = '/openstack' %}
-{% set default_ceilometer_rabbit_queue = 'metering.sample' %}
 
 {% set log_collector = salt['grains.filter_by']({
   'default': {
@@ -70,7 +65,7 @@
 {% set remote_collector = salt['grains.filter_by']({
   'default': {
     'amqp_port': default_amqp_port,
-    'amqp_vhost': default_amqp_vhost,
+    'amqp_vhost': '',
     'elasticsearch_port': default_elasticsearch_port,
     'influxdb_port': default_influxdb_port,
     'influxdb_time_precision': default_influxdb_time_precision,
@@ -99,9 +94,9 @@
     'influxdb_timeout': default_influxdb_timeout,
     'influxdb_time_precision': default_influxdb_time_precision,
     'elasticsearch_port': default_elasticsearch_port,
-    'rabbit_port': default_rabbit_port,
-    'rabbit_vhost': default_rabbit_vhost,
-    'rabbit_queue': default_ceilometer_rabbit_queue,
+    'amqp_port': default_amqp_port,
+    'amqp_vhost': '/openstack',
+    'amqp_queue': 'metering.sample',
     'resource_decoding': False,
     'poolsize': 100,
     'automatic_starting': default_automatic_starting,
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index d461152..d9df152 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -42,11 +42,6 @@
       host: {{ log_collector.metric_collector_host }}
       port: {{ log_collector.metric_collector_port }}
       message_matcher: "(Type == 'metric' || Type == 'heka.sandbox.metric' || Type == 'heka.sandbox.bulk_metric')"
-    log_dashboard:
-      engine: dashboard
-      host: 127.0.0.1
-      port: 4352
-      ticker_interval: 30
 {%- if log_collector.elasticsearch_host is defined %}
     elasticsearch:
       engine: elasticsearch
@@ -82,14 +77,8 @@
       port: 5567
       decoder: metric_decoder
       splitter: HekaFramingSplitter
-  filter:
-    heka_metric_collector:
-      engine: sandbox
-      module_file: /usr/share/lma_collector/filters/heka_monitoring.lua
-      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
-      preserve_data: false
-      message_matcher: "Type == 'heka.all-report'"
 {%- if metric_collector.influxdb_host is defined %}
+  filter:
     influxdb_accumulator:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -101,26 +90,24 @@
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ metric_collector.influxdb_time_precision }}"
 {%- endif %}
-{%- if metric_collector.influxdb_host is defined %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.nagios_host is defined %}
   encoder:
+  {%- if metric_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
-{%- endif %}
-{%- if metric_collector.nagios_host is defined %}
+  {%- endif %}
+  {%- if metric_collector.nagios_host is defined %}
     nagios:
       engine: sandbox
       module_file: /usr/share/lma_collector/encoders/status_nagios.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+  {%- endif %}
 {%- endif %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.aggregator_host is defined or metric_collector.nagios_host is defined %}
   output:
-    metric_dashboard:
-      engine: dashboard
-      host: 127.0.0.1
-      port: 4353
-      ticker_interval: 30
-{%- if metric_collector.influxdb_host is defined %}
+  {%- if metric_collector.influxdb_host is defined %}
     influxdb:
       engine: http
       address: "http://{{ metric_collector.influxdb_host }}:{{ metric_collector.influxdb_port }}/write?db={{ metric_collector.influxdb_database }}&precision={{ metric_collector.influxdb_time_precision }}"
@@ -131,15 +118,15 @@
       message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
       encoder: influxdb_encoder
       timeout: {{ metric_collector.influxdb_timeout }}
-{%- endif %}
-{%- if metric_collector.aggregator_host is defined %}
+  {%- endif %}
+  {%- if metric_collector.aggregator_host is defined %}
     aggregator:
       engine: tcp
       host: "{{ metric_collector.aggregator_host }}"
       port: "{{ metric_collector.aggregator_port }}"
       message_matcher: "Type == 'heka.sandbox.afd_metric'"
-{%- endif %}
-{%- if metric_collector.nagios_host is defined %}
+  {%- endif %}
+  {%- if metric_collector.nagios_host is defined %}
     nagios_alarm:
       engine: http
       address: "http://{{ metric_collector.nagios_host }}:{{metric_collector.nagios_port }}/status"
@@ -152,6 +139,7 @@
       max_buffer_size: 1048576
       max_file_size: 524288
       full_action: drop
+  {%- endif %}
 {%- endif %}
 remote_collector:
   decoder:
@@ -198,7 +186,7 @@
 {%- endif %}
 {%- if remote_collector.influxdb_host is defined or remote_collector.amqp_host is defined %}
   filter:
-{%- if remote_collector.influxdb_host is defined %}
+  {%- if remote_collector.influxdb_host is defined %}
     influxdb_accumulator:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -209,36 +197,32 @@
       config:
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ remote_collector.influxdb_time_precision }}"
-{%- endif %}
-{%- if remote_collector.amqp_host is defined %}
+  {%- endif %}
+  {%- if remote_collector.amqp_host is defined %}
     resource_creation_time:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/resource_creation_time.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       preserve_data: false
       message_matcher: "Type == 'notification' && Fields[event_type] =~ /create.end$/"
-{%- endif %}
+  {%- endif %}
 {%- endif %}
 {%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
   encoder:
-{%- if remote_collector.influxdb_host is defined %}
+  {%- if remote_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
-{%- endif %}
-{%- if remote_collector.elasticsearch_host is defined %}
+  {%- endif %}
+  {%- if remote_collector.elasticsearch_host is defined %}
     elasticsearch:
       engine: elasticsearch
+  {%- endif %}
 {%- endif %}
-{%- endif %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.aggregator_host is defined or remote_collector.elasticsearch_host is defined %}
   output:
-    remote_collector_dashboard:
-      engine: dashboard
-      host: 127.0.0.1
-      port: 4354
-      ticker_interval: 30
-{%- if remote_collector.influxdb_host is defined %}
+  {%- if remote_collector.influxdb_host is defined %}
     influxdb:
       engine: http
       address: "http://{{ remote_collector.influxdb_host }}:{{ remote_collector.influxdb_port }}/write?db={{ remote_collector.influxdb_database }}&precision={{ remote_collector.influxdb_time_precision }}"
@@ -249,25 +233,26 @@
       message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
       encoder: influxdb_encoder
       timeout: {{ remote_collector.influxdb_timeout }}
-{%- endif %}
-{%- if remote_collector.aggregator_host is defined %}
+  {%- endif %}
+  {%- if remote_collector.aggregator_host is defined %}
     aggregator:
       engine: tcp
       host: "{{ remote_collector.aggregator_host }}"
       port: "{{ remote_collector.aggregator_port }}"
       message_matcher: "Type == 'heka.sandbox.afd_metric'"
-{%- endif %}
-{%- if remote_collector.elasticsearch_host is defined %}
+  {%- endif %}
+  {%- if remote_collector.elasticsearch_host is defined %}
     elasticsearch:
       engine: elasticsearch
       server: "http://{{ remote_collector.elasticsearch_host }}:{{ remote_collector.elasticsearch_port }}"
       encoder: elasticsearch_encoder
       message_matcher: "Type == 'notification'"
+  {%- endif %}
 {%- endif %}
 aggregator:
   policy:
-    # A policy defining that the cluster's status depends on the member with
-    # the highest severity, typically used for a cluster of services.
+    # A policy that is used to derive cluster status based
+    # on the highest severity status of its members.
     highest_severity:
     - status: down
       trigger:
@@ -302,41 +287,8 @@
           relational_operator: '>'
           threshold: 0
     - status: unknown
-    # A policy which is typically used for clusters managed by Pacemaker
-    # with the no-quorum-policy set to 'stop'.
-    majority_of_members:
-    - status: down
-      trigger:
-        logical_operator: or
-        rules:
-        - function: percent
-          arguments: [ down ]
-          relational_operator: '>'
-          threshold: 50
-    - status: critical
-      trigger:
-        logical_operator: and
-        rules:
-        - function: percent
-          arguments: [ down, critical ]
-          relational_operator: '>'
-          threshold: 20
-        - function: percent
-          arguments: [ okay ]
-          relational_operator: '<'
-          threshold: 50
-          function: percent
-    - status: warning
-      trigger:
-        logical_operator: or
-        rules:
-        - function: percent
-          arguments: [ okay ]
-          relational_operator: '<'
-          threshold: 50
-          function: percent
-    - status: okay
-    # A policy which is typically used for stateless clusters
+    # A policy that is used to derive a cluster status based
+    # on the status okay or down status for its members.
     availability_of_members:
     - status: down
       trigger:
@@ -355,16 +307,60 @@
           relational_operator: '=='
           threshold: 1
         - function: count
-          arguments: [ critical, down ]
+          arguments: [ down ]
           relational_operator: '>'
+          threshold: 0
+    - status: warning
+      trigger:
+        logical_operator: and
+        rules:
+        - function: count
+          arguments: [ okay ]
+          relational_operator: '>='
+          threshold: 2
+        - function: count
+          arguments: [ down ]
+          relational_operator: '>'
+          threshold: 0
+    - status: okay
+      trigger:
+        logical_operator: or
+        rules:
+        - function: percent
+          arguments: [ okay ]
+          relational_operator: '=='
+          threshold: 100
+    - status: unknown
+    # A policy that is used to derive a cluster status based
+    # on the health status of its members.
+    status_of_members:
+    - status: down
+      trigger:
+        logical_operator: or
+        rules:
+        - function: percent
+          arguments: [ down ]
+          relational_operator: '=='
+          threshold: 100
+    - status: critical
+      trigger:
+        logical_operator: and
+        rules:
+        - function: count
+          arguments: [ okay, warning ]
+          relational_operator: '<='
           threshold: 1
+        - function: count
+          arguments: [ critical, down, unknown ]
+          relational_operator: '>'
+          threshold: 0
     - status: warning
       trigger:
         logical_operator: or
         rules:
         - function: percent
           arguments: [ okay ]
-          relational_operator: '<'
+          relational_operator: '!='
           threshold: 100
     - status: okay
       trigger:
@@ -429,14 +425,15 @@
       preserve_data: false
       message_matcher: "Type == 'heka.sandbox.gse_metric'"
 {%- endif %}
+{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined %}
   encoder:
-{%- if aggregator.influxdb_host is defined %}
+  {%- if aggregator.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
-{%- endif %}
-{%- if aggregator.nagios_host is defined %}
+  {%- endif %}
+  {%- if aggregator.nagios_host is defined %}
     nagios:
       engine: sandbox
       module_file: /usr/share/lma_collector/encoders/status_nagios.lua
@@ -446,9 +443,11 @@
         {%- if aggregator.nagios_host_dimension_key is defined %}
         nagios_host_dimension_key: "{{ aggregator.nagios_host_dimension_key }}"
         {%- endif %}
+  {%- endif %}
 {%- endif %}
+{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined %}
   output:
-{%- if aggregator.influxdb_host is defined %}
+  {%- if aggregator.influxdb_host is defined %}
     influxdb:
       engine: http
       address: "http://{{ aggregator.influxdb_host }}:{{ aggregator.influxdb_port }}/write?db={{ aggregator.influxdb_database }}&precision={{ aggregator.influxdb_time_precision }}"
@@ -459,8 +458,8 @@
       message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
       encoder: influxdb_encoder
       timeout: {{ aggregator.influxdb_timeout }}
-{%- endif %}
-{%- if aggregator.nagios_host is defined %}
+  {%- endif %}
+  {%- if aggregator.nagios_host is defined %}
     nagios_alarm_cluster:
       engine: http
       address: "http://{{ aggregator.nagios_host }}:{{aggregator.nagios_port }}/status"
@@ -473,8 +472,10 @@
       max_buffer_size: 1048576
       max_file_size: 524288
       full_action: drop
+  {%- endif %}
 {%- endif %}
 ceilometer_collector:
+{%- if ceilometer_collector.amqp_host is defined %}
   decoder:
     sample:
       engine: sandbox
@@ -484,26 +485,27 @@
         decoder: 'ceilometer'
         decode_resources: {{ ceilometer_collector.resource_decoding|lower }}
         metadata_fields: "status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state user_metadata.stack"
+{%- endif %}
+{%- if ceilometer_collector.amqp_host is defined %}
   input:
-{%- if ceilometer_collector.rabbit_host is defined %}
     openstack_sample_amqp:
       engine: amqp
-      user: {{ ceilometer_collector.rabbit_user }}
-      password: {{ ceilometer_collector.rabbit_password }}
-      port: {{ ceilometer_collector.rabbit_port }}
-      host: {{ ceilometer_collector.rabbit_host }}
-      vhost: {{ ceilometer_collector.rabbit_vhost }}
-      queue: {{ ceilometer_collector.rabbit_queue }}
-      routing_key: {{ ceilometer_collector.rabbit_queue }}
+      user: {{ ceilometer_collector.amqp_user }}
+      password: {{ ceilometer_collector.amqp_password }}
+      port: {{ ceilometer_collector.amqp_port }}
+      host: {{ ceilometer_collector.amqp_host }}
+      vhost: {{ ceilometer_collector.amqp_vhost }}
+      queue: {{ ceilometer_collector.amqp_queue }}
+      routing_key: {{ ceilometer_collector.amqp_queue }}
       decoder: sample_decoder
       splitter: NullSplitter
-      exchange: "ceilometer"
-      exchange_type: "topic"
+      exchange: {{ ceilometer_collector.get('amqp_exchange', 'ceilometer') }}
+      exchange_type: topic
       exchange_auto_delete: false
       queue_auto_delete: false
 {%- endif %}
-  filter:
 {%- if ceilometer_collector.influxdb_host is defined %}
+  filter:
     ceilometer_influxdb_accumulator:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -517,14 +519,15 @@
         flush_count: 500
         bulk_metric_type_matcher: 'ceilometer_samples'
 {%- endif %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
   encoder:
-{%- if ceilometer_collector.influxdb_host is defined %}
+  {%- if ceilometer_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
-{%- endif %}
-{%- if ceilometer_collector.elasticsearch_host is defined %}
+  {%- endif %}
+  {%- if ceilometer_collector.elasticsearch_host is defined %}
     elasticsearch_resource:
       engine: sandbox
       module_file:  /usr/share/lma_collector/encoders/es_ceilometer_resources.lua
@@ -533,6 +536,7 @@
         index: "ceilometer_resource"
         type_name: "source"
         encoder: "elasticsearch_resources"
+  {%- endif %}
 {%- endif %}
 {%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
   output: