Merge pull request #55 from elemoine/stacklight-matchers

Use better message matchers
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index 7b063d1..6a767e5 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -45,7 +45,8 @@
     ERROR = 3,
     WARNING = 4,
     NOTICE = 5,
-    INFO= 6,
+    NOTE = 5,
+    INFO = 6,
     DEBUG = 7,
 }
 
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index 0befcf0..5c91ca4 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -427,6 +427,11 @@
                 msg['Fields']['name'] = 'openstack_check_local_api'
                 msg['Fields']['service'] = sample['type_instance']
                 table.insert(msg['Fields']['tag_fields'], 'service')
+            elseif metric_source == 'nginx' then
+                msg['Fields']['name'] = 'nginx' .. sep .. string.gsub(sample['type'], '^nginx_', '')
+                if sample['type_instance'] ~= "" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
+                end
             else
                 -- generic metric name translation for 3rd-party sources
                 msg['Fields']['name'] = sample['plugin']
diff --git a/heka/files/lua/decoders/galera.lua b/heka/files/lua/decoders/galera.lua
new file mode 100644
index 0000000..c9f385d
--- /dev/null
+++ b/heka/files/lua/decoders/galera.lua
@@ -0,0 +1,63 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+local l      = require 'lpeg'
+l.locale(l)
+
+local patt   = require 'patterns'
+local utils = require "lma_utils"
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+local programname = read_config('programname') or 'mysql'
+
+-- mysql log messages are formatted like this
+--
+-- 2016-11-09 08:42:34 18430 [Note] InnoDB: Using atomics to ref count buffer pool pages
+local sp = l.space
+local timestamp = l.Cg(patt.Timestamp, "Timestamp")
+local pid = l.Cg(patt.Pid, "Pid")
+local severity = l.P"[" * l.Cg(l.R("az", "AZ")^0 / string.upper, "SeverityLabel") * l.P"]"
+local message = l.Cg(patt.Message, "Message")
+
+local grammar = l.Ct(timestamp * sp^1 * pid * sp^1 * severity * sp^1 * message)
+
+
+function process_message ()
+    local log = read_message("Payload")
+    local m = grammar:match(log)
+    if not m then
+        return -1, string.format("Failed to parse: %s", string.sub(log, 1, 64))
+    end
+
+    msg.Timestamp = m.Timestamp
+    msg.Pid = m.Pid
+    msg.Severity = utils.label_to_severity_map[m.SeverityLabel] or utils.label_to_severity_map.DEBUG
+    msg.Payload = m.Message
+
+    msg.Fields = {}
+    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+    msg.Fields.programname = programname
+
+    utils.inject_tags(msg)
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/toml/input/amqp.toml b/heka/files/toml/input/amqp.toml
index e75596e..a6c0160 100644
--- a/heka/files/toml/input/amqp.toml
+++ b/heka/files/toml/input/amqp.toml
@@ -1,28 +1,33 @@
 [{{ input_name }}_input]
 type = "AMQPInput"
-url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}/{{ input.vhost }}"
+url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}:{{ input.port }}/{{ input.vhost }}"
 exchange = "{{ input.exchange }}"
 exchange_type = "{{ input.exchange_type }}"
-{%- if input.prefetch_count is defined -%}
+{%- if input.prefetch_count is defined %}
 prefetch_count = {{ input.prefetch_count }}
 {%- endif %}
-{%- if input.exchange_durability is defined -%}
-exchange_durability = "{{ input.exchange_durability }}"
+{%- if input.exchange_durability is defined %}
+exchange_durability = {{ input.exchange_durability|lower }}
 {%- endif %}
-{%- if input.exchange_auto_delete is defined -%}
-exchange_auto_delete = "{{ input.exchange_auto_delete }}"
+{%- if input.exchange_auto_delete is defined %}
+exchange_auto_delete = {{ input.exchange_auto_delete|lower }}
 {%- endif %}
