Merge pull request #60 from SwannCroiset/fix-grains

Fix the heka grains for the aggregator/remote_collector
diff --git a/heka/files/lua/common/contrail_patterns.lua b/heka/files/lua/common/contrail_patterns.lua
new file mode 100644
index 0000000..3383a3b
--- /dev/null
+++ b/heka/files/lua/common/contrail_patterns.lua
@@ -0,0 +1,77 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local dt     = require "date_time"
+local l      = require 'lpeg'
+l.locale(l)
+
+local patt   = require 'patterns'
+local utils  = require 'lma_utils'
+
+local tonumber = tonumber
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+local function to_secfrac (num)
+    return tonumber(num) / 1000
+end
+
+local sp = patt.sp
+local spaces = patt.sp^1
+local pgname = patt.programname
+local delimeter = l.P": "
+local message   = l.Cg(patt.Message / utils.chomp, "Message")
+local severity  = l.Cg(pgname, "Severity")
+
+-- Common patterns
+local modulename =  l.Cg(pgname, "Module")
+local ip_address = l.Cg((l.digit + patt.dot)^1, "ip_address")
+local http_status = l.Cg(patt.Number, "status")
+local http_request_time = l.Cg(patt.Number, "request_time")
+local delim = (patt.sp + patt.dash)^1
+local hostname = l.Cg(patt.programname, "Hostname")
+local process_info = l.P"[" * "Thread" * sp * l.Cg(l.digit^1, "ThreadId") *
+                     ", " * "Pid" * sp * l.Cg(l.digit^1, "Pid") * "]"
+
+-- Timestamp patterns
+-- 10/31/2016 03:40:47 AM [contrail-alarm-gen]: blabla
+local log_timestamp = l.Cg(dt.build_strftime_grammar("%m/%d/%Y %r") / dt.time_to_ns, "Timestamp")
+
+-- 172.16.10.101 - - [2016-10-31 12:50:36] "POST /subscribe HTTP/1.1" 200 715 0.058196
+local api_timestamp = l.Cg(patt.Timestamp, "Timestamp")
+
+-- 2016-10-27 Thu 17:50:37:633.908 CEST  ctl01 [Thread 140024858027968, Pid 23338]: DnsAgent [SYS_INFO]: blabla
+local timezone = dt.timezone + l.P"CET" * l.Cg(l.Cc"+", "offset_sign") *
+                 l.Cg(l.Cc"01" / tonumber, "offset_hour") * l.Cg(l.Cc"00"/ tonumber, "offset_min")
+local day_of_week = l.Cg(l.P"Mon" + l.P"Tue" + l.P"Wed" + l.P"Thu" +
+                         l.P"Fri" + l.P"Sat" + l.P"Sun", "day_of_week")
+local secfrac_grammar = l.Cg((l.digit^1 * patt.dot * l.digit^1) / to_secfrac, "sec_frac")
+local ts_grammar = l.Ct(dt.rfc3339_full_date * patt.sp * day_of_week * sp *
+                        dt.rfc3339_partial_time * ":" * secfrac_grammar * sp * timezone)
+local control_timestamp = l.Cg(ts_grammar / dt.time_to_ns, "Timestamp")
+
+-- Complete grammars
+-- 10/31/2016 03:40:47 AM [contrail-alarm-gen]: blabla
+LogGrammar = l.Ct(log_timestamp * patt.sp * "[" * modulename * "]:" * patt.sp * message)
+
+-- 172.16.10.101 - - [2016-10-31 12:50:36] "POST /subscribe HTTP/1.1" 200 715 0.058196
+ApiGrammar = l.Ct(ip_address * delim * "["* api_timestamp * "]" * sp * message)
+RequestGrammar = l.Ct(l.P'"' * patt.http_request * l.P'"' * sp * http_status * sp * patt.Number * sp * http_request_time)
+
+-- 2016-10-27 Thu 17:50:37:633.908 CEST  ctl01 [Thread 140024858027968, Pid 23338]: DnsAgent [SYS_INFO]: blabla
+-- 2016-11-01 Tue 05:24:26:590.824 CET  ctl01 [Thread 140310906029824, Pid 18319]: SANDESH: blabla
+ControlGrammar = l.Ct(control_timestamp * spaces * hostname * sp * process_info * l.P": " *
+					   modulename * (sp * "[" * severity * "]")^-1 * l.P": "^-1 * message)
+
+return M
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index 6a767e5..7226255 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -42,11 +42,17 @@
     EMERGENCY = 0,
     ALERT = 1,
     CRITICAL = 2,
+    CRIT = 2,
+    SYS_CRIT = 2,
     ERROR = 3,
     WARNING = 4,
+    WARN = 4,
+    SYS_WARN = 4,
     NOTICE = 5,
     NOTE = 5,
+    SYS_NOTICE = 5,
     INFO = 6,
