Merge pull request #45 from SwannCroiset/alerting-property

Redefine alerting property
diff --git a/heka/_service.sls b/heka/_service.sls
index c1c85ed..8a09ea7 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -78,8 +78,6 @@
     'decoder': {},
     'input': {},
     'trigger': {},
-    'alarm': {},
-    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -90,7 +88,6 @@
     'input': {},
     'trigger': {},
     'alarm': {},
-    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -101,7 +98,6 @@
     'input': {},
     'trigger': {},
     'alarm': {},
-    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -111,7 +107,6 @@
     'decoder': {},
     'input': {},
     'trigger': {},
-    'alarm': {},
     'alarm_cluster': {},
     'filter': {},
     'splitter': {},
diff --git a/heka/files/gse_policies.lua b/heka/files/gse_policies.lua
index 8ec9c06..8de8175 100644
--- a/heka/files/gse_policies.lua
+++ b/heka/files/gse_policies.lua
@@ -29,15 +29,17 @@
                 logical_operator = '{{ _trigger["logical_operator"] }}',
                 rules = {
             {%- for _rule in _trigger["rules"] %}
-                    ['function'] = '{{ _rule["function"] }}',
+                    {
+                        ['function'] = '{{ _rule["function"] }}',
                 {%- set comma = joiner(",") %}
-                    ['arguments'] = {
+                        ['arguments'] = {
                 {%- for _argument in _rule["arguments"]|sort -%}
-                {{ comma() }}'{{ _argument }}'
+                    {{ comma() }}'{{ _argument }}'
                 {%- endfor -%}
+                        },
+                        ['relational_operator'] = '{{ _rule["relational_operator"] }}',
+                        ['threshold'] = {{ _rule["threshold"] }},
                     },
-                    ['relational_operator'] = '{{ _rule["relational_operator"] }}',
-                    ['threshold'] = {{ _rule["threshold"] }},
             {%- endfor %}
                 },
             },
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index f94a0e8..0befcf0 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -216,6 +216,14 @@
                 else
                     msg['Fields']['name'] = metric_name
                 end
+            elseif metric_source == 'ntpd' then
+                if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
+                    msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
+                else
+                    msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. 'peer'
+                    msg['Fields']['server'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'server')
+                end
             elseif metric_source == 'check_openstack_api' then
                 -- For OpenStack API metrics, plugin_instance = <service name>
                 msg['Fields']['name'] = 'openstack_check_api'
@@ -420,7 +428,31 @@
                 msg['Fields']['service'] = sample['type_instance']
                 table.insert(msg['Fields']['tag_fields'], 'service')
             else
-                msg['Fields']['name'] = replace_dot_by_sep(metric_name)
+                -- generic metric name translation for 3rd-party sources
+                msg['Fields']['name'] = sample['plugin']
+                if sample['plugin_instance'] ~= "" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
+                end
+                if sample['type'] ~= 'gauge' and sample['type'] ~= 'derive' and
+                   sample['type'] ~= 'counter' and sample['type'] ~= 'absolute' then
+                   -- only for custom DS types
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
+                end
+                if sample['type_instance'] ~= "" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
+                end
+                if sample['dsnames'][i] ~= "value" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
+                end
+                msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
+
+                -- add meta fields as tag_fields
+                for k, v in pairs(sample['meta'] or {}) do
+                    if tostring(k) ~= '0' then
+                        msg['Fields'][k] = v
+                        table.insert(msg['Fields']['tag_fields'], k)
+                   end
+                end
             end
 
             if not skip_it then
diff --git a/heka/files/lua/encoders/status_nagios.lua b/heka/files/lua/encoders/status_nagios.lua
index ad4c540..0507a10 100644
--- a/heka/files/lua/encoders/status_nagios.lua
+++ b/heka/files/lua/encoders/status_nagios.lua
@@ -20,7 +20,6 @@
 local interp = require "msg_interpolate"
 
 local host = read_config('nagios_host')
-local service_template = read_config('service_template') or error('service_template is required!')
 -- Nagios CGI cannot accept 'plugin_output' parameter greater than 1024 bytes
 -- See bug #1517917 for details.
 -- With the 'cmd.cgi' re-implementation for the command PROCESS_SERVICE_CHECK_RESULT,