-{%- if input.queue_auto_delete is defined -%}
-queue_auto_delete = {{ input.queue_auto_delete }}
+{%- if input.queue_auto_delete is defined %}
+queue_auto_delete = {{ input.queue_auto_delete|lower }}
 {%- endif %}
-{%- if input.queue is defined -%}
+{%- if input.queue is defined %}
 queue = "{{ input.queue }}"
 {%- endif %}
-{%- if input.routing_key is defined -%}
+{%- if input.routing_key is defined %}
 routing_key = "{{ input.routing_key }}"
 {%- endif %}
+{%- if input.can_exit is defined %}
+can_exit = {{ input.can_exit|lower }}
+{%- endif %}
 decoder = "{{ input.decoder }}"
+{%- if input.splitter is defined %}
 splitter = "{{ input.splitter }}"
+{%- endif %}
 
 {%- if input.ssl is defined and input.ssl.get('enabled', True) %}
 [{{ input_name }}_input.tls]
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index de6e29e..090cb96 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -50,7 +50,7 @@
       engine: elasticsearch
       server: "http://{{ log_collector.elasticsearch_host }}:{{ log_collector.elasticsearch_port }}"
       encoder: elasticsearch_encoder
-      message_matcher: "Type == 'log' || Type == 'notification'"
+      message_matcher: "Type == 'log'"
 {%- endif %}
 metric_collector:
   decoder:
@@ -159,6 +159,14 @@
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       config:
         hostname: '{{ grains.host }}'
+{%- if remote_collector.amqp_host is defined %}
+    notification:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/decoders/notification.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        include_full_notification: false
+{%- endif %}
   input:
     heka_collectd:
       engine: http
@@ -166,6 +174,26 @@
       port: 8326
       decoder: collectd_decoder
       splitter: NullSplitter
+{%- if remote_collector.amqp_host is defined %}
+{%- for notification_level in ('info', 'warn', 'error') %}
+    amqp_notification_{{ notification_level }}:
+      engine: amqp
+      host: {{ remote_collector.amqp_host }}
+      port: {{ remote_collector.amqp_port }}
+      user: {{ remote_collector.amqp_user }}
+      password: {{ remote_collector.amqp_password }}
+      vhost: {{ remote_collector.get('amqp_vhost', '') }}
+      exchange: {{ remote_collector.get('amqp_exchange', 'nova') }}
+      exchange_type: topic
+      exchange_durability: false
+      exchange_auto_delete: false
+      queue_auto_delete: false
+      queue: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
+      routing_key: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
+      can_exit: false
+      decoder: notification_decoder
+{%- endfor %}
+{%- endif %}
 {%- if remote_collector.influxdb_host is defined %}
   filter:
     influxdb_accumulator:
@@ -179,13 +207,19 @@
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ remote_collector.influxdb_time_precision }}"
 {%- endif %}
-{%- if remote_collector.influxdb_host is defined %}
+{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined %}
   encoder:
+{%- if remote_collector.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
 {%- endif %}
+{%- if remote_collector.elasticsearch_host is defined %}
+    elasticsearch:
+      engine: elasticsearch
+{%- endif %}
+{%- endif %}
   output:
     remote_collector_dashboard:
       engine: dashboard
@@ -211,6 +245,13 @@
       port: "{{ remote_collector.aggregator_port }}"
       message_matcher: "Type == 'heka.sandbox.afd_metric'"
 {%- endif %}
+{%- if remote_collector.elasticsearch_host is defined %}
+    elasticsearch:
+      engine: elasticsearch
+      server: "http://{{ remote_collector.elasticsearch_host }}:{{ remote_collector.elasticsearch_port }}"
+      encoder: elasticsearch_encoder
+      message_matcher: "Type == 'notification'"
+{%- endif %}
 aggregator:
   policy:
     # A policy defining that the cluster's status depends on the member with