Manage notification handler for Sensu

This patch adds two things:
  - the field 'handler' to an alarm. Now an alarm can be described as
        alarm_name:
            policy: name_of_policy
            alerting: enabled_with_notifications
            handler: mail
            ...
  - a new parameter 'notification_handler' to pass the handler that
    can now be added to an alarm.

Change-Id: I1e0767b3c1aa664110c97ae5f8fafc47c9682ab9
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
index 83a6445..8376e84 100644
--- a/_modules/heka_alarming.py
+++ b/_modules/heka_alarming.py
@@ -4,7 +4,8 @@
 
 _valid_dimension_re = re.compile(r'^[a-z0-9_/-]+$')
 _disallowed_dimensions = ('name', 'value', 'hostname', 'member',
-                          'alerting_enabled', 'tag_fields')
+                          'alerting_enabled', 'notification_enabled',
+                          'notification_handler', 'tag_fields')
 
 
 def alarm_message_matcher(alarm, triggers):
diff --git a/heka/files/lua/common/afd.lua b/heka/files/lua/common/afd.lua
index 520c007..0c4dbcd 100644
--- a/heka/files/lua/common/afd.lua
+++ b/heka/files/lua/common/afd.lua
@@ -129,7 +129,9 @@
 end
 
 -- inject an AFD event into the Heka pipeline
-function inject_afd_metric(value, hostname, afd_name, dimensions, alerting_enabled)
+function inject_afd_metric(value, hostname, afd_name, dimensions,
+                           alerting_enabled, notification_enabled,
+                           notification_handler)
     local payload
 
     if #alarms > 0 then
@@ -152,6 +154,8 @@
             hostname = hostname,
             member = afd_name,
             alerting_enabled = alerting_enabled,
+            notification_enabled = notification_enabled,
+            notification_handler = notification_handler,
             tag_fields = {'hostname', 'member'}
         }
     }
diff --git a/heka/files/lua/common/gse.lua b/heka/files/lua/common/gse.lua
index 0f71eab..c1007a2 100644
--- a/heka/files/lua/common/gse.lua
+++ b/heka/files/lua/common/gse.lua
@@ -133,7 +133,8 @@
 
 -- compute the cluster metric and inject it into the Heka pipeline
 -- the metric's value is computed using the status of its members
-function inject_cluster_metric(cluster_name, dimensions, alerting_enabled)
+function inject_cluster_metric(cluster_name, dimensions, alerting_enabled,
+                               notification_enabled, notification_handler)
     local payload
     local status, alarms = resolve_status(cluster_name)
 
@@ -155,6 +156,8 @@
             value = status,
             member = cluster_name,
             alerting_enabled = alerting_enabled,
+            notification_enabled = notification_enabled,
+            notification_handler = notification_handler,
             tag_fields = {'member'},
         }
     }
diff --git a/heka/files/lua/filters/afd.lua b/heka/files/lua/filters/afd.lua
index ff622ed..4b8de07 100644
--- a/heka/files/lua/filters/afd.lua
+++ b/heka/files/lua/filters/afd.lua
@@ -24,6 +24,8 @@
 local hostname = read_config('hostname') or error('hostname must be specified')
 local dimensions_json = read_config('dimensions') or ''
 local activate_alerting = read_config('activate_alerting') or true
+local enable_notification = read_config('enable_notification') or false
+local notification_handler = read_config('notification_handler')
 
 local all_alarms = require(afd_file)
 local A = require 'afd_alarms'
@@ -76,7 +78,7 @@
             end
 
             afd.inject_afd_metric(state, hostname, afd_name, dimensions,
-                activate_alerting)
+                activate_alerting, enable_notification, notification_handler)
         end
     else
         A.set_start_time(ns)
diff --git a/heka/files/lua/filters/gse_cluster_filter.lua b/heka/files/lua/filters/gse_cluster_filter.lua
index 2bf6926..8b46fcf 100644
--- a/heka/files/lua/filters/gse_cluster_filter.lua
+++ b/heka/files/lua/filters/gse_cluster_filter.lua
@@ -26,6 +26,8 @@
 local warm_up_period = ((read_config('warm_up_period') or 0) + 0) * 1e9
 local dimensions_json = read_config('dimensions') or ''
 local activate_alerting = read_config('activate_alerting') or true
+local enable_notification = read_config('enable_notification') or false
+local notification_handler = read_config('notification_handler')
 
 local first_tick
 local last_tick = 0
@@ -98,7 +100,8 @@
     local injected = 0
     for i, cluster_name in ipairs(gse.get_ordered_clusters()) do
         if last_index == nil or i > last_index then
-            gse.inject_cluster_metric(cluster_name, dimensions, activate_alerting)
+            gse.inject_cluster_metric(cluster_name, dimensions,
+                activate_alerting, enable_notification, notification_handler)
             last_index = i
             injected = injected + 1
 
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
index 9da7248..7244022 100644
--- a/heka/files/toml/filter/afd_alarm.toml
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -14,3 +14,6 @@
 {%- set alerting = alarm.get('alerting', 'enabled') %}
 activate_alerting = {{ salt['heka_alarming.alarm_activate_alerting'](alerting) }}
 enable_notification = {{ salt['heka_alarming.alarm_enable_notification'](alerting) }}
