Merge pull request #40 from simonpasquier/collect-notifications
Enable collection of notifications
diff --git a/_modules/heka_alarming.py b/_modules/heka_alarming.py
index d87ed73..8f86efd 100644
--- a/_modules/heka_alarming.py
+++ b/_modules/heka_alarming.py
@@ -26,6 +26,15 @@
return ' || '.join(matchers)
+def alarm_activate_alerting(alerting):
+ return ('true' if alerting in ['enabled', 'enabled_with_notification']
+ else 'false')
+
+
+def alarm_enable_notification(alerting):
+ return 'true' if alerting == 'enabled_with_notification' else 'false'
+
+
def alarm_cluster_message_matcher(alarm_cluster):
"""
Return an Heka message matcher expression for a given alarm cluster.
@@ -65,3 +74,31 @@
raise Exception(
'Dimension value {} includes disallowed chars'.format(value))
return dimensions
+
+
+def grains_for_mine(grains):
+ """
+ Return grains that need to be sent to Salt Mine. Only the alarm
+ and alarm cluster data is to be sent to Mine.
+ """
+ filtered_grains = {}
+ for service_name, service_data in grains.items():
+ alarm = service_data.get('alarm')
+ if alarm:
+ filtered_grains[service_name] = {'alarm': alarm}
+ trigger = service_data.get('trigger')
+ if trigger:
+ if service_name in filtered_grains:
+ filtered_grains[service_name].update(
+ {'trigger': trigger})
+ else:
+ filtered_grains[service_name] = {'trigger': trigger}
+ alarm_cluster = service_data.get('alarm_cluster')
+ if alarm_cluster:
+ if service_name in filtered_grains:
+ filtered_grains[service_name].update(
+ {'alarm_cluster': alarm_cluster})
+ else:
+ filtered_grains[service_name] = \
+ {'alarm_cluster': alarm_cluster}
+ return filtered_grains
diff --git a/heka/_service.sls b/heka/_service.sls
index c1c85ed..5749b2d 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -1,8 +1,6 @@
{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file ignore missing %}{% endmacro %}
-{%- set server = salt['pillar.get']('heka:'+service_name) %}
-
-{%- if server.enabled %}
+{%- if server.enabled is defined and server.enabled %}
heka_{{ service_name }}_log_file:
file.managed:
@@ -70,6 +68,8 @@
service.running:
- name: {{ service_name }}
- enable: True
+ - watch:
+ - file: /usr/share/lma_collector
{# Setup basic structure for all roles so updates can apply #}
@@ -78,8 +78,6 @@
'decoder': {},
'input': {},
'trigger': {},
- 'alarm': {},
- 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -90,7 +88,6 @@
'input': {},
'trigger': {},
'alarm': {},
- 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -101,7 +98,6 @@
'input': {},
'trigger': {},
'alarm': {},
- 'alarm_cluster': {},
'filter': {},
'splitter': {},
'encoder': {},
@@ -111,7 +107,6 @@
'decoder': {},
'input': {},
'trigger': {},
- 'alarm': {},
'alarm_cluster': {},
'filter': {},
'splitter': {},
@@ -184,7 +179,7 @@
- mode: 600
- defaults:
service_grains:
- heka: {{ service_grains|yaml }}
+ heka: {{ salt['heka_alarming.grains_for_mine'](service_grains)|yaml }}
- require:
- file: heka_grains_dir
@@ -196,6 +191,7 @@
- group: heka
- defaults:
service_name: {{ service_name }}
+ poolsize: {{ server.poolsize }}
- require:
- file: heka_{{ service_name }}_conf_dir
- require_in:
diff --git a/heka/aggregator.sls b/heka/aggregator.sls
index c53d8e7..dda138c 100644
--- a/heka/aggregator.sls
+++ b/heka/aggregator.sls
@@ -3,6 +3,8 @@
include:
- heka._common
+{%- from "heka/map.jinja" import aggregator with context %}
+{%- set server = aggregator %}
{%- set service_name = "aggregator" %}
{%- include "heka/_service.sls" %}
diff --git a/heka/files/gse_policies.lua b/heka/files/gse_policies.lua
index 8ec9c06..8de8175 100644
--- a/heka/files/gse_policies.lua
+++ b/heka/files/gse_policies.lua
@@ -29,15 +29,17 @@
logical_operator = '{{ _trigger["logical_operator"] }}',
rules = {
{%- for _rule in _trigger["rules"] %}
- ['function'] = '{{ _rule["function"] }}',
+ {
+ ['function'] = '{{ _rule["function"] }}',
{%- set comma = joiner(",") %}
- ['arguments'] = {
+ ['arguments'] = {
{%- for _argument in _rule["arguments"]|sort -%}
- {{ comma() }}'{{ _argument }}'
+ {{ comma() }}'{{ _argument }}'
{%- endfor -%}
+ },
+ ['relational_operator'] = '{{ _rule["relational_operator"] }}',
+ ['threshold'] = {{ _rule["threshold"] }},
},
- ['relational_operator'] = '{{ _rule["relational_operator"] }}',
- ['threshold'] = {{ _rule["threshold"] }},
{%- endfor %}
},
},
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
index 7b063d1..6a767e5 100644
--- a/heka/files/lua/common/lma_utils.lua
+++ b/heka/files/lua/common/lma_utils.lua
@@ -45,7 +45,8 @@
ERROR = 3,
WARNING = 4,
NOTICE = 5,
- INFO= 6,
+ NOTE = 5,
+ INFO = 6,
DEBUG = 7,
}
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
index f94a0e8..0befcf0 100644
--- a/heka/files/lua/decoders/collectd.lua
+++ b/heka/files/lua/decoders/collectd.lua
@@ -216,6 +216,14 @@
else
msg['Fields']['name'] = metric_name
end
+ elseif metric_source == 'ntpd' then
+ if sample['type_instance'] == 'error' or sample['type_instance'] == 'loop' then
+ msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. sample['type_instance']
+ else
+ msg['Fields']['name'] = 'ntp' .. sep .. sample['type'] .. sep .. 'peer'
+ msg['Fields']['server'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'server')
+ end
elseif metric_source == 'check_openstack_api' then
-- For OpenStack API metrics, plugin_instance = <service name>
msg['Fields']['name'] = 'openstack_check_api'
@@ -420,7 +428,31 @@
msg['Fields']['service'] = sample['type_instance']
table.insert(msg['Fields']['tag_fields'], 'service')
else
- msg['Fields']['name'] = replace_dot_by_sep(metric_name)
+ -- generic metric name translation for 3rd-party sources
+ msg['Fields']['name'] = sample['plugin']
+ if sample['plugin_instance'] ~= "" then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['plugin_instance']
+ end
+ if sample['type'] ~= 'gauge' and sample['type'] ~= 'derive' and
+ sample['type'] ~= 'counter' and sample['type'] ~= 'absolute' then
+ -- only for custom DS types
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
+ end
+ if sample['type_instance'] ~= "" then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type_instance']
+ end
+ if sample['dsnames'][i] ~= "value" then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
+ end
+ msg['Fields']['name'] = replace_dot_by_sep(msg['Fields']['name'])
+
+ -- add meta fields as tag_fields
+ for k, v in pairs(sample['meta'] or {}) do
+ if tostring(k) ~= '0' then
+ msg['Fields'][k] = v
+ table.insert(msg['Fields']['tag_fields'], k)
+ end
+ end
end
if not skip_it then
diff --git a/heka/files/lua/decoders/galera.lua b/heka/files/lua/decoders/galera.lua
new file mode 100644
index 0000000..c9f385d
--- /dev/null
+++ b/heka/files/lua/decoders/galera.lua
@@ -0,0 +1,63 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+local l = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local utils = require "lma_utils"
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+local programname = read_config('programname') or 'mysql'
+
+-- mysql log messages are formatted like this
+--
+-- 2016-11-09 08:42:34 18430 [Note] InnoDB: Using atomics to ref count buffer pool pages
+local sp = l.space
+local timestamp = l.Cg(patt.Timestamp, "Timestamp")
+local pid = l.Cg(patt.Pid, "Pid")
+local severity = l.P"[" * l.Cg(l.R("az", "AZ")^0 / string.upper, "SeverityLabel") * l.P"]"
+local message = l.Cg(patt.Message, "Message")
+
+local grammar = l.Ct(timestamp * sp^1 * pid * sp^1 * severity * sp^1 * message)
+
+
+function process_message ()
+ local log = read_message("Payload")
+ local m = grammar:match(log)
+ if not m then
+ return -1, string.format("Failed to parse: %s", string.sub(log, 1, 64))
+ end
+
+ msg.Timestamp = m.Timestamp
+ msg.Pid = m.Pid
+ msg.Severity = utils.label_to_severity_map[m.SeverityLabel] or utils.label_to_severity_map.DEBUG
+ msg.Payload = m.Message
+
+ msg.Fields = {}
+ msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+ msg.Fields.programname = programname
+
+ utils.inject_tags(msg)
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/encoders/status_nagios.lua b/heka/files/lua/encoders/status_nagios.lua
index ad4c540..0507a10 100644
--- a/heka/files/lua/encoders/status_nagios.lua
+++ b/heka/files/lua/encoders/status_nagios.lua
@@ -20,7 +20,6 @@
local interp = require "msg_interpolate"
local host = read_config('nagios_host')
-local service_template = read_config('service_template') or error('service_template is required!')
-- Nagios CGI cannot accept 'plugin_output' parameter greater than 1024 bytes
-- See bug #1517917 for details.
-- With the 'cmd.cgi' re-implementation for the command PROCESS_SERVICE_CHECK_RESULT,
@@ -29,7 +28,6 @@
local data = {
cmd_typ = '30',
cmd_mod = '2',
- host = host,
service = nil,
plugin_state = nil,
plugin_output = nil,
@@ -55,7 +53,7 @@
end
function process_message()
- local service_name = interp.interpolate_from_msg(service_template)
+ local service_name = read_message('Fields[member]')
local status = afd.get_status()
local alarms = afd.alarms_for_human(afd.extract_alarms())
@@ -63,6 +61,11 @@
return -1
end
+ if host then
+ data['host'] = host
+ else
+ data['host'] = read_message('Fields[hostname]') or read_message('Hostname')
+ end
data['service'] = service_name
data['plugin_state'] = nagios_state_map[status]
diff --git a/heka/files/lua/filters/gse_cluster_filter.lua b/heka/files/lua/filters/gse_cluster_filter.lua
index 5b6402c..d1fef2b 100644
--- a/heka/files/lua/filters/gse_cluster_filter.lua
+++ b/heka/files/lua/filters/gse_cluster_filter.lua
@@ -64,6 +64,10 @@
return -1, "Cannot find alarms in the AFD/GSE message"
end
+ -- "hostname" is nil here when the input message is a GSE message, so nil
+ -- is not an error condition here
+ local hostname = afd.get_entity_name('hostname')
+
local cluster_ids = gse.find_cluster_memberships(member_id)
-- update all clusters that depend on this entity
diff --git a/heka/files/toml/encoder/sandbox.toml b/heka/files/toml/encoder/sandbox.toml
new file mode 100644
index 0000000..67c95a1
--- /dev/null
+++ b/heka/files/toml/encoder/sandbox.toml
@@ -0,0 +1,13 @@
+[{{ encoder_name }}_encoder]
+type = "SandboxEncoder"
+filename = "{{ encoder.module_file }}"
+{%- if encoder.module_dir is defined %}
+module_directory = "{{ encoder.module_dir }}"
+{%- endif %}
+
+{%- if encoder.config is mapping %}
+[{{ encoder_name }}_encoder.config]
+{%- for config_param, config_value in encoder.config.iteritems() %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% elif config_value in [True, False] %}{{ config_value|lower }}{% else %}{{ config_value }}{% endif %}
+{%- endfor %}
+{%- endif %}
diff --git a/heka/files/toml/filter/afd_alarm.toml b/heka/files/toml/filter/afd_alarm.toml
index c1168df..5fefb2d 100644
--- a/heka/files/toml/filter/afd_alarm.toml
+++ b/heka/files/toml/filter/afd_alarm.toml
@@ -11,4 +11,6 @@
afd_name = "{{ alarm_name }}"
hostname = "{{ grains.host }}"
dimensions = '{{ salt['heka_alarming.dimensions'](alarm)|json }}'
-activate_alerting = {{ alarm.alerting|default(True)|lower }}
+{%- set alerting = alarm.get('alerting', 'enabled') %}
+activate_alerting = {{ salt['heka_alarming.alarm_activate_alerting'](alerting) }}
+enable_notification = {{ salt['heka_alarming.alarm_enable_notification'](alerting) }}
diff --git a/heka/files/toml/filter/gse_alarm_cluster.toml b/heka/files/toml/filter/gse_alarm_cluster.toml
index 72b1923..1c9fe77 100644
--- a/heka/files/toml/filter/gse_alarm_cluster.toml
+++ b/heka/files/toml/filter/gse_alarm_cluster.toml
@@ -9,7 +9,6 @@
[gse_{{ alarm_cluster_name }}_filter.config]
topology_file = "gse_{{ alarm_cluster_name|replace('-', '_') }}_topology"
dimensions = '{{ salt['heka_alarming.dimensions'](alarm_cluster)|json }}'
-activate_alerting = {{ alarm_cluster.alerting|default(True)|lower }}
{%- if alarm_cluster.interval is defined %}
interval = {{ alarm_cluster.interval }}
{%- endif %}
@@ -19,3 +18,6 @@
{%- if alarm_cluster.warm_up_period is defined %}
warm_up_period = {{ alarm_cluster.warm_up_period }}
{%- endif %}
+{%- set alerting = alarm_cluster.get('alerting', 'enabled_with_notification') %}
+activate_alerting = {{ salt['heka_alarming.alarm_activate_alerting'](alerting) }}
+enable_notification = {{ salt['heka_alarming.alarm_enable_notification'](alerting) }}
diff --git a/heka/files/toml/global.toml b/heka/files/toml/global.toml
index 40aaf3b..30d460d 100644
--- a/heka/files/toml/global.toml
+++ b/heka/files/toml/global.toml
@@ -9,4 +9,4 @@
max_message_size = 262144
max_process_inject = 1
max_timer_inject = 10
-poolsize = 100
+poolsize = {{ poolsize }}
diff --git a/heka/files/toml/output/http.toml b/heka/files/toml/output/http.toml
index 1ec3672..fce5d97 100644
--- a/heka/files/toml/output/http.toml
+++ b/heka/files/toml/output/http.toml
@@ -7,14 +7,16 @@
username = "{{ output.username }}"
password = "{{ output.password }}"
{%- endif %}
-http_timeout = {{ output.timeout }}
-method = "POST"
+http_timeout = {{ output.http_timeout|default(2000) }}
+method = "{{ output.method|default("POST") }}"
+{% if output.get('use_buffering', True) %}
use_buffering = true
[{{ output_name }}_output.buffering]
-max_buffer_size = 1610612736
-max_file_size = 134217728
-full_action = "drop"
+max_buffer_size = {{ output.max_buffer_size|default(268435456) }}
+max_file_size = {{ output.max_file_size|default(67108864) }}
+full_action = "{{ output.full_action|default("drop") }}"
+{% endif %}
[{{ output_name }}_output.headers]
Content-Type = ["application/x-www-form-urlencoded"]
diff --git a/heka/log_collector.sls b/heka/log_collector.sls
index a50e6d2..9ee007c 100644
--- a/heka/log_collector.sls
+++ b/heka/log_collector.sls
@@ -3,6 +3,8 @@
include:
- heka._common
+{%- from "heka/map.jinja" import log_collector with context %}
+{%- set server = log_collector %}
{%- set service_name = "log_collector" %}
{%- include "heka/_service.sls" %}
diff --git a/heka/map.jinja b/heka/map.jinja
index 300960e..8b74f9d 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -35,10 +35,13 @@
{% set default_influxdb_time_precision = 'ms' %}
{% set default_influxdb_timeout = 5000 %}
{% set default_aggregator_port = 5565 %}
+{% set default_nagios_port = 8001 %}
+{% set default_nagios_host_alarm_clusters = '00-clusters' %}
{% set log_collector = salt['grains.filter_by']({
'default': {
'elasticsearch_port': default_elasticsearch_port,
+ 'poolsize': 100,
}
}, merge=salt['pillar.get']('heka:log_collector')) %}
@@ -48,6 +51,8 @@
'influxdb_time_precision': default_influxdb_time_precision,
'influxdb_timeout': default_influxdb_timeout,
'aggregator_port': default_aggregator_port,
+ 'nagios_port': default_nagios_port,
+ 'poolsize': 100,
}
}, merge=salt['pillar.get']('heka:metric_collector')) %}
@@ -57,6 +62,7 @@
'influxdb_time_precision': default_influxdb_time_precision,
'influxdb_timeout': default_influxdb_timeout,
'aggregator_port': default_aggregator_port,
+ 'poolsize': 100,
}
}, merge=salt['pillar.get']('heka:remote_collector')) %}
@@ -65,5 +71,8 @@
'influxdb_port': default_influxdb_port,
'influxdb_time_precision': default_influxdb_time_precision,
'influxdb_timeout': default_influxdb_timeout,
+ 'nagios_port': default_nagios_port,
+ 'nagios_host_alarm_clusters': default_nagios_host_alarm_clusters,
+ 'poolsize': 100,
}
}, merge=salt['pillar.get']('heka:aggregator')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 283a8c0..f1d906f 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -106,6 +106,12 @@
append_newlines: false
prefix_ts: false
{%- endif %}
+{%- if metric_collector.nagios_host is defined %}
+ nagios:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/status_nagios.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+{%- endif %}
output:
metric_dashboard:
engine: dashboard
@@ -131,6 +137,20 @@
port: "{{ metric_collector.aggregator_port }}"
message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric'"
{%- endif %}
+{%- if metric_collector.nagios_host is defined %}
+ nagios_alarm:
+ engine: http
+ address: "http://{{ metric_collector.nagios_host }}:{{metric_collector.nagios_port }}/status"
+ message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.afd_metric' && Fields[no_alerting] == NIL"
+ encoder: nagios_encoder
+ {%- if metric_collector.nagios_username is defined and metric_collector.nagios_password is defined %}
+ username: {{ metric_collector.get('nagios_username') }}
+ password: {{ metric_collector.get('nagios_password') }}
+ {%- endif %}
+ max_buffer_size: 1048576
+ max_file_size: 524288
+ full_action: drop
+{%- endif %}
remote_collector:
decoder:
collectd:
@@ -363,15 +383,23 @@
tag_fields: "deployment_id environment_label tenant_id user_id"
time_precision: "{{ aggregator.influxdb_time_precision }}"
{%- endif %}
-{%- if aggregator.influxdb_host is defined %}
encoder:
+{%- if aggregator.influxdb_host is defined %}
influxdb:
engine: payload
append_newlines: false
prefix_ts: false
{%- endif %}
-{%- if aggregator.influxdb_host is defined %}
+{%- if aggregator.nagios_host is defined %}
+ nagios:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/status_nagios.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ nagios_host: "{{ aggregator.nagios_host_alarm_clusters }}"
+{%- endif %}
output:
+{%- if aggregator.influxdb_host is defined %}
influxdb:
engine: http
address: "http://{{ aggregator.influxdb_host }}:{{ aggregator.influxdb_port }}/write?db={{ aggregator.influxdb_database }}&precision={{ aggregator.influxdb_time_precision }}"
@@ -383,3 +411,17 @@
encoder: influxdb_encoder
timeout: {{ aggregator.influxdb_timeout }}
{%- endif %}
+{%- if aggregator.nagios_host is defined %}
+ nagios_alarm_cluster:
+ engine: http
+ address: "http://{{ aggregator.nagios_host }}:{{aggregator.nagios_port }}/status"
+ message_matcher: "Fields[aggregator] == NIL && Type == 'heka.sandbox.gse_metric' && Fields[no_alerting] == NIL"
+ encoder: nagios_encoder
+ {%- if aggregator.nagios_username is defined and aggregator.nagios_password is defined %}
+ username: {{ aggregator.get('nagios_username') }}
+ password: {{ aggregator.get('nagios_password') }}
+ {%- endif %}
+ max_buffer_size: 1048576
+ max_file_size: 524288
+ full_action: drop
+{%- endif %}
diff --git a/heka/metric_collector.sls b/heka/metric_collector.sls
index 9684c1b..7b28a7c 100644
--- a/heka/metric_collector.sls
+++ b/heka/metric_collector.sls
@@ -3,6 +3,8 @@
include:
- heka._common
+{%- from "heka/map.jinja" import metric_collector with context %}
+{%- set server = metric_collector %}
{%- set service_name = "metric_collector" %}
{%- include "heka/_service.sls" %}
diff --git a/heka/remote_collector.sls b/heka/remote_collector.sls
index 961587d..35533c3 100644
--- a/heka/remote_collector.sls
+++ b/heka/remote_collector.sls
@@ -3,6 +3,8 @@
include:
- heka._common
+{%- from "heka/map.jinja" import remote_collector with context %}
+{%- set server = remote_collector %}
{%- set service_name = "remote_collector" %}
{%- include "heka/_service.sls" %}
diff --git a/metadata/service/aggregator/single.yml b/metadata/service/aggregator/single.yml
index 9db8b69..7e814c5 100644
--- a/metadata/service/aggregator/single.yml
+++ b/metadata/service/aggregator/single.yml
@@ -3,7 +3,10 @@
classes:
- service.heka.support
parameters:
+ _param:
+ aggregator_poolsize: 100
heka:
aggregator:
enabled: true
influxdb_time_precision: ms
+ poolsize: ${_param:aggregator_poolsize}
diff --git a/metadata/service/log_collector/single.yml b/metadata/service/log_collector/single.yml
index f857da5..5160c35 100644
--- a/metadata/service/log_collector/single.yml
+++ b/metadata/service/log_collector/single.yml
@@ -3,6 +3,9 @@
classes:
- service.heka.support
parameters:
+ _param:
+ log_collector_poolsize: 100
heka:
log_collector:
enabled: true
+ poolsize: ${_param:log_collector_poolsize}
diff --git a/metadata/service/metric_collector/single.yml b/metadata/service/metric_collector/single.yml
index 1d1a62a..ad183a2 100644
--- a/metadata/service/metric_collector/single.yml
+++ b/metadata/service/metric_collector/single.yml
@@ -3,7 +3,10 @@
classes:
- service.heka.support
parameters:
+ _param:
+ metric_collector_poolsize: 100
heka:
metric_collector:
enabled: true
influxdb_time_precision: ms
+ poolsize: ${_param:metric_collector_poolsize}
diff --git a/metadata/service/remote_collector/single.yml b/metadata/service/remote_collector/single.yml
index e141799..120414c 100644
--- a/metadata/service/remote_collector/single.yml
+++ b/metadata/service/remote_collector/single.yml
@@ -3,7 +3,10 @@
classes:
- service.heka.support
parameters:
+ _param:
+ remote_collector_poolsize: 100
heka:
remote_collector:
enabled: true
influxdb_time_precision: ms
+ poolsize: ${_param:remote_collector_poolsize}