SRE-78: Implement Multiworker Configuration

- In the booking.com clouds, we find that fluentd cannot keep up with
volume of logs it is expected to process.
- Research into https://docs.fluentd.org/deployment/multi-process-workers
has provided us with the option to configure a multiworker setup, which has
largely resolved the problem.
- MCP largely uses the systemd and tail inputs, which do not support
operating on multiple workers. However, we can still assign them to separate
workers on a multiworker setup. Since all workers will process outputs, we can
easily separate concerns this way and remove CPU bottlenecks.
- By default, we configure these inputs to work on worker 0.

Related-Prod: PROD-36710
Change-Id: I68af85f1a62da72fa2bf1f996e52465af3caa5a5
diff --git a/README.rst b/README.rst
index 3ee3d9c..ecc04cd 100644
--- a/README.rst
+++ b/README.rst
@@ -23,42 +23,43 @@
 .. code-block:: yaml
 
   fluentd:
-    config:
-      label:
-        filename:
-          input:
+    agent:
+      config:
+        label:
+          filename:
+            input:
+              input_name:
+                params
+            filter:
+              filter_name:
+                params
+              filter_name2:
+                params
+            match:
+              match_name:
+                params
+        input:
+          filename:
             input_name:
               params
-          filter:
+            input_name2:
+              params
+          filename2:
+            input_name3:
+              params
+        filter:
+          filename:
             filter_name:
               params
             filter_name2:
               params
-          match:
+          filename2:
+            filter_name3:
+              params
+        match:
+          filename:
             match_name:
               params
-      input:
-        filename:
-          input_name:
-            params
-          input_name2:
-            params
-        filename2:
-          input_name3:
-            params
-      filter:
-        filename:
-          filter_name:
-            params
-          filter_name2:
-            params
-        filename2:
-          filter_name3:
-            params
-      match:
-        filename:
-          match_name:
-            params
 
 Example pillar
 --------------
@@ -66,145 +67,149 @@
 
   fluentd:
     enabled: true
-    config:
-      label:
-        elasticsearch_output:
-          match:
-            elasticsearch_output:
-              tag: "**"
-              type: elasticsearch
-              host: 10.100.0.1
-              port: 9200
-              buffer:
-                flush_thread_count: 8
-        monitoring:
-          filter:
-            parse_log:
-              tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
-              type: parser
-              reserve_data: true
-              key_name: log
-              parser:
-                type: regexp
-                format: >-
-                  /^time="(?<time>[^ ]*)" level=(?<severity>[a-zA-Z]*) msg="(?<message>.+?)"/
-                time_format: '%FT%TZ'
-            remove_log_key:
-              tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
-              type: record_transformer
-              remove_keys: log
-          match:
-            docker_log:
-              tag: 'docker.**'
-              type: file
-              path: /tmp/flow-docker.log
-        grok_example:
-          input:
-            test_log:
-              type: tail
-              path: /var/log/test
-              tag: test.test
-              parser:
-                type: grok
-                custom_pattern_path: /etc/td-agent/config.d/global.grok
+    agent:
+      multiworker:
+        worker_count: 4
+      config:
+        label:
+          elasticsearch_output:
+            worker: 0
+            match:
+              elasticsearch_output:
+                tag: "**"
+                type: elasticsearch
+                host: 10.100.0.1
+                port: 9200
+                buffer:
+                  flush_thread_count: 8
+          monitoring:
+            worker: '0-2'
+            filter:
+              parse_log:
+                tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
+                type: parser
+                reserve_data: true
+                key_name: log
+                parser:
+                  type: regexp
+                  format: >-
+                    /^time="(?<time>[^ ]*)" level=(?<severity>[a-zA-Z]*) msg="(?<message>.+?)"/
+                  time_format: '%FT%TZ'
+              remove_log_key:
+                tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
+                type: record_transformer
+                remove_keys: log
+            match:
+              docker_log:
+                tag: 'docker.**'
+                type: file
+                path: /tmp/flow-docker.log
+          grok_example:
+            input:
+              test_log:
+                type: tail
+                path: /var/log/test
+                tag: test.test
+                parser:
+                  type: grok
+                  custom_pattern_path: /etc/td-agent/config.d/global.grok
+                  rule:
+                    - pattern: >-
+                        %{KEYSTONEACCESS}
+          syslog:
+            filter:
+              add_severity:
+                tag: 'syslog.*'
+                type: record_transformer
+                enable_ruby: true
+                record:
+                  - name: severity
+                    value: 'record["pri"].to_i - (record["pri"].to_i / 8).floor * 8'
+              severity_to_string:
+                tag: 'syslog.*'
+                type: record_transformer
+                enable_ruby: true
+                record:
+                  - name: severity
+                    value: '{"debug"=>7,"info"=>6,"notice"=>5,"warning"=>4,"error"=>3,"critical"=>2,"alert"=>1,"emerg"=>0}.key(record["severity"])'
+              severity_for_telegraf:
+                tag: 'syslog.*.telegraf'
+                type: parser
+                reserve_data: true
+                key_name: message
+                parser:
+                  type: regexp
+                  format: >-
+                    /^(?<time>[^ ]*) (?<severity>[A-Z])! (?<message>.*)/
+                  time_format: '%FT%TZ'
+              severity_for_telegraf_string:
+                tag: 'syslog.*.telegraf'
+                type: record_transformer
+                enable_ruby: true
+                record:
+                  - name: severity
+                    value: '{"debug"=>"D","info"=>"I","notice"=>"N","warning"=>"W","error"=>"E","critical"=>"C","alert"=>"A","emerg"=>"E"}.key(record["severity"])'
+              prometheus_metric:
+                tag: 'syslog.*.*'
+                type: prometheus
+                label:
+                  - name: ident
+                    type: variable
+                    value: ident
+                  - name: severity
+                    type: variable
+                    value: severity
+                metric:
+                  - name: log_messages
+                    type: counter
+                    desc: The total number of log messages.
+            match:
+              rewrite_tag_key:
+                tag: 'syslog.*'
+                type: rewrite_tag_filter
                 rule:
