Merge "Add 'region' field to the InfluxDB tags if present"
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
index efbd091..8376e84 100644
--- a/_modules/heka_alarming.py
+++ b/_modules/heka_alarming.py
@@ -4,7 +4,8 @@
 
 _valid_dimension_re = re.compile(r'^[a-z0-9_/-]+$')
 _disallowed_dimensions = ('name', 'value', 'hostname', 'member',
-                          'no_alerting', 'tag_fields')
+                          'alerting_enabled', 'notification_enabled',
+                          'notification_handler', 'tag_fields')
 
 
 def alarm_message_matcher(alarm, triggers):
diff --git a/heka/files/lua/common/afd.lua b/heka/files/lua/common/afd.lua
index 2d47040..0c4dbcd 100644
--- a/heka/files/lua/common/afd.lua
+++ b/heka/files/lua/common/afd.lua
@@ -129,7 +129,9 @@
 end
 
 -- inject an AFD event into the Heka pipeline
-function inject_afd_metric(value, hostname, afd_name, dimensions, to_alerting)
+function inject_afd_metric(value, hostname, afd_name, dimensions,
+                           alerting_enabled, notification_enabled,
+                           notification_handler)
     local payload
 
     if #alarms > 0 then
@@ -143,11 +145,6 @@
         payload = '{"alarms":[]}'
     end
 
-    local no_alerting
-    if to_alerting ~= nil and to_alerting == false then
-        no_alerting = true
-    end
-
     local msg = {
         Type = 'afd_metric',
         Payload = payload,
@@ -156,7 +153,9 @@
             value = value,
             hostname = hostname,
             member = afd_name,
-            no_alerting = no_alerting,
+            alerting_enabled = alerting_enabled,
+            notification_enabled = notification_enabled,
+            notification_handler = notification_handler,
             tag_fields = {'hostname', 'member'}
         }
     }
diff --git a/heka/files/lua/common/gse.lua b/heka/files/lua/common/gse.lua
index 3e6948e..c1007a2 100644
--- a/heka/files/lua/common/gse.lua
+++ b/heka/files/lua/common/gse.lua
@@ -133,7 +133,8 @@
 
 -- compute the cluster metric and inject it into the Heka pipeline
 -- the metric's value is computed using the status of its members
-function inject_cluster_metric(cluster_name, dimensions, to_alerting)
+function inject_cluster_metric(cluster_name, dimensions, alerting_enabled,
+                               notification_enabled, notification_handler)
     local payload
     local status, alarms = resolve_status(cluster_name)
 
@@ -147,11 +148,6 @@
         payload = '{"alarms":[]}'
     end
 
-    local no_alerting
-    if to_alerting ~= nil and to_alerting == false then
-        no_alerting = true
-    end
-
     local msg = {
         Type = 'gse_metric',
         Payload = payload,
@@ -159,8 +155,10 @@
             name = 'cluster_status',
             value = status,
             member = cluster_name,
+            alerting_enabled = alerting_enabled,
+            notification_enabled = notification_enabled,
+            notification_handler = notification_handler,
             tag_fields = {'member'},
-            no_alerting = no_alerting,
         }
     }
 
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index 9c1ff31..98084af 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -31,6 +31,20 @@
     ps_vm = 'memory_virtual',
 }
 