+    SYS_INFO = 6,
     DEBUG = 7,
 }
 
diff --git a/heka/files/lua/decoders/contrail_api_stdout_log.lua b/heka/files/lua/decoders/contrail_api_stdout_log.lua
new file mode 100644
index 0000000..9e7f513
--- /dev/null
+++ b/heka/files/lua/decoders/contrail_api_stdout_log.lua
@@ -0,0 +1,71 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "table"
+local l      = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local contrail = require 'contrail_patterns'
+local utils  = require 'lma_utils'
+
+local default_severity = read_config('default_severity') or 'NOTICE'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+function process_message ()
+    local log = read_message("Payload")
+    local logger = read_message("Logger")
+
+    local m
+
+    m = contrail.ApiGrammar:match(log)
+    if not m then
+        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+    end
+
+    msg.Logger = logger
+    msg.Timestamp = m.Timestamp
+    msg.Payload = m.Message
+    msg.Pid = m.Pid
+    msg.Severity = utils.label_to_severity_map[m.Severity or default_severity]
+    msg.Fields = {}
+    msg.Fields.severity_label = m.Severity or default_severity
+    msg.Fields.programname = m.Module
+
+    m = contrail.RequestGrammar:match(msg.Payload)
+    if m then
+        msg.Fields.http_method = m.http_method
+        msg.Fields.http_status = m.http_status
+        msg.Fields.http_url = m.http_url
+        msg.Fields.http_version = m.http_version
+        msg.Fields.http_response_size = m.http_response_size
+        msg.Fields.http_response_time = m.http_response_time
+        m = patt.ip_address:match(msg.Payload)
+	if m then
+           msg.Fields.http_client_ip_address = m.ip_address
+	end
+    end
+
+    utils.inject_tags(msg)
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/contrail_collector_log.lua b/heka/files/lua/decoders/contrail_collector_log.lua
new file mode 100644
index 0000000..c739a7f
--- /dev/null
+++ b/heka/files/lua/decoders/contrail_collector_log.lua
@@ -0,0 +1,49 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local dt     = require "date_time"
+local l      = require 'lpeg'
+l.locale(l)
+
+local patt   = require 'patterns'
+local contrail   = require 'contrail_patterns'
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = 5,
+}
+
+function process_message ()
+    local log = read_message("Payload")
+    local logger = read_message("Logger")
+    local m = contrail.ControlGrammar:match(log)
+    if not m then
+        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+    end
+    msg.Timestamp = m.Timestamp
+    msg.Payload = m.Message
+    msg.Fields = {}
+    msg.Pid = m.Pid
+    msg.Fields.programname = m.Module
+    local severity = m.Severity or "SYS_NOTICE"
+    msg.Severity = utils.label_to_severity_map[severity] or 5
+    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+    utils.inject_tags(msg)
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/contrail_log.lua b/heka/files/lua/decoders/contrail_log.lua
new file mode 100644
index 0000000..b2c48ce
--- /dev/null
+++ b/heka/files/lua/decoders/contrail_log.lua
@@ -0,0 +1,44 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local l      = require 'lpeg'
+local utils  = require 'lma_utils'
+l.locale(l)
+
+local contrail   = require 'contrail_patterns'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = 6,
+}
+
+function process_message ()
+    local log = read_message("Payload")
+    local logger = read_message("Logger")
+    local m = contrail.LogGrammar:match(log)
+    if not m then
+        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+    end
+    msg.Timestamp = m.Timestamp
+    msg.Payload = m.Message
+    msg.Fields = {}
+    msg.Fields.severity_label = 'INFO'
+    msg.Fields.programname = m.Module
+    utils.inject_tags(msg)
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/contrail_supervisor_log.lua b/heka/files/lua/decoders/contrail_supervisor_log.lua
new file mode 100644
index 0000000..a108fa5
--- /dev/null
+++ b/heka/files/lua/decoders/contrail_supervisor_log.lua
@@ -0,0 +1,48 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local l      = require 'lpeg'
+l.locale(l)
+
+local patt   = require 'patterns'
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = 5,
+}
+
+local timestamp_grammar = l.Cg(patt.Timestamp, "Timestamp")
+local SeverityLabel = l.Cg(patt.SeverityLabel + l.P"CRIT" + l.P"WARN", "SeverityLabel")
+local grammar = l.Ct(timestamp_grammar * patt.sp * SeverityLabel * l.Cg(patt.Message, "Message"))
+
+function process_message ()
+    local log = read_message("Payload")
+    local logger = read_message("Logger")
+    local m = grammar:match(log)
+    if not m then
+        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+    end
+    msg.Timestamp = m.Timestamp
+    msg.Payload = m.Message
+    msg.Fields = {}
+    msg.Fields.severity_label = m.SeverityLabel
+    msg.Fields.programname = m.Module
+    utils.inject_tags(msg)
+    return utils.safe_inject_message(msg)
+end