Merge pull request #109 from elemoine/elasticsearch-drop

Use full action "drop" for logs
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
index 8f86efd..efbd091 100644
--- a/_modules/heka_alarming.py
+++ b/_modules/heka_alarming.py
@@ -83,6 +83,8 @@
     """
     filtered_grains = {}
     for service_name, service_data in grains.items():
+        if not service_data:
+            continue
         alarm = service_data.get('alarm')
         if alarm:
             filtered_grains[service_name] = {'alarm': alarm}
diff --git a/heka/files/lua/encoders/status_nagios.lua b/heka/files/lua/encoders/status_nagios.lua
index 9957f70..bf3b9aa 100644
--- a/heka/files/lua/encoders/status_nagios.lua
+++ b/heka/files/lua/encoders/status_nagios.lua
@@ -19,9 +19,17 @@
 local lma = require 'lma_utils'
 local interp = require "msg_interpolate"
 
+local host_suffix_dimension_field
+if read_config('host_suffix_dimension_key') then
+    host_suffix_dimension_field = string.format('Fields[%s]', read_config('host_suffix_dimension_key'))
+end
+
 -- These 2 configurations are used only to encode GSE messages
 local default_host = read_config('default_nagios_host')
-local host_dimension_key = read_config('nagios_host_dimension_key')
+local host_dimension_field
+if read_config('nagios_host_dimension_key') then
+    host_dimension_field = string.format('Fields[%s]', read_config('nagios_host_dimension_key'))
+end
 
 -- Nagios CGI cannot accept 'plugin_output' parameter greater than 1024 bytes
 -- See bug #1517917 for details.
@@ -64,12 +72,21 @@
         return -1
     end
 
-    if host_dimension_key then
-        data['host'] = read_message(string.format('Fields[%s]', host_dimension_key)) or default_host
+    local host
+    if host_dimension_field then
+        host = read_message(host_dimension_field) or default_host
     else
-        data['host'] = read_message('Fields[hostname]') or read_message('Hostname')
+        host = read_message('Fields[hostname]') or read_message('Hostname')
     end
 
+    if host_suffix_dimension_field then
+        local suffix = read_message(host_suffix_dimension_field)
+        if suffix then
+            host = host .. '.' .. suffix
+        end
+    end
+    data['host'] = host
+
     data['service'] = service_name
     data['plugin_state'] = nagios_state_map[status]
 
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
index 5fefb2d..9da7248 100644
--- a/heka/files/toml/filter/afd_alarm.toml
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -2,7 +2,7 @@
 type = "SandboxFilter"
 filename = "/usr/share/lma_collector/filters/afd.lua"
 preserve_data = {{ alarm.preserve_data|default(False)|lower }}
-message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric') && ({{ salt['heka_alarming.alarm_message_matcher'](alarm, trigger) }})"
+message_matcher = "Type =~ /metric$/ && ({{ salt['heka_alarming.alarm_message_matcher'](alarm, trigger) }})"
 module_directory = "/usr/share/lma_collector/common;/usr/share/heka/lua_modules"
 ticker_interval = 10
 
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 4a92245..293f0ea 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -77,8 +77,8 @@
       port: 5567
       decoder: metric_decoder
       splitter: HekaFramingSplitter
-  filter:
 {%- if metric_collector.influxdb_host is defined %}
+  filter:
     influxdb_accumulator:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -90,21 +90,26 @@
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ metric_collector.influxdb_time_precision }}"
 {%- endif %}
-{%- if metric_collector.influxdb_host is defined %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.nagios_host is defined %}
   encoder:
+  {%- if metric_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
-{%- endif %}
-{%- if metric_collector.nagios_host is defined %}
+  {%- endif %}
+  {%- if metric_collector.nagios_host is defined %}
     nagios:
       engine: sandbox
       module_file: /usr/share/lma_collector/encoders/status_nagios.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        host_suffix_dimension_key: environment_label
+   {%- endif %}
 {%- endif %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.aggregator_host is defined or metric_collector.nagios_host is defined %}
   output:
-{%- if metric_collector.influxdb_host is defined %}
+  {%- if metric_collector.influxdb_host is defined %}
     influxdb:
       engine: http
       address: "http://{{ metric_collector.influxdb_host }}:{{ metric_collector.influxdb_port }}/write?db={{ metric_collector.influxdb_database }}&precision={{ metric_collector.influxdb_time_precision }}"
@@ -115,15 +120,15 @@
       message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
       encoder: influxdb_encoder
       timeout: {{ metric_collector.influxdb_timeout }}
-{%- endif %}
-{%- if metric_collector.aggregator_host is defined %}
+  {%- endif %}
+  {%- if metric_collector.aggregator_host is defined %}
     aggregator:
       engine: tcp
       host: "{{ metric_collector.aggregator_host }}"
       port: "{{ metric_collector.aggregator_port }}"
       message_matcher: "Type == 'heka.sandbox.afd_metric'"
-{%- endif %}
-{%- if metric_collector.nagios_host is defined %}
+  {%- endif %}
+  {%- if metric_collector.nagios_host is defined %}
     nagios_alarm:
       engine: http
       address: "http://{{ metric_collector.nagios_host }}:{{metric_collector.nagios_port }}/status"
@@ -136,6 +141,7 @@
       max_buffer_size: 1048576
       max_file_size: 524288
       full_action: drop
+  {%- endif %}
 {%- endif %}
 remote_collector:
   decoder:
@@ -182,7 +188,7 @@
 {%- endif %}
 {%- if remote_collector.influxdb_host is defined or remote_collector.amqp_host is defined %}
   filter:
-{%- if remote_collector.influxdb_host is defined %}
+  {%- if remote_collector.influxdb_host is defined %}
     influxdb_accumulator:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -193,31 +199,32 @@
       config:
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ remote_collector.influxdb_time_precision }}"
-{%- endif %}
-{%- if remote_collector.amqp_host is defined %}
+  {%- endif %}
+  {%- if remote_collector.amqp_host is defined %}
     resource_creation_time:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/resource_creation_time.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       preserve_data: false
       message_matcher: "Type == 'notification' && Fields[event_type] =~ /create.end$/"
-{%- endif %}
+  {%- endif %}
 {%- endif %}
 {%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
   encoder:
-{%- if remote_collector.influxdb_host is defined %}
+  {%- if remote_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
-{%- endif %}
-{%- if remote_collector.elasticsearch_host is defined %}
+  {%- endif %}
+  {%- if remote_collector.elasticsearch_host is defined %}
     elasticsearch:
       engine: elasticsearch
+  {%- endif %}
 {%- endif %}
-{%- endif %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.aggregator_host is defined or remote_collector.elasticsearch_host is defined %}
   output:
-{%- if remote_collector.influxdb_host is defined %}
+  {%- if remote_collector.influxdb_host is defined %}
     influxdb:
       engine: http
       address: "http://{{ remote_collector.influxdb_host }}:{{ remote_collector.influxdb_port }}/write?db={{ remote_collector.influxdb_database }}&precision={{ remote_collector.influxdb_time_precision }}"
@@ -228,20 +235,21 @@
       message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
       encoder: influxdb_encoder
       timeout: {{ remote_collector.influxdb_timeout }}
-{%- endif %}
-{%- if remote_collector.aggregator_host is defined %}
+  {%- endif %}
+  {%- if remote_collector.aggregator_host is defined %}
     aggregator:
       engine: tcp
       host: "{{ remote_collector.aggregator_host }}"
       port: "{{ remote_collector.aggregator_port }}"
       message_matcher: "Type == 'heka.sandbox.afd_metric'"
-{%- endif %}
-{%- if remote_collector.elasticsearch_host is defined %}
+  {%- endif %}
+  {%- if remote_collector.elasticsearch_host is defined %}
     elasticsearch:
       engine: elasticsearch
       server: "http://{{ remote_collector.elasticsearch_host }}:{{ remote_collector.elasticsearch_port }}"
       encoder: elasticsearch_encoder
       message_matcher: "Type == 'notification'"
+  {%- endif %}
 {%- endif %}
 aggregator:
   policy:
@@ -338,12 +346,16 @@
           threshold: 100
     - status: critical
       trigger:
-        logical_operator: or
+        logical_operator: and
         rules:
-        - function: percent
+        - function: count
+          arguments: [ okay, warning ]
+          relational_operator: '<='
+          threshold: 1
+        - function: count
           arguments: [ critical, down, unknown ]
-          relational_operator: '>='
-          threshold: 50
+          relational_operator: '>'
+          threshold: 0
     - status: warning
       trigger:
         logical_operator: or
@@ -387,8 +399,8 @@
           arguments: [ down, critical, warning, unknown ]
           relational_operator: '>'
           threshold: 0
-          function: percent      
-    - status: okay  
+          function: percent
+    - status: okay
   input:
     heka_metric:
       engine: tcp
@@ -415,26 +427,30 @@
       preserve_data: false
       message_matcher: "Type == 'heka.sandbox.gse_metric'"
 {%- endif %}
+{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined %}
   encoder:
-{%- if aggregator.influxdb_host is defined %}
+  {%- if aggregator.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
-{%- endif %}
-{%- if aggregator.nagios_host is defined %}
+  {%- endif %}
+  {%- if aggregator.nagios_host is defined %}
     nagios:
       engine: sandbox
       module_file: /usr/share/lma_collector/encoders/status_nagios.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
+        host_suffix_dimension_key: environment_label
         default_nagios_host: "{{ aggregator.nagios_default_host_alarm_clusters }}"
         {%- if aggregator.nagios_host_dimension_key is defined %}
         nagios_host_dimension_key: "{{ aggregator.nagios_host_dimension_key }}"
         {%- endif %}
+  {%- endif %}
 {%- endif %}
+{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined %}
   output:
-{%- if aggregator.influxdb_host is defined %}
+  {%- if aggregator.influxdb_host is defined %}
     influxdb:
       engine: http
       address: "http://{{ aggregator.influxdb_host }}:{{ aggregator.influxdb_port }}/write?db={{ aggregator.influxdb_database }}&precision={{ aggregator.influxdb_time_precision }}"
@@ -445,8 +461,8 @@
       message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
       encoder: influxdb_encoder
       timeout: {{ aggregator.influxdb_timeout }}
-{%- endif %}
-{%- if aggregator.nagios_host is defined %}
+  {%- endif %}
+  {%- if aggregator.nagios_host is defined %}
     nagios_alarm_cluster:
       engine: http
       address: "http://{{ aggregator.nagios_host }}:{{aggregator.nagios_port }}/status"
@@ -459,10 +475,11 @@
       max_buffer_size: 1048576
       max_file_size: 524288
       full_action: drop
+  {%- endif %}
 {%- endif %}
 ceilometer_collector:
-  decoder:
 {%- if ceilometer_collector.amqp_host is defined %}
+  decoder:
     sample:
       engine: sandbox
       module_file: /usr/share/lma_collector/decoders/metering.lua
@@ -472,8 +489,8 @@
         decode_resources: {{ ceilometer_collector.resource_decoding|lower }}
         metadata_fields: "status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state user_metadata.stack"
 {%- endif %}
-  input:
 {%- if ceilometer_collector.amqp_host is defined %}
+  input:
     openstack_sample_amqp:
       engine: amqp
       user: {{ ceilometer_collector.amqp_user }}
@@ -490,8 +507,8 @@
       exchange_auto_delete: false
       queue_auto_delete: false
 {%- endif %}
-  filter:
 {%- if ceilometer_collector.influxdb_host is defined %}
+  filter:
     ceilometer_influxdb_accumulator:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -505,14 +522,15 @@
         flush_count: 500
         bulk_metric_type_matcher: 'ceilometer_samples'
 {%- endif %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
   encoder:
-{%- if ceilometer_collector.influxdb_host is defined %}
+  {%- if ceilometer_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
-{%- endif %}
-{%- if ceilometer_collector.elasticsearch_host is defined %}
+  {%- endif %}
+  {%- if ceilometer_collector.elasticsearch_host is defined %}
     elasticsearch_resource:
       engine: sandbox
       module_file:  /usr/share/lma_collector/encoders/es_ceilometer_resources.lua
@@ -521,6 +539,7 @@
         index: "ceilometer_resource"
         type_name: "source"
         encoder: "elasticsearch_resources"
+  {%- endif %}
 {%- endif %}
 {%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
   output:
diff --git a/heka/server.sls b/heka/server.sls
index c2de1a0..888f04c 100644
--- a/heka/server.sls
+++ b/heka/server.sls
@@ -1,5 +1,5 @@
 {%- from "heka/map.jinja" import server with context %}
-{%- if server.enabled %}
+{%- if server.enabled is defined and server.enabled %}
 
 heka_packages:
   pkg.latest:
@@ -198,4 +198,4 @@
 {%- endif %}
 {%- endfor %}
 
-{%- endif %}
\ No newline at end of file
+{%- endif %}