Merge pull request #60 from SwannCroiset/fix-grains
Fix the heka grains for the aggregator/remote_collector
diff --git a/heka/files/lua/common/contrail_patterns.lua b/heka/files/lua/common/contrail_patterns.lua
new file mode 100644
index 0000000..3383a3b
--- /dev/null
+++ b/heka/files/lua/common/contrail_patterns.lua
@@ -0,0 +1,77 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local dt = require "date_time"
+local l = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local tonumber = tonumber
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+local function to_secfrac (num)
+ return tonumber(num) / 1000
+end
+
+local sp = patt.sp
+local spaces = patt.sp^1
+local pgname = patt.programname
+local delimeter = l.P": "
+local message = l.Cg(patt.Message / utils.chomp, "Message")
+local severity = l.Cg(pgname, "Severity")
+
+-- Common patterns
+local modulename = l.Cg(pgname, "Module")
+local ip_address = l.Cg((l.digit + patt.dot)^1, "ip_address")
+local http_status = l.Cg(patt.Number, "status")
+local http_request_time = l.Cg(patt.Number, "request_time")
+local delim = (patt.sp + patt.dash)^1
+local hostname = l.Cg(patt.programname, "Hostname")
+local process_info = l.P"[" * "Thread" * sp * l.Cg(l.digit^1, "ThreadId") *
+ ", " * "Pid" * sp * l.Cg(l.digit^1, "Pid") * "]"
+
+-- Timestamp patterns
+-- 10/31/2016 03:40:47 AM [contrail-alarm-gen]: blabla
+local log_timestamp = l.Cg(dt.build_strftime_grammar("%m/%d/%Y %r") / dt.time_to_ns, "Timestamp")
+
+-- 172.16.10.101 - - [2016-10-31 12:50:36] "POST /subscribe HTTP/1.1" 200 715 0.058196
+local api_timestamp = l.Cg(patt.Timestamp, "Timestamp")
+
+-- 2016-10-27 Thu 17:50:37:633.908 CEST ctl01 [Thread 140024858027968, Pid 23338]: DnsAgent [SYS_INFO]: blabla
+local timezone = dt.timezone + l.P"CET" * l.Cg(l.Cc"+", "offset_sign") *
+ l.Cg(l.Cc"01" / tonumber, "offset_hour") * l.Cg(l.Cc"00"/ tonumber, "offset_min")
+local day_of_week = l.Cg(l.P"Mon" + l.P"Tue" + l.P"Wed" + l.P"Thu" +
+ l.P"Fri" + l.P"Sat" + l.P"Sun", "day_of_week")
+local secfrac_grammar = l.Cg((l.digit^1 * patt.dot * l.digit^1) / to_secfrac, "sec_frac")
+local ts_grammar = l.Ct(dt.rfc3339_full_date * patt.sp * day_of_week * sp *
+ dt.rfc3339_partial_time * ":" * secfrac_grammar * sp * timezone)
+local control_timestamp = l.Cg(ts_grammar / dt.time_to_ns, "Timestamp")
+
+-- Complete grammars
+-- 10/31/2016 03:40:47 AM [contrail-alarm-gen]: blabla
+LogGrammar = l.Ct(log_timestamp * patt.sp * "[" * modulename * "]:" * patt.sp * message)
+
+-- 172.16.10.101 - - [2016-10-31 12:50:36] "POST /subscribe HTTP/1.1" 200 715 0.058196
+ApiGrammar = l.Ct(ip_address * delim * "["* api_timestamp * "]" * sp * message)
+RequestGrammar = l.Ct(l.P'"' * patt.http_request * l.P'"' * sp * http_status * sp * patt.Number * sp * http_request_time)
+
+-- 2016-10-27 Thu 17:50:37:633.908 CEST ctl01 [Thread 140024858027968, Pid 23338]: DnsAgent [SYS_INFO]: blabla
+-- 2016-11-01 Tue 05:24:26:590.824 CET ctl01 [Thread 140310906029824, Pid 18319]: SANDESH: blabla
+ControlGrammar = l.Ct(control_timestamp * spaces * hostname * sp * process_info * l.P": " *
+ modulename * (sp * "[" * severity * "]")^-1 * l.P": "^-1 * message)
+
+return M
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index 6a767e5..7226255 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -42,11 +42,17 @@
EMERGENCY = 0,
ALERT = 1,
CRITICAL = 2,
+ CRIT = 2,
+ SYS_CRIT = 2,
ERROR = 3,
WARNING = 4,
+ WARN = 4,
+ SYS_WARN = 4,
NOTICE = 5,
NOTE = 5,
+ SYS_NOTICE = 5,
INFO = 6,
+ SYS_INFO = 6,
DEBUG = 7,
}
diff --git a/heka/files/lua/decoders/contrail_api_stdout_log.lua b/heka/files/lua/decoders/contrail_api_stdout_log.lua
new file mode 100644
index 0000000..9e7f513
--- /dev/null
+++ b/heka/files/lua/decoders/contrail_api_stdout_log.lua
@@ -0,0 +1,71 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "table"
+local l = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local contrail = require 'contrail_patterns'
+local utils = require 'lma_utils'
+
+local default_severity = read_config('default_severity') or 'NOTICE'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+function process_message ()
+ local log = read_message("Payload")
+ local logger = read_message("Logger")
+
+ local m
+
+ m = contrail.ApiGrammar:match(log)
+ if not m then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ end
+
+ msg.Logger = logger
+ msg.Timestamp = m.Timestamp
+ msg.Payload = m.Message
+ msg.Pid = m.Pid
+ msg.Severity = utils.label_to_severity_map[m.Severity or default_severity]
+ msg.Fields = {}
+ msg.Fields.severity_label = m.Severity or default_severity
+ msg.Fields.programname = m.Module
+
+ m = contrail.RequestGrammar:match(msg.Payload)
+ if m then
+ msg.Fields.http_method = m.http_method
+ msg.Fields.http_status = m.http_status
+ msg.Fields.http_url = m.http_url
+ msg.Fields.http_version = m.http_version
+ msg.Fields.http_response_size = m.http_response_size
+ msg.Fields.http_response_time = m.http_response_time
+ m = patt.ip_address:match(msg.Payload)
+ if m then
+ msg.Fields.http_client_ip_address = m.ip_address
+ end
+ end
+
+ utils.inject_tags(msg)
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/contrail_collector_log.lua b/heka/files/lua/decoders/contrail_collector_log.lua
new file mode 100644
index 0000000..c739a7f
--- /dev/null
+++ b/heka/files/lua/decoders/contrail_collector_log.lua
@@ -0,0 +1,49 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local dt = require "date_time"
+local l = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local contrail = require 'contrail_patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = 5,
+}
+
+function process_message ()
+ local log = read_message("Payload")
+ local logger = read_message("Logger")
+ local m = contrail.ControlGrammar:match(log)
+ if not m then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ end
+ msg.Timestamp = m.Timestamp
+ msg.Payload = m.Message
+ msg.Fields = {}
+ msg.Pid = m.Pid
+ msg.Fields.programname = m.Module
+ local severity = m.Severity or "SYS_NOTICE"
+ msg.Severity = utils.label_to_severity_map[severity] or 5
+ msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+ utils.inject_tags(msg)
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/contrail_log.lua b/heka/files/lua/decoders/contrail_log.lua
new file mode 100644
index 0000000..b2c48ce
--- /dev/null
+++ b/heka/files/lua/decoders/contrail_log.lua
@@ -0,0 +1,44 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local l = require 'lpeg'
+local utils = require 'lma_utils'
+l.locale(l)
+
+local contrail = require 'contrail_patterns'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = 6,
+}
+
+function process_message ()
+ local log = read_message("Payload")
+ local logger = read_message("Logger")
+ local m = contrail.LogGrammar:match(log)
+ if not m then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ end
+ msg.Timestamp = m.Timestamp
+ msg.Payload = m.Message
+ msg.Fields = {}
+ msg.Fields.severity_label = 'INFO'
+ msg.Fields.programname = m.Module
+ utils.inject_tags(msg)
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/contrail_supervisor_log.lua b/heka/files/lua/decoders/contrail_supervisor_log.lua
new file mode 100644
index 0000000..a108fa5
--- /dev/null
+++ b/heka/files/lua/decoders/contrail_supervisor_log.lua
@@ -0,0 +1,48 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local l = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = 5,
+}
+
+local timestamp_grammar = l.Cg(patt.Timestamp, "Timestamp")
+local SeverityLabel = l.Cg(patt.SeverityLabel + l.P"CRIT" + l.P"WARN", "SeverityLabel")
+local grammar = l.Ct(timestamp_grammar * patt.sp * SeverityLabel * l.Cg(patt.Message, "Message"))
+
+function process_message ()
+ local log = read_message("Payload")
+ local logger = read_message("Logger")
+ local m = grammar:match(log)
+ if not m then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ end
+ msg.Timestamp = m.Timestamp
+ msg.Payload = m.Message
+ msg.Fields = {}
+ msg.Fields.severity_label = m.SeverityLabel
+ msg.Fields.programname = m.Module
+ utils.inject_tags(msg)
+ return utils.safe_inject_message(msg)
+end