+-- legacy lma_components process
+local lma_components = {
+    collectd = true,
+    remote_collectd = true,
+    metric_collector = true,
+    log_collector = true,
+    aggregator = true,
+    remote_collector = true,
+    elasticsearch = true,
+    influxd = true,
+    kibana = true,
+    ['grafana-server'] = true,
+}
+
 -- The following table keeps a list of metrics from plugin where the
 -- Fields[hostname] shouldn't be set by default.
 local hostname_free = {
@@ -169,9 +183,14 @@
             elseif metric_source == 'processes' then
                 if processes_map[sample['type']] then
                     -- metrics related to a specific process
-                    msg['Fields']['service'] = sample['plugin_instance']
+                    local service = sample['plugin_instance']
+                    msg['Fields']['service'] = service
                     table.insert(msg['Fields']['tag_fields'], 'service')
-                    msg['Fields']['name'] = 'lma_components'
+                    if lma_components[service] then
+                        msg['Fields']['name'] = 'lma_components'
+                    else
+                        msg['Fields']['name'] = 'process'
+                    end
                     if processes_map[sample['type']] ~= '' then
                         msg['Fields']['name'] = msg['Fields']['name'] .. sep .. processes_map[sample['type']]
                     end
diff --git a/heka/files/lua/encoders/status_sensu.lua b/heka/files/lua/encoders/status_sensu.lua
index 62979ea..80389e4 100644
--- a/heka/files/lua/encoders/status_sensu.lua
+++ b/heka/files/lua/encoders/status_sensu.lua
@@ -22,8 +22,12 @@
 if read_config('sensu_source_dimension_key') then
     source_dimension_field = string.format('Fields[%s]', read_config('sensu_source_dimension_key'))
 end
-
-local sensu_ttl = (read_config('sensu_ttl') + 0) or 0
+local notification_handler = read_config('notification_handler')
+local noop_handler = read_config('noop_handler')
+local watchdog_ttl = read_config('watchdog_ttl')
+if watchdog_ttl then
+    watchdog_ttl = 0 + watchdog_ttl
+end
 
 -- mapping GSE statuses to Sensu states
 local sensu_state_map = {
@@ -41,11 +45,13 @@
     local service_name
     local status = 0
     local alarms = {}
+    local handler = nil
     local msgtype = read_message('Type')
 
     if msgtype == "heka.sandbox.watchdog" then
         service_name = "watchdog_" .. (read_message('Payload') or 'unknown')
-        source = read_message('Fields[hostname]') or read_message('Hostname')
+        source = read_message('Hostname')
+        data['ttl'] = watchdog_ttl
     else
         service_name = read_message('Fields[member]')
         if not service_name then
@@ -59,6 +65,14 @@
 
         alarms = afd.alarms_for_human(afd.extract_alarms())
 
+        if read_message('Fields[alerting_enabled]') then
+            if read_message('Fields[notification_enabled]') then
+                handler = read_message('Fields[notification_handler]') or notification_handler
+            else
+                handler = noop_handler
+            end
+        end
+
         if msgtype == "heka.sandbox.gse_metric" then
             if source_dimension_field then
                 source = read_message(source_dimension_field) or "Unknown source " .. source_dimension_field
@@ -73,12 +87,10 @@
         end
     end
 
-    if sensu_ttl > 0 then
-        data['ttl'] = sensu_ttl
-    end
     data['source'] = source
     data['name'] = service_name
     data['status'] = sensu_state_map[status]
+    data['handler'] = handler
 
     local details = string.format('%s: ', consts.status_label(status))
 
diff --git a/heka/files/lua/filters/afd.lua b/heka/files/lua/filters/afd.lua
index ff622ed..4b8de07 100644
--- a/heka/files/lua/filters/afd.lua
+++ b/heka/files/lua/filters/afd.lua
@@ -24,6 +24,8 @@
 local hostname = read_config('hostname') or error('hostname must be specified')
 local dimensions_json = read_config('dimensions') or ''
 local activate_alerting = read_config('activate_alerting') or true
+local enable_notification = read_config('enable_notification') or false
+local notification_handler = read_config('notification_handler')
 
 local all_alarms = require(afd_file)
 local A = require 'afd_alarms'
@@ -76,7 +78,7 @@
             end
 
             afd.inject_afd_metric(state, hostname, afd_name, dimensions,
-                activate_alerting)
+                activate_alerting, enable_notification, notification_handler)
         end
     else
         A.set_start_time(ns)
diff --git a/heka/files/lua/filters/gse_cluster_filter.lua b/heka/files/lua/filters/gse_cluster_filter.lua
index 2bf6926..8b46fcf 100644
--- a/heka/files/lua/filters/gse_cluster_filter.lua
+++ b/heka/files/lua/filters/gse_cluster_filter.lua
@@ -26,6 +26,8 @@
 local warm_up_period = ((read_config('warm_up_period') or 0) + 0) * 1e9
 local dimensions_json = read_config('dimensions') or ''
 local activate_alerting = read_config('activate_alerting') or true
+local enable_notification = read_config('enable_notification') or false
+local notification_handler = read_config('notification_handler')
 
 local first_tick
 local last_tick = 0
@@ -98,7 +100,8 @@
     local injected = 0
     for i, cluster_name in ipairs(gse.get_ordered_clusters()) do
         if last_index == nil or i > last_index then
-            gse.inject_cluster_metric(cluster_name, dimensions, activate_alerting)
+            gse.inject_cluster_metric(cluster_name, dimensions,
+                activate_alerting, enable_notification, notification_handler)
             last_index = i
             injected = injected + 1
 
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
index 9da7248..7244022 100644
--- a/heka/files/toml/filter/afd_alarm.toml
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -14,3 +14,6 @@
 {%- set alerting = alarm.get('alerting', 'enabled') %}
 activate_alerting = {{ salt['heka_alarming.alarm_activate_alerting'](alerting) }}
 enable_notification = {{ salt['heka_alarming.alarm_enable_notification'](alerting) }}
+{%- if alarm.handler is defined %}
+notification_handler = "{{ alarm.handler }}"
+{%- endif %}
diff --git a/heka/files/toml/filter/gse_alarm_cluster.toml b/heka/files/toml/filter/gse_alarm_cluster.toml
index 1c9fe77..2eee5bf 100644
--- a/heka/files/toml/filter/gse_alarm_cluster.toml
+++ b/heka/files/toml/filter/gse_alarm_cluster.toml
@@ -21,3 +21,6 @@
 {%- set alerting = alarm_cluster.get('alerting', 'enabled_with_notification') %}
 activate_alerting = {{ salt['heka_alarming.alarm_activate_alerting'](alerting) }}
 enable_notification = {{ salt['heka_alarming.alarm_enable_notification'](alerting) }}
+{%- if alarm_cluster.handler is defined %}
+notification_handler = "{{ alarm_cluster.handler }}"
+{%- endif %}
diff --git a/heka/map.jinja b/heka/map.jinja
index 2aaefc9..14c5e4e 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -40,7 +40,10 @@
 {% set default_nagios_host_alarm_clusters = '00-clusters' %}
 {% set default_automatic_starting = True %}
 {% set default_amqp_port = 5672 %}
+{% set default_sensu_noop_handler = 'default' %}
+{% set default_sensu_notification_handler = 'default' %}
 {% set default_sensu_port = 3030 %}
+{% set default_sensu_watchdog_ttl = 120 %}
 
 {% set log_collector = salt['grains.filter_by']({
   'default': {
@@ -49,7 +52,10 @@
     'automatic_starting': default_automatic_starting,
     'metric_collector_host': '127.0.0.1',
     'metric_collector_port': 5567,
+    'sensu_noop_handler': default_sensu_noop_handler,
+    'sensu_notification_handler': default_sensu_notification_handler,
     'sensu_port': default_sensu_port,
+    'sensu_watchdog_ttl': default_sensu_watchdog_ttl,
   }
 }, merge=salt['pillar.get']('heka:log_collector')) %}
 
@@ -63,7 +69,10 @@
     'nagios_port': default_nagios_port,
     'poolsize': 100,
     'automatic_starting': default_automatic_starting,
+    'sensu_noop_handler': default_sensu_noop_handler,
+    'sensu_notification_handler': default_sensu_notification_handler,
     'sensu_port': default_sensu_port,
+    'sensu_watchdog_ttl': default_sensu_watchdog_ttl,
   }
 }, merge=salt['pillar.get']('heka:metric_collector')) %}
 
