Parse multiline contrail collector logs

Change-Id: I4d7aaa4778d5ed32eb6a8712e10da2fcb8733d53
diff --git a/heka/files/lua/common/patterns.lua b/heka/files/lua/common/patterns.lua
index 84937c8..9c5ae9e 100644
--- a/heka/files/lua/common/patterns.lua
+++ b/heka/files/lua/common/patterns.lua
@@ -144,6 +144,9 @@
 -- Pattern used to match the Java Exception.
 exception = l.P'exception' + l.P'Exception'
 
+-- Contrail multiline patterns.
+multiline = l.P'ApiServer Message:' + l.P'PollResponse message is:'
+
 -- Pattern used to match a number
 Number = l.P"-"^-1 * l.xdigit^1 * (l.S(".,") * l.xdigit^1 )^-1 / tonumber
 
diff --git a/heka/files/lua/decoders/contrail_collector_log.lua b/heka/files/lua/decoders/contrail_collector_log.lua
index c739a7f..fcb4821 100644
--- a/heka/files/lua/decoders/contrail_collector_log.lua
+++ b/heka/files/lua/decoders/contrail_collector_log.lua
@@ -11,13 +11,14 @@
 -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
-local dt     = require "date_time"
-local l      = require 'lpeg'
-l.locale(l)
+require "table"
+local contrail = require 'contrail_patterns'
+local l        = require 'lpeg'
+local utils    = require 'lma_utils'
+local patt     = require 'patterns'
+local table_utils = require 'table_utils'
 
-local patt   = require 'patterns'
-local contrail   = require 'contrail_patterns'
-local utils  = require 'lma_utils'
+l.locale(l)
 
 local msg = {
     Timestamp   = nil,
@@ -29,21 +30,68 @@
     Severity    = 5,
 }
 
+local multiline_key = nil
+local multiline_lines = nil
+
+function prepare_message (timestamp, pid, severity_label, payload, programname)
+    msg.Timestamp = timestamp
+    msg.Pid = pid
+    msg.Payload = payload
+    msg.Severity = utils.label_to_severity_map[severity_label or "SYS_NOTICE"] or 5
+    msg.Fields = {}
+    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+    msg.Fields.programname = programname
+end
+
 function process_message ()
     local log = read_message("Payload")
     local logger = read_message("Logger")
+
     local m = contrail.ControlGrammar:match(log)
     if not m then
-        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+        if multiline_key == nil then
+            return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+        else
+            table.insert(multiline_lines, log)
+            return 0
+        end
     end
-    msg.Timestamp = m.Timestamp
-    msg.Payload = m.Message
-    msg.Fields = {}
-    msg.Pid = m.Pid
-    msg.Fields.programname = m.Module
-    local severity = m.Severity or "SYS_NOTICE"
-    msg.Severity = utils.label_to_severity_map[severity] or 5
-    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+
+    local key = {
+        Timestamp     = m.Timestamp,
+        Pid           = m.Pid,
+        SeverityLabel = m.SeverityLabel,
+        Programname   = m.Module,
+    }
+
+    if multiline_key ~= nil then
+        -- If multiline_key is not nil then it means we've started accumulated
+        -- lines of a multiline message. We keep accumulating the lines
+        -- until we get a different log key.
+        if table_utils.table_equal(multiline_key, key) then
+            table.insert(multiline_lines, m.Message)
+            return 0
+        else
+            prepare_message(multiline_key.Timestamp, multiline_key.Pid,
+                multiline_key.SeverityLabel, table.concat(multiline_lines, ''),
+                multiline_key.Programname)
+            multiline_key = nil
+            multiline_lines = nil
+            utils.inject_tags(msg)
+            -- Ignore safe_inject_message status code here to still get a
+            -- chance to inject the current log message.
+            utils.safe_inject_message(msg)
+        end
+    end
+
+    if patt.anywhere(patt.multiline):match(m.Message) then
+        multiline_key = key
+        multiline_lines = {}
+        table.insert(multiline_lines, m.Message)
+        return 0
+    end
+
+    prepare_message(m.Timestamp, m.Pid, m.Severity, m.Message, m.Module)
     utils.inject_tags(msg)
     return utils.safe_inject_message(msg)
 end