Add alarming support
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
new file mode 100644
index 0000000..f408c9b
--- /dev/null
+++ b/_modules/heka_alarming.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+
+import re
+
+_valid_dimension_re = re.compile(r'^[a-z0-9_/-]+$')
+_disallowed_dimensions = ('name', 'value', 'hostname', 'member',
+ 'no_alerting', 'tag_fields')
+
+
+def message_matcher(alarm, triggers):
+ """
+ Return an Heka message matcher expression for a given alarm and a
+ dict of triggers.
+
+ For example the function may return this:
+
+ Fields[name] == 'cpu_idle' || Fields[name] = 'cpu_wait'
+ """
+ matchers = set()
+ for trigger_name in alarm.get('triggers', []):
+ trigger = triggers.get(trigger_name)
+ if trigger and trigger.get('enabled', True):
+ for rule in trigger.get('rules', []):
+ matcher = "Fields[name] == '{}'".format(rule['metric'])
+ matchers.add(matcher)
+ return ' || '.join(matchers)
+
+
+def dimensions(alarm):
+ """
+ Return a dict alarm dimensions. Each dimension is validated, and an
+ Exception is raised if a dimension is invalid.
+
+ Valid characters are a-z, 0-9, _, - and /.
+ """
+ dimensions = alarm.get('dimension', {})
+ for name, value in dimensions.items():
+ if name in _disallowed_dimensions:
+ raise Exception(
+ '{} is not allowed as a dimension name'.format(name))
+ if not _valid_dimension_re.match(name):
+ raise Exception(
+ 'Dimension name {} includes disallowed chars'.format(name))
+ if not _valid_dimension_re.match(value):
+ raise Exception(
+ 'Dimension value {} includes disallowed chars'.format(value))
+ return dimensions
diff --git a/heka/_service.sls b/heka/_service.sls
index 9e649b4..63516c6 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -76,34 +76,42 @@
'log_collector': {
'decoder': {},
'input': {},
+ 'trigger': {},
+ 'alarm': {},
'filter': {},
'splitter': {},
'encoder': {},
- 'output': {}
+ 'output': {},
},
'metric_collector': {
'decoder': {},
'input': {},
+ 'trigger': {},
+ 'alarm': {},
'filter': {},
'splitter': {},
'encoder': {},
- 'output': {}
+ 'output': {},
},
'remote_collector': {
'decoder': {},
'input': {},
+ 'trigger': {},
+ 'alarm': {},
'filter': {},
'splitter': {},
'encoder': {},
- 'output': {}
+ 'output': {},
},
'aggregator': {
'decoder': {},
'input': {},
+ 'trigger': {},
+ 'alarm': {},
'filter': {},
'splitter': {},
'encoder': {},
- 'output': {}
+ 'output': {},
}
} %}
@@ -230,6 +238,44 @@
{%- endfor %}
+{%- for alarm_name, alarm in service_metadata.get('alarm', {}).iteritems() %}
+
+/etc/{{ service_name }}/filter_afd_{{ alarm_name }}.toml:
+ file.managed:
+ - source: salt://heka/files/toml/filter/afd_alarm.toml
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ alarm_name: {{ alarm_name }}
+ alarm: {{ alarm|yaml }}
+ trigger: {{ service_metadata.get('trigger', {})|yaml }}
+
+/usr/share/lma_collector/common/lma_{{ alarm_name|replace('-', '_') }}.lua:
+ file.managed:
+ - source: salt://heka/files/lma_alarm.lua
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ alarm_name: {{ alarm_name }}
+ alarm: {{ alarm|yaml }}
+ trigger: {{ service_metadata.get('trigger', {})|yaml }}
+
+{%- endfor %}
+
{%- for filter_name, filter in service_metadata.get('filter', {}).iteritems() %}
/etc/{{ service_name }}/filter_{{ filter_name }}.toml:
diff --git a/heka/files/lma_alarm.lua b/heka/files/lma_alarm.lua
new file mode 100644
index 0000000..fb5c38e
--- /dev/null
+++ b/heka/files/lma_alarm.lua
@@ -0,0 +1,47 @@
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+local alarms = {
+{%- for _trigger_name in alarm.triggers %}
+{%- set _trigger = trigger.get(_trigger_name) %}
+{%- if _trigger and _trigger.get('enabled', True) %}
+ {
+ ['name'] = '{{ _trigger_name}}',
+ ['description'] = '{{ _trigger.get("description", "").replace("'", "\\'") }}',
+ ['severity'] = '{{ _trigger.severity }}',
+ {%- if _trigger.no_data_policy is defined %}
+ ['no_data_policy'] = '{{ _trigger.no_data_policy }}',
+ {%- endif %}
+ ['trigger'] = {
+ ['logical_operator'] = '{{ _trigger.get("logical_operator", "or") }}',
+ ['rules'] = {
+ {%- for _rule in _trigger.get('rules', []) %}
+ {
+ ['metric'] = '{{ _rule.metric }}',
+ ['fields'] = {
+ {%- for _field_name, _field_value in _rule.get('field', {}).iteritems() %}
+ ['{{ _field_name }}'] = '{{ _field_value }}',
+ {%- endfor %}
+ },
+ ['relational_operator'] = '{{ _rule.relational_operator }}',
+ ['threshold'] = '{{ _rule.threshold }}',
+ ['window'] = '{{ _rule.window }}',
+ ['periods'] = '{{ _rule.get('periods', 0) }}',
+ ['function'] = '{{ _rule.function }}',
+ {%- if _rule.group_by is defined %}
+ ['group_by'] = {
+ {%- for _group_by in rule.group_by %}
+ {{ _group_by }},
+ {%- endfor %}
+ },
+ {%- endif %}
+ },
+ {%- endfor %}
+ },
+ },
+ },
+{%- endif %}
+{%- endfor %}
+}
+
+return alarms
diff --git a/heka/files/lua/common/afd.lua b/heka/files/lua/common/afd.lua
index 9e864df..2d47040 100644
--- a/heka/files/lua/common/afd.lua
+++ b/heka/files/lua/common/afd.lua
@@ -129,8 +129,7 @@
end
-- inject an AFD event into the Heka pipeline
-function inject_afd_metric(msg_type, msg_tag_name, msg_tag_value, metric_name,
- value, hostname, interval, source, to_alerting)
+function inject_afd_metric(value, hostname, afd_name, dimensions, to_alerting)
local payload
if #alarms > 0 then
@@ -150,32 +149,27 @@
end
local msg = {
- Type = msg_type,
+ Type = 'afd_metric',
Payload = payload,
Fields = {
- name=metric_name,
- value=value,
- hostname=hostname,
- interval=interval,
- source=source,
- tag_fields={msg_tag_name, 'source', 'hostname'},
+ name = 'status',
+ value = value,
+ hostname = hostname,
+ member = afd_name,
no_alerting = no_alerting,
+ tag_fields = {'hostname', 'member'}
}
}
- msg.Fields[msg_tag_name] = msg_tag_value,
+
+ for name, value in pairs(dimensions) do
+ table.insert(msg.Fields.tag_fields, name)
+ msg.Fields[name] = value
+ end
+
lma.inject_tags(msg)
lma.safe_inject_message(msg)
end
--- inject an AFD service event into the Heka pipeline
-function inject_afd_service_metric(service, value, hostname, interval, source)
- inject_afd_metric('afd_service_metric',
- 'service',
- service,
- 'service_status',
- value, hostname, interval, source)
-end
-
MATCH = 1
NO_MATCH = 2
NO_DATA = 3
diff --git a/heka/files/lua/filters/afd.lua b/heka/files/lua/filters/afd.lua
index bf10322..ff622ed 100644
--- a/heka/files/lua/filters/afd.lua
+++ b/heka/files/lua/filters/afd.lua
@@ -12,43 +12,28 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
+local table = require 'table'
local string = require 'string'
+local cjson = require 'cjson'
local utils = require 'lma_utils'
local afd = require 'afd'
--- node or service
-local afd_type = read_config('afd_type') or error('afd_type must be specified!')
-local activate_alerting = read_config('activate_alerting') or true
-local msg_type
-local msg_field_name
-local afd_entity
-
-if afd_type == 'node' then
- msg_type = 'afd_node_metric'
- msg_field_name = 'node_status'
- afd_entity = 'node_role'
-elseif afd_type == 'service' then
- msg_type = 'afd_service_metric'
- msg_field_name = 'service_status'
- afd_entity = 'service'
-else
- error('invalid afd_type value')
-end
-
--- ie: controller for node AFD / rabbitmq for service AFD
-local afd_entity_value = read_config('afd_cluster_name') or error('afd_cluster_name must be specified!')
-
--- ie: cpu for node AFD / queue for service AFD
-local msg_field_source = read_config('afd_logical_name') or error('afd_logical_name must be specified!')
-
-local hostname = read_config('hostname') or error('hostname must be specified')
-
local afd_file = read_config('afd_file') or error('afd_file must be specified')
+local afd_name = read_config('afd_name') or error('afd_name must be specified')
+local hostname = read_config('hostname') or error('hostname must be specified')
+local dimensions_json = read_config('dimensions') or ''
+local activate_alerting = read_config('activate_alerting') or true
+
local all_alarms = require(afd_file)
local A = require 'afd_alarms'
A.load_alarms(all_alarms)
+local ok, dimensions = pcall(cjson.decode, dimensions_json)
+if not ok then
+ error(string.format('dimensions JSON is invalid (%s)', dimensions_json))
+end
+
function process_message()
local metric_name = read_message('Fields[name]')
@@ -90,8 +75,8 @@
alarm.alert.message)
end
- afd.inject_afd_metric(msg_type, afd_entity, afd_entity_value, msg_field_name,
- state, hostname, interval, msg_field_source, activate_alerting)
+ afd.inject_afd_metric(state, hostname, afd_name, dimensions,
+ activate_alerting)
end
else
A.set_start_time(ns)
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
new file mode 100644
index 0000000..ee4f36b
--- /dev/null
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -0,0 +1,14 @@
+[afd_{{ alarm_name }}_filter]
+type = "SandboxFilter"
+filename = "/usr/share/lma_collector/filters/afd.lua"
+preserve_data = {{ alarm.preserve_data|default(False)|lower }}
+message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric') && ({{ salt['heka_alarming.message_matcher'](alarm, trigger) }})"
+module_directory = "/usr/share/lma_collector/common;/usr/share/heka/lua_modules"
+ticker_interval = 10
+
+[afd_{{ alarm_name }}_filter.config]
+afd_file = "lma_{{ alarm_name|replace('-', '_') }}"
+afd_name = "{{ alarm_name }}"
+hostname = "{{ grains.host }}"
+dimensions = '{{ salt['heka_alarming.dimensions'](alarm)|json }}'
+activate_alerting = {{ alarm.alerting|default(True)|lower }}