Extend notification decoder to support CADF

Change-Id: Ie16578792cc954c344e64cfe45e8b4504e4126e3
diff --git a/heka/files/lua/decoders/notification.lua b/heka/files/lua/decoders/notification.lua
index 5b1195d..3f10e0d 100644
--- a/heka/files/lua/decoders/notification.lua
+++ b/heka/files/lua/decoders/notification.lua
@@ -17,13 +17,6 @@
 local patt = require 'patterns'
 local utils = require 'lma_utils'
 
-local msg = {
-    Timestamp = nil,
-    Type = "notification",
-    Payload = nil,
-    Fields = nil
-}
-
 -- Mapping table from event_type prefixes to notification loggers
 local logger_map = {
     --cinder
@@ -108,7 +101,56 @@
 
 local include_full_notification = read_config("include_full_notification") or false
 
-function process_message ()
+function process_cadf_event(notif, msg)
+    local cadf_event = notif.payload
+
+    msg.Type = 'audit'
+    msg.Logger = notif.publisher_id
+    msg.Severity = utils.label_to_severity_map[notif.priority]
+    msg.Timestamp = patt.Timestamp:match(cadf_event.eventTime)
+
+    msg.Fields.action = cadf_event.action
+    -- notif.event_type can be 'http.request' or 'http.response'
+    msg.Fields.notification_type = notif.event_type
+    -- cadf_event.eventType can be 'activity', 'monitor', ...
+    msg.Fields.event_type = cadf_event.eventType
+    msg.Fields.outcome = cadf_event.outcome
+    msg.Fields.severity_label = notif.priority
+end
+
+function process_notification(notif, msg)
+    local openstack_notif = notif.payload
+
+    msg.Type = 'notification'
+    msg.Logger = logger_map[string.match(notif.event_type, '([^.]+)')]
+    msg.Severity = utils.label_to_severity_map[notif.priority]
+    msg.Timestamp = patt.Timestamp:match(notif.timestamp)
+
+    msg.Fields.publisher, msg.Hostname = string.match(notif.publisher_id, '([^.]+)%.([%w_-]+)')
+    if openstack_notif.host ~= nil then
+        msg.Hostname = string.match(openstack_notif.host, '([%w_-]+)')
+    end
+
+    msg.Fields.event_type = notif.event_type
+    msg.Fields.severity_label = notif.priority
+    msg.Fields.hostname = msg.Hostname
+
+    for k, v in pairs(payload_fields) do
+        local val = openstack_notif[k]
+        if val ~= nil then
+            local name = payload_fields[k] or k
+            local transform = transform_functions[k]
+            if transform ~= nil then
+                msg.Fields[name] = transform(val)
+            else
+                msg.Fields[name] = val
+            end
+        end
+    end
+end
+
+function process_message()
+    local msg = {Fields={}}
     local data = read_message("Payload")
     local ok, notif = pcall(cjson.decode, data)
     if not ok then
@@ -130,32 +172,20 @@
         msg.Payload = utils.safe_json_encode(notif.payload) or '{}'
     end
 
-    msg.Fields = {}
-    msg.Logger = logger_map[string.match(notif.event_type, '([^.]+)')]
-    msg.Severity = utils.label_to_severity_map[notif.priority]
-    msg.Timestamp = patt.Timestamp:match(notif.timestamp)
-    msg.Fields.publisher, msg.Hostname = string.match(notif.publisher_id, '([^.]+)%.([%w_-]+)')
-    if notif.payload.host ~= nil then
-        msg.Hostname = string.match(notif.payload.host, '([%w_-]+)')
+    local ok, error_msg
+    if notif.payload.eventType and notif.payload.eventTime then
+        -- Payload of CADF event notifications always contain at least
+        -- eventType and eventTime fields
+        -- http://docs.openstack.org/developer/pycadf/specification/events.html
+        ok, error_msg = pcall(process_cadf_event, notif, msg)
+    else
+        ok, error_msg = pcall(process_notification, notif, msg)
     end
 
-    msg.Fields.event_type = notif.event_type
-    msg.Fields.severity_label = notif.priority
-    msg.Fields.hostname = msg.Hostname
-
-    for k, v in pairs(payload_fields) do
-        local val = notif.payload[k]
-        if val ~= nil then
-            local name = payload_fields[k] or k
-            local transform = transform_functions[k]
-            if transform ~= nil then
-                msg.Fields[name] = transform(val)
-            else
-                msg.Fields[name] = val
-            end
-        end
+    if not ok then
+        return -1, error_msg
     end
+
     utils.inject_tags(msg)
-
     return utils.safe_inject_message(msg)
 end