Merge pull request #31 from elemoine/stacklight-alarming-cluster
Add alarm clusters support
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
index f408c9b..d87ed73 100644
--- a/_modules/heka_alarming.py
+++ b/_modules/heka_alarming.py
@@ -7,7 +7,7 @@
'no_alerting', 'tag_fields')
-def message_matcher(alarm, triggers):
+def alarm_message_matcher(alarm, triggers):
"""
Return an Heka message matcher expression for a given alarm and a
dict of triggers.
@@ -26,14 +26,34 @@
return ' || '.join(matchers)
-def dimensions(alarm):
+def alarm_cluster_message_matcher(alarm_cluster):
+ """
+ Return an Heka message matcher expression for a given alarm cluster.
+
+ For example the function may return this:
+
+ Fields[service] == 'rabbitmq-cluster'
+ """
+ matchers = set()
+ match_items = alarm_cluster.get('match', {}).items()
+ for match_name, match_value in match_items:
+ matcher = "Fields[{}] == '{}'".format(match_name, match_value)
+ matchers.add(matcher)
+ match_items = alarm_cluster.get('match_re', {}).items()
+ for match_name, match_value in match_items:
+ matcher = "Fields[{}] =~ /{}/".format(match_name, match_value)
+ matchers.add(matcher)
+ return ' && '.join(matchers)
+
+
+def dimensions(alarm_or_alarm_cluster):
"""
Return a dict alarm dimensions. Each dimension is validated, and an
Exception is raised if a dimension is invalid.
Valid characters are a-z, 0-9, _, - and /.
"""
- dimensions = alarm.get('dimension', {})
+ dimensions = alarm_or_alarm_cluster.get('dimension', {})
for name, value in dimensions.items():
if name in _disallowed_dimensions:
raise Exception(
diff --git a/heka/_service.sls b/heka/_service.sls
index 22ca02a..c1c85ed 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -79,6 +79,7 @@
'input': {},
'trigger': {},
'alarm': {},
+ 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -89,6 +90,7 @@
'input': {},
'trigger': {},
'alarm': {},
+ 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -99,6 +101,7 @@
'input': {},
'trigger': {},
'alarm': {},
+ 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -109,6 +112,7 @@
'input': {},
'trigger': {},
'alarm': {},
+ 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -277,6 +281,56 @@
{%- endfor %}
+{%- set policy = service_metadata.get('policy') %}
+{%- if policy %}
+/usr/share/lma_collector/common/gse_policies.lua:
+ file.managed:
+ - source: salt://heka/files/gse_policies.lua
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: /usr/share/lma_collector
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ policy: {{ policy|yaml }}
+{%- endif %}
+
+{%- for alarm_cluster_name, alarm_cluster in service_metadata.get('alarm_cluster', {}).iteritems() %}
+
+/etc/{{ service_name }}/filter_gse_{{ alarm_cluster_name }}.toml:
+ file.managed:
+ - source: salt://heka/files/toml/filter/gse_alarm_cluster.toml
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ alarm_cluster_name: {{ alarm_cluster_name }}
+ alarm_cluster: {{ alarm_cluster|yaml }}
+
+/usr/share/lma_collector/common/gse_{{ alarm_cluster_name|replace('-', '_') }}_topology.lua:
+ file.managed:
+ - source: salt://heka/files/gse_topology.lua
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: /usr/share/lma_collector
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ alarm_cluster_name: {{ alarm_cluster_name }}
+ alarm_cluster: {{ alarm_cluster|yaml }}
+
+{%- endfor %}
+
{%- for filter_name, filter in service_metadata.get('filter', {}).iteritems() %}
/etc/{{ service_name }}/filter_{{ filter_name }}.toml:
diff --git a/heka/files/gse_policies.lua b/heka/files/gse_policies.lua
new file mode 100644
index 0000000..8ec9c06
--- /dev/null
+++ b/heka/files/gse_policies.lua
@@ -0,0 +1,55 @@
+-- Copyright 2015-2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local gse_policy = require 'gse_policy'
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+local policies = {
+{%- for _policy_name, _policy in policy|dictsort %}
+ ['{{ _policy_name|replace("'", "\\'") }}'] = {
+ {%- for _policy_rule in _policy %}
+ gse_policy.new({
+ status = '{{ _policy_rule["status"] }}',
+ {%- set _trigger = _policy_rule.get("trigger") %}
+ {%- if _trigger %}
+ trigger = {
+ logical_operator = '{{ _trigger["logical_operator"] }}',
+ rules = {
+ {%- for _rule in _trigger["rules"] %}
+ ['function'] = '{{ _rule["function"] }}',
+ {%- set comma = joiner(",") %}
+ ['arguments'] = {
+ {%- for _argument in _rule["arguments"]|sort -%}
+ {{ comma() }}'{{ _argument }}'
+ {%- endfor -%}
+ },
+ ['relational_operator'] = '{{ _rule["relational_operator"] }}',
+ ['threshold'] = {{ _rule["threshold"] }},
+ {%- endfor %}
+ },
+ },
+ {%- endif %}
+ }),
+ {%- endfor %}
+ },
+{%- endfor %}
+}
+
+function find(policy)
+ return policies[policy]
+end
+
+return M
diff --git a/heka/files/gse_topology.lua b/heka/files/gse_topology.lua
new file mode 100644
index 0000000..e161ad6
--- /dev/null
+++ b/heka/files/gse_topology.lua
@@ -0,0 +1,31 @@
+-- Copyright 2015-2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+clusters = {
+ ['{{ alarm_cluster_name|replace("'","\\'") }}'] = {
+{%- set comma = joiner(",") %}
+ ['members'] = {
+{%- for _member in alarm_cluster["members"]|sort -%}
+ {{ comma() }}'{{ _member|replace("'","\\'") }}'
+{%- endfor -%}
+ },
+ ['group_by'] = '{{ alarm_cluster["group_by"]|default("member") }}',
+ ['policy'] = '{{ alarm_cluster["policy"]|replace("'","\\'") }}',
+ },
+}
+
+return M
diff --git a/heka/files/lua/common/gse.lua b/heka/files/lua/common/gse.lua
index 16bb950..d1a01a6 100644
--- a/heka/files/lua/common/gse.lua
+++ b/heka/files/lua/common/gse.lua
@@ -125,7 +125,7 @@
-- compute the cluster metric and inject it into the Heka pipeline
-- the metric's value is computed using the status of its members
-function inject_cluster_metric(msg_type, cluster_name, metric_name, interval, source, to_alerting)
+function inject_cluster_metric(cluster_name, dimensions, to_alerting)
local payload
local status, alarms = resolve_status(cluster_name)
@@ -145,18 +145,22 @@
end
local msg = {
- Type = msg_type,
+ Type = 'gse_metric',
Payload = payload,
Fields = {
- name=metric_name,
- value=status,
- cluster_name=cluster_name,
- tag_fields={'cluster_name'},
- interval=interval,
- source=source,
- no_alerting=no_alerting,
+ name = 'cluster_status',
+ value = status,
+ member = cluster_name,
+ tag_fields = {'member'},
+ no_alerting = no_alerting,
}
}
+
+ for name, value in pairs(dimensions) do
+ table.insert(msg.Fields.tag_fields, name)
+ msg.Fields[name] = value
+ end
+
lma.inject_tags(msg)
lma.safe_inject_message(msg)
end
diff --git a/heka/files/lua/filters/gse_cluster_filter.lua b/heka/files/lua/filters/gse_cluster_filter.lua
index 6c8415f..5b6402c 100644
--- a/heka/files/lua/filters/gse_cluster_filter.lua
+++ b/heka/files/lua/filters/gse_cluster_filter.lua
@@ -17,53 +17,39 @@
local afd = require 'afd'
local gse = require 'gse'
local lma = require 'lma_utils'
+local policies = require('gse_policies')
-local output_message_type = read_config('output_message_type') or error('output_message_type must be specified!')
-local cluster_field = read_config('cluster_field')
-local member_field = read_config('member_field') or error('member_field must be specified!')
-local output_metric_name = read_config('output_metric_name') or error('output_metric_name must be specified!')
-local source = read_config('source') or error('source must be specified!')
local topology_file = read_config('topology_file') or error('topology_file must be specified!')
-local policies_file = read_config('policies_file') or error('policies_file must be specified!')
-local interval = (read_config('interval') or error('interval must be specified!')) + 0
+local interval = (read_config('interval') or 10) + 0
local interval_in_ns = interval * 1e9
local max_inject = (read_config('max_inject') or 10) + 0
local warm_up_period = ((read_config('warm_up_period') or 0) + 0) * 1e9
+local dimensions_json = read_config('dimensions') or ''
local activate_alerting = read_config('activate_alerting') or true
-local is_active = false
local first_tick
local last_tick = 0
local last_index = nil
local topology = require(topology_file)
-local policies = require(policies_file)
for cluster_name, attributes in pairs(topology.clusters) do
local policy = policies.find(attributes.policy)
if not policy then
error('Cannot find ' .. attributes.policy .. ' policy!')
end
- gse.add_cluster(cluster_name, attributes.members, attributes.hints, attributes.group_by, policy)
+ gse.add_cluster(cluster_name, attributes.members, attributes.hints or {},
+ attributes.group_by, policy)
+end
+
+local ok, dimensions = pcall(cjson.decode, dimensions_json)
+if not ok then
+ error(string.format('dimensions JSON is invalid (%s)', dimensions_json))
end
function process_message()
- local name = read_message('Fields[name]')
- local hostname = read_message('Fields[hostname]')
- if name and name == 'pacemaker_local_resource_active' and read_message("Fields[resource]") == 'vip__management' then
- -- Skip pacemaker_local_resource_active metrics that don't
- -- concern the local node
- if read_message('Hostname') == hostname then
- if read_message('Fields[value]') == 1 then
- is_active = true
- else
- is_active = false
- end
- end
- return 0
- end
- local member_id = afd.get_entity_name(member_field)
+ local member_id = afd.get_entity_name('member')
if not member_id then
return -1, "Cannot find entity's name in the AFD/GSE message"
end
@@ -78,19 +64,7 @@
return -1, "Cannot find alarms in the AFD/GSE message"
end
- local cluster_ids
- if cluster_field then
- local cluster_id = afd.get_entity_name(cluster_field)
- if not cluster_id then
- return -1, "Cannot find the cluster's name in the AFD/GSE message"
- elseif not gse.cluster_exists(cluster_id) then
- -- Just ignore AFD/GSE messages which aren't part of a cluster's definition
- return 0
- end
- cluster_ids = { cluster_id }
- else
- cluster_ids = gse.find_cluster_memberships(member_id)
- end
+ local cluster_ids = gse.find_cluster_memberships(member_id)
-- update all clusters that depend on this entity
for _, cluster_id in ipairs(cluster_ids) do
@@ -100,10 +74,7 @@
end
function timer_event(ns)
- if not is_active then
- -- not running as the aggregator
- return
- elseif not first_tick then
+ if not first_tick then
first_tick = ns
return
elseif ns - first_tick <= warm_up_period then
@@ -118,14 +89,7 @@
local injected = 0
for i, cluster_name in ipairs(gse.get_ordered_clusters()) do
if last_index == nil or i > last_index then
- gse.inject_cluster_metric(
- output_message_type,
- cluster_name,
- output_metric_name,
- interval,
- source,
- activate_alerting
- )
+ gse.inject_cluster_metric(cluster_name, dimensions, activate_alerting)
last_index = i
injected = injected + 1
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
index ee4f36b..c1168df 100644
--- a/heka/files/toml/filter/afd_alarm.toml
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -2,7 +2,7 @@
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/afd.lua"
preserve_data = {{ alarm.preserve_data|default(False)|lower }}
-message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric') && ({{ salt['heka_alarming.message_matcher'](alarm, trigger) }})"
+message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric') && ({{ salt['heka_alarming.alarm_message_matcher'](alarm, trigger) }})"
module_directory = "/usr/share/lma_collector/common;/usr/share/heka/lua_modules"
ticker_interval = 10
diff --git a/heka/files/toml/filter/gse_alarm_cluster.toml b/heka/files/toml/filter/gse_alarm_cluster.toml
new file mode 100644
index 0000000..72b1923
--- /dev/null
+++ b/heka/files/toml/filter/gse_alarm_cluster.toml
@@ -0,0 +1,21 @@
+[gse_{{ alarm_cluster_name }}_filter]
+type = "SandboxFilter"
+filename = "/usr/share/lma_collector/filters/gse_cluster_filter.lua"
+preserve_data = {{ alarm_cluster.preserve_data|default(False)|lower }}
+message_matcher = "(Type == 'heka.sandbox.afd_metric' || Type == 'heka.sandbox.gse_metric') && ({{ salt['heka_alarming.alarm_cluster_message_matcher'](alarm_cluster) }})"
+module_directory = "/usr/share/lma_collector/common;/usr/share/heka/lua_modules"
+ticker_interval = 1
+
+[gse_{{ alarm_cluster_name }}_filter.config]
+topology_file = "gse_{{ alarm_cluster_name|replace('-', '_') }}_topology"
+dimensions = '{{ salt['heka_alarming.dimensions'](alarm_cluster)|json }}'
+activate_alerting = {{ alarm_cluster.alerting|default(True)|lower }}
+{%- if alarm_cluster.interval is defined %}
+interval = {{ alarm_cluster.interval }}
+{%- endif %}
+{%- if alarm_cluster.max_inject is defined %}
+max_inject = {{ alarm_cluster.max_inject }}
+{%- endif %}
+{%- if alarm_cluster.warm_up_period is defined %}
+warm_up_period = {{ alarm_cluster.warm_up_period }}
+{%- endif %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index fbd847a..51d9ff1 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -134,6 +134,116 @@
port: 4354
ticker_interval: 30
aggregator:
+ policy:
+ # A policy defining that the cluster's status depends on the member with
+ # the highest severity, typically used for a cluster of services.
+ highest_severity:
+ - status: down
+ trigger:
+ logical_operator: or
+ rules:
+ - function: count
+ arguments: [ down ]
+ relational_operator: '>'
+ threshold: 0
+ - status: critical
+ trigger:
+ logical_operator: or
+ rules:
+ - function: count
+ arguments: [ critical ]
+ relational_operator: '>'
+ threshold: 0
+ - status: warning
+ trigger:
+ logical_operator: or
+ rules:
+ - function: count
+ arguments: [ warning ]
+ relational_operator: '>'
+ threshold: 0
+ - status: okay
+ trigger:
+ logical_operator: or
+ rules:
+ - function: count
+ arguments: [ okay ]
+ relational_operator: '>'
+ threshold: 0
+ - status: unknown
+ # A policy which is typically used for clusters managed by Pacemaker
+ # with the no-quorum-policy set to 'stop'.
+ majority_of_members:
+ - status: down
+ trigger:
+ logical_operator: or
+ rules:
+ - function: percent
+ arguments: [ down ]
+ relational_operator: '>'
+ threshold: 50
+ - status: critical
+ trigger:
+ logical_operator: and
+ rules:
+ - function: percent
+ arguments: [ down, critical ]
+ relational_operator: '>'
+ threshold: 20
+ - function: percent
+ arguments: [ okay ]
+ relational_operator: '<'
+ threshold: 50
+ function: percent
+ - status: warning
+ trigger:
+ logical_operator: or
+ rules:
+ - function: percent
+ arguments: [ okay ]
+ relational_operator: '<'
+ threshold: 50
+ function: percent
+ - status: okay
+ # A policy which is typically used for stateless clusters
+ availability_of_members:
+ - status: down
+ trigger:
+ logical_operator: or
+ rules:
+ - function: count
+ arguments: [ okay ]
+ relational_operator: '=='
+ threshold: 0
+ - status: critical
+ trigger:
+ logical_operator: and
+ rules:
+ - function: count
+ arguments: [ okay ]
+ relational_operator: '=='
+ threshold: 1
+ - function: count
+ arguments: [ critical, down ]
+ relational_operator: '>'
+ threshold: 1
+ - status: warning
+ trigger:
+ logical_operator: or
+ rules:
+ - function: percent
+ arguments: [ okay ]
+ relational_operator: '<'
+ threshold: 100
+ - status: okay
+ trigger:
+ logical_operator: or
+ rules:
+ - function: percent
+ arguments: [ okay ]
+ relational_operator: '=='
+ threshold: 100
+ - status: unknown
input:
heka_metric:
engine: tcp
diff --git a/tests/lua/test_gse.lua b/tests/lua/test_gse.lua
index 98dfb89..be892f5 100644
--- a/tests/lua/test_gse.lua
+++ b/tests/lua/test_gse.lua
@@ -184,54 +184,36 @@
end
function TestGse:test_inject_cluster_metric_for_nova()
- gse.inject_cluster_metric(
- 'gse_service_cluster_metric',
- 'nova',
- 'service_cluster_status',
- 10,
- 'gse_service_cluster_plugin'
- )
+ gse.inject_cluster_metric('nova', {key = "val"}, true)
local metric = last_injected_msg
- assertEquals(metric.Type, 'gse_service_cluster_metric')
- assertEquals(metric.Fields.cluster_name, 'nova')
- assertEquals(metric.Fields.name, 'service_cluster_status')
+ assertEquals(metric.Type, 'gse_metric')
+ assertEquals(metric.Fields.member, 'nova')
+ assertEquals(metric.Fields.name, 'cluster_status')
assertEquals(metric.Fields.value, consts.OKAY)
- assertEquals(metric.Fields.interval, 10)
+ assertEquals(metric.Fields.key, 'val')
assertEquals(metric.Payload, '{"alarms":[]}')
end
function TestGse:test_inject_cluster_metric_for_glance()
- gse.inject_cluster_metric(
- 'gse_service_cluster_metric',
- 'glance',
- 'service_cluster_status',
- 10,
- 'gse_service_cluster_plugin'
- )
+ gse.inject_cluster_metric('glance', {key = "val"}, true)
local metric = last_injected_msg
- assertEquals(metric.Type, 'gse_service_cluster_metric')
- assertEquals(metric.Fields.cluster_name, 'glance')
- assertEquals(metric.Fields.name, 'service_cluster_status')
+ assertEquals(metric.Type, 'gse_metric')
+ assertEquals(metric.Fields.member, 'glance')
+ assertEquals(metric.Fields.name, 'cluster_status')
assertEquals(metric.Fields.value, consts.DOWN)
- assertEquals(metric.Fields.interval, 10)
+ assertEquals(metric.Fields.key, 'val')
assert(metric.Payload:match("glance%-registry endpoints are down"))
assert(metric.Payload:match("glance%-api endpoint is down on node%-1"))
end
function TestGse:test_inject_cluster_metric_for_heat()
- gse.inject_cluster_metric(
- 'gse_service_cluster_metric',
- 'heat',
- 'service_cluster_status',
- 10,
- 'gse_service_cluster_plugin'
- )
+ gse.inject_cluster_metric('heat', {key = "val"}, true)
local metric = last_injected_msg
- assertEquals(metric.Type, 'gse_service_cluster_metric')
- assertEquals(metric.Fields.cluster_name, 'heat')
- assertEquals(metric.Fields.name, 'service_cluster_status')
+ assertEquals(metric.Type, 'gse_metric')
+ assertEquals(metric.Fields.member, 'heat')
+ assertEquals(metric.Fields.name, 'cluster_status')
assertEquals(metric.Fields.value, consts.WARN)
- assertEquals(metric.Fields.interval, 10)
+ assertEquals(metric.Fields.key, 'val')
assert(metric.Payload:match("5xx errors detected"))
assert(metric.Payload:match("1 RabbitMQ node out of 3 is down"))
end