Merge pull request #30 from simonpasquier/add-tz-support

Add generic timezone support to decoders
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
new file mode 100644
index 0000000..f408c9b
--- /dev/null
+++ b/_modules/heka_alarming.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+
+import re
+
+_valid_dimension_re = re.compile(r'^[a-z0-9_/-]+$')
+_disallowed_dimensions = ('name', 'value', 'hostname', 'member',
+                          'no_alerting', 'tag_fields')
+
+
+def message_matcher(alarm, triggers):
+    """
+    Return an Heka message matcher expression for a given alarm and a
+    dict of triggers.
+
+    For example the function may return this:
+
+        Fields[name] == 'cpu_idle' || Fields[name] = 'cpu_wait'
+    """
+    matchers = set()
+    for trigger_name in alarm.get('triggers', []):
+        trigger = triggers.get(trigger_name)
+        if trigger and trigger.get('enabled', True):
+            for rule in trigger.get('rules', []):
+                matcher = "Fields[name] == '{}'".format(rule['metric'])
+                matchers.add(matcher)
+    return ' || '.join(matchers)
+
+
+def dimensions(alarm):
+    """
+    Return a dict alarm dimensions. Each dimension is validated, and an
+    Exception is raised if a dimension is invalid.
+
+    Valid characters are a-z, 0-9, _, - and /.
+    """
+    dimensions = alarm.get('dimension', {})
+    for name, value in dimensions.items():
+        if name in _disallowed_dimensions:
+            raise Exception(
+                '{} is not allowed as a dimension name'.format(name))
+        if not _valid_dimension_re.match(name):
+            raise Exception(
+                'Dimension name {} includes disallowed chars'.format(name))
+        if not _valid_dimension_re.match(value):
+            raise Exception(
+                'Dimension value {} includes disallowed chars'.format(value))
+    return dimensions
diff --git a/heka/_service.sls b/heka/_service.sls
index 9e649b4..e873f2e 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -1,3 +1,4 @@
+{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file ignore missing %}{% endmacro %}
 
 {%- set server = salt['pillar.get']('heka:'+service_name) %}
 
@@ -76,34 +77,42 @@
   'log_collector': {
     'decoder': {},
     'input': {},
+    'trigger': {},
+    'alarm': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
-    'output': {}
+    'output': {},
   },
   'metric_collector': {
     'decoder': {},
     'input': {},
+    'trigger': {},
+    'alarm': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
-    'output': {}
+    'output': {},
   },
   'remote_collector': {
     'decoder': {},
     'input': {},
+    'trigger': {},
+    'alarm': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
-    'output': {}
+    'output': {},
   },
   'aggregator': {
     'decoder': {},
     'input': {},
+    'trigger': {},
+    'alarm': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
-    'output': {}
+    'output': {},
   }
 } %}
 
@@ -116,8 +125,6 @@
 {%- for service_name, service in pillar.iteritems() %}
 {%- if service.get('_support', {}).get('heka', {}).get('enabled', False) %}
 
-{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file ignore missing %}{% endmacro %}
-
 {%- set grains_fragment_file = service_name+'/meta/heka.yml' %}
 {%- set grains_yaml = load_grains_file(grains_fragment_file)|load_yaml %}
 {%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=grains_yaml) %}
@@ -128,15 +135,19 @@
 {%- endif %}
 
 
