Add fluentd-based notification transport

Change-Id: I84de6582603aee57d7f868f6493fce8ae226aa9a
PROD-related: PROD-29185
diff --git a/fluentd/label/notifications/audit.yml b/fluentd/label/notifications/audit.yml
new file mode 100644
index 0000000..6449e1e
--- /dev/null
+++ b/fluentd/label/notifications/audit.yml
@@ -0,0 +1,51 @@
+parameters:
+  _param:
+    elasticsearch_port: 9200
+  fluentd:
+    agent:
+      config:
+        label:
+          audit_messages:
+            filter:
+              get_payload_values:
+                tag: audit
+                type: record_transformer
+                enable_ruby: true
+                record:
+                  - name: Logger
+                    value: ${fluentd:dollar}{ record.dig("publisher_id") }
+                  - name: Severity
+                    value: ${fluentd:dollar}{ {'TRACE'=>7,'DEBUG'=>7,'INFO'=>6,'AUDIT'=>6,'WARNING'=>4,'ERROR'=>3,'CRITICAL'=>2}[record['priority']].to_i }
+                  - name: Timestamp
+                    value: ${fluentd:dollar}{ DateTime.strptime(record.dig("payload", "eventTime"), "%Y-%m-%dT%H:%M:%S.%N%z").strftime("%Y-%m-%dT%H:%M:%S.%3NZ") }
+                  - name: notification_type
+                    value: ${fluentd:dollar}{ record.dig("event_type") }
+                  - name: severity_label
+                    value: ${fluentd:dollar}{ record.dig("priority") }
+                  - name: environment_label
+                    value: ${_param:cluster_domain}
+
+                  - name: action
+                    value: ${fluentd:dollar}{ record.dig("payload", "action") }
+                  - name: event_type
+                    value: ${fluentd:dollar}{ record.dig("payload", "eventType") }
+                  - name: outcome
+                    value: ${fluentd:dollar}{ record.dig("payload", "outcome") }
+              pack_payload_to_json:
+                tag: audit
+                require:
+                  - get_payload_values
+                type: record_transformer
+                enable_ruby: true
+                remove_keys: '["payload", "timestamp", "publisher_id", "priority"]'
+                record:
+                  - name: Payload
+                    value: ${fluentd:dollar}{ record["payload"].to_json }
+            match:
+              audit_output:
+                tag: audit
+                type: elasticsearch
+                host: ${_param:stacklight_log_address}
+                port: ${_param:elasticsearch_port}
+                es_index_name: audit
+                tag_key: Type
diff --git a/fluentd/label/notifications/init.yml b/fluentd/label/notifications/init.yml
new file mode 100644
index 0000000..e4e57f8
--- /dev/null
+++ b/fluentd/label/notifications/init.yml
@@ -0,0 +1,4 @@
+classes:
+- system.fluentd.label.notifications.input_rabbitmq
+- system.fluentd.label.notifications.notifications
+- system.fluentd.label.notifications.audit
diff --git a/fluentd/label/notifications/input_rabbitmq.yml b/fluentd/label/notifications/input_rabbitmq.yml
new file mode 100644
index 0000000..3d7edef
--- /dev/null
+++ b/fluentd/label/notifications/input_rabbitmq.yml
@@ -0,0 +1,107 @@
+parameters:
+  _param:
+    stacklight_notification_topic: stacklight_notifications
+  fluentd:
+    agent:
+      config:
+        label:
+          rabbitmq_notifications:
+            input:
+              tail_rabbitmq_info:
+                tag: raw_notifications
+                type: rabbitmq
+                host: ${_param:openstack_message_queue_address}
+                user: openstack
+                pass: ${_param:rabbitmq_openstack_password}
+                vhost: /openstack
+                queue: ${_param:stacklight_notification_topic}.info
+                routing_key: ${_param:stacklight_notification_topic}.info
+                parser:
+                  type: json
+              tail_rabbitmq_warn:
+                tag: raw_notifications
+                type: rabbitmq
+                host: ${_param:openstack_message_queue_address}
+                user: openstack
+                pass: ${_param:rabbitmq_openstack_password}
+                vhost: /openstack
+                queue: ${_param:stacklight_notification_topic}.warn
+                routing_key: ${_param:stacklight_notification_topic}.warn
+                parser:
+                  type: json
+              tail_rabbitmq_error:
+                tag: raw_notifications
+                type: rabbitmq
+                host: ${_param:openstack_message_queue_address}
+                user: openstack
+                pass: ${_param:rabbitmq_openstack_password}
+                vhost: /openstack
+                queue: ${_param:stacklight_notification_topic}.error
+                routing_key: ${_param:stacklight_notification_topic}.error
+                parser:
+                  type: json
+            filter:
+              parse_json:
+                tag: raw_notifications
+                type: parser
+                key_name: oslo.message
+                reserve_data: false
+                hash_value_field: parsed
+                parser:
+                  type: json
+              remove_context:
+                tag: raw_notifications
+                require:
+                  - parse_json
+                type: record_transformer
+                enable_ruby: true
+                remove_keys: _dummy_1
+                record:
+                  - name: _dummy_1
+                    value: ${fluentd:dollar}{record['parsed'].delete_if { |k,_| k.include?('_context_') }; nil}
+              pack_parsed_to_json:
+                tag: raw_notifications
+                require:
+                  - remove_context
+                type: record_transformer
+                enable_ruby: true
+                record:
+                  - name: parsed
+                    value: ${fluentd:dollar}{record["parsed"].to_json}
+              unpack_on_top_level:
+                tag: raw_notifications
+                require:
+                  - pack_parsed_to_json
+                type: parser
+                key_name: parsed
+                reserve_data: false
+                parser:
+                  type: json
+              detect_audit_notification:
+                tag: raw_notifications
+                require:
+                  - unpack_on_top_level
+                type: record_transformer
+                enable_ruby: true
+                record:
+                  - name: notification_type
+                    value: '${fluentd:dollar}{ record["payload"]["eventType"] && record["payload"]["eventTime"] ? "audit" : "notification" }'
+            match:
+              rewrite_message_tag:
+                tag: raw_notifications
+                type: rewrite_tag_filter
+                rule:
+                  - name: notification_type
+                    regexp: 'audit'
+                    result: audit
+                  - name: notification_type
+                    regexp: '/.+/'
+                    result: notification
+              forward_notification:
+                tag: notification
+                type: relabel
+                label: notification_messages
+              forward_audit:
+                tag: audit
+                type: relabel
+                label: audit_messages
diff --git a/fluentd/label/notifications/notifications.yml b/fluentd/label/notifications/notifications.yml
new file mode 100644
index 0000000..5556d6e
--- /dev/null
+++ b/fluentd/label/notifications/notifications.yml
@@ -0,0 +1,124 @@
+parameters:
+  _param:
+    elasticsearch_port: 9200
+  fluentd:
+    agent:
+      config:
+        label:
+          notification_messages:
+            filter:
+              parse_publuisher_host:
+                tag: notification
+                type: parser
+                key_name: publisher_id
+                reserve_data: true
+                parser:
+                  type: regexp
+                  format: (?<publisher>\w+).(?<hostname>\w+)
+              save_hostname:
+                tag: notification
+                require:
+                  - parse_publuisher_host
+                type: record_transformer
+                enable_ruby: true
+                record:
+                  - name: Hostname
+                    value: ${fluentd:dollar}{ record["hostname"] }
+              parse_source:
+                tag: notification
+                require:
+                  - save_hostname
+                type: parser
+                key_name: event_type
+                reserve_data: true
+                parser:
+                  type: regexp
+                  format: (?<event_type_logger>\w+).+
+              map_logger:
+                tag: notification
+                require:
+                  - parse_source
+                type: record_transformer
+                enable_ruby: true
+                remove_keys: event_type_logger
+                record:
+                  - name: Logger
+                    value: ${fluentd:dollar}{ {'volume'=>'cinder', 'snapshot'=>'cinder', 'image'=>'glance', 'orchestration'=>'heat', 'identity'=>'keystone', 'compute'=>'nova', 'compute_task'=>'nova', 'scheduler'=>'nova', 'keypair'=>'nova', 'floatingip' =>'neutron', 'security_group' =>'neutron', 'security_group_rule' =>'neutron', 'network' =>'neutron', 'port' =>'neutron', 'router' =>'neutron', 'subnet' =>'neutron', 'sahara' =>'sahara'}[record["event_type_logger"]] }
+              get_payload_values:
+                tag: notification
+                require:
+                  - map_logger
+                type: record_transformer
+                enable_ruby: true
+                record:
+                  - name: Timestamp
+                    value: ${fluentd:dollar}{ DateTime.strptime(record['timestamp'], '%Y-%m-%d %H:%M:%S.%N').strftime('%Y-%m-%dT%H:%M:%S.%3NZ') }
+                  - name: severity_label
+                    value: ${fluentd:dollar}{ record["priority"] }
+                  - name: Severity
+                    value: ${fluentd:dollar}{ {'TRACE'=>7,'DEBUG'=>7,'INFO'=>6,'AUDIT'=>6,'WARNING'=>4,'ERROR'=>3,'CRITICAL'=>2}[record['priority']].to_i }
+                  - name: Hostname
+                    value: '${fluentd:dollar}{ record["payload"].has_key?("host") ? record["payload"]["host"] : record["Hostname"] }'
+                  - name: environment_label
+                    value: ${_param:cluster_domain}
+
+                  - name: tenant_id
+                    value: ${fluentd:dollar}{ record.dig("payload", "tenant_id") }
+                  - name: user_id
+                    value: ${fluentd:dollar}{ record.dig("payload", "user_id") }
+                  - name: display_name
+                    value: ${fluentd:dollar}{ record.dig("payload", "display_name") }
+                  - name: vcpus
+                    value: ${fluentd:dollar}{ record.dig("payload", "vcpus") }
+                  - name: availability_zone
+                    value: ${fluentd:dollar}{ record.dig("payload", "availability_zone") }
+                  - name: instance_id
+                    value: ${fluentd:dollar}{ record.dig("payload", "instance_id") }
+                  - name: instance_type
+                    value: ${fluentd:dollar}{ record.dig("payload", "instance_type") }
+                  - name: image_name
+                    value: ${fluentd:dollar}{ record.dig("payload", "image_name") }
+                  - name: memory_mb
+                    value: ${fluentd:dollar}{ record.dig("payload", "memory_mb") }
+                  - name: disk_gb
+                    value: ${fluentd:dollar}{ record.dig("payload", "disk_gb") }
+                  - name: state
+                    value: ${fluentd:dollar}{ record.dig("payload", "state") }
+                  - name: old_state
+                    value: ${fluentd:dollar}{ record.dig("payload", "old_state") }
+                  - name: old_task_state
+                    value: ${fluentd:dollar}{ record.dig("payload", "old_task_state") }
+                  - name: new_task_state
+                    value: ${fluentd:dollar}{ record.dig("payload", "new_task_state") }
+                  - name: network_id
+                    value: ${fluentd:dollar}{ record.dig("payload", "network_id") }
+                  - name: subnet_id
+                    value: ${fluentd:dollar}{ record.dig("payload", "subnet_id") }
+                  - name: port_id
+                    value: ${fluentd:dollar}{ record.dig("payload", "port_id") }
+                  - name: volume_id
+                    value: ${fluentd:dollar}{ record.dig("payload", "volume_id") }
+                  - name: size
+                    value: ${fluentd:dollar}{ record.dig("payload", "size") }
+                  - name: status
+                    value: ${fluentd:dollar}{ record.dig("payload", "status") }
+                  - name: replication_status
+                    value: ${fluentd:dollar}{ record.dig("payload", "replication_status") }
+              pack_payload_to_json:
+                tag: notification
+                require:
+                  - get_payload_values
+                type: record_transformer
+                enable_ruby: true
+                remove_keys: '["timestamp", "publisher_id", "priority", "notification_type", "payload"]'
+                record:
+                  - name: Payload
+                    value: ${fluentd:dollar}{ record["payload"].to_json }
+            match:
+              notifications_output:
+                tag: notification
+                type: elasticsearch
+                host: ${_param:stacklight_log_address}
+                port: ${_param:elasticsearch_port}
+                es_index_name: notification
+                tag_key: Type