Add watchdog for Heka services

This patch adds a new 'heka.sandbox.watchdog' type of Heka message that
is used by Sensu to check the availability of a service. In the current
implementation the message is issued by a filter for all Heka services
and the check is sent locally to the Sensu client.

Change-Id: I6b3c9d808dcec7d8c15c442390dfbda2032f38ce
diff --git a/heka/files/lua/encoders/status_sensu.lua b/heka/files/lua/encoders/status_sensu.lua
index 2901dcf..62979ea 100644
--- a/heka/files/lua/encoders/status_sensu.lua
+++ b/heka/files/lua/encoders/status_sensu.lua
@@ -23,6 +23,8 @@
     source_dimension_field = string.format('Fields[%s]', read_config('sensu_source_dimension_key'))
 end
 
+local sensu_ttl = (read_config('sensu_ttl') + 0) or 0
+
 -- mapping GSE statuses to Sensu states
 local sensu_state_map = {
     [consts.OKAY]=0,
@@ -34,36 +36,46 @@
 
 function process_message()
 
-    local data = {
-	source = nil,
-	name = nil,
-	status = nil,
-	output = nil,
-    }
-
-    local service_name = read_message('Fields[member]')
-    local status = afd.get_status()
-    local alarms = afd.alarms_for_human(afd.extract_alarms())
-    local msgtype = read_message("Type")
-
-    if not service_name or not sensu_state_map[status] or not alarms or not msgtype then
-	return -1
-    end
-
+    local data = {}
     local source
-    if msgtype == "heka.sandbox.gse_metric" then
-        if source_dimension_field then
-            source = read_message(source_dimension_field) or "Unknown Source" 
-        else
-            source = "Unknown source"
-        end
-    elseif msgtype == "heka.sandbox.afd_metric" then
+    local service_name
+    local status = 0
+    local alarms = {}
+    local msgtype = read_message('Type')
+
+    if msgtype == "heka.sandbox.watchdog" then
+        service_name = "watchdog_" .. (read_message('Payload') or 'unknown')
         source = read_message('Fields[hostname]') or read_message('Hostname')
     else
-        -- Should not happen since we track only AFD and GSE plugins.
-        return -1    
+        service_name = read_message('Fields[member]')
+        if not service_name then
+            return -1, "Service name is missing in Fields[member]"
+        end
+
+        status = afd.get_status()
+        if not sensu_state_map[status] then
+            return -1, "Status <" .. status .. "> is not mapping any Sensu state"
+        end
+
+        alarms = afd.alarms_for_human(afd.extract_alarms())
+
+        if msgtype == "heka.sandbox.gse_metric" then
+            if source_dimension_field then
+                source = read_message(source_dimension_field) or "Unknown source " .. source_dimension_field
+            else
+                source = "Unknown source"
+            end
+        elseif msgtype == "heka.sandbox.afd_metric" then
+            source = read_message('Fields[hostname]') or read_message('Hostname')
+        else
+            -- Should not happen since we track only watchdog, AFD and GSE plugins.
+            return -1, "message type <" .. msgtype .. "> is not tracked"
+        end
     end
 
+    if sensu_ttl > 0 then
+        data['ttl'] = sensu_ttl
+    end
     data['source'] = source
     data['name'] = service_name
     data['status'] = sensu_state_map[status]
@@ -80,12 +92,12 @@
       end
     end
 
-    data['output'] = details 
+    data['output'] = details
 
     local payload = lma.safe_json_encode(data)
 
     if not payload then
-       return -1
+       return -1, "Payload failed to be encoded in JSON"
     end
 
     return lma.safe_inject_payload('json', 'sensu', payload)
diff --git a/heka/files/lua/filters/watchdog.lua b/heka/files/lua/filters/watchdog.lua
index 0399d5c..dbcade8 100644
--- a/heka/files/lua/filters/watchdog.lua
+++ b/heka/files/lua/filters/watchdog.lua
@@ -1,4 +1,4 @@
--- Copyright 2015 Mirantis, Inc.
+-- Copyright 2017 Mirantis, Inc.
 --
 -- Licensed under the Apache License, Version 2.0 (the "License");
 -- you may not use this file except in compliance with the License.
@@ -11,14 +11,15 @@
 -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
-require 'math'
 
-local payload_name = read_config('payload_name') or error('payload_name is required')
-local payload = read_config('payload')
+local service_name = read_config('service_name') or error('service_name is required')
+local msg = {
+    Type = 'watchdog',
+    Payload = service_name,
+}
 
--- Very simple filter that emits a fixed message or the current timestamp (in
--- second) every ticker interval. It can be used to check the liveness of the
--- Heka service.
+-- Filter that emits a message at every ticker interval. It is used to check
+-- the liveness of the Heka services.
 function timer_event(ns)
-   inject_payload("txt", payload_name, payload or math.floor(ns / 1e9))
+    inject_message(msg)
 end
diff --git a/heka/map.jinja b/heka/map.jinja
index 86313f7..2983362 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -39,6 +39,7 @@
 {% set default_nagios_host_alarm_clusters = '00-clusters' %}
 {% set default_automatic_starting = True %}
 {% set default_amqp_port = 5672 %}
+{% set default_sensu_port = 3030 %}
 
 {% set log_collector = salt['grains.filter_by']({
   'default': {
@@ -47,6 +48,7 @@
     'automatic_starting': default_automatic_starting,
     'metric_collector_host': '127.0.0.1',
     'metric_collector_port': 5567,
+    'sensu_port': default_sensu_port,
   }
 }, merge=salt['pillar.get']('heka:log_collector')) %}
 
@@ -59,6 +61,7 @@
     'nagios_port': default_nagios_port,
     'poolsize': 100,
     'automatic_starting': default_automatic_starting,
+    'sensu_port': default_sensu_port,
   }
 }, merge=salt['pillar.get']('heka:metric_collector')) %}
 