-{# Loading the other services' support metadata from salt-mine #}
-
 {%- if service_name in ['remote_collector', 'aggregator'] %}
 
+{# Load the support metadata from heka/meta/heka.yml #}
+
+{%- set grains_fragment_file = 'heka/meta/heka.yml' %}
+{%- set grains_yaml = load_grains_file(grains_fragment_file)|load_yaml %}
+{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=grains_yaml) %}
+
+{# Load the other services' support metadata from salt-mine #}
+
 {%- for node_name, node_grains in salt['mine.get']('*', 'grains.items').iteritems() %}
 {%- if node_grains.heka is defined %}
-
-{%- do service_grains.update(node_grains.heka) %}
-
+{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=node_grains.heka) %}
 {%- endif %}
 {%- endfor %}
 
@@ -230,6 +241,44 @@
 
 {%- endfor %}
 
+{%- for alarm_name, alarm in service_metadata.get('alarm', {}).iteritems() %}
+
+/etc/{{ service_name }}/filter_afd_{{ alarm_name }}.toml:
+  file.managed:
+  - source: salt://heka/files/toml/filter/afd_alarm.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      alarm_name: {{ alarm_name }}
+      alarm: {{ alarm|yaml }}
+      trigger: {{ service_metadata.get('trigger', {})|yaml }}
+
+/usr/share/lma_collector/common/lma_{{ alarm_name|replace('-', '_') }}.lua:
+  file.managed:
+  - source: salt://heka/files/lma_alarm.lua
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      alarm_name: {{ alarm_name }}
+      alarm: {{ alarm|yaml }}
+      trigger: {{ service_metadata.get('trigger', {})|yaml }}
+
+{%- endfor %}
+
 {%- for filter_name, filter in service_metadata.get('filter', {}).iteritems() %}
 
 /etc/{{ service_name }}/filter_{{ filter_name }}.toml:
diff --git a/heka/files/lma_alarm.lua b/heka/files/lma_alarm.lua
new file mode 100644
index 0000000..fb5c38e
--- /dev/null
+++ b/heka/files/lma_alarm.lua
@@ -0,0 +1,47 @@
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+local alarms = {
+{%- for _trigger_name in alarm.triggers %}
+{%- set _trigger = trigger.get(_trigger_name) %}
+{%- if _trigger and _trigger.get('enabled', True) %}
+  {
+    ['name'] = '{{ _trigger_name}}',
+    ['description'] = '{{ _trigger.get("description", "").replace("'", "\\'") }}',
+    ['severity'] = '{{ _trigger.severity }}',
+    {%- if _trigger.no_data_policy is defined %}
+    ['no_data_policy'] = '{{ _trigger.no_data_policy }}',
+    {%- endif %}
+    ['trigger'] = {
+      ['logical_operator'] = '{{ _trigger.get("logical_operator", "or") }}',
+      ['rules'] = {
+        {%- for _rule in _trigger.get('rules', []) %}
+        {
+          ['metric'] = '{{ _rule.metric }}',
+          ['fields'] = {
+            {%- for _field_name, _field_value in _rule.get('field', {}).iteritems() %}
+            ['{{ _field_name }}'] = '{{ _field_value }}',
+            {%- endfor %}
+          },
+          ['relational_operator'] = '{{ _rule.relational_operator }}',
+          ['threshold'] = '{{ _rule.threshold }}',
+          ['window'] = '{{ _rule.window }}',
+          ['periods'] = '{{ _rule.get('periods', 0) }}',
+          ['function'] = '{{ _rule.function }}',
+          {%- if _rule.group_by is defined %}
+          ['group_by'] = {
+            {%- for _group_by in rule.group_by %}
+            {{ _group_by }},
+            {%- endfor %}
+          },
+          {%- endif %}
+        },
+        {%- endfor %}
+      },
+    },
+  },
+{%- endif %}
+{%- endfor %}
+}
+
+return alarms
diff --git a/heka/files/lua/common/afd.lua b/heka/files/lua/common/afd.lua
index 9e864df..2d47040 100644
--- a/heka/files/lua/common/afd.lua
+++ b/heka/files/lua/common/afd.lua
@@ -129,8 +129,7 @@
 end
 
 -- inject an AFD event into the Heka pipeline
-function inject_afd_metric(msg_type, msg_tag_name, msg_tag_value, metric_name,
-                           value, hostname, interval, source, to_alerting)
+function inject_afd_metric(value, hostname, afd_name, dimensions, to_alerting)
     local payload
 
     if #alarms > 0 then
@@ -150,32 +149,27 @@
     end
 
     local msg = {
-        Type = msg_type,
+        Type = 'afd_metric',
         Payload = payload,
         Fields = {
-            name=metric_name,
-            value=value,
-            hostname=hostname,
-            interval=interval,
-            source=source,
-            tag_fields={msg_tag_name, 'source', 'hostname'},
+            name = 'status',
+            value = value,
+            hostname = hostname,
+            member = afd_name,
             no_alerting = no_alerting,
+            tag_fields = {'hostname', 'member'}
         }
     }
-    msg.Fields[msg_tag_name] = msg_tag_value,
+
+    for name, value in pairs(dimensions) do
+        table.insert(msg.Fields.tag_fields, name)
+        msg.Fields[name] = value
+    end
+
     lma.inject_tags(msg)
     lma.safe_inject_message(msg)
 end
 
--- inject an AFD service event into the Heka pipeline
-function inject_afd_service_metric(service, value, hostname, interval, source)
-    inject_afd_metric('afd_service_metric',
-                      'service',
-                      service,
-                      'service_status',
-                      value, hostname, interval, source)
-end
-
 MATCH = 1
 NO_MATCH = 2
 NO_DATA = 3
diff --git a/heka/files/lua/filters/afd.lua b/heka/files/lua/filters/afd.lua
index bf10322..ff622ed 100644
--- a/heka/files/lua/filters/afd.lua
+++ b/heka/files/lua/filters/afd.lua
@@ -12,43 +12,28 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 
+local table = require 'table'
 local string = require 'string'
+local cjson = require 'cjson'
 
 local utils = require 'lma_utils'
 local afd = require 'afd'
 
--- node or service
-local afd_type = read_config('afd_type') or error('afd_type must be specified!')
-local activate_alerting = read_config('activate_alerting') or true
-local msg_type
-local msg_field_name
-local afd_entity
-
-if afd_type == 'node' then
-    msg_type = 'afd_node_metric'
-    msg_field_name = 'node_status'
-    afd_entity = 'node_role'
-elseif afd_type == 'service' then
-    msg_type = 'afd_service_metric'
-    msg_field_name = 'service_status'
-    afd_entity = 'service'
-else
-    error('invalid afd_type value')
-end
-
--- ie: controller for node AFD / rabbitmq for service AFD
-local afd_entity_value = read_config('afd_cluster_name') or error('afd_cluster_name must be specified!')
-
--- ie: cpu for node AFD / queue for service AFD
-local msg_field_source = read_config('afd_logical_name') or error('afd_logical_name must be specified!')
-
-local hostname = read_config('hostname') or error('hostname must be specified')
-
 local afd_file = read_config('afd_file') or error('afd_file must be specified')
+local afd_name = read_config('afd_name') or error('afd_name must be specified')
+local hostname = read_config('hostname') or error('hostname must be specified')
+local dimensions_json = read_config('dimensions') or ''
+local activate_alerting = read_config('activate_alerting') or true
+
 local all_alarms = require(afd_file)
 local A = require 'afd_alarms'
 A.load_alarms(all_alarms)
 
+local ok, dimensions = pcall(cjson.decode, dimensions_json)
+if not ok then
+    error(string.format('dimensions JSON is invalid (%s)', dimensions_json))
+end
+
 function process_message()
 
     local metric_name = read_message('Fields[name]')
@@ -90,8 +75,8 @@
                     alarm.alert.message)
             end
 