-                  - pattern: >-
-                      %{KEYSTONEACCESS}
-        syslog:
-          filter:
-            add_severity:
-              tag: 'syslog.*'
-              type: record_transformer
-              enable_ruby: true
-              record:
-                - name: severity
-                  value: 'record["pri"].to_i - (record["pri"].to_i / 8).floor * 8'
-            severity_to_string:
-              tag: 'syslog.*'
-              type: record_transformer
-              enable_ruby: true
-              record:
-                - name: severity
-                  value: '{"debug"=>7,"info"=>6,"notice"=>5,"warning"=>4,"error"=>3,"critical"=>2,"alert"=>1,"emerg"=>0}.key(record["severity"])'
-            severity_for_telegraf:
-              tag: 'syslog.*.telegraf'
-              type: parser
-              reserve_data: true
-              key_name: message
+                  - name: ident
+                    regexp: '^(.*)'
+                    result: '__TAG__.$1'
+              syslog_log:
+                tag: 'syslog.*.*'
+                type: file
+                path: /tmp/syslog
+        input:
+          syslog:
+            syslog_log:
+              type: tail
+              label: syslog
+              path: /var/log/syslog
+              tag: syslog.syslog
               parser:
                 type: regexp
                 format: >-
-                  /^(?<time>[^ ]*) (?<severity>[A-Z])! (?<message>.*)/
-                time_format: '%FT%TZ'
-            severity_for_telegraf_string:
-              tag: 'syslog.*.telegraf'
-              type: record_transformer
-              enable_ruby: true
-              record:
-                - name: severity
-                  value: '{"debug"=>"D","info"=>"I","notice"=>"N","warning"=>"W","error"=>"E","critical"=>"C","alert"=>"A","emerg"=>"E"}.key(record["severity"])'
-            prometheus_metric:
-              tag: 'syslog.*.*'
-              type: prometheus
-              label:
-                - name: ident
-                  type: variable
-                  value: ident
-                - name: severity
-                  type: variable
-                  value: severity
-              metric:
-                - name: log_messages
-                  type: counter
-                  desc: The total number of log messages.
-          match:
-            rewrite_tag_key:
-              tag: 'syslog.*'
-              type: rewrite_tag_filter
-              rule:
-                - name: ident
-                  regexp: '^(.*)'
-                  result: '__TAG__.$1'
-            syslog_log:
-              tag: 'syslog.*.*'
-              type: file
-              path: /tmp/syslog
-      input:
-        syslog:
-          syslog_log:
-            type: tail
-            label: syslog
-            path: /var/log/syslog
-            tag: syslog.syslog
-            parser:
-              type: regexp
-              format: >-
-                '/^\<(?<pri>[0-9]+)\>(?<time>[^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/'
-              time_format: '%FT%T.%L%:z'
-          auth_log:
-            type: tail
-            label: syslog
-            path: /var/log/auth.log
-            tag: syslog.auth
-            parser:
-              type: regexp
-              format: >-
-                '/^\<(?<pri>[0-9]+)\>(?<time>[^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/'
-              time_format: '%FT%T.%L%:z'
-        prometheus:
+                  '/^\<(?<pri>[0-9]+)\>(?<time>[^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/'
+                time_format: '%FT%T.%L%:z'
+            auth_log:
+              type: tail
+              label: syslog
+              path: /var/log/auth.log
+              tag: syslog.auth
+              parser:
+                type: regexp
+                format: >-
+                  '/^\<(?<pri>[0-9]+)\>(?<time>[^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/'
+                time_format: '%FT%T.%L%:z'
           prometheus:
-            type: prometheus
-          prometheus_monitor:
-            type: prometheus_monitor
-          prometheus_output_monitor:
-            type: prometheus_output_monitor
-        forward:
-          forward_listen:
-            type: forward
-            port: 24224
-            bind: 0.0.0.0
-      match:
-        docker_monitoring:
+            prometheus:
+              type: prometheus
+            prometheus_monitor:
+              type: prometheus_monitor
+            prometheus_output_monitor:
+              type: prometheus_output_monitor
+          forward:
+            forward_listen:
+              type: forward
+              port: 24224
+              bind: 0.0.0.0
+        match:
           docker_monitoring:
             tag: 'docker.monitoring.{alertmanager,remote_storage_adapter,prometheus}.*'
             type: relabel
diff --git a/fluentd/agent.sls b/fluentd/agent.sls
index bbc3117..d71f892 100644
--- a/fluentd/agent.sls
+++ b/fluentd/agent.sls
@@ -96,6 +96,23 @@
     - context:
       fluentd_agent: {{ fluentd_agent | yaml }}
 
+{%- if fluentd_agent.get('multiworker', False) %}
+fluentd_multiworker_config_agent:
+  file.managed:
+    - name: {{ fluentd_agent.dir.config }}/config.d/multiworker.conf
+    - source: salt://fluentd/files/multiworker.conf
+    - user: root
+    - group: root
+    - mode: 644
+    - template: jinja
+    - require:
+      - pkg: fluentd_packages_agent
+    - require_in:
+      - file: fluentd_config_d_dir_clean
+    - context:
+      fluentd_agent: {{ fluentd_agent | yaml }}
+{%- endif %}
+
 fluentd_grok_pattern_agent:
   file.managed:
     - name: {{ fluentd_agent.dir.config }}/config.d/global.grok
diff --git a/fluentd/files/label.conf b/fluentd/files/label.conf
index e7a7377..7803ba7 100644
--- a/fluentd/files/label.conf
+++ b/fluentd/files/label.conf
@@ -1,3 +1,10 @@
+{% from "fluentd/map.jinja" import fluentd_agent with context %}
+{%- if values.worker is defined and fluentd_agent.get('multiworker', False) %}
+<worker {{ values.worker }}>
+{%- elif fluentd_agent.get('multiworker', False) %}
+<worker 0>
+{%- endif %}
+
 {%- if values.get('enabled', True) %}
 # Label {{ label_name }}
 {%- if values.get('input') %}
@@ -20,3 +27,7 @@
 {%- endif %}
 </label>
 {%- endif %}
+
+{%- if fluentd_agent.get('multiworker', False) %}
+</worker>
+{%- endif %}
diff --git a/fluentd/files/multiworker.conf b/fluentd/files/multiworker.conf
new file mode 100644
index 0000000..45b6310
--- /dev/null
+++ b/fluentd/files/multiworker.conf
@@ -0,0 +1,6 @@
+{% from "fluentd/map.jinja" import fluentd_agent with context %}
+{%- if fluentd_agent.get('multiworker', False) %}
+<system>
+  workers {{ fluentd_agent.multiworker.worker_count }}
+</system>
+{%- endif %}