@@ -73,6 +76,7 @@
     'aggregator_port': default_aggregator_port,
     'poolsize': 100,
     'automatic_starting': default_automatic_starting,
+    'sensu_port': default_sensu_port,
   }
 }, merge=salt['pillar.get']('heka:remote_collector')) %}
 
@@ -82,10 +86,10 @@
     'influxdb_time_precision': default_influxdb_time_precision,
     'influxdb_timeout': default_influxdb_timeout,
     'nagios_port': default_nagios_port,
-    'sensu_port': 3030, 
     'nagios_default_host_alarm_clusters': default_nagios_host_alarm_clusters,
     'poolsize': 100,
     'automatic_starting': default_automatic_starting,
+    'sensu_port': default_sensu_port,
   }
 }, merge=salt['pillar.get']('heka:aggregator')) %}
 
@@ -101,5 +105,6 @@
     'resource_decoding': False,
     'poolsize': 100,
     'automatic_starting': default_automatic_starting,
+    'sensu_port': default_sensu_port,
   }
 }, merge=salt['pillar.get']('heka:ceilometer_collector')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 9e00f60..0e3f9ba 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -1,7 +1,7 @@
 {%- from "heka/map.jinja" import log_collector with context %}
 {%- from "heka/map.jinja" import metric_collector with context %}
 {%- from "heka/map.jinja" import remote_collector with context %}
-{%- from "heka/map.jinja" import aggregator  with context %}
+{%- from "heka/map.jinja" import aggregator with context %}
 {%- from "heka/map.jinja" import ceilometer_collector with context %}
 
 log_collector:
@@ -49,17 +49,47 @@
         hostname: '{{ grains.host }}'
         logger: hdd_errors_filter
         source: log_collector
-{%- if log_collector.elasticsearch_host is defined %}
+{%- if log_collector.sensu_host is defined %}
+    watchdog:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/watchdog.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      message_matcher: "FALSE"
+      preserve_data: false
+      ticker_interval: 60
+      config:
+        service_name: "log_collector"
+{%- endif %}
+{%- if log_collector.elasticsearch_host is defined or log_collector.sensu_host is defined %}
   encoder:
+{%- if log_collector.elasticsearch_host %}
     elasticsearch:
       engine: elasticsearch
 {%- endif %}
+{%- if log_collector.sensu_host is defined %}
+    sensu:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/encoders/status_sensu.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        sensu_ttl: 120
+{%- endif %}
+{%- endif %}
   output:
     metric_collector:
       engine: tcp
       host: {{ log_collector.metric_collector_host }}
       port: {{ log_collector.metric_collector_port }}
       message_matcher: "(Type == 'metric' || Type == 'heka.sandbox.metric' || Type == 'heka.sandbox.bulk_metric')"
+{%- if log_collector.sensu_host is defined %}
+    sensu_watchdog:
+      engine: udp
+      host: "{{ log_collector.sensu_host }}"
+      port: "{{ log_collector.sensu_port }}"
+      message_matcher: "Type == 'heka.sandbox.watchdog'"
+      encoder: sensu_encoder
+      use_buffering: false
+{%- endif %}
 {%- if log_collector.elasticsearch_host is defined %}
     elasticsearch:
       engine: elasticsearch
@@ -95,8 +125,20 @@
       port: 5567
       decoder: metric_decoder
       splitter: HekaFramingSplitter
-{%- if metric_collector.influxdb_host is defined %}
+{%- if metric_collector.influxdb_host is defined or metic_collector.sensu_host is defined %}
   filter:
