Add alarming support
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
new file mode 100644
index 0000000..f408c9b
--- /dev/null
+++ b/_modules/heka_alarming.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+
+import re
+
+_valid_dimension_re = re.compile(r'^[a-z0-9_/-]+$')
+_disallowed_dimensions = ('name', 'value', 'hostname', 'member',
+                          'no_alerting', 'tag_fields')
+
+
+def message_matcher(alarm, triggers):
+    """
+    Return an Heka message matcher expression for a given alarm and a
+    dict of triggers.
+
+    For example the function may return this:
+
+        Fields[name] == 'cpu_idle' || Fields[name] = 'cpu_wait'
+    """
+    matchers = set()
+    for trigger_name in alarm.get('triggers', []):
+        trigger = triggers.get(trigger_name)
+        if trigger and trigger.get('enabled', True):
+            for rule in trigger.get('rules', []):
+                matcher = "Fields[name] == '{}'".format(rule['metric'])
+                matchers.add(matcher)
+    return ' || '.join(matchers)
+
+
+def dimensions(alarm):
+    """
+    Return a dict alarm dimensions. Each dimension is validated, and an
+    Exception is raised if a dimension is invalid.
+
+    Valid characters are a-z, 0-9, _, - and /.
+    """
+    dimensions = alarm.get('dimension', {})
+    for name, value in dimensions.items():
+        if name in _disallowed_dimensions:
+            raise Exception(
+                '{} is not allowed as a dimension name'.format(name))
+        if not _valid_dimension_re.match(name):
+            raise Exception(
+                'Dimension name {} includes disallowed chars'.format(name))
+        if not _valid_dimension_re.match(value):
+            raise Exception(
+                'Dimension value {} includes disallowed chars'.format(value))
+    return dimensions
diff --git a/heka/_service.sls b/heka/_service.sls
index 9e649b4..63516c6 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -76,34 +76,42 @@
   'log_collector': {
     'decoder': {},
     'input': {},
+    'trigger': {},
+    'alarm': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
-    'output': {}
+    'output': {},
   },
   'metric_collector': {
     'decoder': {},
     'input': {},
+    'trigger': {},
+    'alarm': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
-    'output': {}
+    'output': {},
   },
   'remote_collector': {
     'decoder': {},
     'input': {},
+    'trigger': {},
+    'alarm': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
-    'output': {}
+    'output': {},
   },
   'aggregator': {
     'decoder': {},
     'input': {},
+    'trigger': {},
+    'alarm': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
-    'output': {}
+    'output': {},
   }
 } %}
 
@@ -230,6 +238,44 @@
 
 {%- endfor %}
 
+{%- for alarm_name, alarm in service_metadata.get('alarm', {}).iteritems() %}
+
+/etc/{{ service_name }}/filter_afd_{{ alarm_name }}.toml:
+  file.managed:
+  - source: salt://heka/files/toml/filter/afd_alarm.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      alarm_name: {{ alarm_name }}
+      alarm: {{ alarm|yaml }}
+      trigger: {{ service_metadata.get('trigger', {})|yaml }}
+
+/usr/share/lma_collector/common/lma_{{ alarm_name|replace('-', '_') }}.lua:
+  file.managed:
+  - source: salt://heka/files/lma_alarm.lua
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      alarm_name: {{ alarm_name }}
+      alarm: {{ alarm|yaml }}
+      trigger: {{ service_metadata.get('trigger', {})|yaml }}
+
+{%- endfor %}
+
 {%- for filter_name, filter in service_metadata.get('filter', {}).iteritems() %}
 
 /etc/{{ service_name }}/filter_{{ filter_name }}.toml:
diff --git a/heka/files/lma_alarm.lua b/heka/files/lma_alarm.lua
new file mode 100644
index 0000000..fb5c38e
--- /dev/null
+++ b/heka/files/lma_alarm.lua
@@ -0,0 +1,47 @@
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+local alarms = {
+{%- for _trigger_name in alarm.triggers %}
+{%- set _trigger = trigger.get(_trigger_name) %}
+{%- if _trigger and _trigger.get('enabled', True) %}
+  {
+    ['name'] = '{{ _trigger_name}}',
+    ['description'] = '{{ _trigger.get("description", "").replace("'", "\\'") }}',
+    ['severity'] = '{{ _trigger.severity }}',
+    {%- if _trigger.no_data_policy is defined %}
+    ['no_data_policy'] = '{{ _trigger.no_data_policy }}',
+    {%- endif %}
+    ['trigger'] = {
+      ['logical_operator'] = '{{ _trigger.get("logical_operator", "or") }}',
+      ['rules'] = {
+        {%- for _rule in _trigger.get('rules', []) %}
+        {
+          ['metric'] = '{{ _rule.metric }}',
+          ['fields'] = {
+            {%- for _field_name, _field_value in _rule.get('field', {}).iteritems() %}
+            ['{{ _field_name }}'] = '{{ _field_value }}',
+            {%- endfor %}
+          },
+          ['relational_operator'] = '{{ _rule.relational_operator }}',
+          ['threshold'] = '{{ _rule.threshold }}',
+          ['window'] = '{{ _rule.window }}',
+          ['periods'] = '{{ _rule.get('periods', 0) }}',
+          ['function'] = '{{ _rule.function }}',
+          {%- if _rule.group_by is defined %}
+          ['group_by'] = {
+            {%- for _group_by in rule.group_by %}
+            {{ _group_by }},
+            {%- endfor %}
+          },
+          {%- endif %}
+        },
+        {%- endfor %}
+      },
+    },
+  },
+{%- endif %}
+{%- endfor %}
+}
+
+return alarms
diff --git a/heka/files/lua/common/afd.lua b/heka/files/lua/common/afd.lua
index 9e864df..2d47040 100644
--- a/heka/files/lua/common/afd.lua
+++ b/heka/files/lua/common/afd.lua
@@ -129,8 +129,7 @@
 end
 
 -- inject an AFD event into the Heka pipeline
