Add watchdog for Heka services
This patch adds a new 'heka.sandbox.watchdog' type of Heka message that
is used by Sensu to check the availability of a service. In the current
implementation the message is issued by a filter for all Heka services
and the check is sent locally to the Sensu client.
Change-Id: I6b3c9d808dcec7d8c15c442390dfbda2032f38ce
diff --git a/heka/files/lua/encoders/status_sensu.lua b/heka/files/lua/encoders/status_sensu.lua
index 2901dcf..62979ea 100644
--- a/heka/files/lua/encoders/status_sensu.lua
+++ b/heka/files/lua/encoders/status_sensu.lua
@@ -23,6 +23,8 @@
source_dimension_field = string.format('Fields[%s]', read_config('sensu_source_dimension_key'))
end
+local sensu_ttl = (read_config('sensu_ttl') + 0) or 0
+
-- mapping GSE statuses to Sensu states
local sensu_state_map = {
[consts.OKAY]=0,
@@ -34,36 +36,46 @@
function process_message()
- local data = {
- source = nil,
- name = nil,
- status = nil,
- output = nil,
- }
-
- local service_name = read_message('Fields[member]')
- local status = afd.get_status()
- local alarms = afd.alarms_for_human(afd.extract_alarms())
- local msgtype = read_message("Type")
-
- if not service_name or not sensu_state_map[status] or not alarms or not msgtype then
- return -1
- end
-
+ local data = {}
local source
- if msgtype == "heka.sandbox.gse_metric" then
- if source_dimension_field then
- source = read_message(source_dimension_field) or "Unknown Source"
- else
- source = "Unknown source"
- end
- elseif msgtype == "heka.sandbox.afd_metric" then
+ local service_name
+ local status = 0
+ local alarms = {}
+ local msgtype = read_message('Type')
+
+ if msgtype == "heka.sandbox.watchdog" then
+ service_name = "watchdog_" .. (read_message('Payload') or 'unknown')
source = read_message('Fields[hostname]') or read_message('Hostname')
else
- -- Should not happen since we track only AFD and GSE plugins.
- return -1
+ service_name = read_message('Fields[member]')
+ if not service_name then
+ return -1, "Service name is missing in Fields[member]"
+ end
+
+ status = afd.get_status()
+ if not sensu_state_map[status] then
+ return -1, "Status <" .. status .. "> is not mapping any Sensu state"
+ end
+
+ alarms = afd.alarms_for_human(afd.extract_alarms())
+
+ if msgtype == "heka.sandbox.gse_metric" then
+ if source_dimension_field then
+ source = read_message(source_dimension_field) or "Unknown source " .. source_dimension_field
+ else
+ source = "Unknown source"
+ end
+ elseif msgtype == "heka.sandbox.afd_metric" then
+ source = read_message('Fields[hostname]') or read_message('Hostname')
+ else
+ -- Should not happen since we track only watchdog, AFD and GSE plugins.
+ return -1, "message type <" .. msgtype .. "> is not tracked"
+ end
end
+ if sensu_ttl > 0 then
+ data['ttl'] = sensu_ttl
+ end
data['source'] = source
data['name'] = service_name
data['status'] = sensu_state_map[status]
@@ -80,12 +92,12 @@
end
end
- data['output'] = details
+ data['output'] = details
local payload = lma.safe_json_encode(data)
if not payload then
- return -1
+ return -1, "Payload failed to be encoded in JSON"
end
return lma.safe_inject_payload('json', 'sensu', payload)
diff --git a/heka/files/lua/filters/watchdog.lua b/heka/files/lua/filters/watchdog.lua
index 0399d5c..dbcade8 100644
--- a/heka/files/lua/filters/watchdog.lua
+++ b/heka/files/lua/filters/watchdog.lua
@@ -1,4 +1,4 @@
--- Copyright 2015 Mirantis, Inc.
+-- Copyright 2017 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
@@ -11,14 +11,15 @@
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-require 'math'
-local payload_name = read_config('payload_name') or error('payload_name is required')
-local payload = read_config('payload')
+local service_name = read_config('service_name') or error('service_name is required')
+local msg = {
+ Type = 'watchdog',
+ Payload = service_name,
+}
--- Very simple filter that emits a fixed message or the current timestamp (in
--- second) every ticker interval. It can be used to check the liveness of the
--- Heka service.
+-- Filter that emits a message at every ticker interval. It is used to check
+-- the liveness of the Heka services.
function timer_event(ns)
- inject_payload("txt", payload_name, payload or math.floor(ns / 1e9))
+ inject_message(msg)
end
diff --git a/heka/map.jinja b/heka/map.jinja
index 86313f7..2983362 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -39,6 +39,7 @@
{% set default_nagios_host_alarm_clusters = '00-clusters' %}
{% set default_automatic_starting = True %}
{% set default_amqp_port = 5672 %}
+{% set default_sensu_port = 3030 %}
{% set log_collector = salt['grains.filter_by']({
'default': {
@@ -47,6 +48,7 @@
'automatic_starting': default_automatic_starting,
'metric_collector_host': '127.0.0.1',
'metric_collector_port': 5567,
+ 'sensu_port': default_sensu_port,
}
}, merge=salt['pillar.get']('heka:log_collector')) %}
@@ -59,6 +61,7 @@
'nagios_port': default_nagios_port,
'poolsize': 100,
'automatic_starting': default_automatic_starting,
+ 'sensu_port': default_sensu_port,
}
}, merge=salt['pillar.get']('heka:metric_collector')) %}
@@ -73,6 +76,7 @@
'aggregator_port': default_aggregator_port,
'poolsize': 100,
'automatic_starting': default_automatic_starting,
+ 'sensu_port': default_sensu_port,
}
}, merge=salt['pillar.get']('heka:remote_collector')) %}
@@ -82,10 +86,10 @@
'influxdb_time_precision': default_influxdb_time_precision,
'influxdb_timeout': default_influxdb_timeout,
'nagios_port': default_nagios_port,
- 'sensu_port': 3030,
'nagios_default_host_alarm_clusters': default_nagios_host_alarm_clusters,
'poolsize': 100,
'automatic_starting': default_automatic_starting,
+ 'sensu_port': default_sensu_port,
}
}, merge=salt['pillar.get']('heka:aggregator')) %}
@@ -101,5 +105,6 @@
'resource_decoding': False,
'poolsize': 100,
'automatic_starting': default_automatic_starting,
+ 'sensu_port': default_sensu_port,
}
}, merge=salt['pillar.get']('heka:ceilometer_collector')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 9e00f60..0e3f9ba 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -1,7 +1,7 @@
{%- from "heka/map.jinja" import log_collector with context %}
{%- from "heka/map.jinja" import metric_collector with context %}
{%- from "heka/map.jinja" import remote_collector with context %}
-{%- from "heka/map.jinja" import aggregator with context %}
+{%- from "heka/map.jinja" import aggregator with context %}
{%- from "heka/map.jinja" import ceilometer_collector with context %}
log_collector:
@@ -49,17 +49,47 @@
hostname: '{{ grains.host }}'
logger: hdd_errors_filter
source: log_collector
-{%- if log_collector.elasticsearch_host is defined %}
+{%- if log_collector.sensu_host is defined %}
+ watchdog:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/watchdog.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ message_matcher: "FALSE"
+ preserve_data: false
+ ticker_interval: 60
+ config:
+ service_name: "log_collector"
+{%- endif %}
+{%- if log_collector.elasticsearch_host is defined or log_collector.sensu_host is defined %}
encoder:
+{%- if log_collector.elasticsearch_host %}
elasticsearch:
engine: elasticsearch
{%- endif %}
+{%- if log_collector.sensu_host is defined %}
+ sensu:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/status_sensu.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ sensu_ttl: 120
+{%- endif %}
+{%- endif %}
output:
metric_collector:
engine: tcp
host: {{ log_collector.metric_collector_host }}
port: {{ log_collector.metric_collector_port }}
message_matcher: "(Type == 'metric' || Type == 'heka.sandbox.metric' || Type == 'heka.sandbox.bulk_metric')"
+{%- if log_collector.sensu_host is defined %}
+ sensu_watchdog:
+ engine: udp
+ host: "{{ log_collector.sensu_host }}"
+ port: "{{ log_collector.sensu_port }}"
+ message_matcher: "Type == 'heka.sandbox.watchdog'"
+ encoder: sensu_encoder
+ use_buffering: false
+{%- endif %}
{%- if log_collector.elasticsearch_host is defined %}
elasticsearch:
engine: elasticsearch
@@ -95,8 +125,20 @@
port: 5567
decoder: metric_decoder
splitter: HekaFramingSplitter
-{%- if metric_collector.influxdb_host is defined %}
+{%- if metric_collector.influxdb_host is defined or metic_collector.sensu_host is defined %}
filter:
+{%- if metric_collector.sensu_host is defined %}
+ watchdog:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/watchdog.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ message_matcher: "FALSE"
+ preserve_data: false
+ ticker_interval: 60
+ config:
+ service_name: "metric_collector"
+{%- endif %}
+{%- if metric_collector.influxdb_host is defined %}
influxdb_accumulator:
engine: sandbox
module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -108,8 +150,17 @@
tag_fields: "deployment_id environment_label tenant_id user_id"
time_precision: "{{ metric_collector.influxdb_time_precision }}"
{%- endif %}
-{%- if metric_collector.influxdb_host is defined or metric_collector.nagios_host is defined %}
+{%- endif %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.nagios_host is defined or metric_collector.sensu_host is defined %}
encoder:
+ {%- if metric_collector.sensu_host is defined %}
+ sensu:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/status_sensu.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ sensu_ttl: 120
+ {%- endif %}
{%- if metric_collector.influxdb_host is defined %}
influxdb:
engine: payload
@@ -125,8 +176,17 @@
host_suffix_dimension_key: environment_label
{%- endif %}
{%- endif %}
-{%- if metric_collector.influxdb_host is defined or metric_collector.aggregator_host is defined or metric_collector.nagios_host is defined %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.aggregator_host is defined or metric_collector.nagios_host is defined or metric_collector.sensu_host is defined %}
output:
+ {%- if metric_collector.sensu_host is defined %}
+ sensu_watchdog:
+ engine: udp
+ host: "{{ metric_collector.sensu_host }}"
+ port: "{{ metric_collector.sensu_port }}"
+ message_matcher: "Type == 'heka.sandbox.watchdog'"
+ encoder: sensu_encoder
+ use_buffering: false
+ {%- endif %}
{%- if metric_collector.influxdb_host is defined %}
influxdb:
engine: http
@@ -204,8 +264,19 @@
decoder: notification_decoder
{%- endfor %}
{%- endif %}
-{%- if remote_collector.influxdb_host is defined or remote_collector.amqp_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.amqp_host is defined or remote_collector.sensu_host is defined %}
filter:
+ {%- if remote_collector.sensu_host is defined %}
+ watchdog:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/watchdog.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ message_matcher: "FALSE"
+ preserve_data: false
+ ticker_interval: 60
+ config:
+ service_name: "remote_collector"
+ {%- endif %}
{%- if remote_collector.influxdb_host is defined %}
influxdb_accumulator:
engine: sandbox
@@ -238,8 +309,16 @@
source: remote_collector
{%- endif %}
{%- endif %}
-{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined or remote_collector.sensu_host is defined %}
encoder:
+ {%- if remote_collector.sensu_host is defined %}
+ sensu:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/status_sensu.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ sensu_ttl: 120
+ {%- endif %}
{%- if remote_collector.influxdb_host is defined %}
influxdb:
engine: payload
@@ -251,8 +330,17 @@
engine: elasticsearch
{%- endif %}
{%- endif %}
-{%- if remote_collector.influxdb_host is defined or remote_collector.aggregator_host is defined or remote_collector.elasticsearch_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.aggregator_host is defined or remote_collector.elasticsearch_host is defined or remote_collector.sensu_host is defined %}
output:
+ {%- if remote_collector.sensu_host is defined %}
+ sensu_watchdog:
+ engine: udp
+ host: "{{ remote_collector.sensu_host }}"
+ port: "{{ remote_collector.sensu_port }}"
+ message_matcher: "Type == 'heka.sandbox.watchdog'"
+ encoder: sensu_encoder
+ use_buffering: false
+ {%- endif %}
{%- if remote_collector.influxdb_host is defined %}
influxdb:
engine: http
@@ -437,8 +525,20 @@
port: 5565
decoder: ProtobufDecoder
splitter: HekaFramingSplitter
-{%- if aggregator.influxdb_host is defined %}
+{%- if aggregator.sensu_host is defined or aggregator.influxdb_host is defined %}
filter:
+{%- if aggregator.sensu_host is defined %}
+ watchdog:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/watchdog.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ message_matcher: "FALSE"
+ preserve_data: false
+ ticker_interval: 60
+ config:
+ service_name: "aggregator"
+{%- endif %}
+{%- if aggregator.influxdb_host is defined %}
influxdb_accumulator:
engine: sandbox
module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -456,6 +556,7 @@
preserve_data: false
message_matcher: "Type == 'heka.sandbox.gse_metric'"
{%- endif %}
+{%- endif %}
{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined or aggregator.sensu_host is defined %}
encoder:
{%- if aggregator.influxdb_host is defined %}
@@ -482,6 +583,7 @@
module_file: /usr/share/lma_collector/encoders/status_sensu.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
config:
+ sensu_ttl: 120
{%- if aggregator.sensu_source_dimension_key is defined %}
sensu_source_dimension_key: "{{ aggregator.sensu_source_dimension_key }}"
{%- endif %}
@@ -519,10 +621,10 @@
sensu_alarm_cluster:
engine: udp
host: "{{ aggregator.sensu_host }}"
- port: "{{aggregator.sensu_port }}"
- message_matcher: "(Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.afd_metric') && Fields[no_alerting] == NIL"
+ port: "{{ aggregator.sensu_port|default(3030) }}"
+ message_matcher: "Type == 'heka.sandbox.watchdog' || ((Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.afd_metric') && Fields[no_alerting] == NIL)"
encoder: sensu_encoder
- use_buffering: false
+ use_buffering: false
{%- endif %}
{%- endif %}
ceilometer_collector:
@@ -555,8 +657,9 @@
exchange_auto_delete: false
queue_auto_delete: false
{%- endif %}
-{%- if ceilometer_collector.influxdb_host is defined %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.sensu_host is defined %}
filter:
+ {%- if ceilometer_collector.influxdb_host is defined %}
ceilometer_influxdb_accumulator:
engine: sandbox
module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -569,8 +672,20 @@
payload_name: 'sample_data'
flush_count: 500
bulk_metric_type_matcher: 'ceilometer_samples'
+ {%- endif %}
+ {%- if ceilometer_collector.sensu_host is defined %}
+ watchdog:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/watchdog.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ message_matcher: "FALSE"
+ preserve_data: false
+ ticker_interval: 60
+ config:
+ service_name: "ceilometer_collector"
+ {%- endif %}
{%- endif %}
-{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined or ceilometer_collector.sensu_host is defined %}
encoder:
{%- if ceilometer_collector.influxdb_host is defined %}
influxdb:
@@ -588,8 +703,16 @@
type_name: "source"
encoder: "elasticsearch_resources"
{%- endif %}
+ {%- if ceilometer_collector.sensu_host is defined %}
+ sensu:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/status_sensu.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ sensu_ttl: 120
+ {%- endif %}
{%- endif %}
-{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined or ceilometer_collector.sensu_host is defined %}
output:
{%- if ceilometer_collector.influxdb_host is defined %}
samples_influxdb:
@@ -611,4 +734,13 @@
message_matcher: "Type == 'ceilometer_resources'"
encoder: elasticsearch_resource_encoder
{%- endif %}
+ {%- if ceilometer_collector.sensu_host is defined %}
+ sensu_watchdog:
+ engine: udp
+ host: "{{ ceilometer_collector.sensu_host }}"
+ port: "{{ ceilometer_collector.sensu_port }}"
+ message_matcher: "Type == 'heka.sandbox.watchdog'"
+ encoder: sensu_encoder
+ use_buffering: false
+ {%- endif %}
{%- endif %}
diff --git a/metadata/service/aggregator/output/sensu.yml b/metadata/service/aggregator/output/sensu.yml
index bd5e55a..394acb6 100644
--- a/metadata/service/aggregator/output/sensu.yml
+++ b/metadata/service/aggregator/output/sensu.yml
@@ -2,4 +2,4 @@
heka:
aggregator:
sensu_source_dimension_key: nagios_host
- sensu_host: 127.0.0.1
\ No newline at end of file
+ sensu_host: 127.0.0.1
diff --git a/metadata/service/ceilometer_collector/output/sensu.yml b/metadata/service/ceilometer_collector/output/sensu.yml
new file mode 100644
index 0000000..488b481
--- /dev/null
+++ b/metadata/service/ceilometer_collector/output/sensu.yml
@@ -0,0 +1,4 @@
+parameters:
+ heka:
+ ceilometer_collector:
+ sensu_host: 127.0.0.1
diff --git a/metadata/service/log_collector/output/sensu.yml b/metadata/service/log_collector/output/sensu.yml
new file mode 100644
index 0000000..e2e5379
--- /dev/null
+++ b/metadata/service/log_collector/output/sensu.yml
@@ -0,0 +1,4 @@
+parameters:
+ heka:
+ log_collector:
+ sensu_host: 127.0.0.1
diff --git a/metadata/service/metric_collector/output/sensu.yml b/metadata/service/metric_collector/output/sensu.yml
new file mode 100644
index 0000000..7e86605
--- /dev/null
+++ b/metadata/service/metric_collector/output/sensu.yml
@@ -0,0 +1,4 @@
+parameters:
+ heka:
+ metric_collector:
+ sensu_host: 127.0.0.1
diff --git a/metadata/service/remote_collector/output/sensu.yml b/metadata/service/remote_collector/output/sensu.yml
new file mode 100644
index 0000000..3906916
--- /dev/null
+++ b/metadata/service/remote_collector/output/sensu.yml
@@ -0,0 +1,4 @@
+parameters:
+ heka:
+ remote_collector:
+ sensu_host: 127.0.0.1