Merge pull request #40 from simonpasquier/collect-notifications

Enable collection of notifications
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
index d87ed73..8f86efd 100644
--- a/_modules/heka_alarming.py
+++ b/_modules/heka_alarming.py
@@ -26,6 +26,15 @@
     return ' || '.join(matchers)
 
 
+def alarm_activate_alerting(alerting):
+    return ('true' if alerting in ['enabled', 'enabled_with_notification']
+            else 'false')
+
+
+def alarm_enable_notification(alerting):
+    return 'true' if alerting == 'enabled_with_notification' else 'false'
+
+
 def alarm_cluster_message_matcher(alarm_cluster):
     """
     Return an Heka message matcher expression for a given alarm cluster.
@@ -65,3 +74,31 @@
             raise Exception(
                 'Dimension value {} includes disallowed chars'.format(value))
     return dimensions
+
+
+def grains_for_mine(grains):
+    """
+    Return grains that need to be sent to Salt Mine. Only the alarm
+    and alarm cluster data is to be sent to Mine.
+    """
+    filtered_grains = {}
+    for service_name, service_data in grains.items():
+        alarm = service_data.get('alarm')
+        if alarm:
+            filtered_grains[service_name] = {'alarm': alarm}
+        trigger = service_data.get('trigger')
+        if trigger:
+            if service_name in filtered_grains:
+                filtered_grains[service_name].update(
+                    {'trigger': trigger})
+            else:
+                filtered_grains[service_name] = {'trigger': trigger}
+        alarm_cluster = service_data.get('alarm_cluster')
+        if alarm_cluster:
+            if service_name in filtered_grains:
+                filtered_grains[service_name].update(
+                    {'alarm_cluster': alarm_cluster})
+            else:
+                filtered_grains[service_name] = \
+                    {'alarm_cluster': alarm_cluster}
+    return filtered_grains
diff --git a/heka/_service.sls b/heka/_service.sls
index c1c85ed..5749b2d 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -1,8 +1,6 @@
 {%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file ignore missing %}{% endmacro %}
 
-{%- set server = salt['pillar.get']('heka:'+service_name) %}
-
-{%- if server.enabled %}
+{%- if server.enabled is defined and server.enabled %}
 
 heka_{{ service_name }}_log_file:
   file.managed:
@@ -70,6 +68,8 @@
   service.running:
   - name: {{ service_name }}
   - enable: True
+  - watch:
+    - file: /usr/share/lma_collector
 
 {# Setup basic structure for all roles so updates can apply #}
 
@@ -78,8 +78,6 @@
     'decoder': {},
     'input': {},
     'trigger': {},
-    'alarm': {},
-    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -90,7 +88,6 @@
     'input': {},
     'trigger': {},
     'alarm': {},
-    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -101,7 +98,6 @@
     'input': {},
     'trigger': {},
     'alarm': {},
-    'alarm_cluster': {},
     'filter': {},
     'splitter': {},
     'encoder': {},
@@ -111,7 +107,6 @@
     'decoder': {},
     'input': {},
     'trigger': {},
-    'alarm': {},
     'alarm_cluster': {},
     'filter': {},
     'splitter': {},
@@ -184,7 +179,7 @@
   - mode: 600
   - defaults:
     service_grains:
-      heka: {{ service_grains|yaml }}
+      heka: {{ salt['heka_alarming.grains_for_mine'](service_grains)|yaml }}
   - require:
     - file: heka_grains_dir
 
@@ -196,6 +191,7 @@
   - group: heka
   - defaults:
     service_name: {{ service_name }}
+    poolsize: {{ server.poolsize }}
   - require:
     - file: heka_{{ service_name }}_conf_dir
   - require_in:
diff --git a/heka/aggregator.sls b/heka/aggregator.sls
index c53d8e7..dda138c 100644
--- a/heka/aggregator.sls
+++ b/heka/aggregator.sls
@@ -3,6 +3,8 @@
 include:
 - heka._common
 
+{%- from "heka/map.jinja" import aggregator with context %}
+{%- set server = aggregator %}
 {%- set service_name = "aggregator" %}
 
 {%- include "heka/_service.sls" %}
diff --git a/heka/files/gse_policies.lua b/heka/files/gse_policies.lua
index 8ec9c06..8de8175 100644
--- a/heka/files/gse_policies.lua
+++ b/heka/files/gse_policies.lua
@@ -29,15 +29,17 @@
                 logical_operator = '{{ _trigger["logical_operator"] }}',
                 rules = {
             {%- for _rule in _trigger["rules"] %}
-                    ['function'] = '{{ _rule["function"] }}',
+                    {
+                        ['function'] = '{{ _rule["function"] }}',
                 {%- set comma = joiner(",") %}
-                    ['arguments'] = {
+                        ['arguments'] = {
                 {%- for _argument in _rule["arguments"]|sort -%}
-                {{ comma() }}'{{ _argument }}'
+                    {{ comma() }}'{{ _argument }}'
                 {%- endfor -%}
+                        },
+                        ['relational_operator'] = '{{ _rule["relational_operator"] }}',
+                        ['threshold'] = {{ _rule["threshold"] }},
                     },
-                    ['relational_operator'] = '{{ _rule["relational_operator"] }}',
-                    ['threshold'] = {{ _rule["threshold"] }},
             {%- endfor %}
                 },
             },
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index 7b063d1..6a767e5 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -45,7 +45,8 @@
     ERROR = 3,
     WARNING = 4,
     NOTICE = 5,
-    INFO= 6,
+    NOTE = 5,
+    INFO = 6,
     DEBUG = 7,
 }
 
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index f94a0e8..0befcf0 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -216,6 +216,14 @@
                 else
                     msg['Fields']['name'] = metric_name
                 end
+            elseif metric_source == 'ntpd' then
+                if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
+                    msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
+                else
+                    msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. 'peer'
+                    msg['Fields']['server'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'server')
+                end
             elseif metric_source == 'check_openstack_api' then
                 -- For OpenStack API metrics, plugin_instance = <service name>
                 msg['Fields']['name'] = 'openstack_check_api'
@@ -420,7 +428,31 @@
                 msg['Fields']['service'] = sample['type_instance']
                 table.insert(msg['Fields']['tag_fields'], 'service')
             else
-                msg['Fields']['name'] = replace_dot_by_sep(metric_name)
+                -- generic metric name translation for 3rd-party sources
+                msg['Fields']['name'] = sample['plugin']
+                if sample['plugin_instance'] ~= "" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
+                end
+                if sample['type'] ~= 'gauge' and sample['type'] ~= 'derive' and
+                   sample['type'] ~= 'counter' and sample['type'] ~= 'absolute' then
+                   -- only for custom DS types
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
+                end
+                if sample['type_instance'] ~= "" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
+                end
+                if sample['dsnames'][i] ~= "value" then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
+                end
+                msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
+
+                -- add meta fields as tag_fields
+                for k, v in pairs(sample['meta'] or {}) do
+                    if tostring(k) ~= '0' then
+                        msg['Fields'][k] = v
+                        table.insert(msg['Fields']['tag_fields'], k)
+                   end
+                end
             end
 
             if not skip_it then
diff --git a/heka/files/lua/decoders/galera.lua b/heka/files/lua/decoders/galera.lua
new file mode 100644
index 0000000..c9f385d
--- /dev/null
+++ b/heka/files/lua/decoders/galera.lua
@@ -0,0 +1,63 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+local l      = require 'lpeg'
+l.locale(l)
+
+local patt   = require 'patterns'
+local utils = require "lma_utils"
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+local programname = read_config('programname') or 'mysql'
+
+-- mysql log messages are formatted like this
+--
+-- 2016-11-09 08:42:34 18430 [Note] InnoDB: Using atomics to ref count buffer pool pages
+local sp = l.space
+local timestamp = l.Cg(patt.Timestamp, "Timestamp")
+local pid = l.Cg(patt.Pid, "Pid")
+local severity = l.P"[" * l.Cg(l.R("az", "AZ")^0 / string.upper, "SeverityLabel") * l.P"]"
+local message = l.Cg(patt.Message, "Message")
+
+local grammar = l.Ct(timestamp * sp^1 * pid * sp^1 * severity * sp^1 * message)
+
+
+function process_message ()
+    local log = read_message("Payload")
+    local m = grammar:match(log)
+    if not m then
+        return -1, string.format("Failed to parse: %s", string.sub(log, 1, 64))
+    end
+
+    msg.Timestamp = m.Timestamp
+    msg.Pid = m.Pid
+    msg.Severity = utils.label_to_severity_map[m.SeverityLabel] or utils.label_to_severity_map.DEBUG
+    msg.Payload = m.Message
+
+    msg.Fields = {}
+    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+    msg.Fields.programname = programname
+
+    utils.inject_tags(msg)
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/encoders/status_nagios.lua b/heka/files/lua/encoders/status_nagios.lua
index ad4c540..0507a10 100644
--- a/heka/files/lua/encoders/status_nagios.lua
+++ b/heka/files/lua/encoders/status_nagios.lua
@@ -20,7 +20,6 @@
 local interp = require "msg_interpolate"
 
 local host = read_config('nagios_host')
-local service_template = read_config('service_template') or error('service_template is required!')
 -- Nagios CGI cannot accept 'plugin_output' parameter greater than 1024 bytes
 -- See bug #1517917 for details.
 -- With the 'cmd.cgi' re-implementation for the command PROCESS_SERVICE_CHECK_RESULT,
@@ -29,7 +28,6 @@
 local data = {
    cmd_typ = '30',
    cmd_mod = '2',
-   host    = host,
    service = nil,
    plugin_state = nil,
    plugin_output = nil,
@@ -55,7 +53,7 @@
 end
 
 function process_message()
-    local service_name = interp.interpolate_from_msg(service_template)
+    local service_name = read_message('Fields[member]')
     local status = afd.get_status()
     local alarms = afd.alarms_for_human(afd.extract_alarms())
 
@@ -63,6 +61,11 @@
         return -1
     end
 
+    if host then
+        data['host'] = host
+    else
+        data['host'] = read_message('Fields[hostname]') or read_message('Hostname')
+    end
     data['service'] = service_name
     data['plugin_state'] = nagios_state_map[status]
 
diff --git a/heka/files/lua/filters/gse_cluster_filter.lua b/heka/files/lua/filters/gse_cluster_filter.lua
index 5b6402c..d1fef2b 100644
--- a/heka/files/lua/filters/gse_cluster_filter.lua
+++ b/heka/files/lua/filters/gse_cluster_filter.lua
@@ -64,6 +64,10 @@
         return -1, "Cannot find alarms in the AFD/GSE message"
     end
 
+    -- "hostname" is nil here when the input message is a GSE message, so nil
+    -- is not an error condition here
+    local hostname = afd.get_entity_name('hostname')
+
     local cluster_ids = gse.find_cluster_memberships(member_id)
 
     -- update all clusters that depend on this entity
diff --git a/heka/files/toml/encoder/sandbox.toml b/heka/files/toml/encoder/sandbox.toml
new file mode 100644
index 0000000..67c95a1
--- /dev/null
+++ b/heka/files/toml/encoder/sandbox.toml
@@ -0,0 +1,13 @@
+[{{ encoder_name }}_encoder]
+type = "SandboxEncoder"
+filename = "{{ encoder.module_file }}"
+{%- if encoder.module_dir is defined %}
+module_directory = "{{ encoder.module_dir }}"
+{%- endif %}
+
+{%- if encoder.config is mapping %}
+[{{ encoder_name }}_encoder.config]
+{%- for config_param, config_value in encoder.config.iteritems() %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% elif config_value in [True, False] %}{{ config_value|lower }}{% else %}{{ config_value }}{% endif %}
+{%- endfor %}
+{%- endif %}
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
index c1168df..5fefb2d 100644
--- a/heka/files/toml/filter/afd_alarm.toml
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -11,4 +11,6 @@
 afd_name = "{{ alarm_name }}"
 hostname = "{{ grains.host }}"
 dimensions = '{{ salt['heka_alarming.dimensions'](alarm)|json }}'
-activate_alerting = {{ alarm.alerting|default(True)|lower }}
+{%- set alerting = alarm.get('alerting', 'enabled') %}
+activate_alerting = {{ salt['heka_alarming.alarm_activate_alerting'](alerting) }}
+enable_notification = {{ salt['heka_alarming.alarm_enable_notification'](alerting) }}
diff --git a/heka/files/toml/filter/gse_alarm_cluster.toml b/heka/files/toml/filter/gse_alarm_cluster.toml
index 72b1923..1c9fe77 100644
--- a/heka/files/toml/filter/gse_alarm_cluster.toml
+++ b/heka/files/toml/filter/gse_alarm_cluster.toml
@@ -9,7 +9,6 @@
 [gse_{{ alarm_cluster_name }}_filter.config]
 topology_file = "gse_{{ alarm_cluster_name|replace('-', '_') }}_topology"
 dimensions = '{{ salt['heka_alarming.dimensions'](alarm_cluster)|json }}'
-activate_alerting = {{ alarm_cluster.alerting|default(True)|lower }}
 {%- if alarm_cluster.interval is defined %}
 interval = {{ alarm_cluster.interval }}
 {%- endif %}
@@ -19,3 +18,6 @@
 {%- if alarm_cluster.warm_up_period is defined %}
 warm_up_period = {{ alarm_cluster.warm_up_period }}
 {%- endif %}
+{%- set alerting = alarm_cluster.get('alerting', 'enabled_with_notification') %}
+activate_alerting = {{ salt['heka_alarming.alarm_activate_alerting'](alerting) }}
+enable_notification = {{ salt['heka_alarming.alarm_enable_notification'](alerting) }}
diff --git a/heka/files/toml/global.toml b/heka/files/toml/global.toml
index 40aaf3b..30d460d 100644
--- a/heka/files/toml/global.toml
+++ b/heka/files/toml/global.toml
@@ -9,4 +9,4 @@
 max_message_size = 262144
 max_process_inject = 1
 max_timer_inject = 10
-poolsize = 100
+poolsize = {{ poolsize }}
diff --git a/heka/files/toml/output/http.toml b/heka/files/toml/output/http.toml
index 1ec3672..fce5d97 100644
--- a/heka/files/toml/output/http.toml
+++ b/heka/files/toml/output/http.toml
@@ -7,14 +7,16 @@
 username = "{{ output.username }}"
 password = "{{ output.password }}"
 {%- endif %}
-http_timeout = {{ output.timeout }}
-method = "POST"
+http_timeout = {{ output.http_timeout|default(2000) }}
+method = "{{ output.method|default("POST") }}"
+{% if output.get('use_buffering', True) %}
 use_buffering = true
 
 [{{ output_name }}_output.buffering]
-max_buffer_size = 1610612736
-max_file_size = 134217728
-full_action = "drop"
+max_buffer_size = {{ output.max_buffer_size|default(268435456) }}
+max_file_size = {{ output.max_file_size|default(67108864) }}
+full_action = "{{ output.full_action|default("drop") }}"
+{% endif %}
 
 [{{ output_name }}_output.headers]
 Content-Type = ["application/x-www-form-urlencoded"]
diff --git a/heka/log_collector.sls b/heka/log_collector.sls
index a50e6d2..9ee007c 100644
--- a/heka/log_collector.sls
+++ b/heka/log_collector.sls
@@ -3,6 +3,8 @@
 include:
 - heka._common
 
+{%- from "heka/map.jinja" import log_collector with context %}
+{%- set server = log_collector %}
 {%- set service_name = "log_collector" %}
 
 {%- include "heka/_service.sls" %}
diff --git a/heka/map.jinja b/heka/map.jinja
index 300960e..8b74f9d 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -35,10 +35,13 @@
 {% set default_influxdb_time_precision = 'ms' %}
 {% set default_influxdb_timeout = 5000 %}
 {% set default_aggregator_port = 5565 %}
+{% set default_nagios_port = 8001 %}
+{% set default_nagios_host_alarm_clusters = '00-clusters' %}
 
 {% set log_collector = salt['grains.filter_by']({
   'default': {
     'elasticsearch_port': default_elasticsearch_port,
+    'poolsize': 100,
   }
 }, merge=salt['pillar.get']('heka:log_collector')) %}
 
@@ -48,6 +51,8 @@
     'influxdb_time_precision': default_influxdb_time_precision,
     'influxdb_timeout': default_influxdb_timeout,
     'aggregator_port': default_aggregator_port,
+    'nagios_port': default_nagios_port,
+    'poolsize': 100,
   }
 }, merge=salt['pillar.get']('heka:metric_collector')) %}
 
@@ -57,6 +62,7 @@
     'influxdb_time_precision': default_influxdb_time_precision,
     'influxdb_timeout': default_influxdb_timeout,
     'aggregator_port': default_aggregator_port,
+    'poolsize': 100,
   }
 }, merge=salt['pillar.get']('heka:remote_collector')) %}
 
@@ -65,5 +71,8 @@
     'influxdb_port': default_influxdb_port,
     'influxdb_time_precision': default_influxdb_time_precision,
     'influxdb_timeout': default_influxdb_timeout,
+    'nagios_port': default_nagios_port,
+    'nagios_host_alarm_clusters': default_nagios_host_alarm_clusters,
+    'poolsize': 100,
   }
 }, merge=salt['pillar.get']('heka:aggregator')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 283a8c0..f1d906f 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -106,6 +106,12 @@
       append_newlines: false
       prefix_ts: false
 {%- endif %}
+{%- if metric_collector.nagios_host is defined %}
+    nagios:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/encoders/status_nagios.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+{%- endif %}
   output:
     metric_dashboard:
       engine: dashboard
@@ -131,6 +137,20 @@
       port: "{{ metric_collector.aggregator_port }}"
       message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
 {%- endif %}
+{%- if metric_collector.nagios_host is defined %}
+    nagios_alarm:
+      engine: http
+      address: "http://{{ metric_collector.nagios_host }}:{{metric_collector.nagios_port }}/status"
+      message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric' && Fields[no_alerting] == NIL"
+      encoder: nagios_encoder
+      {%- if metric_collector.nagios_username is defined and metric_collector.nagios_password is defined %}
+      username: {{ metric_collector.get('nagios_username') }}
+      password: {{ metric_collector.get('nagios_password') }}
+      {%- endif %}
+      max_buffer_size: 1048576
+      max_file_size: 524288
+      full_action: drop
+{%- endif %}
 remote_collector:
   decoder:
     collectd:
@@ -363,15 +383,23 @@
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ aggregator.influxdb_time_precision }}"
 {%- endif %}
-{%- if aggregator.influxdb_host is defined %}
   encoder:
+{%- if aggregator.influxdb_host is defined %}
     influxdb:
       engine: payload
       append_newlines: false
       prefix_ts: false
 {%- endif %}
-{%- if aggregator.influxdb_host is defined %}
+{%- if aggregator.nagios_host is defined %}
+    nagios:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/encoders/status_nagios.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        nagios_host: "{{ aggregator.nagios_host_alarm_clusters }}"
+{%- endif %}
   output:
+{%- if aggregator.influxdb_host is defined %}
     influxdb:
       engine: http
       address: "http://{{ aggregator.influxdb_host }}:{{ aggregator.influxdb_port }}/write?db={{ aggregator.influxdb_database }}&precision={{ aggregator.influxdb_time_precision }}"
@@ -383,3 +411,17 @@
       encoder: influxdb_encoder
       timeout: {{ aggregator.influxdb_timeout }}
 {%- endif %}
+{%- if aggregator.nagios_host is defined %}
+    nagios_alarm_cluster:
+      engine: http
+      address: "http://{{ aggregator.nagios_host }}:{{aggregator.nagios_port }}/status"
+      message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.gse_metric' && Fields[no_alerting] == NIL"
+      encoder: nagios_encoder
+      {%- if aggregator.nagios_username is defined and aggregator.nagios_password is defined %}
+      username: {{ aggregator.get('nagios_username') }}
+      password: {{ aggregator.get('nagios_password') }}
+      {%- endif %}
+      max_buffer_size: 1048576
+      max_file_size: 524288
+      full_action: drop
+{%- endif %}
diff --git a/heka/metric_collector.sls b/heka/metric_collector.sls
index 9684c1b..7b28a7c 100644
--- a/heka/metric_collector.sls
+++ b/heka/metric_collector.sls
@@ -3,6 +3,8 @@
 include:
 - heka._common
 
+{%- from "heka/map.jinja" import metric_collector with context %}
+{%- set server = metric_collector %}
 {%- set service_name = "metric_collector" %}
 
 {%- include "heka/_service.sls" %}
diff --git a/heka/remote_collector.sls b/heka/remote_collector.sls
index 961587d..35533c3 100644
--- a/heka/remote_collector.sls
+++ b/heka/remote_collector.sls
@@ -3,6 +3,8 @@
 include:
 - heka._common
 
+{%- from "heka/map.jinja" import remote_collector with context %}
+{%- set server = remote_collector %}
 {%- set service_name = "remote_collector" %}
 
 {%- include "heka/_service.sls" %}
diff --git a/metadata/service/aggregator/single.yml b/metadata/service/aggregator/single.yml
index 9db8b69..7e814c5 100644
--- a/metadata/service/aggregator/single.yml
+++ b/metadata/service/aggregator/single.yml
@@ -3,7 +3,10 @@
 classes:
 - service.heka.support
 parameters:
+  _param:
+    aggregator_poolsize: 100
   heka:
     aggregator:
       enabled: true
       influxdb_time_precision: ms
+      poolsize: ${_param:aggregator_poolsize}
diff --git a/metadata/service/log_collector/single.yml b/metadata/service/log_collector/single.yml
index f857da5..5160c35 100644
--- a/metadata/service/log_collector/single.yml
+++ b/metadata/service/log_collector/single.yml
@@ -3,6 +3,9 @@
 classes:
 - service.heka.support
 parameters:
+  _param:
+    log_collector_poolsize: 100
   heka:
     log_collector:
       enabled: true
+      poolsize: ${_param:log_collector_poolsize}
diff --git a/metadata/service/metric_collector/single.yml b/metadata/service/metric_collector/single.yml
index 1d1a62a..ad183a2 100644
--- a/metadata/service/metric_collector/single.yml
+++ b/metadata/service/metric_collector/single.yml
@@ -3,7 +3,10 @@
 classes:
 - service.heka.support
 parameters:
+  _param:
+    metric_collector_poolsize: 100
   heka:
     metric_collector:
       enabled: true
       influxdb_time_precision: ms
+      poolsize: ${_param:metric_collector_poolsize}
diff --git a/metadata/service/remote_collector/single.yml b/metadata/service/remote_collector/single.yml
index e141799..120414c 100644
--- a/metadata/service/remote_collector/single.yml
+++ b/metadata/service/remote_collector/single.yml
@@ -3,7 +3,10 @@
 classes:
 - service.heka.support
 parameters:
+  _param:
+    remote_collector_poolsize: 100
   heka:
     remote_collector:
       enabled: true
       influxdb_time_precision: ms
+      poolsize: ${_param:remote_collector_poolsize}