-            afd.inject_afd_metric(msg_type, afd_entity, afd_entity_value, msg_field_name,
-                state, hostname, interval, msg_field_source, activate_alerting)
+            afd.inject_afd_metric(state, hostname, afd_name, dimensions,
+                activate_alerting)
         end
     else
         A.set_start_time(ns)
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
new file mode 100644
index 0000000..ee4f36b
--- /dev/null
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -0,0 +1,14 @@
+[afd_{{ alarm_name }}_filter]
+type = "SandboxFilter"
+filename = "/usr/share/lma_collector/filters/afd.lua"
+preserve_data = {{ alarm.preserve_data|default(False)|lower }}
+message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric') && ({{ salt['heka_alarming.message_matcher'](alarm, trigger) }})"
+module_directory = "/usr/share/lma_collector/common;/usr/share/heka/lua_modules"
+ticker_interval = 10
+
+[afd_{{ alarm_name }}_filter.config]
+afd_file = "lma_{{ alarm_name|replace('-', '_') }}"
+afd_name = "{{ alarm_name }}"
+hostname = "{{ grains.host }}"
+dimensions = '{{ salt['heka_alarming.dimensions'](alarm)|json }}'
+activate_alerting = {{ alarm.alerting|default(True)|lower }}
diff --git a/heka/files/toml/output/aggregator.toml b/heka/files/toml/output/aggregator.toml
new file mode 100644
index 0000000..1eedea7
--- /dev/null
+++ b/heka/files/toml/output/aggregator.toml
@@ -0,0 +1,7 @@
+{%- extends "heka/files/toml/output/tcp.toml" %}
+{%- block address -%}
+address = "{{ output.host }}:5565"
+{%- endblock %}
+{%- block message_matcher -%}
+message_matcher = "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
+{%- endblock %}
diff --git a/heka/files/toml/output/tcp.toml b/heka/files/toml/output/tcp.toml
index dee7fa0..c4115df 100644
--- a/heka/files/toml/output/tcp.toml
+++ b/heka/files/toml/output/tcp.toml
@@ -1,8 +1,12 @@
 [{{ output_name }}_output]
 type="TcpOutput"
+{% block address %}
 address = "{{ output.host }}:{{ output.port }}"
+{% endblock %}
 encoder = "ProtobufEncoder"
+{% block message_matcher %}
 message_matcher = "{{ output.message_matcher }}"
+{% endblock %}
 use_buffering = true
 
 [{{ output_name }}_output.buffering]
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 7fee988..a6e8568 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -108,9 +108,26 @@
       port: 4354
       ticker_interval: 30 
 aggregator:
-  decoder: {}
-  input: {}
-  filter: {}
-  output: {}
-  splitter: {}
-  encoder: {}
+  input:
+    heka_metric:
+      engine: tcp
+      address: 0.0.0.0
+      port: 5565
+      decoder: ProtobufDecoder
+      splitter: HekaFramingSplitter
+  filter:
+    influxdb_accumulator:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      preserve_data: false
+      message_matcher: "Type == 'heka.sandbox.gse_metric'"
+      ticker_interval: 1
+      config:
+        tag_fields: "deployment_id environment_label tenant_id user_id"
+        time_precision: "{{ server.influxdb_time_precision }}"
+  encoder:
+    influxdb:
+      engine: payload
+      append_newlines: false
+      prefix_ts: false