@@ -79,7 +88,10 @@
     'aggregator_port': default_aggregator_port,
     'poolsize': 100,
     'automatic_starting': default_automatic_starting,
+    'sensu_noop_handler': default_sensu_noop_handler,
+    'sensu_notification_handler': default_sensu_notification_handler,
     'sensu_port': default_sensu_port,
+    'sensu_watchdog_ttl': default_sensu_watchdog_ttl,
   }
 }, merge=salt['pillar.get']('heka:remote_collector')) %}
 
@@ -93,7 +105,10 @@
     'nagios_default_host_alarm_clusters': default_nagios_host_alarm_clusters,
     'poolsize': 100,
     'automatic_starting': default_automatic_starting,
+    'sensu_noop_handler': default_sensu_noop_handler,
+    'sensu_notification_handler': default_sensu_notification_handler,
     'sensu_port': default_sensu_port,
+    'sensu_watchdog_ttl': default_sensu_watchdog_ttl,
   }
 }, merge=salt['pillar.get']('heka:aggregator')) %}
 
@@ -109,6 +124,9 @@
     'resource_decoding': False,
     'poolsize': 100,
     'automatic_starting': default_automatic_starting,
+    'sensu_noop_handler': default_sensu_noop_handler,
+    'sensu_notification_handler': default_sensu_notification_handler,
     'sensu_port': default_sensu_port,
+    'sensu_watchdog_ttl': default_sensu_watchdog_ttl,
   }
 }, merge=salt['pillar.get']('heka:ceilometer_collector')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index be05ce0..8555f47 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -69,7 +69,9 @@
       module_file: /usr/share/lma_collector/encoders/status_sensu.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
-        sensu_ttl: 120
+        noop_handler: "{{ log_collector.sensu_noop_handler }}"
+        notification_handler: "{{ log_collector.sensu_notification_handler }}"
+        watchdog_ttl: {{ log_collector.sensu_watchdog_ttl }}
 {%- endif %}
 {%- endif %}
   output:
