Merge pull request #31 from elemoine/stacklight-alarming-cluster

Add alarm clusters support
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
index f408c9b..d87ed73 100644
--- a/_modules/heka_alarming.py
+++ b/_modules/heka_alarming.py
@@ -7,7 +7,7 @@
                           'no_alerting', 'tag_fields')
 
 
-def message_matcher(alarm, triggers):
+def alarm_message_matcher(alarm, triggers):
     """
     Return an Heka message matcher expression for a given alarm and a
     dict of triggers.
@@ -26,14 +26,34 @@
     return ' || '.join(matchers)
 
 
-def dimensions(alarm):
+def alarm_cluster_message_matcher(alarm_cluster):
+    """
+    Return an Heka message matcher expression for a given alarm cluster.
+
+    For example the function may return this:
+
+        Fields[service] == 'rabbitmq-cluster'
+    """
+    matchers = set()
+    match_items = alarm_cluster.get('match', {}).items()
+    for match_name, match_value in match_items:
+        matcher = "Fields[{}] == '{}'".format(match_name, match_value)
+        matchers.add(matcher)
+    match_items = alarm_cluster.get('match_re', {}).items()
+    for match_name, match_value in match_items:
+        matcher = "Fields[{}] =~ /{}/".format(match_name, match_value)
+        matchers.add(matcher)
+    return ' && '.join(matchers)
+
+
+def dimensions(alarm_or_alarm_cluster):
     """
     Return a dict alarm dimensions. Each dimension is validated, and an
     Exception is raised if a dimension is invalid.
 
     Valid characters are a-z, 0-9, _, - and /.
     """
-    dimensions = alarm.get('dimension', {})
+    dimensions = alarm_or_alarm_cluster.get('dimension', {})
     for name, value in dimensions.items():
         if name in _disallowed_dimensions:
             raise Exception(
diff --git a/heka/_service.sls b/heka/_service.sls
index 22ca02a..c1c85ed 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -79,6 +79,7 @@
     'input': {},
     'trigger': {},
     'alarm': {},
+    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -89,6 +90,7 @@
     'input': {},
     'trigger': {},
     'alarm': {},
+    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -99,6 +101,7 @@
     'input': {},
     'trigger': {},
     'alarm': {},
+    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -109,6 +112,7 @@
     'input': {},
     'trigger': {},
     'alarm': {},
+    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -277,6 +281,56 @@
 
 {%- endfor %}
 
+{%- set policy = service_metadata.get('policy') %}
+{%- if policy %}
+/usr/share/lma_collector/common/gse_policies.lua:
+  file.managed:
+  - source: salt://heka/files/gse_policies.lua
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: /usr/share/lma_collector
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+    policy: {{ policy|yaml }}
+{%- endif %}
+
+{%- for alarm_cluster_name, alarm_cluster in service_metadata.get('alarm_cluster', {}).iteritems() %}
+
+/etc/{{ service_name }}/filter_gse_{{ alarm_cluster_name }}.toml:
+  file.managed:
+  - source: salt://heka/files/toml/filter/gse_alarm_cluster.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      alarm_cluster_name: {{ alarm_cluster_name }}
+      alarm_cluster: {{ alarm_cluster|yaml }}
+
+/usr/share/lma_collector/common/gse_{{ alarm_cluster_name|replace('-', '_') }}_topology.lua:
+  file.managed:
+  - source: salt://heka/files/gse_topology.lua
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: /usr/share/lma_collector
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      alarm_cluster_name: {{ alarm_cluster_name }}
+      alarm_cluster: {{ alarm_cluster|yaml }}
+
+{%- endfor %}
+
 {%- for filter_name, filter in service_metadata.get('filter', {}).iteritems() %}
 
 /etc/{{ service_name }}/filter_{{ filter_name }}.toml:
diff --git a/heka/files/gse_policies.lua b/heka/files/gse_policies.lua
new file mode 100644
index 0000000..8ec9c06
--- /dev/null
+++ b/heka/files/gse_policies.lua
@@ -0,0 +1,55 @@
+-- Copyright 2015-2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local gse_policy = require 'gse_policy'
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+local policies = {
+{%- for _policy_name, _policy in policy|dictsort %}
+    ['{{ _policy_name|replace("'", "\\'") }}'] = {
+    {%- for _policy_rule in _policy %}
+        gse_policy.new({
+            status = '{{ _policy_rule["status"] }}',
+        {%- set _trigger = _policy_rule.get("trigger") %}
+        {%- if _trigger %}
+            trigger = {
+                logical_operator = '{{ _trigger["logical_operator"] }}',
+                rules = {
+            {%- for _rule in _trigger["rules"] %}
+                    ['function'] = '{{ _rule["function"] }}',
+                {%- set comma = joiner(",") %}
+                    ['arguments'] = {
+                {%- for _argument in _rule["arguments"]|sort -%}
+                {{ comma() }}'{{ _argument }}'
+                {%- endfor -%}
+                    },
+                    ['relational_operator'] = '{{ _rule["relational_operator"] }}',
+                    ['threshold'] = {{ _rule["threshold"] }},
+            {%- endfor %}
+                },
+            },
+        {%- endif %}
+        }),
+    {%- endfor %}
+    },
+{%- endfor %}
+}
+
+function find(policy)
+    return policies[policy]
+end
+
+return M
diff --git a/heka/files/gse_topology.lua b/heka/files/gse_topology.lua
new file mode 100644
index 0000000..e161ad6
--- /dev/null
+++ b/heka/files/gse_topology.lua
@@ -0,0 +1,31 @@
+-- Copyright 2015-2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+clusters = {
+    ['{{ alarm_cluster_name|replace("'","\\'") }}'] = {
+{%- set comma = joiner(",") %}
+        ['members'] = {
+{%- for _member in alarm_cluster["members"]|sort -%}
+        {{ comma() }}'{{ _member|replace("'","\\'") }}'
+{%- endfor -%}
+        },
+        ['group_by'] = '{{ alarm_cluster["group_by"]|default("member") }}',
+        ['policy'] = '{{ alarm_cluster["policy"]|replace("'","\\'") }}',
+    },
+}
+
+return M
diff --git a/heka/files/lua/common/gse.lua b/heka/files/lua/common/gse.lua
index 16bb950..d1a01a6 100644
--- a/heka/files/lua/common/gse.lua
+++ b/heka/files/lua/common/gse.lua
@@ -125,7 +125,7 @@
 
 -- compute the cluster metric and inject it into the Heka pipeline
 -- the metric's value is computed using the status of its members
-function inject_cluster_metric(msg_type, cluster_name, metric_name, interval, source, to_alerting)
+function inject_cluster_metric(cluster_name, dimensions, to_alerting)
     local payload
     local status, alarms = resolve_status(cluster_name)
 
@@ -145,18 +145,22 @@
     end
 
     local msg = {
-        Type = msg_type,
+        Type = 'gse_metric',
         Payload = payload,
         Fields = {
-            name=metric_name,
-            value=status,
-            cluster_name=cluster_name,
-            tag_fields={'cluster_name'},
-            interval=interval,
-            source=source,
-            no_alerting=no_alerting,
+            name = 'cluster_status',
+            value = status,
+            member = cluster_name,
+            tag_fields = {'member'},
+            no_alerting = no_alerting,
         }
     }
+
+    for name, value in pairs(dimensions) do
+        table.insert(msg.Fields.tag_fields, name)
+        msg.Fields[name] = value
+    end
+
     lma.inject_tags(msg)
     lma.safe_inject_message(msg)
 end
diff --git a/heka/files/lua/filters/gse_cluster_filter.lua b/heka/files/lua/filters/gse_cluster_filter.lua
index 6c8415f..5b6402c 100644
--- a/heka/files/lua/filters/gse_cluster_filter.lua
+++ b/heka/files/lua/filters/gse_cluster_filter.lua
@@ -17,53 +17,39 @@
 local afd = require 'afd'
 local gse = require 'gse'
 local lma = require 'lma_utils'
+local policies = require('gse_policies')
 
-local output_message_type = read_config('output_message_type') or error('output_message_type must be specified!')
-local cluster_field = read_config('cluster_field')
-local member_field = read_config('member_field') or error('member_field must be specified!')
-local output_metric_name = read_config('output_metric_name') or error('output_metric_name must be specified!')
-local source = read_config('source') or error('source must be specified!')
 local topology_file = read_config('topology_file') or error('topology_file must be specified!')
-local policies_file = read_config('policies_file') or error('policies_file must be specified!')
-local interval = (read_config('interval') or error('interval must be specified!')) + 0
+local interval = (read_config('interval') or 10) + 0
 local interval_in_ns = interval * 1e9
 local max_inject = (read_config('max_inject') or 10) + 0
 local warm_up_period = ((read_config('warm_up_period') or 0) + 0) * 1e9
+local dimensions_json = read_config('dimensions') or ''
 local activate_alerting = read_config('activate_alerting') or true
 
-local is_active = false
 local first_tick
 local last_tick = 0
 local last_index = nil
 
 local topology = require(topology_file)
-local policies = require(policies_file)
 
 for cluster_name, attributes in pairs(topology.clusters) do
     local policy = policies.find(attributes.policy)
     if not policy then
         error('Cannot find ' .. attributes.policy .. ' policy!')
     end
-    gse.add_cluster(cluster_name, attributes.members, attributes.hints, attributes.group_by, policy)
+    gse.add_cluster(cluster_name, attributes.members, attributes.hints or {},
+        attributes.group_by, policy)
+end
+
+local ok, dimensions = pcall(cjson.decode, dimensions_json)
+if not ok then
+    error(string.format('dimensions JSON is invalid (%s)', dimensions_json))
 end
 
 function process_message()
-    local name = read_message('Fields[name]')
-    local hostname = read_message('Fields[hostname]')
-    if name and name == 'pacemaker_local_resource_active' and read_message("Fields[resource]") == 'vip__management' then
-        -- Skip pacemaker_local_resource_active metrics that don't
-        -- concern the local node
-        if read_message('Hostname') == hostname then
-            if read_message('Fields[value]') == 1 then
-                is_active = true
-            else
-                is_active = false
-            end
-        end
-        return 0
-    end
 
-    local member_id = afd.get_entity_name(member_field)
+    local member_id = afd.get_entity_name('member')
     if not member_id then
         return -1, "Cannot find entity's name in the AFD/GSE message"
     end
@@ -78,19 +64,7 @@
         return -1, "Cannot find alarms in the AFD/GSE message"
     end
 
-    local cluster_ids
-    if cluster_field then
-        local cluster_id = afd.get_entity_name(cluster_field)
-        if not cluster_id then
-            return -1, "Cannot find the cluster's name in the AFD/GSE message"
-        elseif not gse.cluster_exists(cluster_id) then
-            -- Just ignore AFD/GSE messages which aren't part of a cluster's definition
-            return 0
-        end
-        cluster_ids = { cluster_id }
-    else
-        cluster_ids = gse.find_cluster_memberships(member_id)
-    end
+    local cluster_ids = gse.find_cluster_memberships(member_id)
 
     -- update all clusters that depend on this entity
     for _, cluster_id in ipairs(cluster_ids) do
@@ -100,10 +74,7 @@
 end
 
 function timer_event(ns)
-    if not is_active then
-        -- not running as the aggregator
-        return
-    elseif not first_tick then
+    if not first_tick then
         first_tick = ns
         return
     elseif ns - first_tick <= warm_up_period then
@@ -118,14 +89,7 @@
     local injected = 0
     for i, cluster_name in ipairs(gse.get_ordered_clusters()) do
         if last_index == nil or i > last_index then
-            gse.inject_cluster_metric(
-                output_message_type,
-                cluster_name,
-                output_metric_name,
-                interval,
-                source,
-                activate_alerting
-            )
+            gse.inject_cluster_metric(cluster_name, dimensions, activate_alerting)
             last_index = i
             injected = injected + 1
 
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
index ee4f36b..c1168df 100644
--- a/heka/files/toml/filter/afd_alarm.toml
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -2,7 +2,7 @@
 type = "SandboxFilter"
 filename = "/usr/share/lma_collector/filters/afd.lua"
 preserve_data = {{ alarm.preserve_data|default(False)|lower }}
-message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric') && ({{ salt['heka_alarming.message_matcher'](alarm, trigger) }})"
+message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric') && ({{ salt['heka_alarming.alarm_message_matcher'](alarm, trigger) }})"
 module_directory = "/usr/share/lma_collector/common;/usr/share/heka/lua_modules"
 ticker_interval = 10
 
diff --git a/heka/files/toml/filter/gse_alarm_cluster.toml b/heka/files/toml/filter/gse_alarm_cluster.toml
new file mode 100644
index 0000000..72b1923
--- /dev/null
+++ b/heka/files/toml/filter/gse_alarm_cluster.toml
@@ -0,0 +1,21 @@
+[gse_{{ alarm_cluster_name }}_filter]
+type = "SandboxFilter"
+filename = "/usr/share/lma_collector/filters/gse_cluster_filter.lua"
+preserve_data = {{ alarm_cluster.preserve_data|default(False)|lower }}
+message_matcher = "(Type == 'heka.sandbox.afd_metric' || Type == 'heka.sandbox.gse_metric') && ({{ salt['heka_alarming.alarm_cluster_message_matcher'](alarm_cluster) }})"
+module_directory = "/usr/share/lma_collector/common;/usr/share/heka/lua_modules"
+ticker_interval = 1
+
+[gse_{{ alarm_cluster_name }}_filter.config]
+topology_file = "gse_{{ alarm_cluster_name|replace('-', '_') }}_topology"
+dimensions = '{{ salt['heka_alarming.dimensions'](alarm_cluster)|json }}'
+activate_alerting = {{ alarm_cluster.alerting|default(True)|lower }}
+{%- if alarm_cluster.interval is defined %}
+interval = {{ alarm_cluster.interval }}
+{%- endif %}
+{%- if alarm_cluster.max_inject is defined %}
+max_inject = {{ alarm_cluster.max_inject }}
+{%- endif %}
+{%- if alarm_cluster.warm_up_period is defined %}
+warm_up_period = {{ alarm_cluster.warm_up_period }}
+{%- endif %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index fbd847a..51d9ff1 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -134,6 +134,116 @@
       port: 4354
       ticker_interval: 30
 aggregator:
+  policy:
+    # A policy defining that the cluster's status depends on the member with
+    # the highest severity, typically used for a cluster of services.
+    highest_severity:
+    - status: down
+      trigger:
+        logical_operator: or
+        rules:
+        - function: count
+          arguments: [ down ]
+          relational_operator: '>'
+          threshold: 0
+    - status: critical
+      trigger:
+        logical_operator: or
+        rules:
+        - function: count
+          arguments: [ critical ]
+          relational_operator: '>'
+          threshold: 0
+    - status: warning
+      trigger:
+        logical_operator: or
+        rules:
+        - function: count
+          arguments: [ warning ]
+          relational_operator: '>'
+          threshold: 0
+    - status: okay
+      trigger:
+        logical_operator: or
+        rules:
+        - function: count
+          arguments: [ okay ]
+          relational_operator: '>'
+          threshold: 0
+    - status: unknown
+    # A policy which is typically used for clusters managed by Pacemaker
+    # with the no-quorum-policy set to 'stop'.
+    majority_of_members:
+    - status: down
+      trigger:
+        logical_operator: or
+        rules:
+        - function: percent
+          arguments: [ down ]
+          relational_operator: '>'
+          threshold: 50
+    - status: critical
+      trigger:
+        logical_operator: and
+        rules:
+        - function: percent
+          arguments: [ down, critical ]
+          relational_operator: '>'
+          threshold: 20
+        - function: percent
+          arguments: [ okay ]
+          relational_operator: '<'
+          threshold: 50
+          function: percent
+    - status: warning
+      trigger:
+        logical_operator: or
+        rules:
+        - function: percent
+          arguments: [ okay ]
+          relational_operator: '<'
+          threshold: 50
+          function: percent
+    - status: okay
+    # A policy which is typically used for stateless clusters
+    availability_of_members:
+    - status: down
+      trigger:
+        logical_operator: or
+        rules:
+        - function: count
+          arguments: [ okay ]
+          relational_operator: '=='
+          threshold: 0
+    - status: critical
+      trigger:
+        logical_operator: and
+        rules:
+        - function: count
+          arguments: [ okay ]
+          relational_operator: '=='
+          threshold: 1
+        - function: count
+          arguments: [ critical, down ]
+          relational_operator: '>'
+          threshold: 1
+    - status: warning
+      trigger:
+        logical_operator: or
+        rules:
+        - function: percent
+          arguments: [ okay ]
+          relational_operator: '<'
+          threshold: 100
+    - status: okay
+      trigger:
+        logical_operator: or
+        rules:
+        - function: percent
+          arguments: [ okay ]
+          relational_operator: '=='
+          threshold: 100
+    - status: unknown
   input:
     heka_metric:
       engine: tcp
diff --git a/tests/lua/test_gse.lua b/tests/lua/test_gse.lua
index 98dfb89..be892f5 100644
--- a/tests/lua/test_gse.lua
+++ b/tests/lua/test_gse.lua
@@ -184,54 +184,36 @@
     end
 
     function TestGse:test_inject_cluster_metric_for_nova()
-        gse.inject_cluster_metric(
-            'gse_service_cluster_metric',
-            'nova',
-            'service_cluster_status',
-            10,
-            'gse_service_cluster_plugin'
-        )
+        gse.inject_cluster_metric('nova', {key = "val"}, true)
         local metric = last_injected_msg
-        assertEquals(metric.Type, 'gse_service_cluster_metric')
-        assertEquals(metric.Fields.cluster_name, 'nova')
-        assertEquals(metric.Fields.name, 'service_cluster_status')
+        assertEquals(metric.Type, 'gse_metric')
+        assertEquals(metric.Fields.member, 'nova')
+        assertEquals(metric.Fields.name, 'cluster_status')
         assertEquals(metric.Fields.value, consts.OKAY)
-        assertEquals(metric.Fields.interval, 10)
+        assertEquals(metric.Fields.key, 'val')
         assertEquals(metric.Payload, '{"alarms":[]}')
     end
 
     function TestGse:test_inject_cluster_metric_for_glance()
-        gse.inject_cluster_metric(
-            'gse_service_cluster_metric',
-            'glance',
-            'service_cluster_status',
-            10,
-            'gse_service_cluster_plugin'
-        )
+        gse.inject_cluster_metric('glance', {key = "val"}, true)
         local metric = last_injected_msg
-        assertEquals(metric.Type, 'gse_service_cluster_metric')
-        assertEquals(metric.Fields.cluster_name, 'glance')
-        assertEquals(metric.Fields.name, 'service_cluster_status')
+        assertEquals(metric.Type, 'gse_metric')
+        assertEquals(metric.Fields.member, 'glance')
+        assertEquals(metric.Fields.name, 'cluster_status')
         assertEquals(metric.Fields.value, consts.DOWN)
-        assertEquals(metric.Fields.interval, 10)
+        assertEquals(metric.Fields.key, 'val')
         assert(metric.Payload:match("glance%-registry endpoints are down"))
         assert(metric.Payload:match("glance%-api endpoint is down on node%-1"))
     end
 
     function TestGse:test_inject_cluster_metric_for_heat()
-        gse.inject_cluster_metric(
-            'gse_service_cluster_metric',
-            'heat',
-            'service_cluster_status',
-            10,
-            'gse_service_cluster_plugin'
-        )
+        gse.inject_cluster_metric('heat', {key = "val"}, true)
         local metric = last_injected_msg
-        assertEquals(metric.Type, 'gse_service_cluster_metric')
-        assertEquals(metric.Fields.cluster_name, 'heat')
-        assertEquals(metric.Fields.name, 'service_cluster_status')
+        assertEquals(metric.Type, 'gse_metric')
+        assertEquals(metric.Fields.member, 'heat')
+        assertEquals(metric.Fields.name, 'cluster_status')
         assertEquals(metric.Fields.value, consts.WARN)
-        assertEquals(metric.Fields.interval, 10)
+        assertEquals(metric.Fields.key, 'val')
         assert(metric.Payload:match("5xx errors detected"))
         assert(metric.Payload:match("1 RabbitMQ node out of 3 is down"))
     end