Parse zookeeper exception log lines
Change-Id: Icafa40ab8794c3bff9e573b39c4cfe5a5cef830f
diff --git a/heka/files/lua/common/patterns.lua b/heka/files/lua/common/patterns.lua
index e330ee3..84937c8 100644
--- a/heka/files/lua/common/patterns.lua
+++ b/heka/files/lua/common/patterns.lua
@@ -141,11 +141,14 @@
-- Pattern used to match the beginning of a Python Traceback.
traceback = l.P'Traceback (most recent call last):'
+-- Pattern used to match the Java Exception.
+exception = l.P'exception' + l.P'Exception'
+
-- Pattern used to match a number
Number = l.P"-"^-1 * l.xdigit^1 * (l.S(".,") * l.xdigit^1 )^-1 / tonumber
-- Java/Log4J Timestamp patterns
--- 2016-11-21 06:38:43,081 - INFO
+-- 2016-11-21 06:38:43,081 - INFO
local time_secfrac = l.Cg(l.P"," * l.digit^1 / tonumber, "sec_frac")
local ts_grammar = l.Ct(dt.date_fullyear * dash * dt.date_month * dash * dt.date_mday * sp * dt.rfc3339_partial_time * time_secfrac)
JavaTimestamp = l.Cg(ts_grammar / dt.time_to_ns, "Timestamp")
diff --git a/heka/files/lua/decoders/zookeeper.lua b/heka/files/lua/decoders/zookeeper.lua
index ed3a338..70187cb 100644
--- a/heka/files/lua/decoders/zookeeper.lua
+++ b/heka/files/lua/decoders/zookeeper.lua
@@ -11,11 +11,14 @@
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-local l = require 'lpeg'
-local utils = require 'lma_utils'
-l.locale(l)
+require "table"
+local java = require 'java_patterns'
+local l = require 'lpeg'
+local utils = require 'lma_utils'
+local patt = require 'patterns'
+local table_utils = require 'table_utils'
-local java = require 'java_patterns'
+l.locale(l)
local msg = {
Timestamp = nil,
@@ -27,20 +30,68 @@
Severity = 6,
}
-function process_message ()
- local log = read_message("Payload")
- local logger = read_message("Logger")
- local m = java.ZookeeperLogGrammar:match(log)
- if not m then
- return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
- end
- msg.Timestamp = m.Timestamp
- msg.Payload = m.Message
- msg.Pid = m.Pid
- msg.Severity = utils.label_to_severity_map[m.SeverityLabel or 'INFO'] or 6
+local exception_key = nil
+local exception_lines = nil
+
+function prepare_message (timestamp, pid, severity_label, payload)
+ msg.Timestamp = timestamp
+ msg.Pid = pid
+ msg.Payload = payload
+ msg.Severity = utils.label_to_severity_map[severity_label or 'INFO'] or 6
msg.Fields = {}
msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
msg.Fields.programname = 'zookeeper'
+end
+
+function process_message ()
+ local log = read_message("Payload")
+ local logger = read_message("Logger")
+
+ local m = java.ZookeeperLogGrammar:match(log)
+ if not m then
+ if exception_key == nil then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ else
+ table.insert(exception_lines, log)
+ return 0
+ end
+ end
+
+ local key = {
+ Timestamp = m.Timestamp,
+ Pid = m.Pid,
+ SeverityLabel = m.SeverityLabel,
+ }
+
+ if exception_key ~= nil then
+ -- If exception_key is not nil then it means we've started accumulated
+ -- lines of a exception. We keep accumulating the exception lines
+ -- until we get a different log key.
+ if table_utils.table_equal(exception_key, key) then
+ table.insert(exception_lines, m.Message)
+ return 0
+ else
+ prepare_message(exception_key.Timestamp, exception_key.Pid,
+ exception_key.SeverityLabel, table.concat(exception_lines, ''))
+ exception_key = nil
+ exception_lines = nil
+ utils.inject_tags(msg)
+ -- Ignore safe_inject_message status code here to still get a
+ -- chance to inject the current log message.
+ utils.safe_inject_message(msg)
+ end
+ end
+
+ if patt.anywhere(patt.exception):match(m.Message) then
+ -- Zookeeper exception detected, begin accumulating the lines making
+ -- up the exception.
+ exception_key = key
+ exception_lines = {}
+ table.insert(exception_lines, m.Message)
+ return 0
+ end
+
+ prepare_message(m.Timestamp, m.Pid, m.SeverityLabel, m.Message)
utils.inject_tags(msg)
return utils.safe_inject_message(msg)
end