@@ -156,7 +158,9 @@
       module_file: /usr/share/lma_collector/encoders/status_sensu.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
-        sensu_ttl: 120
+        noop_handler: "{{ metric_collector.sensu_noop_handler }}"
+        notification_handler: "{{ metric_collector.sensu_notification_handler }}"
+        watchdog_ttl: {{ metric_collector.sensu_watchdog_ttl }}
   {%- endif %}
   {%- if metric_collector.influxdb_host is defined %}
     influxdb:
@@ -207,7 +211,7 @@
     nagios_alarm:
       engine: http
       address: "http://{{ metric_collector.nagios_host }}:{{metric_collector.nagios_port }}/status"
-      message_matcher: "Type == 'heka.sandbox.afd_metric' && Fields[no_alerting] == NIL"
+      message_matcher: "Type == 'heka.sandbox.afd_metric' && Fields[alerting_enabled] == TRUE"
       encoder: nagios_encoder
       {%- if metric_collector.nagios_username is defined and metric_collector.nagios_password is defined %}
       username: {{ metric_collector.get('nagios_username') }}
@@ -312,7 +316,9 @@
       module_file: /usr/share/lma_collector/encoders/status_sensu.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
-        sensu_ttl: 120
+        noop_handler: "{{ remote_collector.sensu_noop_handler }}"
+        notification_handler: "{{ remote_collector.sensu_notification_handler }}"
+        watchdog_ttl: {{ remote_collector.sensu_watchdog_ttl }}
   {%- endif %}
   {%- if remote_collector.influxdb_host is defined %}
     influxdb:
@@ -578,7 +584,9 @@
       module_file: /usr/share/lma_collector/encoders/status_sensu.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
-        sensu_ttl: 120
+        noop_handler: "{{ aggregator.sensu_noop_handler }}"
+        notification_handler: "{{ aggregator.sensu_notification_handler }}"
+        watchdog_ttl: {{ aggregator.sensu_watchdog_ttl }}
         {%- if aggregator.sensu_source_dimension_key is defined %}
         sensu_source_dimension_key: "{{ aggregator.sensu_source_dimension_key }}"
         {%- endif %}
@@ -602,7 +610,7 @@
     nagios_alarm_cluster:
       engine: http
       address: "http://{{ aggregator.nagios_host }}:{{aggregator.nagios_port }}/status"
-      message_matcher: "Type == 'heka.sandbox.gse_metric' && Fields[no_alerting] == NIL"
+      message_matcher: "Type == 'heka.sandbox.gse_metric' && Fields[alerting_enabled] == TRUE"
       encoder: nagios_encoder
       {%- if aggregator.nagios_username is defined and aggregator.nagios_password is defined %}
       username: {{ aggregator.get('nagios_username') }}
@@ -617,7 +625,7 @@
       engine: udp
       host: "{{ aggregator.sensu_host }}"
       port: "{{ aggregator.sensu_port|default(3030) }}"
-      message_matcher: "Type == 'heka.sandbox.watchdog' || ((Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.afd_metric') && Fields[no_alerting] == NIL)"
+      message_matcher: "Type == 'heka.sandbox.watchdog' || ((Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.afd_metric') && Fields[alerting_enabled] == TRUE)"
       encoder: sensu_encoder
       use_buffering: false
   {%- endif %}
@@ -704,7 +712,9 @@
       module_file: /usr/share/lma_collector/encoders/status_sensu.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
-        sensu_ttl: 120
+        noop_handler: "{{ ceilometer_collector.sensu_noop_handler }}"
+        notification_handler: "{{ ceilometer_collector.sensu_notification_handler }}"
+        watchdog_ttl: {{ ceilometer_collector.sensu_watchdog_ttl }}
   {%- endif %}
 {%- endif %}
 {%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined or ceilometer_collector.sensu_host is defined %}
diff --git a/heka/meta/sensu.yml b/heka/meta/sensu.yml
index 1e946af..4741ad0 100644
--- a/heka/meta/sensu.yml
+++ b/heka/meta/sensu.yml
@@ -1,3 +1,7 @@
+{%- if pillar.heka.get('log_collector', {}).get('enabled', False) or
+       pillar.heka.get('metric_collector', {}).get('enabled', False) or
+       pillar.heka.get('remote_collector', {}).get('enabled', False) or
+       pillar.heka.get('aggregator', {}).get('enabled', False) %}
 check:
 {%- if pillar.heka.get('log_collector', {}).get('enabled', False) %}
   local_heka_log_collector_proc:
@@ -31,3 +35,4 @@
     subscribers:
     - local-heka-aggregator
 {%- endif %}
