Add fluentd-based notification transport
Change-Id: I84de6582603aee57d7f868f6493fce8ae226aa9a
PROD-related: PROD-29185
diff --git a/fluentd/label/notifications/audit.yml b/fluentd/label/notifications/audit.yml
new file mode 100644
index 0000000..6449e1e
--- /dev/null
+++ b/fluentd/label/notifications/audit.yml
@@ -0,0 +1,51 @@
+parameters:
+ _param:
+ elasticsearch_port: 9200
+ fluentd:
+ agent:
+ config:
+ label:
+ audit_messages:
+ filter:
+ get_payload_values:
+ tag: audit
+ type: record_transformer
+ enable_ruby: true
+ record:
+ - name: Logger
+ value: ${fluentd:dollar}{ record.dig("publisher_id") }
+ - name: Severity
+ value: ${fluentd:dollar}{ {'TRACE'=>7,'DEBUG'=>7,'INFO'=>6,'AUDIT'=>6,'WARNING'=>4,'ERROR'=>3,'CRITICAL'=>2}[record['priority']].to_i }
+ - name: Timestamp
+ value: ${fluentd:dollar}{ DateTime.strptime(record.dig("payload", "eventTime"), "%Y-%m-%dT%H:%M:%S.%N%z").strftime("%Y-%m-%dT%H:%M:%S.%3NZ") }
+ - name: notification_type
+ value: ${fluentd:dollar}{ record.dig("event_type") }
+ - name: severity_label
+ value: ${fluentd:dollar}{ record.dig("priority") }
+ - name: environment_label
+ value: ${_param:cluster_domain}
+
+ - name: action
+ value: ${fluentd:dollar}{ record.dig("payload", "action") }
+ - name: event_type
+ value: ${fluentd:dollar}{ record.dig("payload", "eventType") }
+ - name: outcome
+ value: ${fluentd:dollar}{ record.dig("payload", "outcome") }
+ pack_payload_to_json:
+ tag: audit
+ require:
+ - get_payload_values
+ type: record_transformer
+ enable_ruby: true
+ remove_keys: '["payload", "timestamp", "publisher_id", "priority"]'
+ record:
+ - name: Payload
+ value: ${fluentd:dollar}{ record["payload"].to_json }
+ match:
+ audit_output:
+ tag: audit
+ type: elasticsearch
+ host: ${_param:stacklight_log_address}
+ port: ${_param:elasticsearch_port}
+ es_index_name: audit
+ tag_key: Type
diff --git a/fluentd/label/notifications/init.yml b/fluentd/label/notifications/init.yml
new file mode 100644
index 0000000..e4e57f8
--- /dev/null
+++ b/fluentd/label/notifications/init.yml
@@ -0,0 +1,4 @@
+classes:
+- system.fluentd.label.notifications.input_rabbitmq
+- system.fluentd.label.notifications.notifications
+- system.fluentd.label.notifications.audit
diff --git a/fluentd/label/notifications/input_rabbitmq.yml b/fluentd/label/notifications/input_rabbitmq.yml
new file mode 100644
index 0000000..3d7edef
--- /dev/null
+++ b/fluentd/label/notifications/input_rabbitmq.yml
@@ -0,0 +1,107 @@
+parameters:
+ _param:
+ stacklight_notification_topic: stacklight_notifications
+ fluentd:
+ agent:
+ config:
+ label:
+ rabbitmq_notifications:
+ input:
+ tail_rabbitmq_info:
+ tag: raw_notifications
+ type: rabbitmq
+ host: ${_param:openstack_message_queue_address}
+ user: openstack
+ pass: ${_param:rabbitmq_openstack_password}
+ vhost: /openstack
+ queue: ${_param:stacklight_notification_topic}.info
+ routing_key: ${_param:stacklight_notification_topic}.info
+ parser:
+ type: json
+ tail_rabbitmq_warn:
+ tag: raw_notifications
+ type: rabbitmq
+ host: ${_param:openstack_message_queue_address}
+ user: openstack
+ pass: ${_param:rabbitmq_openstack_password}
+ vhost: /openstack
+ queue: ${_param:stacklight_notification_topic}.warn
+ routing_key: ${_param:stacklight_notification_topic}.warn
+ parser:
+ type: json
+ tail_rabbitmq_error:
+ tag: raw_notifications
+ type: rabbitmq
+ host: ${_param:openstack_message_queue_address}
+ user: openstack
+ pass: ${_param:rabbitmq_openstack_password}
+ vhost: /openstack
+ queue: ${_param:stacklight_notification_topic}.error
+ routing_key: ${_param:stacklight_notification_topic}.error
+ parser:
+ type: json
+ filter:
+ parse_json:
+ tag: raw_notifications
+ type: parser
+ key_name: oslo.message
+ reserve_data: false
+ hash_value_field: parsed
+ parser:
+ type: json
+ remove_context:
+ tag: raw_notifications
+ require:
+ - parse_json
+ type: record_transformer
+ enable_ruby: true
+ remove_keys: _dummy_1
+ record:
+ - name: _dummy_1
+ value: ${fluentd:dollar}{record['parsed'].delete_if { |k,_| k.include?('_context_') }; nil}
+ pack_parsed_to_json:
+ tag: raw_notifications
+ require:
+ - remove_context
+ type: record_transformer
+ enable_ruby: true
+ record:
+ - name: parsed
+ value: ${fluentd:dollar}{record["parsed"].to_json}
+ unpack_on_top_level:
+ tag: raw_notifications
+ require:
+ - pack_parsed_to_json
+ type: parser
+ key_name: parsed
+ reserve_data: false
+ parser:
+ type: json
+ detect_audit_notification:
+ tag: raw_notifications
+ require:
+ - unpack_on_top_level
+ type: record_transformer
+ enable_ruby: true
+ record:
+ - name: notification_type
+ value: '${fluentd:dollar}{ record["payload"]["eventType"] && record["payload"]["eventTime"] ? "audit" : "notification" }'
+ match:
+ rewrite_message_tag:
+ tag: raw_notifications
+ type: rewrite_tag_filter
+ rule:
+ - name: notification_type
+ regexp: 'audit'
+ result: audit
+ - name: notification_type
+ regexp: '/.+/'
+ result: notification
+ forward_notification:
+ tag: notification
+ type: relabel
+ label: notification_messages
+ forward_audit:
+ tag: audit
+ type: relabel
+ label: audit_messages
diff --git a/fluentd/label/notifications/notifications.yml b/fluentd/label/notifications/notifications.yml
new file mode 100644
index 0000000..5556d6e
--- /dev/null
+++ b/fluentd/label/notifications/notifications.yml
@@ -0,0 +1,124 @@
+parameters:
+ _param:
+ elasticsearch_port: 9200
+ fluentd:
+ agent:
+ config:
+ label:
+ notification_messages:
+ filter:
+ parse_publuisher_host:
+ tag: notification
+ type: parser
+ key_name: publisher_id
+ reserve_data: true
+ parser:
+ type: regexp
+ format: (?<publisher>\w+).(?<hostname>\w+)
+ save_hostname:
+ tag: notification
+ require:
+ - parse_publuisher_host
+ type: record_transformer
+ enable_ruby: true
+ record:
+ - name: Hostname
+ value: ${fluentd:dollar}{ record["hostname"] }
+ parse_source:
+ tag: notification
+ require:
+ - save_hostname
+ type: parser
+ key_name: event_type
+ reserve_data: true
+ parser:
+ type: regexp
+ format: (?<event_type_logger>\w+).+
+ map_logger:
+ tag: notification
+ require:
+ - parse_source
+ type: record_transformer
+ enable_ruby: true
+ remove_keys: event_type_logger
+ record:
+ - name: Logger
+ value: ${fluentd:dollar}{ {'volume'=>'cinder', 'snapshot'=>'cinder', 'image'=>'glance', 'orchestration'=>'heat', 'identity'=>'keystone', 'compute'=>'nova', 'compute_task'=>'nova', 'scheduler'=>'nova', 'keypair'=>'nova', 'floatingip' =>'neutron', 'security_group' =>'neutron', 'security_group_rule' =>'neutron', 'network' =>'neutron', 'port' =>'neutron', 'router' =>'neutron', 'subnet' =>'neutron', 'sahara' =>'sahara'}[record["event_type_logger"]] }
+ get_payload_values:
+ tag: notification
+ require:
+ - map_logger
+ type: record_transformer
+ enable_ruby: true
+ record:
+ - name: Timestamp
+ value: ${fluentd:dollar}{ DateTime.strptime(record['timestamp'], '%Y-%m-%d %H:%M:%S.%N').strftime('%Y-%m-%dT%H:%M:%S.%3NZ') }
+ - name: severity_label
+ value: ${fluentd:dollar}{ record["priority"] }
+ - name: Severity
+ value: ${fluentd:dollar}{ {'TRACE'=>7,'DEBUG'=>7,'INFO'=>6,'AUDIT'=>6,'WARNING'=>4,'ERROR'=>3,'CRITICAL'=>2}[record['priority']].to_i }
+ - name: Hostname
+ value: '${fluentd:dollar}{ record["payload"].has_key?("host") ? record["payload"]["host"] : record["Hostname"] }'
+ - name: environment_label
+ value: ${_param:cluster_domain}
+
+ - name: tenant_id
+ value: ${fluentd:dollar}{ record.dig("payload", "tenant_id") }
+ - name: user_id
+ value: ${fluentd:dollar}{ record.dig("payload", "user_id") }
+ - name: display_name
+ value: ${fluentd:dollar}{ record.dig("payload", "display_name") }
+ - name: vcpus
+ value: ${fluentd:dollar}{ record.dig("payload", "vcpus") }
+ - name: availability_zone
+ value: ${fluentd:dollar}{ record.dig("payload", "availability_zone") }
+ - name: instance_id
+ value: ${fluentd:dollar}{ record.dig("payload", "instance_id") }
+ - name: instance_type
+ value: ${fluentd:dollar}{ record.dig("payload", "instance_type") }
+ - name: image_name
+ value: ${fluentd:dollar}{ record.dig("payload", "image_name") }
+ - name: memory_mb
+ value: ${fluentd:dollar}{ record.dig("payload", "memory_mb") }
+ - name: disk_gb
+ value: ${fluentd:dollar}{ record.dig("payload", "disk_gb") }
+ - name: state
+ value: ${fluentd:dollar}{ record.dig("payload", "state") }
+ - name: old_state
+ value: ${fluentd:dollar}{ record.dig("payload", "old_state") }
+ - name: old_task_state
+ value: ${fluentd:dollar}{ record.dig("payload", "old_task_state") }
+ - name: new_task_state
+ value: ${fluentd:dollar}{ record.dig("payload", "new_task_state") }
+ - name: network_id
+ value: ${fluentd:dollar}{ record.dig("payload", "network_id") }
+ - name: subnet_id
+ value: ${fluentd:dollar}{ record.dig("payload", "subnet_id") }
+ - name: port_id
+ value: ${fluentd:dollar}{ record.dig("payload", "port_id") }
+ - name: volume_id
+ value: ${fluentd:dollar}{ record.dig("payload", "volume_id") }
+ - name: size
+ value: ${fluentd:dollar}{ record.dig("payload", "size") }
+ - name: status
+ value: ${fluentd:dollar}{ record.dig("payload", "status") }
+ - name: replication_status
+ value: ${fluentd:dollar}{ record.dig("payload", "replication_status") }
+ pack_payload_to_json:
+ tag: notification
+ require:
+ - get_payload_values
+ type: record_transformer
+ enable_ruby: true
+ remove_keys: '["timestamp", "publisher_id", "priority", "notification_type", "payload"]'
+ record:
+ - name: Payload
+ value: ${fluentd:dollar}{ record["payload"].to_json }
+ match:
+ notifications_output:
+ tag: notification
+ type: elasticsearch
+ host: ${_param:stacklight_log_address}
+ port: ${_param:elasticsearch_port}
+ es_index_name: notification
+ tag_key: Type