Merge "Fix decoders and filters for hostname-free metrics"
diff --git a/heka/files/lua/encoders/status_sensu.lua b/heka/files/lua/encoders/status_sensu.lua
new file mode 100644
index 0000000..2901dcf
--- /dev/null
+++ b/heka/files/lua/encoders/status_sensu.lua
@@ -0,0 +1,92 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'table'
+require 'string'
+
+local afd = require 'afd'
+local consts = require 'gse_constants'
+local lma = require 'lma_utils'
+
+local source_dimension_field
+if read_config('sensu_source_dimension_key') then
+ source_dimension_field = string.format('Fields[%s]', read_config('sensu_source_dimension_key'))
+end
+
+-- mapping GSE statuses to Sensu states
+local sensu_state_map = {
+ [consts.OKAY]=0,
+ [consts.WARN]=1,
+ [consts.CRIT]=2,
+ [consts.DOWN]=2,
+ [consts.UNKW]=2
+}
+
+function process_message()
+
+ local data = {
+ source = nil,
+ name = nil,
+ status = nil,
+ output = nil,
+ }
+
+ local service_name = read_message('Fields[member]')
+ local status = afd.get_status()
+ local alarms = afd.alarms_for_human(afd.extract_alarms())
+ local msgtype = read_message("Type")
+
+ if not service_name or not sensu_state_map[status] or not alarms or not msgtype then
+ return -1
+ end
+
+ local source
+ if msgtype == "heka.sandbox.gse_metric" then
+ if source_dimension_field then
+ source = read_message(source_dimension_field) or "Unknown Source"
+ else
+ source = "Unknown source"
+ end
+ elseif msgtype == "heka.sandbox.afd_metric" then
+ source = read_message('Fields[hostname]') or read_message('Hostname')
+ else
+ -- Should not happen since we track only AFD and GSE plugins.
+ return -1
+ end
+
+ data['source'] = source
+ data['name'] = service_name
+ data['status'] = sensu_state_map[status]
+
+ local details = string.format('%s: ', consts.status_label(status))
+
+ if data['status'] ~= 0 then
+ if #alarms == 0 then
+ details = details .. 'No details\n'
+ else
+ for _, alarm in ipairs(alarms) do
+ details = details .. alarm .. '\n'
+ end
+ end
+ end
+
+ data['output'] = details
+
+ local payload = lma.safe_json_encode(data)
+
+ if not payload then
+ return -1
+ end
+
+ return lma.safe_inject_payload('json', 'sensu', payload)
+end
diff --git a/heka/files/lua/filters/authentications.lua b/heka/files/lua/filters/authentications.lua
new file mode 100644
index 0000000..a2eba0d
--- /dev/null
+++ b/heka/files/lua/filters/authentications.lua
@@ -0,0 +1,118 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'os'
+local utils = require 'lma_utils'
+
+local hostname = read_config('hostname') or error('hostname must be specified')
+-- The filter can receive messages that should be discarded because they are
+-- way too old (Heka cannot guarantee that messages are processed in real-time).
+-- The 'grace_interval' parameter allows to define which messages should be
+-- kept and which should be discarded. For instance, a value of '10' means that
+-- the filter will take into account messages that are at most 10 seconds
+-- older than the current time.
+local grace_interval = (read_config('grace_interval') or 0) + 0
+local metric_logger = read_config('logger')
+local metric_source = read_config('source')
+
+local msg = {
+ Type = "multivalue_metric", -- will be prefixed by "heka.sandbox."
+ Hostname = hostname,
+ Severity = 6,
+ Logger = read_config('logger') or error('logger must be specified')
+}
+local global_counters = {
+ total=0,
+ failed=0,
+ success=0,
+}
+local ticker_counters = {
+ total=0,
+ failed=0,
+ success=0,
+}
+local last_timer_event = os.time() * 1e9
+
+function process_message ()
+ if utils.convert_to_sec(read_message('Timestamp')) + grace_interval < utils.convert_to_sec(last_timer_event) then
+ -- skip the the message if it doesn't fall into the current interval
+ return 0
+ end
+
+ local auth_success
+ if read_message('Type') == 'audit' and read_message('Fields[action]') == 'authenticate' then
+ auth_success = (read_message('Fields[outcome]') == 'success')
+ else
+ return 0
+ end
+
+ global_counters.total = global_counters.total + 1
+ ticker_counters.total = ticker_counters.total + 1
+ if auth_success then
+ global_counters.success = global_counters.success + 1
+ ticker_counters.success = ticker_counters.success + 1
+ else
+ global_counters.failed = global_counters.failed + 1
+ ticker_counters.failed = ticker_counters.failed + 1
+ end
+
+ return 0
+end
+
+function timer_event(ns)
+ msg.Timestamp = ns
+ msg.Fields = {
+ name = 'authentications_total',
+ value_fields = {'all', 'success', 'failed'},
+ source = metric_source,
+ type = utils.metric_type['COUNTER'],
+ }
+ utils.inject_tags(msg)
+
+ -- send the counters
+ msg.Fields.all = global_counters.total
+ msg.Fields.success = global_counters.success
+ msg.Fields.failed = global_counters.failed
+ utils.safe_inject_message(msg)
+
+ -- send the rates
+ msg.Fields.name = 'authentications_rate'
+ msg.Fields.type = utils.metric_type['DERIVE']
+ local delta_sec = (ns - last_timer_event) / 1e9
+ msg.Fields.all = ticker_counters.total / delta_sec
+ msg.Fields.success = ticker_counters.success / delta_sec
+ msg.Fields.failed = ticker_counters.failed / delta_sec
+ utils.safe_inject_message(msg)
+
+ -- send the percentages
+ if ticker_counters.total > 0 then
+ msg.Fields.name = 'authentications_percent'
+ msg.Fields.type = utils.metric_type['GAUGE']
+ msg.Fields.value_fields = {'success', 'failed'}
+ msg.Fields.all = nil
+ msg.Fields.success = 100.0 * ticker_counters.success / ticker_counters.total
+ msg.Fields.failed = 100.0 * ticker_counters.failed / ticker_counters.total
+ utils.safe_inject_message(msg)
+ end
+
+ -- reset the variables
+ ticker_counters = {
+ total=0,
+ failed=0,
+ success=0,
+ }
+ last_timer_event = ns
+
+ return 0
+end
diff --git a/heka/map.jinja b/heka/map.jinja
index 1677ffc..86313f7 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -82,6 +82,7 @@
'influxdb_time_precision': default_influxdb_time_precision,
'influxdb_timeout': default_influxdb_timeout,
'nagios_port': default_nagios_port,
+ 'sensu_port': 3030,
'nagios_default_host_alarm_clusters': default_nagios_host_alarm_clusters,
'poolsize': 100,
'automatic_starting': default_automatic_starting,
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 3cd50d8..637a870 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -4,7 +4,6 @@
{%- from "heka/map.jinja" import aggregator with context %}
{%- from "heka/map.jinja" import ceilometer_collector with context %}
-
log_collector:
filter:
aggregated_http_metrics:
@@ -226,6 +225,17 @@
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
preserve_data: false
message_matcher: "Type == 'notification' && Fields[event_type] =~ /create.end$/"
+ authentication:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/authentications.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ message_matcher: "Type == 'audit' && Fields[action] == 'authenticate'"
+ ticker_interval: 60
+ config:
+ hostname: '{{ grains.host }}'
+ grace_interval: 30
+ logger: authentication_filter
+ source: remote_collector
{%- endif %}
{%- endif %}
{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
@@ -446,7 +456,7 @@
preserve_data: false
message_matcher: "Type == 'heka.sandbox.gse_metric'"
{%- endif %}
-{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined %}
+{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined or aggregator.sensu_host is defined %}
encoder:
{%- if aggregator.influxdb_host is defined %}
influxdb:
@@ -466,8 +476,18 @@
nagios_host_dimension_key: "{{ aggregator.nagios_host_dimension_key }}"
{%- endif %}
{%- endif %}
+ {%- if aggregator.sensu_host is defined %}
+ sensu:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/status_sensu.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ {%- if aggregator.sensu_source_dimension_key is defined %}
+ sensu_source_dimension_key: "{{ aggregator.sensu_source_dimension_key }}"
+ {%- endif %}
+ {%- endif %}
{%- endif %}
-{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined %}
+{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined or aggregator.sensu_host is defined %}
output:
{%- if aggregator.influxdb_host is defined %}
influxdb:
@@ -495,6 +515,15 @@
max_file_size: 524288
full_action: drop
{%- endif %}
+ {%- if aggregator.sensu_host is defined %}
+ sensu_alarm_cluster:
+ engine: udp
+ host: "{{ aggregator.sensu_host }}"
+ port: "{{aggregator.sensu_port }}"
+ message_matcher: "(Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.afd_metric') && Fields[no_alerting] == NIL"
+ encoder: sensu_encoder
+ use_buffering: false
+ {%- endif %}
{%- endif %}
ceilometer_collector:
{%- if ceilometer_collector.amqp_host is defined %}
diff --git a/metadata/service/aggregator/output/nagios.yml b/metadata/service/aggregator/output/nagios.yml
new file mode 100644
index 0000000..7c30db1
--- /dev/null
+++ b/metadata/service/aggregator/output/nagios.yml
@@ -0,0 +1,5 @@
+parameters:
+ heka:
+ aggregator:
+ nagios_host_dimension_key: nagios_host
+ nagios_host: ${_param:nagios_host}
\ No newline at end of file
diff --git a/metadata/service/aggregator/output/sensu.yml b/metadata/service/aggregator/output/sensu.yml
new file mode 100644
index 0000000..bd5e55a
--- /dev/null
+++ b/metadata/service/aggregator/output/sensu.yml
@@ -0,0 +1,5 @@
+parameters:
+ heka:
+ aggregator:
+ sensu_source_dimension_key: nagios_host
+ sensu_host: 127.0.0.1
\ No newline at end of file
diff --git a/metadata/service/aggregator/single.yml b/metadata/service/aggregator/single.yml
index 159e5bd..bf2364e 100644
--- a/metadata/service/aggregator/single.yml
+++ b/metadata/service/aggregator/single.yml
@@ -5,11 +5,9 @@
parameters:
_param:
aggregator_poolsize: 100
- nagios_host_dimension_key: nagios_host
heka:
aggregator:
automatic_starting: true
enabled: true
influxdb_time_precision: ms
- poolsize: ${_param:aggregator_poolsize}
- nagios_host_dimension_key: ${_param:nagios_host_dimension_key}
+ poolsize: ${_param:aggregator_poolsize}
\ No newline at end of file