Parse multiline contrail collector logs
Change-Id: I4d7aaa4778d5ed32eb6a8712e10da2fcb8733d53
diff --git a/heka/files/lua/common/patterns.lua b/heka/files/lua/common/patterns.lua
index 84937c8..9c5ae9e 100644
--- a/heka/files/lua/common/patterns.lua
+++ b/heka/files/lua/common/patterns.lua
@@ -144,6 +144,9 @@
-- Pattern used to match the Java Exception.
exception = l.P'exception' + l.P'Exception'
+-- Contrail multiline patterns.
+multiline = l.P'ApiServer Message:' + l.P'PollResponse message is:'
+
-- Pattern used to match a number
Number = l.P"-"^-1 * l.xdigit^1 * (l.S(".,") * l.xdigit^1 )^-1 / tonumber
diff --git a/heka/files/lua/decoders/contrail_collector_log.lua b/heka/files/lua/decoders/contrail_collector_log.lua
index c739a7f..fcb4821 100644
--- a/heka/files/lua/decoders/contrail_collector_log.lua
+++ b/heka/files/lua/decoders/contrail_collector_log.lua
@@ -11,13 +11,14 @@
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-local dt = require "date_time"
-local l = require 'lpeg'
-l.locale(l)
+require "table"
+local contrail = require 'contrail_patterns'
+local l = require 'lpeg'
+local utils = require 'lma_utils'
+local patt = require 'patterns'
+local table_utils = require 'table_utils'
-local patt = require 'patterns'
-local contrail = require 'contrail_patterns'
-local utils = require 'lma_utils'
+l.locale(l)
local msg = {
Timestamp = nil,
@@ -29,21 +30,68 @@
Severity = 5,
}
+local multiline_key = nil
+local multiline_lines = nil
+
+function prepare_message (timestamp, pid, severity_label, payload, programname)
+ msg.Timestamp = timestamp
+ msg.Pid = pid
+ msg.Payload = payload
+ msg.Severity = utils.label_to_severity_map[severity_label or "SYS_NOTICE"] or 5
+ msg.Fields = {}
+ msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+ msg.Fields.programname = programname
+end
+
function process_message ()
local log = read_message("Payload")
local logger = read_message("Logger")
+
local m = contrail.ControlGrammar:match(log)
if not m then
- return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ if multiline_key == nil then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ else
+ table.insert(multiline_lines, log)
+ return 0
+ end
end
- msg.Timestamp = m.Timestamp
- msg.Payload = m.Message
- msg.Fields = {}
- msg.Pid = m.Pid
- msg.Fields.programname = m.Module
- local severity = m.Severity or "SYS_NOTICE"
- msg.Severity = utils.label_to_severity_map[severity] or 5
- msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+
+ local key = {
+ Timestamp = m.Timestamp,
+ Pid = m.Pid,
+ SeverityLabel = m.SeverityLabel,
+ Programname = m.Module,
+ }
+
+ if multiline_key ~= nil then
+ -- If multiline_key is not nil then it means we've started accumulated
+ -- lines of a multiline message. We keep accumulating the lines
+ -- until we get a different log key.
+ if table_utils.table_equal(multiline_key, key) then
+ table.insert(multiline_lines, m.Message)
+ return 0
+ else
+ prepare_message(multiline_key.Timestamp, multiline_key.Pid,
+ multiline_key.SeverityLabel, table.concat(multiline_lines, ''),
+ multiline_key.Programname)
+ multiline_key = nil
+ multiline_lines = nil
+ utils.inject_tags(msg)
+ -- Ignore safe_inject_message status code here to still get a
+ -- chance to inject the current log message.
+ utils.safe_inject_message(msg)
+ end
+ end
+
+ if patt.anywhere(patt.multiline):match(m.Message) then
+ multiline_key = key
+ multiline_lines = {}
+ table.insert(multiline_lines, m.Message)
+ return 0
+ end
+
+ prepare_message(m.Timestamp, m.Pid, m.Severity, m.Message, m.Module)
utils.inject_tags(msg)
return utils.safe_inject_message(msg)
end