SRE-78: Implement Multiworker Configuration
- In the booking.com clouds, we find that fluentd cannot keep up with
volume of logs it is expected to process.
- Research into https://docs.fluentd.org/deployment/multi-process-workers
has provided us with the option to configure a multiworker setup, which has
largely resolved the problem.
- MCP largely uses the systemd and tail inputs, which do not support
operating on multiple workers. However, we can still assign them to separate
workers on a multiworker setup. Since all workers will process outputs, we can
easily separate concerns this way and remove CPU bottlenecks.
- By default, we configure these inputs to work on worker 0.
Related-Prod: PROD-36710
Change-Id: I68af85f1a62da72fa2bf1f996e52465af3caa5a5
diff --git a/README.rst b/README.rst
index 3ee3d9c..ecc04cd 100644
--- a/README.rst
+++ b/README.rst
@@ -23,42 +23,43 @@
.. code-block:: yaml
fluentd:
- config:
- label:
- filename:
- input:
+ agent:
+ config:
+ label:
+ filename:
+ input:
+ input_name:
+ params
+ filter:
+ filter_name:
+ params
+ filter_name2:
+ params
+ match:
+ match_name:
+ params
+ input:
+ filename:
input_name:
params
- filter:
+ input_name2:
+ params
+ filename2:
+ input_name3:
+ params
+ filter:
+ filename:
filter_name:
params
filter_name2:
params
- match:
+ filename2:
+ filter_name3:
+ params
+ match:
+ filename:
match_name:
params
- input:
- filename:
- input_name:
- params
- input_name2:
- params
- filename2:
- input_name3:
- params
- filter:
- filename:
- filter_name:
- params
- filter_name2:
- params
- filename2:
- filter_name3:
- params
- match:
- filename:
- match_name:
- params
Example pillar
--------------
@@ -66,145 +67,149 @@
fluentd:
enabled: true
- config:
- label:
- elasticsearch_output:
- match:
- elasticsearch_output:
- tag: "**"
- type: elasticsearch
- host: 10.100.0.1
- port: 9200
- buffer:
- flush_thread_count: 8
- monitoring:
- filter:
- parse_log:
- tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
- type: parser
- reserve_data: true
- key_name: log
- parser:
- type: regexp
- format: >-
- /^time="(?<time>[^ ]*)" level=(?<severity>[a-zA-Z]*) msg="(?<message>.+?)"/
- time_format: '%FT%TZ'
- remove_log_key:
- tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
- type: record_transformer
- remove_keys: log
- match:
- docker_log:
- tag: 'docker.**'
- type: file
- path: /tmp/flow-docker.log
- grok_example:
- input:
- test_log:
- type: tail
- path: /var/log/test
- tag: test.test
- parser:
- type: grok
- custom_pattern_path: /etc/td-agent/config.d/global.grok
+ agent:
+ multiworker:
+ worker_count: 4
+ config:
+ label:
+ elasticsearch_output:
+ worker: 0
+ match:
+ elasticsearch_output:
+ tag: "**"
+ type: elasticsearch
+ host: 10.100.0.1
+ port: 9200
+ buffer:
+ flush_thread_count: 8
+ monitoring:
+ worker: '0-2'
+ filter:
+ parse_log:
+ tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
+ type: parser
+ reserve_data: true
+ key_name: log
+ parser:
+ type: regexp
+ format: >-
+ /^time="(?<time>[^ ]*)" level=(?<severity>[a-zA-Z]*) msg="(?<message>.+?)"/
+ time_format: '%FT%TZ'
+ remove_log_key:
+ tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
+ type: record_transformer
+ remove_keys: log
+ match:
+ docker_log:
+ tag: 'docker.**'
+ type: file
+ path: /tmp/flow-docker.log
+ grok_example:
+ input:
+ test_log:
+ type: tail
+ path: /var/log/test
+ tag: test.test
+ parser:
+ type: grok
+ custom_pattern_path: /etc/td-agent/config.d/global.grok
+ rule:
+ - pattern: >-
+ %{KEYSTONEACCESS}
+ syslog:
+ filter:
+ add_severity:
+ tag: 'syslog.*'
+ type: record_transformer
+ enable_ruby: true
+ record:
+ - name: severity
+ value: 'record["pri"].to_i - (record["pri"].to_i / 8).floor * 8'
+ severity_to_string:
+ tag: 'syslog.*'
+ type: record_transformer
+ enable_ruby: true
+ record:
+ - name: severity
+ value: '{"debug"=>7,"info"=>6,"notice"=>5,"warning"=>4,"error"=>3,"critical"=>2,"alert"=>1,"emerg"=>0}.key(record["severity"])'
+ severity_for_telegraf:
+ tag: 'syslog.*.telegraf'
+ type: parser
+ reserve_data: true
+ key_name: message
+ parser:
+ type: regexp
+ format: >-
+ /^(?<time>[^ ]*) (?<severity>[A-Z])! (?<message>.*)/
+ time_format: '%FT%TZ'
+ severity_for_telegraf_string:
+ tag: 'syslog.*.telegraf'
+ type: record_transformer
+ enable_ruby: true
+ record:
+ - name: severity
+ value: '{"debug"=>"D","info"=>"I","notice"=>"N","warning"=>"W","error"=>"E","critical"=>"C","alert"=>"A","emerg"=>"E"}.key(record["severity"])'
+ prometheus_metric:
+ tag: 'syslog.*.*'
+ type: prometheus
+ label:
+ - name: ident
+ type: variable
+ value: ident
+ - name: severity
+ type: variable
+ value: severity
+ metric:
+ - name: log_messages
+ type: counter
+ desc: The total number of log messages.
+ match:
+ rewrite_tag_key:
+ tag: 'syslog.*'
+ type: rewrite_tag_filter
rule:
- - pattern: >-
- %{KEYSTONEACCESS}
- syslog:
- filter:
- add_severity:
- tag: 'syslog.*'
- type: record_transformer
- enable_ruby: true
- record:
- - name: severity
- value: 'record["pri"].to_i - (record["pri"].to_i / 8).floor * 8'
- severity_to_string:
- tag: 'syslog.*'
- type: record_transformer
- enable_ruby: true
- record:
- - name: severity
- value: '{"debug"=>7,"info"=>6,"notice"=>5,"warning"=>4,"error"=>3,"critical"=>2,"alert"=>1,"emerg"=>0}.key(record["severity"])'
- severity_for_telegraf:
- tag: 'syslog.*.telegraf'
- type: parser
- reserve_data: true
- key_name: message
+ - name: ident
+ regexp: '^(.*)'
+ result: '__TAG__.$1'
+ syslog_log:
+ tag: 'syslog.*.*'
+ type: file
+ path: /tmp/syslog
+ input:
+ syslog:
+ syslog_log:
+ type: tail
+ label: syslog
+ path: /var/log/syslog
+ tag: syslog.syslog
parser:
type: regexp
format: >-
- /^(?<time>[^ ]*) (?<severity>[A-Z])! (?<message>.*)/
- time_format: '%FT%TZ'
- severity_for_telegraf_string:
- tag: 'syslog.*.telegraf'
- type: record_transformer
- enable_ruby: true
- record:
- - name: severity
- value: '{"debug"=>"D","info"=>"I","notice"=>"N","warning"=>"W","error"=>"E","critical"=>"C","alert"=>"A","emerg"=>"E"}.key(record["severity"])'
- prometheus_metric:
- tag: 'syslog.*.*'
- type: prometheus
- label:
- - name: ident
- type: variable
- value: ident
- - name: severity
- type: variable
- value: severity
- metric:
- - name: log_messages
- type: counter
- desc: The total number of log messages.
- match:
- rewrite_tag_key:
- tag: 'syslog.*'
- type: rewrite_tag_filter
- rule:
- - name: ident
- regexp: '^(.*)'
- result: '__TAG__.$1'
- syslog_log:
- tag: 'syslog.*.*'
- type: file
- path: /tmp/syslog
- input:
- syslog:
- syslog_log:
- type: tail
- label: syslog
- path: /var/log/syslog
- tag: syslog.syslog
- parser:
- type: regexp
- format: >-
- '/^\<(?<pri>[0-9]+)\>(?<time>[^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/'
- time_format: '%FT%T.%L%:z'
- auth_log:
- type: tail
- label: syslog
- path: /var/log/auth.log
- tag: syslog.auth
- parser:
- type: regexp
- format: >-
- '/^\<(?<pri>[0-9]+)\>(?<time>[^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/'
- time_format: '%FT%T.%L%:z'
- prometheus:
+ '/^\<(?<pri>[0-9]+)\>(?<time>[^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/'
+ time_format: '%FT%T.%L%:z'
+ auth_log:
+ type: tail
+ label: syslog
+ path: /var/log/auth.log
+ tag: syslog.auth
+ parser:
+ type: regexp
+ format: >-
+ '/^\<(?<pri>[0-9]+)\>(?<time>[^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/'
+ time_format: '%FT%T.%L%:z'
prometheus:
- type: prometheus
- prometheus_monitor:
- type: prometheus_monitor
- prometheus_output_monitor:
- type: prometheus_output_monitor
- forward:
- forward_listen:
- type: forward
- port: 24224
- bind: 0.0.0.0
- match:
- docker_monitoring:
+ prometheus:
+ type: prometheus
+ prometheus_monitor:
+ type: prometheus_monitor
+ prometheus_output_monitor:
+ type: prometheus_output_monitor
+ forward:
+ forward_listen:
+ type: forward
+ port: 24224
+ bind: 0.0.0.0
+ match:
docker_monitoring:
tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
type: relabel
diff --git a/fluentd/agent.sls b/fluentd/agent.sls
index bbc3117..d71f892 100644
--- a/fluentd/agent.sls
+++ b/fluentd/agent.sls
@@ -96,6 +96,23 @@
- context:
fluentd_agent: {{ fluentd_agent | yaml }}
+{%- if fluentd_agent.get('multiworker', False) %}
+fluentd_multiworker_config_agent:
+ file.managed:
+ - name: {{ fluentd_agent.dir.config }}/config.d/multiworker.conf
+ - source: salt://fluentd/files/multiworker.conf
+ - user: root
+ - group: root
+ - mode: 644
+ - template: jinja
+ - require:
+ - pkg: fluentd_packages_agent
+ - require_in:
+ - file: fluentd_config_d_dir_clean
+ - context:
+ fluentd_agent: {{ fluentd_agent | yaml }}
+{%- endif %}
+
fluentd_grok_pattern_agent:
file.managed:
- name: {{ fluentd_agent.dir.config }}/config.d/global.grok
diff --git a/fluentd/files/label.conf b/fluentd/files/label.conf
index e7a7377..7803ba7 100644
--- a/fluentd/files/label.conf
+++ b/fluentd/files/label.conf
@@ -1,3 +1,10 @@
+{% from "fluentd/map.jinja" import fluentd_agent with context %}
+{%- if values.worker is defined and fluentd_agent.get('multiworker', False) %}
+<worker {{ values.worker }}>
+{%- elif fluentd_agent.get('multiworker', False) %}
+<worker 0>
+{%- endif %}
+
{%- if values.get('enabled', True) %}
# Label {{ label_name }}
{%- if values.get('input') %}
@@ -20,3 +27,7 @@
{%- endif %}
</label>
{%- endif %}
+
+{%- if fluentd_agent.get('multiworker', False) %}
+</worker>
+{%- endif %}
diff --git a/fluentd/files/multiworker.conf b/fluentd/files/multiworker.conf
new file mode 100644
index 0000000..45b6310
--- /dev/null
+++ b/fluentd/files/multiworker.conf
@@ -0,0 +1,6 @@
+{% from "fluentd/map.jinja" import fluentd_agent with context %}
+{%- if fluentd_agent.get('multiworker', False) %}
+<system>
+ workers {{ fluentd_agent.multiworker.worker_count }}
+</system>
+{%- endif %}