Merge pull request #55 from elemoine/stacklight-matchers
Use better message matchers
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index 7b063d1..6a767e5 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -45,7 +45,8 @@
ERROR = 3,
WARNING = 4,
NOTICE = 5,
- INFO= 6,
+ NOTE = 5,
+ INFO = 6,
DEBUG = 7,
}
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index 0befcf0..5c91ca4 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -427,6 +427,11 @@
msg['Fields']['name'] = 'openstack_check_local_api'
msg['Fields']['service'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
+ elseif metric_source == 'nginx' then
+ msg['Fields']['name'] = 'nginx' .. sep .. string.gsub(sample['type'], '^nginx_', '')
+ if sample['type_instance'] ~= "" then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
+ end
else
-- generic metric name translation for 3rd-party sources
msg['Fields']['name'] = sample['plugin']
diff --git a/heka/files/lua/decoders/galera.lua b/heka/files/lua/decoders/galera.lua
new file mode 100644
index 0000000..c9f385d
--- /dev/null
+++ b/heka/files/lua/decoders/galera.lua
@@ -0,0 +1,63 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+local l = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local utils = require "lma_utils"
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+local programname = read_config('programname') or 'mysql'
+
+-- mysql log messages are formatted like this
+--
+-- 2016-11-09 08:42:34 18430 [Note] InnoDB: Using atomics to ref count buffer pool pages
+local sp = l.space
+local timestamp = l.Cg(patt.Timestamp, "Timestamp")
+local pid = l.Cg(patt.Pid, "Pid")
+local severity = l.P"[" * l.Cg(l.R("az", "AZ")^0 / string.upper, "SeverityLabel") * l.P"]"
+local message = l.Cg(patt.Message, "Message")
+
+local grammar = l.Ct(timestamp * sp^1 * pid * sp^1 * severity * sp^1 * message)
+
+
+function process_message ()
+ local log = read_message("Payload")
+ local m = grammar:match(log)
+ if not m then
+ return -1, string.format("Failed to parse: %s", string.sub(log, 1, 64))
+ end
+
+ msg.Timestamp = m.Timestamp
+ msg.Pid = m.Pid
+ msg.Severity = utils.label_to_severity_map[m.SeverityLabel] or utils.label_to_severity_map.DEBUG
+ msg.Payload = m.Message
+
+ msg.Fields = {}
+ msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+ msg.Fields.programname = programname
+
+ utils.inject_tags(msg)
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/toml/input/amqp.toml b/heka/files/toml/input/amqp.toml
index e75596e..a6c0160 100644
--- a/heka/files/toml/input/amqp.toml
+++ b/heka/files/toml/input/amqp.toml
@@ -1,28 +1,33 @@
[{{ input_name }}_input]
type = "AMQPInput"
-url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}/{{ input.vhost }}"
+url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}:{{ input.port }}/{{ input.vhost }}"
exchange = "{{ input.exchange }}"
exchange_type = "{{ input.exchange_type }}"
-{%- if input.prefetch_count is defined -%}
+{%- if input.prefetch_count is defined %}
prefetch_count = {{ input.prefetch_count }}
{%- endif %}
-{%- if input.exchange_durability is defined -%}
-exchange_durability = "{{ input.exchange_durability }}"
+{%- if input.exchange_durability is defined %}
+exchange_durability = {{ input.exchange_durability|lower }}
{%- endif %}
-{%- if input.exchange_auto_delete is defined -%}
-exchange_auto_delete = "{{ input.exchange_auto_delete }}"
+{%- if input.exchange_auto_delete is defined %}
+exchange_auto_delete = {{ input.exchange_auto_delete|lower }}
{%- endif %}
-{%- if input.queue_auto_delete is defined -%}
-queue_auto_delete = {{ input.queue_auto_delete }}
+{%- if input.queue_auto_delete is defined %}
+queue_auto_delete = {{ input.queue_auto_delete|lower }}
{%- endif %}
-{%- if input.queue is defined -%}
+{%- if input.queue is defined %}
queue = "{{ input.queue }}"
{%- endif %}
-{%- if input.routing_key is defined -%}
+{%- if input.routing_key is defined %}
routing_key = "{{ input.routing_key }}"
{%- endif %}
+{%- if input.can_exit is defined %}
+can_exit = {{ input.can_exit|lower }}
+{%- endif %}
decoder = "{{ input.decoder }}"
+{%- if input.splitter is defined %}
splitter = "{{ input.splitter }}"
+{%- endif %}
{%- if input.ssl is defined and input.ssl.get('enabled', True) %}
[{{ input_name }}_input.tls]
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index de6e29e..090cb96 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -50,7 +50,7 @@
engine: elasticsearch
server: "http://{{ log_collector.elasticsearch_host }}:{{ log_collector.elasticsearch_port }}"
encoder: elasticsearch_encoder
- message_matcher: "Type == 'log' || Type == 'notification'"
+ message_matcher: "Type == 'log'"
{%- endif %}
metric_collector:
decoder:
@@ -159,6 +159,14 @@
module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
config:
hostname: '{{ grains.host }}'
+{%- if remote_collector.amqp_host is defined %}
+ notification:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/decoders/notification.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ include_full_notification: false
+{%- endif %}
input:
heka_collectd:
engine: http
@@ -166,6 +174,26 @@
port: 8326
decoder: collectd_decoder
splitter: NullSplitter
+{%- if remote_collector.amqp_host is defined %}
+{%- for notification_level in ('info', 'warn', 'error') %}
+ amqp_notification_{{ notification_level }}:
+ engine: amqp
+ host: {{ remote_collector.amqp_host }}
+ port: {{ remote_collector.amqp_port }}
+ user: {{ remote_collector.amqp_user }}
+ password: {{ remote_collector.amqp_password }}
+ vhost: {{ remote_collector.get('amqp_vhost', '') }}
+ exchange: {{ remote_collector.get('amqp_exchange', 'nova') }}
+ exchange_type: topic
+ exchange_durability: false
+ exchange_auto_delete: false
+ queue_auto_delete: false
+ queue: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
+ routing_key: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
+ can_exit: false
+ decoder: notification_decoder
+{%- endfor %}
+{%- endif %}
{%- if remote_collector.influxdb_host is defined %}
filter:
influxdb_accumulator:
@@ -179,13 +207,19 @@
tag_fields: "deployment_id environment_label tenant_id user_id"
time_precision: "{{ remote_collector.influxdb_time_precision }}"
{%- endif %}
-{%- if remote_collector.influxdb_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
encoder:
+{%- if remote_collector.influxdb_host is defined %}
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
{%- endif %}
+{%- if remote_collector.elasticsearch_host is defined %}
+ elasticsearch:
+ engine: elasticsearch
+{%- endif %}
+{%- endif %}
output:
remote_collector_dashboard:
engine: dashboard
@@ -211,6 +245,13 @@
port: "{{ remote_collector.aggregator_port }}"
message_matcher: "Type == 'heka.sandbox.afd_metric'"
{%- endif %}
+{%- if remote_collector.elasticsearch_host is defined %}
+ elasticsearch:
+ engine: elasticsearch
+ server: "http://{{ remote_collector.elasticsearch_host }}:{{ remote_collector.elasticsearch_port }}"
+ encoder: elasticsearch_encoder
+ message_matcher: "Type == 'notification'"
+{%- endif %}
aggregator:
policy:
# A policy defining that the cluster's status depends on the member with