-function inject_afd_metric(msg_type, msg_tag_name, msg_tag_value, metric_name,
-                           value, hostname, interval, source, to_alerting)
+function inject_afd_metric(value, hostname, afd_name, dimensions, to_alerting)
     local payload
 
     if #alarms > 0 then
@@ -150,32 +149,27 @@
     end
 
     local msg = {
-        Type = msg_type,
+        Type = 'afd_metric',
         Payload = payload,
         Fields = {
-            name=metric_name,
-            value=value,
-            hostname=hostname,
-            interval=interval,
-            source=source,
-            tag_fields={msg_tag_name, 'source', 'hostname'},
+            name = 'status',
+            value = value,
+            hostname = hostname,
+            member = afd_name,
             no_alerting = no_alerting,
+            tag_fields = {'hostname', 'member'}
         }
     }
-    msg.Fields[msg_tag_name] = msg_tag_value,
+
+    for name, value in pairs(dimensions) do
+        table.insert(msg.Fields.tag_fields, name)
+        msg.Fields[name] = value
+    end
+
     lma.inject_tags(msg)
     lma.safe_inject_message(msg)
 end
 
--- inject an AFD service event into the Heka pipeline
-function inject_afd_service_metric(service, value, hostname, interval, source)
-    inject_afd_metric('afd_service_metric',
-                      'service',
-                      service,
-                      'service_status',
-                      value, hostname, interval, source)
-end
-
 MATCH = 1
 NO_MATCH = 2
 NO_DATA = 3
diff --git a/heka/files/lua/filters/afd.lua b/heka/files/lua/filters/afd.lua
index bf10322..ff622ed 100644
--- a/heka/files/lua/filters/afd.lua
+++ b/heka/files/lua/filters/afd.lua
@@ -12,43 +12,28 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 
+local table = require 'table'
 local string = require 'string'
+local cjson = require 'cjson'
 
 local utils = require 'lma_utils'
 local afd = require 'afd'
 
--- node or service
-local afd_type = read_config('afd_type') or error('afd_type must be specified!')
-local activate_alerting = read_config('activate_alerting') or true
-local msg_type
-local msg_field_name
-local afd_entity
-
-if afd_type == 'node' then
-    msg_type = 'afd_node_metric'
-    msg_field_name = 'node_status'
-    afd_entity = 'node_role'
-elseif afd_type == 'service' then
-    msg_type = 'afd_service_metric'
-    msg_field_name = 'service_status'
-    afd_entity = 'service'
-else
-    error('invalid afd_type value')
-end
-
--- ie: controller for node AFD / rabbitmq for service AFD
-local afd_entity_value = read_config('afd_cluster_name') or error('afd_cluster_name must be specified!')
-
--- ie: cpu for node AFD / queue for service AFD
-local msg_field_source = read_config('afd_logical_name') or error('afd_logical_name must be specified!')
-
-local hostname = read_config('hostname') or error('hostname must be specified')
-
 local afd_file = read_config('afd_file') or error('afd_file must be specified')
+local afd_name = read_config('afd_name') or error('afd_name must be specified')
+local hostname = read_config('hostname') or error('hostname must be specified')
+local dimensions_json = read_config('dimensions') or ''
+local activate_alerting = read_config('activate_alerting') or true
+
 local all_alarms = require(afd_file)
 local A = require 'afd_alarms'
 A.load_alarms(all_alarms)
 
+local ok, dimensions = pcall(cjson.decode, dimensions_json)
+if not ok then
+    error(string.format('dimensions JSON is invalid (%s)', dimensions_json))
+end
+
 function process_message()
 
     local metric_name = read_message('Fields[name]')
@@ -90,8 +75,8 @@
                     alarm.alert.message)
             end
 
-            afd.inject_afd_metric(msg_type, afd_entity, afd_entity_value, msg_field_name,
-                state, hostname, interval, msg_field_source, activate_alerting)
+            afd.inject_afd_metric(state, hostname, afd_name, dimensions,
+                activate_alerting)
         end
     else
         A.set_start_time(ns)
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
new file mode 100644
index 0000000..ee4f36b
--- /dev/null
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -0,0 +1,14 @@
+[afd_{{ alarm_name }}_filter]
+type = "SandboxFilter"
+filename = "/usr/share/lma_collector/filters/afd.lua"
+preserve_data = {{ alarm.preserve_data|default(False)|lower }}
+message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric') && ({{ salt['heka_alarming.message_matcher'](alarm, trigger) }})"
+module_directory = "/usr/share/lma_collector/common;/usr/share/heka/lua_modules"
+ticker_interval = 10
+
+[afd_{{ alarm_name }}_filter.config]
+afd_file = "lma_{{ alarm_name|replace('-', '_') }}"
+afd_name = "{{ alarm_name }}"
+hostname = "{{ grains.host }}"
+dimensions = '{{ salt['heka_alarming.dimensions'](alarm)|json }}'
+activate_alerting = {{ alarm.alerting|default(True)|lower }}