Merge pull request #109 from elemoine/elasticsearch-drop
Use full action "drop" for logs
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
index 8f86efd..efbd091 100644
--- a/_modules/heka_alarming.py
+++ b/_modules/heka_alarming.py
@@ -83,6 +83,8 @@
"""
filtered_grains = {}
for service_name, service_data in grains.items():
+ if not service_data:
+ continue
alarm = service_data.get('alarm')
if alarm:
filtered_grains[service_name] = {'alarm': alarm}
diff --git a/heka/files/lua/encoders/status_nagios.lua b/heka/files/lua/encoders/status_nagios.lua
index 9957f70..bf3b9aa 100644
--- a/heka/files/lua/encoders/status_nagios.lua
+++ b/heka/files/lua/encoders/status_nagios.lua
@@ -19,9 +19,17 @@
local lma = require 'lma_utils'
local interp = require "msg_interpolate"
+local host_suffix_dimension_field
+if read_config('host_suffix_dimension_key') then
+ host_suffix_dimension_field = string.format('Fields[%s]', read_config('host_suffix_dimension_key'))
+end
+
-- These 2 configurations are used only to encode GSE messages
local default_host = read_config('default_nagios_host')
-local host_dimension_key = read_config('nagios_host_dimension_key')
+local host_dimension_field
+if read_config('nagios_host_dimension_key') then
+ host_dimension_field = string.format('Fields[%s]', read_config('nagios_host_dimension_key'))
+end
-- Nagios CGI cannot accept 'plugin_output' parameter greater than 1024 bytes
-- See bug #1517917 for details.
@@ -64,12 +72,21 @@
return -1
end
- if host_dimension_key then
- data['host'] = read_message(string.format('Fields[%s]', host_dimension_key)) or default_host
+ local host
+ if host_dimension_field then
+ host = read_message(host_dimension_field) or default_host
else
- data['host'] = read_message('Fields[hostname]') or read_message('Hostname')
+ host = read_message('Fields[hostname]') or read_message('Hostname')
end
+ if host_suffix_dimension_field then
+ local suffix = read_message(host_suffix_dimension_field)
+ if suffix then
+ host = host .. '.' .. suffix
+ end
+ end
+ data['host'] = host
+
data['service'] = service_name
data['plugin_state'] = nagios_state_map[status]
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
index 5fefb2d..9da7248 100644
--- a/heka/files/toml/filter/afd_alarm.toml
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -2,7 +2,7 @@
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/afd.lua"
preserve_data = {{ alarm.preserve_data|default(False)|lower }}
-message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric') && ({{ salt['heka_alarming.alarm_message_matcher'](alarm, trigger) }})"
+message_matcher = "Type =~ /metric$/ && ({{ salt['heka_alarming.alarm_message_matcher'](alarm, trigger) }})"
module_directory = "/usr/share/lma_collector/common;/usr/share/heka/lua_modules"
ticker_interval = 10
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 4a92245..293f0ea 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -77,8 +77,8 @@
port: 5567
decoder: metric_decoder
splitter: HekaFramingSplitter
- filter:
{%- if metric_collector.influxdb_host is defined %}
+ filter:
influxdb_accumulator:
engine: sandbox
module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -90,21 +90,26 @@
tag_fields: "deployment_id environment_label tenant_id user_id"
time_precision: "{{ metric_collector.influxdb_time_precision }}"
{%- endif %}
-{%- if metric_collector.influxdb_host is defined %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.nagios_host is defined %}
encoder:
+ {%- if metric_collector.influxdb_host is defined %}
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
-{%- endif %}
-{%- if metric_collector.nagios_host is defined %}
+ {%- endif %}
+ {%- if metric_collector.nagios_host is defined %}
nagios:
engine: sandbox
module_file: /usr/share/lma_collector/encoders/status_nagios.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ host_suffix_dimension_key: environment_label
+ {%- endif %}
{%- endif %}
+{%- if metric_collector.influxdb_host is defined or metric_collector.aggregator_host is defined or metric_collector.nagios_host is defined %}
output:
-{%- if metric_collector.influxdb_host is defined %}
+ {%- if metric_collector.influxdb_host is defined %}
influxdb:
engine: http
address: "http://{{ metric_collector.influxdb_host }}:{{ metric_collector.influxdb_port }}/write?db={{ metric_collector.influxdb_database }}&precision={{ metric_collector.influxdb_time_precision }}"
@@ -115,15 +120,15 @@
message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
encoder: influxdb_encoder
timeout: {{ metric_collector.influxdb_timeout }}
-{%- endif %}
-{%- if metric_collector.aggregator_host is defined %}
+ {%- endif %}
+ {%- if metric_collector.aggregator_host is defined %}
aggregator:
engine: tcp
host: "{{ metric_collector.aggregator_host }}"
port: "{{ metric_collector.aggregator_port }}"
message_matcher: "Type == 'heka.sandbox.afd_metric'"
-{%- endif %}
-{%- if metric_collector.nagios_host is defined %}
+ {%- endif %}
+ {%- if metric_collector.nagios_host is defined %}
nagios_alarm:
engine: http
address: "http://{{ metric_collector.nagios_host }}:{{metric_collector.nagios_port }}/status"
@@ -136,6 +141,7 @@
max_buffer_size: 1048576
max_file_size: 524288
full_action: drop
+ {%- endif %}
{%- endif %}
remote_collector:
decoder:
@@ -182,7 +188,7 @@
{%- endif %}
{%- if remote_collector.influxdb_host is defined or remote_collector.amqp_host is defined %}
filter:
-{%- if remote_collector.influxdb_host is defined %}
+ {%- if remote_collector.influxdb_host is defined %}
influxdb_accumulator:
engine: sandbox
module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -193,31 +199,32 @@
config:
tag_fields: "deployment_id environment_label tenant_id user_id"
time_precision: "{{ remote_collector.influxdb_time_precision }}"
-{%- endif %}
-{%- if remote_collector.amqp_host is defined %}
+ {%- endif %}
+ {%- if remote_collector.amqp_host is defined %}
resource_creation_time:
engine: sandbox
module_file: /usr/share/lma_collector/filters/resource_creation_time.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
preserve_data: false
message_matcher: "Type == 'notification' && Fields[event_type] =~ /create.end$/"
-{%- endif %}
+ {%- endif %}
{%- endif %}
{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
encoder:
-{%- if remote_collector.influxdb_host is defined %}
+ {%- if remote_collector.influxdb_host is defined %}
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
-{%- endif %}
-{%- if remote_collector.elasticsearch_host is defined %}
+ {%- endif %}
+ {%- if remote_collector.elasticsearch_host is defined %}
elasticsearch:
engine: elasticsearch
+ {%- endif %}
{%- endif %}
-{%- endif %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.aggregator_host is defined or remote_collector.elasticsearch_host is defined %}
output:
-{%- if remote_collector.influxdb_host is defined %}
+ {%- if remote_collector.influxdb_host is defined %}
influxdb:
engine: http
address: "http://{{ remote_collector.influxdb_host }}:{{ remote_collector.influxdb_port }}/write?db={{ remote_collector.influxdb_database }}&precision={{ remote_collector.influxdb_time_precision }}"
@@ -228,20 +235,21 @@
message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
encoder: influxdb_encoder
timeout: {{ remote_collector.influxdb_timeout }}
-{%- endif %}
-{%- if remote_collector.aggregator_host is defined %}
+ {%- endif %}
+ {%- if remote_collector.aggregator_host is defined %}
aggregator:
engine: tcp
host: "{{ remote_collector.aggregator_host }}"
port: "{{ remote_collector.aggregator_port }}"
message_matcher: "Type == 'heka.sandbox.afd_metric'"
-{%- endif %}
-{%- if remote_collector.elasticsearch_host is defined %}
+ {%- endif %}
+ {%- if remote_collector.elasticsearch_host is defined %}
elasticsearch:
engine: elasticsearch
server: "http://{{ remote_collector.elasticsearch_host }}:{{ remote_collector.elasticsearch_port }}"
encoder: elasticsearch_encoder
message_matcher: "Type == 'notification'"
+ {%- endif %}
{%- endif %}
aggregator:
policy:
@@ -338,12 +346,16 @@
threshold: 100
- status: critical
trigger:
- logical_operator: or
+ logical_operator: and
rules:
- - function: percent
+ - function: count
+ arguments: [ okay, warning ]
+ relational_operator: '<='
+ threshold: 1
+ - function: count
arguments: [ critical, down, unknown ]
- relational_operator: '>='
- threshold: 50
+ relational_operator: '>'
+ threshold: 0
- status: warning
trigger:
logical_operator: or
@@ -387,8 +399,8 @@
arguments: [ down, critical, warning, unknown ]
relational_operator: '>'
threshold: 0
- function: percent
- - status: okay
+ function: percent
+ - status: okay
input:
heka_metric:
engine: tcp
@@ -415,26 +427,30 @@
preserve_data: false
message_matcher: "Type == 'heka.sandbox.gse_metric'"
{%- endif %}
+{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined %}
encoder:
-{%- if aggregator.influxdb_host is defined %}
+ {%- if aggregator.influxdb_host is defined %}
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
-{%- endif %}
-{%- if aggregator.nagios_host is defined %}
+ {%- endif %}
+ {%- if aggregator.nagios_host is defined %}
nagios:
engine: sandbox
module_file: /usr/share/lma_collector/encoders/status_nagios.lua
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
config:
+ host_suffix_dimension_key: environment_label
default_nagios_host: "{{ aggregator.nagios_default_host_alarm_clusters }}"
{%- if aggregator.nagios_host_dimension_key is defined %}
nagios_host_dimension_key: "{{ aggregator.nagios_host_dimension_key }}"
{%- endif %}
+ {%- endif %}
{%- endif %}
+{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined %}
output:
-{%- if aggregator.influxdb_host is defined %}
+ {%- if aggregator.influxdb_host is defined %}
influxdb:
engine: http
address: "http://{{ aggregator.influxdb_host }}:{{ aggregator.influxdb_port }}/write?db={{ aggregator.influxdb_database }}&precision={{ aggregator.influxdb_time_precision }}"
@@ -445,8 +461,8 @@
message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
encoder: influxdb_encoder
timeout: {{ aggregator.influxdb_timeout }}
-{%- endif %}
-{%- if aggregator.nagios_host is defined %}
+ {%- endif %}
+ {%- if aggregator.nagios_host is defined %}
nagios_alarm_cluster:
engine: http
address: "http://{{ aggregator.nagios_host }}:{{aggregator.nagios_port }}/status"
@@ -459,10 +475,11 @@
max_buffer_size: 1048576
max_file_size: 524288
full_action: drop
+ {%- endif %}
{%- endif %}
ceilometer_collector:
- decoder:
{%- if ceilometer_collector.amqp_host is defined %}
+ decoder:
sample:
engine: sandbox
module_file: /usr/share/lma_collector/decoders/metering.lua
@@ -472,8 +489,8 @@
decode_resources: {{ ceilometer_collector.resource_decoding|lower }}
metadata_fields: "status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state user_metadata.stack"
{%- endif %}
- input:
{%- if ceilometer_collector.amqp_host is defined %}
+ input:
openstack_sample_amqp:
engine: amqp
user: {{ ceilometer_collector.amqp_user }}
@@ -490,8 +507,8 @@
exchange_auto_delete: false
queue_auto_delete: false
{%- endif %}
- filter:
{%- if ceilometer_collector.influxdb_host is defined %}
+ filter:
ceilometer_influxdb_accumulator:
engine: sandbox
module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
@@ -505,14 +522,15 @@
flush_count: 500
bulk_metric_type_matcher: 'ceilometer_samples'
{%- endif %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
encoder:
-{%- if ceilometer_collector.influxdb_host is defined %}
+ {%- if ceilometer_collector.influxdb_host is defined %}
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
-{%- endif %}
-{%- if ceilometer_collector.elasticsearch_host is defined %}
+ {%- endif %}
+ {%- if ceilometer_collector.elasticsearch_host is defined %}
elasticsearch_resource:
engine: sandbox
module_file: /usr/share/lma_collector/encoders/es_ceilometer_resources.lua
@@ -521,6 +539,7 @@
index: "ceilometer_resource"
type_name: "source"
encoder: "elasticsearch_resources"
+ {%- endif %}
{%- endif %}
{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
output:
diff --git a/heka/server.sls b/heka/server.sls
index c2de1a0..888f04c 100644
--- a/heka/server.sls
+++ b/heka/server.sls
@@ -1,5 +1,5 @@
{%- from "heka/map.jinja" import server with context %}
-{%- if server.enabled %}
+{%- if server.enabled is defined and server.enabled %}
heka_packages:
pkg.latest:
@@ -198,4 +198,4 @@
{%- endif %}
{%- endfor %}
-{%- endif %}
\ No newline at end of file
+{%- endif %}