@@ -29,7 +28,6 @@
 local data = {
    cmd_typ = '30',
    cmd_mod = '2',
-   host    = host,
    service = nil,
    plugin_state = nil,
    plugin_output = nil,
@@ -55,7 +53,7 @@
 end
 
 function process_message()
-    local service_name = interp.interpolate_from_msg(service_template)
+    local service_name = read_message('Fields[member]')
     local status = afd.get_status()
     local alarms = afd.alarms_for_human(afd.extract_alarms())
 
@@ -63,6 +61,11 @@
         return -1
     end
 
+    if host then
+        data['host'] = host
+    else
+        data['host'] = read_message('Fields[hostname]') or read_message('Hostname')
+    end
     data['service'] = service_name
     data['plugin_state'] = nagios_state_map[status]
 
diff --git a/heka/files/toml/encoder/sandbox.toml b/heka/files/toml/encoder/sandbox.toml
new file mode 100644
index 0000000..67c95a1
--- /dev/null
+++ b/heka/files/toml/encoder/sandbox.toml
@@ -0,0 +1,13 @@
+[{{ encoder_name }}_encoder]
+type = "SandboxEncoder"
+filename = "{{ encoder.module_file }}"
+{%- if encoder.module_dir is defined %}
+module_directory = "{{ encoder.module_dir }}"
+{%- endif %}
+
+{%- if encoder.config is mapping %}
+[{{ encoder_name }}_encoder.config]
+{%- for config_param, config_value in encoder.config.iteritems() %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% elif config_value in [True, False] %}{{ config_value|lower }}{% else %}{{ config_value }}{% endif %}
+{%- endfor %}
+{%- endif %}
diff --git a/heka/files/toml/output/http.toml b/heka/files/toml/output/http.toml
index 1ec3672..fce5d97 100644
--- a/heka/files/toml/output/http.toml
+++ b/heka/files/toml/output/http.toml
@@ -7,14 +7,16 @@
 username = "{{ output.username }}"
 password = "{{ output.password }}"
 {%- endif %}
-http_timeout = {{ output.timeout }}
-method = "POST"
+http_timeout = {{ output.http_timeout|default(2000) }}
+method = "{{ output.method|default("POST") }}"
+{% if output.get('use_buffering', True) %}
 use_buffering = true
 
 [{{ output_name }}_output.buffering]
-max_buffer_size = 1610612736
-max_file_size = 134217728
-full_action = "drop"
+max_buffer_size = {{ output.max_buffer_size|default(268435456) }}
+max_file_size = {{ output.max_file_size|default(67108864) }}
+full_action = "{{ output.full_action|default("drop") }}"
+{% endif %}
 
 [{{ output_name }}_output.headers]
 Content-Type = ["application/x-www-form-urlencoded"]
diff --git a/heka/map.jinja b/heka/map.jinja
index c7de276..00291d0 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -30,28 +30,45 @@
 {%- endload %}
 {%- set server = salt['grains.filter_by'](server_defaults, merge=salt['pillar.get']('heka:server')) %}
 
-{%- load_yaml as elasticsearch_defaults %}
-default:
-  elasticsearch_port: 9200
-{%- endload %}
-{% set log_collector = salt['grains.filter_by'](elasticsearch_defaults, merge=salt['pillar.get']('heka:log_collector')) %}
+{% set default_elasticsearch_port = 9200 %}
+{% set default_influxdb_port = 8086 %}
+{% set default_influxdb_time_precision = 'ms' %}
+{% set default_influxdb_timeout = 5000 %}
+{% set default_aggregator_port = 5565 %}
+{% set default_nagios_port = 8001 %}
+{% set default_nagios_host_alarm_clusters = '00-clusters' %}
 
-{%- load_yaml as influxdb_defaults %}
-default:
-  influxdb_port: 8086
-  influxdb_time_precision: ms
-  influxdb_timeout: 5000
-{%- endload %}
+{% set log_collector = salt['grains.filter_by']({
+  'default': {
+    'elasticsearch_port': default_elasticsearch_port,
+  }
+}, merge=salt['pillar.get']('heka:log_collector')) %}
 
-{%- load_yaml as metric_collector_defaults %}
-default:
-  aggregator_port: 5565
-{%- endload %}
-{% set metric_collector = salt['grains.filter_by'](
-    metric_collector_defaults,
-    merge=salt['grains.filter_by'](
-        influxdb_defaults,
-        merge=salt['pillar.get']('heka:metric_collector'))) %}
+{% set metric_collector = salt['grains.filter_by']({
+  'default': {
+    'influxdb_port': default_influxdb_port,
+    'influxdb_time_precision': default_influxdb_time_precision,
+    'influxdb_timeout': default_influxdb_timeout,
+    'aggregator_port': default_aggregator_port,
+    'nagios_port': default_nagios_port,
+  }
+}, merge=salt['pillar.get']('heka:metric_collector')) %}
 
-{% set remote_collector = salt['grains.filter_by'](influxdb_defaults, merge=salt['pillar.get']('heka:remote_collector')) %}
-{% set aggregator = salt['grains.filter_by'](influxdb_defaults, merge=salt['pillar.get']('heka:aggregator')) %}
+{% set remote_collector = salt['grains.filter_by']({
+  'default': {
+    'influxdb_port': default_influxdb_port,
+    'influxdb_time_precision': default_influxdb_time_precision,
+    'influxdb_timeout': default_influxdb_timeout,
+    'aggregator_port': default_aggregator_port,
+  }
+}, merge=salt['pillar.get']('heka:remote_collector')) %}
+
+{% set aggregator = salt['grains.filter_by']({
+  'default': {
+    'influxdb_port': default_influxdb_port,
+    'influxdb_time_precision': default_influxdb_time_precision,
+    'influxdb_timeout': default_influxdb_timeout,
+    'nagios_port': default_nagios_port,
+    'nagios_host_alarm_clusters': default_nagios_host_alarm_clusters,
+  }
+}, merge=salt['pillar.get']('heka:aggregator')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 08b99ea..a217510 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -106,6 +106,12 @@
       append_newlines: false
       prefix_ts: false
 {%- endif %}
+{%- if metric_collector.nagios_host is defined %}
+    nagios:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/encoders/status_nagios.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+{%- endif %}
   output:
     metric_dashboard:
       engine: dashboard
@@ -131,6 +137,20 @@
       port: "{{ metric_collector.aggregator_port }}"
       message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
 {%- endif %}
+{%- if metric_collector.nagios_host is defined %}
+    nagios_alarm:
+      engine: http
+      address: "http://{{ metric_collector.nagios_host }}:{{metric_collector.nagios_port }}/status"
+      message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric' && Fields[no_alerting] == NIL"
+      encoder: nagios_encoder
+      {%- if metric_collector.nagios_username is defined and metric_collector.nagios_password is defined %}
+      username: {{ metric_collector.get('nagios_username') }}
+      password: {{ metric_collector.get('nagios_password') }}
+      {%- endif %}
+      max_buffer_size: 1048576
+      max_file_size: 524288
+      full_action: drop
+{%- endif %}
 remote_collector:
   decoder:
     collectd:
@@ -153,7 +173,7 @@
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       preserve_data: false
-      message_matcher: "Type =~ /metric$/"
+      message_matcher: "Type == 'heka.sandbox.afd_metric'"
       ticker_interval: 1
       config:
         tag_fields: "deployment_id environment_label tenant_id user_id"
@@ -184,6 +204,13 @@
       encoder: influxdb_encoder
       timeout: {{ remote_collector.influxdb_timeout }}
 {%- endif %}
+{%- if remote_collector.aggregator_host is defined %}
+    aggregator:
+      engine: tcp
+      host: "{{ remote_collector.aggregator_host }}"
+      port: "{{ remote_collector.aggregator_port }}"
+      message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
+{%- endif %}
 aggregator:
   policy:
     # A policy defining that the cluster's status depends on the member with
@@ -315,15 +342,23 @@
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ aggregator.influxdb_time_precision }}"
 {%- endif %}
-{%- if aggregator.influxdb_host is defined %}
   encoder:
+{%- if aggregator.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
 {%- endif %}
-{%- if aggregator.influxdb_host is defined %}
+{%- if aggregator.nagios_host is defined %}
+    nagios:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/encoders/status_nagios.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        nagios_host: "{{ aggregator.nagios_host_alarm_clusters }}"
+{%- endif %}
   output:
+{%- if aggregator.influxdb_host is defined %}
     influxdb:
       engine: http
       address: "http://{{ aggregator.influxdb_host }}:{{ aggregator.influxdb_port }}/write?db={{ aggregator.influxdb_database }}&precision={{ aggregator.influxdb_time_precision }}"
@@ -335,3 +370,17 @@
       encoder: influxdb_encoder
       timeout: {{ aggregator.influxdb_timeout }}
 {%- endif %}
+{%- if aggregator.nagios_host is defined %}
+    nagios_alarm_cluster:
+      engine: http
+      address: "http://{{ aggregator.nagios_host }}:{{aggregator.nagios_port }}/status"
+      message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.gse_metric' && Fields[no_alerting] == NIL"
+      encoder: nagios_encoder
+      {%- if aggregator.nagios_username is defined and aggregator.nagios_password is defined %}
+      username: {{ aggregator.get('nagios_username') }}
+      password: {{ aggregator.get('nagios_password') }}
+      {%- endif %}
+      max_buffer_size: 1048576
+      max_file_size: 524288
+      full_action: drop
+{%- endif %}