Merge "Heka stalled alert improvement"
diff --git a/heka/files/lua/common/patterns.lua b/heka/files/lua/common/patterns.lua
index e330ee3..9c5ae9e 100644
--- a/heka/files/lua/common/patterns.lua
+++ b/heka/files/lua/common/patterns.lua
@@ -141,11 +141,17 @@
-- Pattern used to match the beginning of a Python Traceback.
traceback = l.P'Traceback (most recent call last):'
+-- Pattern used to match the Java Exception.
+exception = l.P'exception' + l.P'Exception'
+
+-- Contrail multiline patterns.
+multiline = l.P'ApiServer Message:' + l.P'PollResponse message is:'
+
-- Pattern used to match a number
Number = l.P"-"^-1 * l.xdigit^1 * (l.S(".,") * l.xdigit^1 )^-1 / tonumber
-- Java/Log4J Timestamp patterns
--- 2016-11-21 06:38:43,081 - INFO
+-- 2016-11-21 06:38:43,081 - INFO
local time_secfrac = l.Cg(l.P"," * l.digit^1 / tonumber, "sec_frac")
local ts_grammar = l.Ct(dt.date_fullyear * dash * dt.date_month * dash * dt.date_mday * sp * dt.rfc3339_partial_time * time_secfrac)
JavaTimestamp = l.Cg(ts_grammar / dt.time_to_ns, "Timestamp")
diff --git a/heka/files/lua/decoders/contrail_collector_log.lua b/heka/files/lua/decoders/contrail_collector_log.lua
index c739a7f..fcb4821 100644
--- a/heka/files/lua/decoders/contrail_collector_log.lua
+++ b/heka/files/lua/decoders/contrail_collector_log.lua
@@ -11,13 +11,14 @@
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-local dt = require "date_time"
-local l = require 'lpeg'
-l.locale(l)
+require "table"
+local contrail = require 'contrail_patterns'
+local l = require 'lpeg'
+local utils = require 'lma_utils'
+local patt = require 'patterns'
+local table_utils = require 'table_utils'
-local patt = require 'patterns'
-local contrail = require 'contrail_patterns'
-local utils = require 'lma_utils'
+l.locale(l)
local msg = {
Timestamp = nil,
@@ -29,21 +30,68 @@
Severity = 5,
}
+local multiline_key = nil
+local multiline_lines = nil
+
+function prepare_message (timestamp, pid, severity_label, payload, programname)
+ msg.Timestamp = timestamp
+ msg.Pid = pid
+ msg.Payload = payload
+ msg.Severity = utils.label_to_severity_map[severity_label or "SYS_NOTICE"] or 5
+ msg.Fields = {}
+ msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+ msg.Fields.programname = programname
+end
+
function process_message ()
local log = read_message("Payload")
local logger = read_message("Logger")
+
local m = contrail.ControlGrammar:match(log)
if not m then
- return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ if multiline_key == nil then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ else
+ table.insert(multiline_lines, log)
+ return 0
+ end
end
- msg.Timestamp = m.Timestamp
- msg.Payload = m.Message
- msg.Fields = {}
- msg.Pid = m.Pid
- msg.Fields.programname = m.Module
- local severity = m.Severity or "SYS_NOTICE"
- msg.Severity = utils.label_to_severity_map[severity] or 5
- msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+
+ local key = {
+ Timestamp = m.Timestamp,
+ Pid = m.Pid,
+ SeverityLabel = m.SeverityLabel,
+ Programname = m.Module,
+ }
+
+ if multiline_key ~= nil then
+ -- If multiline_key is not nil then it means we've started accumulated
+ -- lines of a multiline message. We keep accumulating the lines
+ -- until we get a different log key.
+ if table_utils.table_equal(multiline_key, key) then
+ table.insert(multiline_lines, m.Message)
+ return 0
+ else
+ prepare_message(multiline_key.Timestamp, multiline_key.Pid,
+ multiline_key.SeverityLabel, table.concat(multiline_lines, ''),
+ multiline_key.Programname)
+ multiline_key = nil
+ multiline_lines = nil
+ utils.inject_tags(msg)
+ -- Ignore safe_inject_message status code here to still get a
+ -- chance to inject the current log message.
+ utils.safe_inject_message(msg)
+ end
+ end
+
+ if patt.anywhere(patt.multiline):match(m.Message) then
+ multiline_key = key
+ multiline_lines = {}
+ table.insert(multiline_lines, m.Message)
+ return 0
+ end
+
+ prepare_message(m.Timestamp, m.Pid, m.Severity, m.Message, m.Module)
utils.inject_tags(msg)
return utils.safe_inject_message(msg)
end
diff --git a/heka/files/lua/decoders/openstack_log.lua b/heka/files/lua/decoders/openstack_log.lua
index 95b2002..a8f2c59 100644
--- a/heka/files/lua/decoders/openstack_log.lua
+++ b/heka/files/lua/decoders/openstack_log.lua
@@ -68,7 +68,12 @@
m = patt.openstack:match(log)
if not m then
- return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ if traceback_key == nil then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ else
+ table.insert(traceback_lines, log)
+ return 0
+ end
end
local key = {
diff --git a/heka/files/lua/decoders/zookeeper.lua b/heka/files/lua/decoders/zookeeper.lua
index ed3a338..70187cb 100644
--- a/heka/files/lua/decoders/zookeeper.lua
+++ b/heka/files/lua/decoders/zookeeper.lua
@@ -11,11 +11,14 @@
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-local l = require 'lpeg'
-local utils = require 'lma_utils'
-l.locale(l)
+require "table"
+local java = require 'java_patterns'
+local l = require 'lpeg'
+local utils = require 'lma_utils'
+local patt = require 'patterns'
+local table_utils = require 'table_utils'
-local java = require 'java_patterns'
+l.locale(l)
local msg = {
Timestamp = nil,
@@ -27,20 +30,68 @@
Severity = 6,
}
-function process_message ()
- local log = read_message("Payload")
- local logger = read_message("Logger")
- local m = java.ZookeeperLogGrammar:match(log)
- if not m then
- return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
- end
- msg.Timestamp = m.Timestamp
- msg.Payload = m.Message
- msg.Pid = m.Pid
- msg.Severity = utils.label_to_severity_map[m.SeverityLabel or 'INFO'] or 6
+local exception_key = nil
+local exception_lines = nil
+
+function prepare_message (timestamp, pid, severity_label, payload)
+ msg.Timestamp = timestamp
+ msg.Pid = pid
+ msg.Payload = payload
+ msg.Severity = utils.label_to_severity_map[severity_label or 'INFO'] or 6
msg.Fields = {}
msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
msg.Fields.programname = 'zookeeper'
+end
+
+function process_message ()
+ local log = read_message("Payload")
+ local logger = read_message("Logger")
+
+ local m = java.ZookeeperLogGrammar:match(log)
+ if not m then
+ if exception_key == nil then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ else
+ table.insert(exception_lines, log)
+ return 0
+ end
+ end
+
+ local key = {
+ Timestamp = m.Timestamp,
+ Pid = m.Pid,
+ SeverityLabel = m.SeverityLabel,
+ }
+
+ if exception_key ~= nil then
+ -- If exception_key is not nil then it means we've started accumulated
+ -- lines of a exception. We keep accumulating the exception lines
+ -- until we get a different log key.
+ if table_utils.table_equal(exception_key, key) then
+ table.insert(exception_lines, m.Message)
+ return 0
+ else
+ prepare_message(exception_key.Timestamp, exception_key.Pid,
+ exception_key.SeverityLabel, table.concat(exception_lines, ''))
+ exception_key = nil
+ exception_lines = nil
+ utils.inject_tags(msg)
+ -- Ignore safe_inject_message status code here to still get a
+ -- chance to inject the current log message.
+ utils.safe_inject_message(msg)
+ end
+ end
+
+ if patt.anywhere(patt.exception):match(m.Message) then
+ -- Zookeeper exception detected, begin accumulating the lines making
+ -- up the exception.
+ exception_key = key
+ exception_lines = {}
+ table.insert(exception_lines, m.Message)
+ return 0
+ end
+
+ prepare_message(m.Timestamp, m.Pid, m.SeverityLabel, m.Message)
utils.inject_tags(msg)
return utils.safe_inject_message(msg)
end