+{%- if alarm.handler is defined %}
+notification_handler = "{{ alarm.handler }}"
+{%- endif %}
diff --git a/heka/files/toml/filter/gse_alarm_cluster.toml b/heka/files/toml/filter/gse_alarm_cluster.toml
index 1c9fe77..2eee5bf 100644
--- a/heka/files/toml/filter/gse_alarm_cluster.toml
+++ b/heka/files/toml/filter/gse_alarm_cluster.toml
@@ -21,3 +21,6 @@
 {%- set alerting = alarm_cluster.get('alerting', 'enabled_with_notification') %}
 activate_alerting = {{ salt['heka_alarming.alarm_activate_alerting'](alerting) }}
 enable_notification = {{ salt['heka_alarming.alarm_enable_notification'](alerting) }}
+{%- if alarm_cluster.handler is defined %}
+notification_handler = "{{ alarm_cluster.handler }}"
+{%- endif %}
diff --git a/tests/lua/test_afd.lua b/tests/lua/test_afd.lua
index a54bdfd..478a36c 100644
--- a/tests/lua/test_afd.lua
+++ b/tests/lua/test_afd.lua
@@ -47,19 +47,20 @@
     end
 
     function TestAfd:test_inject_afd_metric_without_alarms()
-        afd.inject_afd_metric(consts.OKAY, 'node-1', 'foo', {}, false)
+        afd.inject_afd_metric(consts.OKAY, 'node-1', 'foo', {}, false, false, nil)
 
         local alarms = afd.get_alarms()
         assertEquals(#alarms, 0)
         assertEquals(last_injected_msg.Type, 'afd_metric')
         assertEquals(last_injected_msg.Fields.value, consts.OKAY)
         assertEquals(last_injected_msg.Fields.hostname, 'node-1')
+        assertEquals(last_injected_msg.Fields.notification_handler, 'mail')
         assertEquals(last_injected_msg.Payload, '{"alarms":[]}')
     end
 
     function TestAfd:test_inject_afd_metric_with_alarms()
         afd.add_to_alarms(consts.CRIT, 'last', 'metric_1', {}, {}, '==', 0, 0, nil, nil, "important message")
-        afd.inject_afd_metric(consts.CRIT, 'node-1', 'foo', {}, false)
+        afd.inject_afd_metric(consts.CRIT, 'node-1', 'foo', {}, true, true, 'mail')
 
         local alarms = afd.get_alarms()
         assertEquals(#alarms, 0)
@@ -67,6 +68,7 @@
         assertEquals(last_injected_msg.Fields.value, consts.CRIT)
         assertEquals(last_injected_msg.Fields.hostname, 'node-1')
         assertEquals(last_injected_msg.Fields.environment_id, extra.environment_id)
+        assertEquals(last_injected_msg.Fields.notification_handler, 'mail')
         assert(last_injected_msg.Payload:match('"message":"important message"'))
         assert(last_injected_msg.Payload:match('"severity":"CRITICAL"'))
     end
diff --git a/tests/lua/test_gse.lua b/tests/lua/test_gse.lua
index be892f5..b68986d 100644
--- a/tests/lua/test_gse.lua
+++ b/tests/lua/test_gse.lua
@@ -184,36 +184,39 @@
     end
 
     function TestGse:test_inject_cluster_metric_for_nova()
-        gse.inject_cluster_metric('nova', {key = "val"}, true)
+        gse.inject_cluster_metric('nova', {key = "val"}, true, true, 'mail')
         local metric = last_injected_msg
         assertEquals(metric.Type, 'gse_metric')
         assertEquals(metric.Fields.member, 'nova')
         assertEquals(metric.Fields.name, 'cluster_status')
         assertEquals(metric.Fields.value, consts.OKAY)
         assertEquals(metric.Fields.key, 'val')
+        assertEquals(metric.Fields.notification_handler, 'mail')
         assertEquals(metric.Payload, '{"alarms":[]}')
     end
 
     function TestGse:test_inject_cluster_metric_for_glance()
-        gse.inject_cluster_metric('glance', {key = "val"}, true)
+        gse.inject_cluster_metric('glance', {key = "val"}, false, false, 'mail')
         local metric = last_injected_msg
         assertEquals(metric.Type, 'gse_metric')
         assertEquals(metric.Fields.member, 'glance')
         assertEquals(metric.Fields.name, 'cluster_status')
         assertEquals(metric.Fields.value, consts.DOWN)
         assertEquals(metric.Fields.key, 'val')
+        assertEquals(metric.Fields.notification_handler, 'mail')
         assert(metric.Payload:match("glance%-registry endpoints are down"))
         assert(metric.Payload:match("glance%-api endpoint is down on node%-1"))
     end
 
     function TestGse:test_inject_cluster_metric_for_heat()
-        gse.inject_cluster_metric('heat', {key = "val"}, true)
+        gse.inject_cluster_metric('heat', {key = "val"}, true, true, 'mail')
         local metric = last_injected_msg
         assertEquals(metric.Type, 'gse_metric')
         assertEquals(metric.Fields.member, 'heat')
         assertEquals(metric.Fields.name, 'cluster_status')
         assertEquals(metric.Fields.value, consts.WARN)
         assertEquals(metric.Fields.key, 'val')
+        assertEquals(metric.Fields.notification_handler, 'mail')
         assert(metric.Payload:match("5xx errors detected"))
         assert(metric.Payload:match("1 RabbitMQ node out of 3 is down"))
     end