+{%- endif %}
diff --git a/tests/lua/test_afd.lua b/tests/lua/test_afd.lua
index a54bdfd..478a36c 100644
--- a/tests/lua/test_afd.lua
+++ b/tests/lua/test_afd.lua
@@ -47,19 +47,20 @@
     end
 
     function TestAfd:test_inject_afd_metric_without_alarms()
-        afd.inject_afd_metric(consts.OKAY, 'node-1', 'foo', {}, false)
+        afd.inject_afd_metric(consts.OKAY, 'node-1', 'foo', {}, false, false, nil)
 
         local alarms = afd.get_alarms()
         assertEquals(#alarms, 0)
         assertEquals(last_injected_msg.Type, 'afd_metric')
         assertEquals(last_injected_msg.Fields.value, consts.OKAY)
         assertEquals(last_injected_msg.Fields.hostname, 'node-1')
+        assertEquals(last_injected_msg.Fields.notification_handler, 'mail')
         assertEquals(last_injected_msg.Payload, '{"alarms":[]}')
     end
 
     function TestAfd:test_inject_afd_metric_with_alarms()
         afd.add_to_alarms(consts.CRIT, 'last', 'metric_1', {}, {}, '==', 0, 0, nil, nil, "important message")
-        afd.inject_afd_metric(consts.CRIT, 'node-1', 'foo', {}, false)
+        afd.inject_afd_metric(consts.CRIT, 'node-1', 'foo', {}, true, true, 'mail')
 
         local alarms = afd.get_alarms()
         assertEquals(#alarms, 0)
@@ -67,6 +68,7 @@
         assertEquals(last_injected_msg.Fields.value, consts.CRIT)
         assertEquals(last_injected_msg.Fields.hostname, 'node-1')
         assertEquals(last_injected_msg.Fields.environment_id, extra.environment_id)
+        assertEquals(last_injected_msg.Fields.notification_handler, 'mail')
         assert(last_injected_msg.Payload:match('"message":"important message"'))
         assert(last_injected_msg.Payload:match('"severity":"CRITICAL"'))
     end
diff --git a/tests/lua/test_gse.lua b/tests/lua/test_gse.lua
index be892f5..b68986d 100644
--- a/tests/lua/test_gse.lua
+++ b/tests/lua/test_gse.lua
@@ -184,36 +184,39 @@
     end
 
     function TestGse:test_inject_cluster_metric_for_nova()
-        gse.inject_cluster_metric('nova', {key = "val"}, true)
+        gse.inject_cluster_metric('nova', {key = "val"}, true, true, 'mail')
         local metric = last_injected_msg
         assertEquals(metric.Type, 'gse_metric')
         assertEquals(metric.Fields.member, 'nova')
         assertEquals(metric.Fields.name, 'cluster_status')
         assertEquals(metric.Fields.value, consts.OKAY)
         assertEquals(metric.Fields.key, 'val')
+        assertEquals(metric.Fields.notification_handler, 'mail')
         assertEquals(metric.Payload, '{"alarms":[]}')
     end
 
     function TestGse:test_inject_cluster_metric_for_glance()
-        gse.inject_cluster_metric('glance', {key = "val"}, true)
+        gse.inject_cluster_metric('glance', {key = "val"}, false, false, 'mail')
         local metric = last_injected_msg
         assertEquals(metric.Type, 'gse_metric')
         assertEquals(metric.Fields.member, 'glance')
         assertEquals(metric.Fields.name, 'cluster_status')
         assertEquals(metric.Fields.value, consts.DOWN)
         assertEquals(metric.Fields.key, 'val')
+        assertEquals(metric.Fields.notification_handler, 'mail')
         assert(metric.Payload:match("glance%-registry endpoints are down"))
         assert(metric.Payload:match("glance%-api endpoint is down on node%-1"))
     end
 
     function TestGse:test_inject_cluster_metric_for_heat()
-        gse.inject_cluster_metric('heat', {key = "val"}, true)
+        gse.inject_cluster_metric('heat', {key = "val"}, true, true, 'mail')
         local metric = last_injected_msg
         assertEquals(metric.Type, 'gse_metric')
         assertEquals(metric.Fields.member, 'heat')
         assertEquals(metric.Fields.name, 'cluster_status')
         assertEquals(metric.Fields.value, consts.WARN)
         assertEquals(metric.Fields.key, 'val')
+        assertEquals(metric.Fields.notification_handler, 'mail')
         assert(metric.Payload:match("5xx errors detected"))
         assert(metric.Payload:match("1 RabbitMQ node out of 3 is down"))
     end