+{%- if metric_collector.sensu_host is defined %}
+    watchdog:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/watchdog.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      message_matcher: "FALSE"
+      preserve_data: false
+      ticker_interval: 60
+      config:
+        service_name: "metric_collector"
+{%- endif %}
+{%- if metric_collector.influxdb_host is defined %}
     influxdb_accumulator:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -108,8 +150,17 @@
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ metric_collector.influxdb_time_precision }}"
 {%- endif %}
-{%- if metric_collector.influxdb_host is defined or metric_collector.nagios_host is defined %}
+{%- endif %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.nagios_host is defined or metric_collector.sensu_host is defined %}
   encoder:
+  {%- if metric_collector.sensu_host is defined %}
+    sensu:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/encoders/status_sensu.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        sensu_ttl: 120
+  {%- endif %}
   {%- if metric_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
@@ -125,8 +176,17 @@
         host_suffix_dimension_key: environment_label
    {%- endif %}
 {%- endif %}
-{%- if metric_collector.influxdb_host is defined or metric_collector.aggregator_host is defined or metric_collector.nagios_host is defined %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.aggregator_host is defined or metric_collector.nagios_host is defined or metric_collector.sensu_host is defined %}
   output:
+  {%- if metric_collector.sensu_host is defined %}
+    sensu_watchdog:
+      engine: udp
+      host: "{{ metric_collector.sensu_host }}"
+      port: "{{ metric_collector.sensu_port }}"
+      message_matcher: "Type == 'heka.sandbox.watchdog'"
+      encoder: sensu_encoder
+      use_buffering: false
+  {%- endif %}
   {%- if metric_collector.influxdb_host is defined %}
     influxdb:
       engine: http
@@ -204,8 +264,19 @@
       decoder: notification_decoder
 {%- endfor %}
 {%- endif %}
-{%- if remote_collector.influxdb_host is defined or remote_collector.amqp_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.amqp_host is defined or remote_collector.sensu_host is defined %}
   filter:
+  {%- if remote_collector.sensu_host is defined %}
+    watchdog:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/watchdog.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      message_matcher: "FALSE"
+      preserve_data: false
+      ticker_interval: 60
+      config:
+        service_name: "remote_collector"
+  {%- endif %}
   {%- if remote_collector.influxdb_host is defined %}
     influxdb_accumulator:
       engine: sandbox
@@ -238,8 +309,16 @@
         source: remote_collector
   {%- endif %}
 {%- endif %}
-{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined or remote_collector.sensu_host is defined %}
   encoder:
+  {%- if remote_collector.sensu_host is defined %}
+    sensu:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/encoders/status_sensu.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        sensu_ttl: 120
+  {%- endif %}
   {%- if remote_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
@@ -251,8 +330,17 @@
       engine: elasticsearch
   {%- endif %}
 {%- endif %}
-{%- if remote_collector.influxdb_host is defined or remote_collector.aggregator_host is defined or remote_collector.elasticsearch_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.aggregator_host is defined or remote_collector.elasticsearch_host is defined or remote_collector.sensu_host is defined %}
   output:
+  {%- if remote_collector.sensu_host is defined %}
+    sensu_watchdog:
+      engine: udp
+      host: "{{ remote_collector.sensu_host }}"
+      port: "{{ remote_collector.sensu_port }}"
+      message_matcher: "Type == 'heka.sandbox.watchdog'"
+      encoder: sensu_encoder
+      use_buffering: false
+  {%- endif %}
   {%- if remote_collector.influxdb_host is defined %}
     influxdb:
       engine: http
@@ -437,8 +525,20 @@
       port: 5565
       decoder: ProtobufDecoder
       splitter: HekaFramingSplitter
-{%- if aggregator.influxdb_host is defined %}
+{%- if aggregator.sensu_host is defined or aggregator.influxdb_host is defined %}
   filter:
+{%- if aggregator.sensu_host is defined %}
+    watchdog:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/watchdog.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      message_matcher: "FALSE"
+      preserve_data: false
+      ticker_interval: 60
+      config:
+        service_name: "aggregator"
+{%- endif %}
+{%- if aggregator.influxdb_host is defined %}
     influxdb_accumulator:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -456,6 +556,7 @@
       preserve_data: false
       message_matcher: "Type == 'heka.sandbox.gse_metric'"
 {%- endif %}
+{%- endif %}
 {%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined or aggregator.sensu_host is defined %}
   encoder:
   {%- if aggregator.influxdb_host is defined %}
@@ -482,6 +583,7 @@
       module_file: /usr/share/lma_collector/encoders/status_sensu.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
+        sensu_ttl: 120
         {%- if aggregator.sensu_source_dimension_key is defined %}
         sensu_source_dimension_key: "{{ aggregator.sensu_source_dimension_key }}"
         {%- endif %}
@@ -519,10 +621,10 @@
     sensu_alarm_cluster:
       engine: udp
       host: "{{ aggregator.sensu_host }}"
-      port: "{{aggregator.sensu_port }}"
-      message_matcher: "(Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.afd_metric') && Fields[no_alerting] == NIL"
+      port: "{{ aggregator.sensu_port|default(3030) }}"
+      message_matcher: "Type == 'heka.sandbox.watchdog' || ((Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.afd_metric') && Fields[no_alerting] == NIL)"
       encoder: sensu_encoder
-      use_buffering: false 
+      use_buffering: false
   {%- endif %}
 {%- endif %}
 ceilometer_collector:
@@ -555,8 +657,9 @@
       exchange_auto_delete: false
       queue_auto_delete: false
 {%- endif %}
-{%- if ceilometer_collector.influxdb_host is defined %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.sensu_host is defined %}
   filter:
+  {%- if ceilometer_collector.influxdb_host is defined %}
     ceilometer_influxdb_accumulator:
       engine: sandbox
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -569,8 +672,20 @@
         payload_name: 'sample_data'
         flush_count: 500
         bulk_metric_type_matcher: 'ceilometer_samples'
+  {%- endif %}
+  {%- if ceilometer_collector.sensu_host is defined %}
+    watchdog:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/watchdog.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      message_matcher: "FALSE"
+      preserve_data: false
+      ticker_interval: 60
+      config:
+        service_name: "ceilometer_collector"
+  {%- endif %}
 {%- endif %}
-{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined or ceilometer_collector.sensu_host is defined %}
   encoder:
   {%- if ceilometer_collector.influxdb_host is defined %}
     influxdb:
@@ -588,8 +703,16 @@
         type_name: "source"
         encoder: "elasticsearch_resources"
   {%- endif %}
+  {%- if ceilometer_collector.sensu_host is defined %}
+    sensu:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/encoders/status_sensu.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        sensu_ttl: 120
+  {%- endif %}
 {%- endif %}
-{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined or ceilometer_collector.sensu_host is defined %}
   output:
   {%- if ceilometer_collector.influxdb_host is defined %}
     samples_influxdb:
@@ -611,4 +734,13 @@
       message_matcher: "Type == 'ceilometer_resources'"
       encoder: elasticsearch_resource_encoder
   {%- endif %}
+  {%- if ceilometer_collector.sensu_host is defined %}
+    sensu_watchdog:
+      engine: udp
+      host: "{{ ceilometer_collector.sensu_host }}"
+      port: "{{ ceilometer_collector.sensu_port }}"
+      message_matcher: "Type == 'heka.sandbox.watchdog'"
+      encoder: sensu_encoder
+      use_buffering: false
+  {%- endif %}
 {%- endif %}
diff --git a/metadata/service/aggregator/output/sensu.yml b/metadata/service/aggregator/output/sensu.yml
index bd5e55a..394acb6 100644
--- a/metadata/service/aggregator/output/sensu.yml
+++ b/metadata/service/aggregator/output/sensu.yml
@@ -2,4 +2,4 @@
   heka:
     aggregator:
       sensu_source_dimension_key: nagios_host
-      sensu_host: 127.0.0.1
\ No newline at end of file
+      sensu_host: 127.0.0.1
diff --git a/metadata/service/ceilometer_collector/output/sensu.yml b/metadata/service/ceilometer_collector/output/sensu.yml
new file mode 100644
index 0000000..488b481
--- /dev/null
+++ b/metadata/service/ceilometer_collector/output/sensu.yml
@@ -0,0 +1,4 @@
+parameters:
+  heka:
+    ceilometer_collector:
+      sensu_host: 127.0.0.1
diff --git a/metadata/service/log_collector/output/sensu.yml b/metadata/service/log_collector/output/sensu.yml
new file mode 100644
index 0000000..e2e5379
--- /dev/null
+++ b/metadata/service/log_collector/output/sensu.yml
@@ -0,0 +1,4 @@
+parameters:
+  heka:
+    log_collector:
+      sensu_host: 127.0.0.1
diff --git a/metadata/service/metric_collector/output/sensu.yml b/metadata/service/metric_collector/output/sensu.yml
new file mode 100644
index 0000000..7e86605
--- /dev/null
+++ b/metadata/service/metric_collector/output/sensu.yml
@@ -0,0 +1,4 @@
+parameters:
+  heka:
+    metric_collector:
+      sensu_host: 127.0.0.1
diff --git a/metadata/service/remote_collector/output/sensu.yml b/metadata/service/remote_collector/output/sensu.yml
new file mode 100644
index 0000000..3906916
--- /dev/null
+++ b/metadata/service/remote_collector/output/sensu.yml
@@ -0,0 +1,4 @@
+parameters:
+  heka:
+    remote_collector:
+      sensu_host: 127.0.0.1