Merge "Heka stalled alert improvement"
diff --git a/heka/files/lua/common/patterns.lua b/heka/files/lua/common/patterns.lua
index e330ee3..9c5ae9e 100644
--- a/heka/files/lua/common/patterns.lua
+++ b/heka/files/lua/common/patterns.lua
@@ -141,11 +141,17 @@
 -- Pattern used to match the beginning of a Python Traceback.
 traceback = l.P'Traceback (most recent call last):'
 
+-- Pattern used to match the Java Exception.
+exception = l.P'exception' + l.P'Exception'
+
+-- Contrail multiline patterns.
+multiline = l.P'ApiServer Message:' + l.P'PollResponse message is:'
+
 -- Pattern used to match a number
 Number = l.P"-"^-1 * l.xdigit^1 * (l.S(".,") * l.xdigit^1 )^-1 / tonumber
 
 -- Java/Log4J Timestamp patterns
--- 2016-11-21 06:38:43,081 - INFO 
+-- 2016-11-21 06:38:43,081 - INFO
 local time_secfrac = l.Cg(l.P"," * l.digit^1 / tonumber, "sec_frac")
 local ts_grammar = l.Ct(dt.date_fullyear * dash * dt.date_month * dash * dt.date_mday * sp * dt.rfc3339_partial_time * time_secfrac)
 JavaTimestamp = l.Cg(ts_grammar / dt.time_to_ns, "Timestamp")
diff --git a/heka/files/lua/decoders/contrail_collector_log.lua b/heka/files/lua/decoders/contrail_collector_log.lua
index c739a7f..fcb4821 100644
--- a/heka/files/lua/decoders/contrail_collector_log.lua
+++ b/heka/files/lua/decoders/contrail_collector_log.lua
@@ -11,13 +11,14 @@
 -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
-local dt     = require "date_time"
-local l      = require 'lpeg'
-l.locale(l)
+require "table"
+local contrail = require 'contrail_patterns'
+local l        = require 'lpeg'
+local utils    = require 'lma_utils'
+local patt     = require 'patterns'
+local table_utils = require 'table_utils'
 
-local patt   = require 'patterns'
-local contrail   = require 'contrail_patterns'
-local utils  = require 'lma_utils'
+l.locale(l)
 
 local msg = {
     Timestamp   = nil,
@@ -29,21 +30,68 @@
     Severity    = 5,
 }
 
+local multiline_key = nil
+local multiline_lines = nil
+
+function prepare_message (timestamp, pid, severity_label, payload, programname)
+    msg.Timestamp = timestamp
+    msg.Pid = pid
+    msg.Payload = payload
+    msg.Severity = utils.label_to_severity_map[severity_label or "SYS_NOTICE"] or 5
+    msg.Fields = {}
+    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+    msg.Fields.programname = programname
+end
+
 function process_message ()
     local log = read_message("Payload")
     local logger = read_message("Logger")
+
     local m = contrail.ControlGrammar:match(log)
     if not m then
-        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+        if multiline_key == nil then
+            return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+        else
+            table.insert(multiline_lines, log)
+            return 0
+        end
     end
-    msg.Timestamp = m.Timestamp
-    msg.Payload = m.Message
-    msg.Fields = {}
-    msg.Pid = m.Pid
-    msg.Fields.programname = m.Module
-    local severity = m.Severity or "SYS_NOTICE"
-    msg.Severity = utils.label_to_severity_map[severity] or 5
-    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+
+    local key = {
+        Timestamp     = m.Timestamp,
+        Pid           = m.Pid,
+        SeverityLabel = m.SeverityLabel,
+        Programname   = m.Module,
+    }
+
+    if multiline_key ~= nil then
+        -- If multiline_key is not nil then it means we've started accumulated
+        -- lines of a multiline message. We keep accumulating the lines
+        -- until we get a different log key.
+        if table_utils.table_equal(multiline_key, key) then
+            table.insert(multiline_lines, m.Message)
+            return 0
+        else
+            prepare_message(multiline_key.Timestamp, multiline_key.Pid,
+                multiline_key.SeverityLabel, table.concat(multiline_lines, ''),
+                multiline_key.Programname)
+            multiline_key = nil
+            multiline_lines = nil
+            utils.inject_tags(msg)
+            -- Ignore safe_inject_message status code here to still get a
+            -- chance to inject the current log message.
+            utils.safe_inject_message(msg)
+        end
+    end
+
+    if patt.anywhere(patt.multiline):match(m.Message) then
+        multiline_key = key
+        multiline_lines = {}
+        table.insert(multiline_lines, m.Message)
+        return 0
+    end
+
+    prepare_message(m.Timestamp, m.Pid, m.Severity, m.Message, m.Module)
     utils.inject_tags(msg)
     return utils.safe_inject_message(msg)
 end
