Parse zookeeper exception log lines

Change-Id: Icafa40ab8794c3bff9e573b39c4cfe5a5cef830f
diff --git a/heka/files/lua/common/patterns.lua b/heka/files/lua/common/patterns.lua
index e330ee3..84937c8 100644
--- a/heka/files/lua/common/patterns.lua
+++ b/heka/files/lua/common/patterns.lua
@@ -141,11 +141,14 @@
 -- Pattern used to match the beginning of a Python Traceback.
 traceback = l.P'Traceback (most recent call last):'
 
+-- Pattern used to match the Java Exception.
+exception = l.P'exception' + l.P'Exception'
+
 -- Pattern used to match a number
 Number = l.P"-"^-1 * l.xdigit^1 * (l.S(".,") * l.xdigit^1 )^-1 / tonumber
 
 -- Java/Log4J Timestamp patterns
--- 2016-11-21 06:38:43,081 - INFO 
+-- 2016-11-21 06:38:43,081 - INFO
 local time_secfrac = l.Cg(l.P"," * l.digit^1 / tonumber, "sec_frac")
 local ts_grammar = l.Ct(dt.date_fullyear * dash * dt.date_month * dash * dt.date_mday * sp * dt.rfc3339_partial_time * time_secfrac)
 JavaTimestamp = l.Cg(ts_grammar / dt.time_to_ns, "Timestamp")
diff --git a/heka/files/lua/decoders/zookeeper.lua b/heka/files/lua/decoders/zookeeper.lua
index ed3a338..70187cb 100644
--- a/heka/files/lua/decoders/zookeeper.lua
+++ b/heka/files/lua/decoders/zookeeper.lua
@@ -11,11 +11,14 @@
 -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
-local l      = require 'lpeg'
-local utils  = require 'lma_utils'
-l.locale(l)
+require "table"
+local java        = require 'java_patterns'
+local l           = require 'lpeg'
+local utils       = require 'lma_utils'
+local patt        = require 'patterns'
+local table_utils = require 'table_utils'
 
-local java  = require 'java_patterns'
+l.locale(l)
 
 local msg = {
     Timestamp   = nil,
@@ -27,20 +30,68 @@
     Severity    = 6,
 }
 
-function process_message ()
-    local log = read_message("Payload")
-    local logger = read_message("Logger")
-    local m = java.ZookeeperLogGrammar:match(log)
-    if not m then
-        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
-    end
-    msg.Timestamp = m.Timestamp
-    msg.Payload = m.Message
-    msg.Pid = m.Pid
-    msg.Severity = utils.label_to_severity_map[m.SeverityLabel or 'INFO'] or 6
+local exception_key = nil
+local exception_lines = nil
+
+function prepare_message (timestamp, pid, severity_label, payload)
+    msg.Timestamp = timestamp
+    msg.Pid = pid
+    msg.Payload = payload
+    msg.Severity = utils.label_to_severity_map[severity_label or 'INFO'] or 6
     msg.Fields = {}
     msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
     msg.Fields.programname = 'zookeeper'
+end
+
+function process_message ()
+    local log = read_message("Payload")
+    local logger = read_message("Logger")
+
+    local m = java.ZookeeperLogGrammar:match(log)
+    if not m then
+        if exception_key == nil then
+            return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+        else
+            table.insert(exception_lines, log)
+            return 0
+        end
+    end
+
+    local key = {
+        Timestamp     = m.Timestamp,
+        Pid           = m.Pid,
+        SeverityLabel = m.SeverityLabel,
+    }
+
+    if exception_key ~= nil then
+        -- If exception_key is not nil then it means we've started accumulated
+        -- lines of a exception. We keep accumulating the exception lines
+        -- until we get a different log key.
+        if table_utils.table_equal(exception_key, key) then
+            table.insert(exception_lines, m.Message)
+            return 0
+        else
+            prepare_message(exception_key.Timestamp, exception_key.Pid,
+                exception_key.SeverityLabel, table.concat(exception_lines, ''))
+            exception_key = nil
+            exception_lines = nil
+            utils.inject_tags(msg)
+            -- Ignore safe_inject_message status code here to still get a
+            -- chance to inject the current log message.
+            utils.safe_inject_message(msg)
+        end
+    end
+
+    if patt.anywhere(patt.exception):match(m.Message) then
+        -- Zookeeper exception detected, begin accumulating the lines making
+        -- up the exception.
+        exception_key = key
+        exception_lines = {}
+        table.insert(exception_lines, m.Message)
+        return 0
+    end
+
+    prepare_message(m.Timestamp, m.Pid, m.SeverityLabel, m.Message)
     utils.inject_tags(msg)
     return utils.safe_inject_message(msg)
 end