Merge pull request #45 from SwannCroiset/alerting-property
Redefine alerting property
diff --git a/heka/_service.sls b/heka/_service.sls
index c1c85ed..8a09ea7 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -78,8 +78,6 @@
'decoder': {},
'input': {},
'trigger': {},
- 'alarm': {},
- 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -90,7 +88,6 @@
'input': {},
'trigger': {},
'alarm': {},
- 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -101,7 +98,6 @@
'input': {},
'trigger': {},
'alarm': {},
- 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -111,7 +107,6 @@
'decoder': {},
'input': {},
'trigger': {},
- 'alarm': {},
'alarm_cluster': {},
'filter': {},
'splitter': {},
diff --git a/heka/files/gse_policies.lua b/heka/files/gse_policies.lua
index 8ec9c06..8de8175 100644
--- a/heka/files/gse_policies.lua
+++ b/heka/files/gse_policies.lua
@@ -29,15 +29,17 @@
logical_operator = '{{ _trigger["logical_operator"] }}',
rules = {
{%- for _rule in _trigger["rules"] %}
- ['function'] = '{{ _rule["function"] }}',
+ {
+ ['function'] = '{{ _rule["function"] }}',
{%- set comma = joiner(",") %}
- ['arguments'] = {
+ ['arguments'] = {
{%- for _argument in _rule["arguments"]|sort -%}
- {{ comma() }}'{{ _argument }}'
+ {{ comma() }}'{{ _argument }}'
{%- endfor -%}
+ },
+ ['relational_operator'] = '{{ _rule["relational_operator"] }}',
+ ['threshold'] = {{ _rule["threshold"] }},
},
- ['relational_operator'] = '{{ _rule["relational_operator"] }}',
- ['threshold'] = {{ _rule["threshold"] }},
{%- endfor %}
},
},
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index f94a0e8..0befcf0 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -216,6 +216,14 @@
else
msg['Fields']['name'] = metric_name
end
+ elseif metric_source == 'ntpd' then
+ if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
+ msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
+ else
+ msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. 'peer'
+ msg['Fields']['server'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'server')
+ end
elseif metric_source == 'check_openstack_api' then
-- For OpenStack API metrics, plugin_instance = <service name>
msg['Fields']['name'] = 'openstack_check_api'
@@ -420,7 +428,31 @@
msg['Fields']['service'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
else
- msg['Fields']['name'] = replace_dot_by_sep(metric_name)
+ -- generic metric name translation for 3rd-party sources
+ msg['Fields']['name'] = sample['plugin']
+ if sample['plugin_instance'] ~= "" then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
+ end
+ if sample['type'] ~= 'gauge' and sample['type'] ~= 'derive' and
+ sample['type'] ~= 'counter' and sample['type'] ~= 'absolute' then
+ -- only for custom DS types
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
+ end
+ if sample['type_instance'] ~= "" then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
+ end
+ if sample['dsnames'][i] ~= "value" then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
+ end
+ msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
+
+ -- add meta fields as tag_fields
+ for k, v in pairs(sample['meta'] or {}) do
+ if tostring(k) ~= '0' then
+ msg['Fields'][k] = v
+ table.insert(msg['Fields']['tag_fields'], k)
+ end
+ end
end
if not skip_it then
diff --git a/heka/files/lua/encoders/status_nagios.lua b/heka/files/lua/encoders/status_nagios.lua
index ad4c540..0507a10 100644
--- a/heka/files/lua/encoders/status_nagios.lua
+++ b/heka/files/lua/encoders/status_nagios.lua
@@ -20,7 +20,6 @@
local interp = require "msg_interpolate"
local host = read_config('nagios_host')
-local service_template = read_config('service_template') or error('service_template is required!')
-- Nagios CGI cannot accept 'plugin_output' parameter greater than 1024 bytes
-- See bug #1517917 for details.
-- With the 'cmd.cgi' re-implementation for the command PROCESS_SERVICE_CHECK_RESULT,
@@ -29,7 +28,6 @@
local data = {
cmd_typ = '30',
cmd_mod = '2',
- host = host,
service = nil,
plugin_state = nil,
plugin_output = nil,
@@ -55,7 +53,7 @@
end
function process_message()
- local service_name = interp.interpolate_from_msg(service_template)
+ local service_name = read_message('Fields[member]')
local status = afd.get_status()
local alarms = afd.alarms_for_human(afd.extract_alarms())
@@ -63,6 +61,11 @@
return -1
end
+ if host then
+ data['host'] = host
+ else
+ data['host'] = read_message('Fields[hostname]') or read_message('Hostname')
+ end
data['service'] = service_name
data['plugin_state'] = nagios_state_map[status]
diff --git a/heka/files/toml/encoder/sandbox.toml b/heka/files/toml/encoder/sandbox.toml
new file mode 100644
index 0000000..67c95a1
--- /dev/null
+++ b/heka/files/toml/encoder/sandbox.toml
@@ -0,0 +1,13 @@
+[{{ encoder_name }}_encoder]
+type = "SandboxEncoder"
+filename = "{{ encoder.module_file }}"
+{%- if encoder.module_dir is defined %}
+module_directory = "{{ encoder.module_dir }}"
+{%- endif %}
+
+{%- if encoder.config is mapping %}
+[{{ encoder_name }}_encoder.config]
+{%- for config_param, config_value in encoder.config.iteritems() %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% elif config_value in [True, False] %}{{ config_value|lower }}{% else %}{{ config_value }}{% endif %}
+{%- endfor %}
+{%- endif %}
diff --git a/heka/files/toml/output/http.toml b/heka/files/toml/output/http.toml
index 1ec3672..fce5d97 100644
--- a/heka/files/toml/output/http.toml
+++ b/heka/files/toml/output/http.toml
@@ -7,14 +7,16 @@
username = "{{ output.username }}"
password = "{{ output.password }}"
{%- endif %}
-http_timeout = {{ output.timeout }}
-method = "POST"
+http_timeout = {{ output.http_timeout|default(2000) }}
+method = "{{ output.method|default("POST") }}"
+{% if output.get('use_buffering', True) %}
use_buffering = true
[{{ output_name }}_output.buffering]
-max_buffer_size = 1610612736
-max_file_size = 134217728
-full_action = "drop"
+max_buffer_size = {{ output.max_buffer_size|default(268435456) }}
+max_file_size = {{ output.max_file_size|default(67108864) }}
+full_action = "{{ output.full_action|default("drop") }}"
+{% endif %}
[{{ output_name }}_output.headers]
Content-Type = ["application/x-www-form-urlencoded"]
diff --git a/heka/map.jinja b/heka/map.jinja
index c7de276..00291d0 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -30,28 +30,45 @@
{%- endload %}
{%- set server = salt['grains.filter_by'](server_defaults, merge=salt['pillar.get']('heka:server')) %}
-{%- load_yaml as elasticsearch_defaults %}
-default:
- elasticsearch_port: 9200
-{%- endload %}
-{% set log_collector = salt['grains.filter_by'](elasticsearch_defaults, merge=salt['pillar.get']('heka:log_collector')) %}
+{% set default_elasticsearch_port = 9200 %}
+{% set default_influxdb_port = 8086 %}
+{% set default_influxdb_time_precision = 'ms' %}
+{% set default_influxdb_timeout = 5000 %}
+{% set default_aggregator_port = 5565 %}
+{% set default_nagios_port = 8001 %}
+{% set default_nagios_host_alarm_clusters = '00-clusters' %}
-{%- load_yaml as influxdb_defaults %}
-default:
- influxdb_port: 8086
- influxdb_time_precision: ms
- influxdb_timeout: 5000
-{%- endload %}
+{% set log_collector = salt['grains.filter_by']({
+ 'default': {
+ 'elasticsearch_port': default_elasticsearch_port,
+ }
+}, merge=salt['pillar.get']('heka:log_collector')) %}
-{%- load_yaml as metric_collector_defaults %}
-default:
- aggregator_port: 5565
-{%- endload %}
-{% set metric_collector = salt['grains.filter_by'](
- metric_collector_defaults,
- merge=salt['grains.filter_by'](
- influxdb_defaults,
- merge=salt['pillar.get']('heka:metric_collector'))) %}
+{% set metric_collector = salt['grains.filter_by']({
+ 'default': {
+ 'influxdb_port': default_influxdb_port,
+ 'influxdb_time_precision': default_influxdb_time_precision,
+ 'influxdb_timeout': default_influxdb_timeout,
+ 'aggregator_port': default_aggregator_port,
+ 'nagios_port': default_nagios_port,
+ }
+}, merge=salt['pillar.get']('heka:metric_collector')) %}
-{% set remote_collector = salt['grains.filter_by'](influxdb_defaults, merge=salt['pillar.get']('heka:remote_collector')) %}
-{% set aggregator = salt['grains.filter_by'](influxdb_defaults, merge=salt['pillar.get']('heka:aggregator')) %}
+{% set remote_collector = salt['grains.filter_by']({
+ 'default': {
+ 'influxdb_port': default_influxdb_port,
+ 'influxdb_time_precision': default_influxdb_time_precision,
+ 'influxdb_timeout': default_influxdb_timeout,
+ 'aggregator_port': default_aggregator_port,
+ }
+}, merge=salt['pillar.get']('heka:remote_collector')) %}
+
+{% set aggregator = salt['grains.filter_by']({
+ 'default': {
+ 'influxdb_port': default_influxdb_port,
+ 'influxdb_time_precision': default_influxdb_time_precision,
+ 'influxdb_timeout': default_influxdb_timeout,
+ 'nagios_port': default_nagios_port,
+ 'nagios_host_alarm_clusters': default_nagios_host_alarm_clusters,
+ }
+}, merge=salt['pillar.get']('heka:aggregator')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 08b99ea..a217510 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -106,6 +106,12 @@
append_newlines: false
prefix_ts: false
{%- endif %}
+{%- if metric_collector.nagios_host is defined %}
+ nagios:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/status_nagios.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+{%- endif %}
output:
metric_dashboard:
engine: dashboard
@@ -131,6 +137,20 @@
port: "{{ metric_collector.aggregator_port }}"
message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
{%- endif %}
+{%- if metric_collector.nagios_host is defined %}
+ nagios_alarm:
+ engine: http
+ address: "http://{{ metric_collector.nagios_host }}:{{metric_collector.nagios_port }}/status"
+ message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric' && Fields[no_alerting] == NIL"
+ encoder: nagios_encoder
+ {%- if metric_collector.nagios_username is defined and metric_collector.nagios_password is defined %}
+ username: {{ metric_collector.get('nagios_username') }}
+ password: {{ metric_collector.get('nagios_password') }}
+ {%- endif %}
+ max_buffer_size: 1048576
+ max_file_size: 524288
+ full_action: drop
+{%- endif %}
remote_collector:
decoder:
collectd:
@@ -153,7 +173,7 @@
module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
preserve_data: false
- message_matcher: "Type =~ /metric$/"
+ message_matcher: "Type == 'heka.sandbox.afd_metric'"
ticker_interval: 1
config:
tag_fields: "deployment_id environment_label tenant_id user_id"
@@ -184,6 +204,13 @@
encoder: influxdb_encoder
timeout: {{ remote_collector.influxdb_timeout }}
{%- endif %}
+{%- if remote_collector.aggregator_host is defined %}
+ aggregator:
+ engine: tcp
+ host: "{{ remote_collector.aggregator_host }}"
+ port: "{{ remote_collector.aggregator_port }}"
+ message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
+{%- endif %}
aggregator:
policy:
# A policy defining that the cluster's status depends on the member with
@@ -315,15 +342,23 @@
tag_fields: "deployment_id environment_label tenant_id user_id"
time_precision: "{{ aggregator.influxdb_time_precision }}"
{%- endif %}
-{%- if aggregator.influxdb_host is defined %}
encoder:
+{%- if aggregator.influxdb_host is defined %}
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
{%- endif %}
-{%- if aggregator.influxdb_host is defined %}
+{%- if aggregator.nagios_host is defined %}
+ nagios:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/status_nagios.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ nagios_host: "{{ aggregator.nagios_host_alarm_clusters }}"
+{%- endif %}
output:
+{%- if aggregator.influxdb_host is defined %}
influxdb:
engine: http
address: "http://{{ aggregator.influxdb_host }}:{{ aggregator.influxdb_port }}/write?db={{ aggregator.influxdb_database }}&precision={{ aggregator.influxdb_time_precision }}"
@@ -335,3 +370,17 @@
encoder: influxdb_encoder
timeout: {{ aggregator.influxdb_timeout }}
{%- endif %}
+{%- if aggregator.nagios_host is defined %}
+ nagios_alarm_cluster:
+ engine: http
+ address: "http://{{ aggregator.nagios_host }}:{{aggregator.nagios_port }}/status"
+ message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.gse_metric' && Fields[no_alerting] == NIL"
+ encoder: nagios_encoder
+ {%- if aggregator.nagios_username is defined and aggregator.nagios_password is defined %}
+ username: {{ aggregator.get('nagios_username') }}
+ password: {{ aggregator.get('nagios_password') }}
+ {%- endif %}
+ max_buffer_size: 1048576
+ max_file_size: 524288
+ full_action: drop
+{%- endif %}