diff --git a/heka/files/lua/decoders/openstack_log.lua b/heka/files/lua/decoders/openstack_log.lua
index 95b2002..a8f2c59 100644
--- a/heka/files/lua/decoders/openstack_log.lua
+++ b/heka/files/lua/decoders/openstack_log.lua
@@ -68,7 +68,12 @@
 
     m = patt.openstack:match(log)
     if not m then
-        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+        if traceback_key == nil then
+            return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+        else
+            table.insert(traceback_lines, log)
+            return 0
+        end
     end
 
     local key = {
diff --git a/heka/files/lua/decoders/zookeeper.lua b/heka/files/lua/decoders/zookeeper.lua
index ed3a338..70187cb 100644
--- a/heka/files/lua/decoders/zookeeper.lua
+++ b/heka/files/lua/decoders/zookeeper.lua
@@ -11,11 +11,14 @@
 -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
-local l      = require 'lpeg'
-local utils  = require 'lma_utils'
-l.locale(l)
+require "table"
+local java        = require 'java_patterns'
+local l           = require 'lpeg'
+local utils       = require 'lma_utils'
+local patt        = require 'patterns'
+local table_utils = require 'table_utils'
 
-local java  = require 'java_patterns'
+l.locale(l)
 
 local msg = {
     Timestamp   = nil,
@@ -27,20 +30,68 @@
     Severity    = 6,
 }
 
-function process_message ()
-    local log = read_message("Payload")
-    local logger = read_message("Logger")
-    local m = java.ZookeeperLogGrammar:match(log)
-    if not m then
-        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
-    end
-    msg.Timestamp = m.Timestamp
-    msg.Payload = m.Message
-    msg.Pid = m.Pid
-    msg.Severity = utils.label_to_severity_map[m.SeverityLabel or 'INFO'] or 6
+local exception_key = nil
+local exception_lines = nil
+
+function prepare_message (timestamp, pid, severity_label, payload)
+    msg.Timestamp = timestamp
+    msg.Pid = pid
+    msg.Payload = payload
+    msg.Severity = utils.label_to_severity_map[severity_label or 'INFO'] or 6
     msg.Fields = {}
     msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
     msg.Fields.programname = 'zookeeper'
+end
+
+function process_message ()
+    local log = read_message("Payload")
+    local logger = read_message("Logger")
+
+    local m = java.ZookeeperLogGrammar:match(log)
+    if not m then
+        if exception_key == nil then
+            return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+        else
+            table.insert(exception_lines, log)
+            return 0
+        end
+    end
+
+    local key = {
+        Timestamp     = m.Timestamp,
+        Pid           = m.Pid,
+        SeverityLabel = m.SeverityLabel,
+    }
+
+    if exception_key ~= nil then
+        -- If exception_key is not nil then it means we've started accumulated
+        -- lines of a exception. We keep accumulating the exception lines
+        -- until we get a different log key.
+        if table_utils.table_equal(exception_key, key) then
+            table.insert(exception_lines, m.Message)
+            return 0
+        else
+            prepare_message(exception_key.Timestamp, exception_key.Pid,
+                exception_key.SeverityLabel, table.concat(exception_lines, ''))
+            exception_key = nil
+            exception_lines = nil
+            utils.inject_tags(msg)
+            -- Ignore safe_inject_message status code here to still get a
+            -- chance to inject the current log message.
+            utils.safe_inject_message(msg)
+        end
+    end
+
+    if patt.anywhere(patt.exception):match(m.Message) then
+        -- Zookeeper exception detected, begin accumulating the lines making
+        -- up the exception.
+        exception_key = key
+        exception_lines = {}
+        table.insert(exception_lines, m.Message)
+        return 0
+    end
+
+    prepare_message(m.Timestamp, m.Pid, m.SeverityLabel, m.Message)
     utils.inject_tags(msg)
     return utils.safe_inject_message(msg)
 end