Stacklight integration
diff --git a/README.rst b/README.rst
index 90730cd..75e0ed0 100644
--- a/README.rst
+++ b/README.rst
@@ -3,98 +3,33 @@
Heka Formula
============
-Heka is an open source stream processing software system developed by Mozilla. Heka is a Swiss Army Knife type tool for data processing
+Heka is an open source stream processing software system developed by Mozilla. Heka is a Swiss Army Knife type tool for data processing.
Sample pillars
==============
-Basic log shipper streaming decoded rsyslog's logfiles using amqp broker as transport.
-From every message there is one amqp message and it's also logged to heka's logfile in RST format.
+Log collector service
.. code-block:: yaml
-
heka:
- server:
+ log_collector:
enabled: true
- input:
- rsyslog-syslog:
- engine: logstreamer
- log_directory: /var/log
- file_match: syslog\.?(?P<Index>\d+)?(.gz)?
- decoder: RsyslogDecoder
- priority: ["^Index"]
- rsyslog-auth:
- engine: logstreamer
- log_directory: /var/log
- file_match: auth\.log\.?(?P<Index>\d+)?(.gz)?
- decoder: RsyslogDecoder
- priority: ["^Index"]
- decoder:
- rsyslog:
- engine: rsyslog
- template: %TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n
- hostname_keep: TRUE
- tz: Europe/Prague
output:
- rabbitmq:
- engine: amqp
+ elasticsearch01:
+ engine: elasticsearch
host: localhost
- user: guest
- password: guest
- vhost: /logs
- exchange: logs
- exchange_type: fanout
- encoder: ProtobufEncoder
- use_framing: true
- heka-logfile:
- engine: logoutput
- encoder: RstEncoder
+ port: 9200
+ encoder: es_json
message_matcher: TRUE
- encoder:
- heka-logfile:
- engine: RstEncoder
-
-Heka acting as message router and dashboard.
-Messages are consumed from amqp and sent to elasticsearch server.
-
+Metric collector service
.. code-block:: yaml
-
heka:
- server:
+ metric_collector:
enabled: true
- input:
- rabbitmq:
- engine: amqp
- host: localhost
- user: guest
- password: guest
- vhost: /logs
- exchange: logs
- exchange_type: fanout
- decoder: ProtoBufDecoder
- splitter: HekaFramingSplitter
- rsyslog-syslog:
- engine: logstreamer
- log_directory: /var/log
- file_match: syslog\.?(?P<Index>\d+)?(.gz)?
- decoder: RsyslogDecoder
- priority: ["^Index"]
- rsyslog-auth:
- engine: logstreamer
- log_directory: /var/log
- file_match: auth\.log\.?(?P<Index>\d+)?(.gz)?
- decoder: RsyslogDecoder
- priority: ["^Index"]
- decoder:
- rsyslog:
- engine: rsyslog
- template: %TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n
- hostname_keep: TRUE
- tz: Europe/Prague
output:
elasticsearch01:
engine: elasticsearch
@@ -105,11 +40,25 @@
dashboard01:
engine: dashboard
ticker_interval: 30
- encoder:
- es-json:
- engine: es-json
+
+Aggregator service
+
+.. code-block:: yaml
+
+ heka:
+ aggregator:
+ enabled: true
+ output:
+ elasticsearch01:
+ engine: elasticsearch
+ host: localhost
+ port: 9200
+ encoder: es_json
message_matcher: TRUE
- index: logfile-%{%Y.%m.%d}
+ dashboard01:
+ engine: dashboard
+ ticker_interval: 30
+
Read more
=========
diff --git a/debian/changelog b/debian/changelog
index f828603..b604c53 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,5 +1,4 @@
-<<<<<<< HEAD
-=======
+
salt-formula-heka (0.2) trusty; urgency=medium
* First public release
diff --git a/heka/_common.sls b/heka/_common.sls
new file mode 100644
index 0000000..0ffad13
--- /dev/null
+++ b/heka/_common.sls
@@ -0,0 +1,29 @@
+{%- from "heka/map.jinja" import server with context %}
+
+heka_packages:
+ pkg.latest:
+ - names: {{ server.pkgs }}
+
+/usr/share/lma_collector:
+ file.recurse:
+ - source: salt://heka/files/lua
+
+heka_user:
+ user.present:
+ - name: heka
+ - system: true
+ - shell: /bin/false
+ - groups: {{ server.groups }}
+ - require:
+ - pkg: heka_packages
+
+heka_acl_log:
+ cmd.run:
+ - name: "setfacl -R -m g:adm:rx /var/log; setfacl -R -d -m g:adm:rx /var/log"
+ - unless: "getfacl /var/log/|grep default:group:adm"
+
+heka_service:
+ service.dead:
+ - name: heka
+
+{%- endif %}
diff --git a/heka/_service.sls b/heka/_service.sls
new file mode 100644
index 0000000..7085abe
--- /dev/null
+++ b/heka/_service.sls
@@ -0,0 +1,286 @@
+
+{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file %}{% endmacro %}
+
+{%- macro service_config(service_name) %}
+
+{%- set server = salt['pillar.get']('heka:'+service_name) %}
+
+{%- if server.enabled %}
+
+heka_{{ service_name }}_log_file:
+ file.managed:
+ - name: /var/log/{{ service_name }}.log
+ - user: heka
+ - mode: 644
+ - replace: False
+
+heka_{{ service_name }}_conf_dir:
+ file.directory:
+ - name: /etc/{{ service_name }}
+ - user: heka
+ - mode: 750
+ - makedirs: true
+
+heka_{{ service_name }}_conf_dir_clean:
+ file.directory:
+ - name: /etc/{{ service_name }}
+ - clean: true
+
+{%- if grains.get('init', None) == 'systemd' %}
+
+heka_{{ service_name }}_service_file:
+ file.managed:
+ - name: /etc/systemd/system/{{ service_name }}.service
+ - source: salt://heka/files/heka.service
+ - user: root
+ - mode: 644
+ - template: jinja
+
+{%- else %}
+
+heka_{{ service_name }}_service_file:
+ file.managed:
+ - name: /etc/init/{{ service_name }}.conf
+ - source: salt://heka/files/heka.service
+ - user: root
+ - mode: 644
+ - template: jinja
+
+heka_{{ service_name }}_service_wrapper:
+ file.managed:
+ - name: /usr/local/bin/{{ service_name }}
+ - source: salt://heka/files/service_wrapper
+ - user: root
+ - mode: 755
+ - template: jinja
+
+{%- endif %}
+
+
+{# Setup basic structure for all roles so updates can apply #}
+
+{%- set service_grains = {
+ 'heka': {
+ 'log_collector': {
+ 'decoder': {},
+ 'input': {},
+ 'filter': {},
+ 'splitter': {},
+ 'encoder': {},
+ 'output': {}
+ },
+ 'metric_collector': {
+ 'decoder': {},
+ 'input': {},
+ 'filter': {},
+ 'splitter': {},
+ 'encoder': {},
+ 'output': {}
+ },
+ 'remote_collector': {
+ 'decoder': {},
+ 'input': {},
+ 'filter': {},
+ 'splitter': {},
+ 'encoder': {},
+ 'output': {}
+ },
+ 'aggregator': {
+ 'decoder': {},
+ 'input': {},
+ 'filter': {},
+ 'splitter': {},
+ 'encoder': {},
+ 'output': {}
+ }
+ }
+} %}
+
+
+{# Loading the other services' support metadata for local roles #}
+
+{%- if service_name in ['log_collector', 'metric_collector'] %}
+
+{%- for service_name, service in pillar.iteritems() %}
+{%- if service.get('_support', {}).get('heka', {}).get('enabled', False) %}
+
+{%- set grains_fragment_file = service_name+'/meta/heka.yml' %}
+{%- set grains_yaml = load_grains_file(grains_fragment_file)|load_yaml %}
+{%- do service_grains.heka.update(grains_yaml) %}
+
+{%- endif %}
+{%- endfor %}
+
+{%- endif %}
+
+
+{# Loading the other services' support metadata from salt-mine #}
+
+{%- if service_name in ['remote_collector', 'aggregator'] %}
+
+{%- for node_name, node_grains in salt['mine.get']('*', 'grains.items').iteritems() %}
+{%- if node_grains.heka is defined %}
+
+{%- do service_grains.heka.update(node_grains.heka) %}
+
+{%- endif %}
+{%- endfor %}
+
+{%- endif %}
+
+
+{# Overriding aggregated metadata from user-space pillar data #}
+
+{%- for service_grain_name, service_grain in service_grains.iteritems() %}
+{% if salt['pillar.get']('heka:'+service_grain_name) %}
+
+{%- for service_action_name, service_action in service_grain.iteritems() %}
+{%- if salt['pillar.get']('heka:'+service_grain_name).get(service_action_name, False) is mapping %}
+{%- do service_grains.heka.[service_grain_name].[service_action_name].update(salt['pillar.get']('heka:'+service_grain_name+':'+service_action_name)) %}
+{%- endif %}
+{%- endfor %}
+
+{%- endif %}
+{%- endfor %}
+
+
+/etc/{{ service_name }}/global.toml:
+ file.managed:
+ - source: salt://heka/files/toml/global.toml
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - defaults:
+ service_name: {{ service_name }}
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+
+{%- for decoder_name, decoder in service_grains.heka.[service_name].decoder.iteritems() %}
+
+/etc/{{ service_name }}/10-decoder-{{ decoder_name }}-{{ decoder.engine }}.toml:
+ file.managed:
+ - source: salt://heka/files/toml/decoder/{{ decoder.engine }}.toml
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ decoder_name: {{ decoder_name }}
+ decoder: {{ decoder|yaml }}
+
+{%- endfor %}
+
+{%- for input_name, input in service_grains.heka.[service_name].input.iteritems() %}
+
+/etc/{{ service_name }}/15-input-{{ input_name }}-{{ input.engine }}.toml:
+ file.managed:
+ - source: salt://heka/files/toml/input/{{ input.engine }}.toml
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ input_name: {{ input_name }}
+ input: {{ input|yaml }}
+
+{%- endfor %}
+
+{%- for filter_name, filter in service_grains.heka.[service_name].filter.iteritems() %}
+
+/etc/{{ service_name }}/20-filter-{{ filter_name }}-{{ filter.engine }}.toml:
+ file.managed:
+ - source: salt://heka/files/toml/filter/{{ filter.engine }}.toml
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ filter_name: {{ filter_name }}
+ filter: {{ filter|yaml }}
+
+{%- endfor %}
+
+{%- for splitter_name, splitter in service_grains.heka.[service_name].splitter.iteritems() %}
+
+/etc/{{ service_name }}/30-splitter-{{ splitter_name }}-{{ splitter.engine }}.toml:
+ file.managed:
+ - source: salt://heka/files/toml/splitter/{{ splitter.engine }}.toml
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ splitter_name: {{ splitter_name }}
+ splitter: {{ splitter|yaml }}
+
+{%- endfor %}
+
+{%- for encoder_name, encoder in service_grains.heka.[service_name].encoder.iteritems() %}
+
+/etc/{{ service_name }}/40-encoder-{{ encoder_name }}-{{ encoder.engine }}.toml:
+ file.managed:
+ - source: salt://heka/files/toml/encoder/{{ encoder.engine }}.toml
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ encoder_name: {{ encoder_name }}
+ encoder: {{ encoder|yaml }}
+
+{%- endfor %}
+
+{%- for output_name, output in service_grains.heka.[service_name].output.iteritems() %}
+
+/etc/{{ service_name }}/60-output-{{ output_name }}-{{ output.engine }}.toml:
+ file.managed:
+ - source: salt://heka/files/toml/output/{{ output.engine }}.toml
+ - template: jinja
+ - mode: 640
+ - group: heka
+ - require:
+ - file: heka_{{ service_name }}_conf_dir
+ - require_in:
+ - file: heka_{{ service_name }}_conf_dir_clean
+ - watch_in:
+ - service: heka_{{ service_name }}_service
+ - defaults:
+ output_name: {{ output_name }}
+ output: {{ output|yaml }}
+
+{%- endfor %}
+
+{%- endif %}
+
+{%- endmacro %}
+
+{%- service_config(service_name) %}
diff --git a/heka/aggregator.sls b/heka/aggregator.sls
new file mode 100644
index 0000000..c53d8e7
--- /dev/null
+++ b/heka/aggregator.sls
@@ -0,0 +1,10 @@
+{%- if pillar.heka.aggregator is defined %}
+
+include:
+- heka._common
+
+{%- set service_name = "aggregator" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/files/00-hekad.toml b/heka/files/00-hekad.toml
deleted file mode 100644
index 99cad61..0000000
--- a/heka/files/00-hekad.toml
+++ /dev/null
@@ -1,3 +0,0 @@
-[hekad]
-{%- set workers = grains.num_cpus + 1 %}
-maxprocs = {{ workers }}
diff --git a/heka/files/decoder/multidecoder.toml b/heka/files/decoder/multidecoder.toml
deleted file mode 100644
index fe73218..0000000
--- a/heka/files/decoder/multidecoder.toml
+++ /dev/null
@@ -1,6 +0,0 @@
-[multidecoder_{{ name }}]
-type = "MultiDecoder"
-subs = {{ values.subs }}
-cascade_strategy = "{{ values.cascade_strategy }}"
-log_sub_errors = {{ values.log_sub_errors }}
-
diff --git a/heka/files/decoder/payloadregex.toml b/heka/files/decoder/payloadregex.toml
deleted file mode 100644
index 0af80f7..0000000
--- a/heka/files/decoder/payloadregex.toml
+++ /dev/null
@@ -1,5 +0,0 @@
-[Payloadregex_{{ name }}]
-type = "PayloadRegexDecoder"
-match_regex = "{{ values.match_regex }}"
-timestamp_layout = "{{ values.timestamp_layout }}"
-
diff --git a/heka/files/decoder/protobuf.toml b/heka/files/decoder/protobuf.toml
deleted file mode 100644
index c856239..0000000
--- a/heka/files/decoder/protobuf.toml
+++ /dev/null
@@ -1,3 +0,0 @@
-[ProtoBufDecoder]
-type = "ProtobufDecoder"
-
diff --git a/heka/files/decoder/rsyslog.toml b/heka/files/decoder/rsyslog.toml
deleted file mode 100644
index 6abd40e..0000000
--- a/heka/files/decoder/rsyslog.toml
+++ /dev/null
@@ -1,9 +0,0 @@
-{%- from "heka/map.jinja" import server with context -%}
-[RsyslogDecoder]
-type = "SandboxDecoder"
-filename = "lua_decoders/rsyslog.lua"
-
-[RsyslogDecoder.config]
-type = "{{ values.type }}"
-template = '%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n'
-tz = "{{ server.decoder.rsyslog.tz }}"
diff --git a/heka/files/decoder/sandbox.toml b/heka/files/decoder/sandbox.toml
deleted file mode 100644
index 053b475..0000000
--- a/heka/files/decoder/sandbox.toml
+++ /dev/null
@@ -1,30 +0,0 @@
-[Sandbox_{{ name }}]
-type = "SandboxDecoder"
-filename = "{{ values.file_name }}"
-
-{% if values.module_directory is defined %}
-module_directory = "{{ values.module_directory }}"
-{%- endif %}
-
-{% if values.memory_limit is defined %}
-memory_limit = "{{ values.memory_limit }}"
-{%- endif %}
-
-{% if values.preserve_data is defined %}
-preserve_data = "{{ values.preserve_data }}"
-{%- endif %}
-
-{% if values.instruction_limit is defined %}
-instruction_limit = "{{ values.instruction_limit }}"
-{%- endif %}
-
-{% if values.output_limit is defined %}
-output_limit = "{{ values.output_limit }}"
-{%- endif %}
-
-{% if values.config is defined %}
-config = [
-{% for k,v in values.config.iteritems() %}
-{{ k }} = {{ v }} ]
-{%- endfor %}
-{%- endif %}
diff --git a/heka/files/heka.service b/heka/files/heka.service
index b95fab6..b32bf8e 100644
--- a/heka/files/heka.service
+++ b/heka/files/heka.service
@@ -1,14 +1,17 @@
+{%- if grains.get('init', None) == 'systemd' %}
+
[Unit]
-Description=heka - data collector and processor daemon
+Description=heka {{ service_name }} - data collector and processor daemon
After=network.target auditd.service
-ConditionPathExists=!/etc/heka/hekad_not_to_be_run
+ConditionPathExists=!/etc/{{ service_name }}_not_to_be_run
[Service]
-EnvironmentFile=-/etc/default/heka
+EnvironmentFile=-/etc/default/{{ service_name }}
User=heka
Group=heka
-ExecStart=/usr/bin/hekad -config=/etc/heka/conf.d/
-ExecReload=/bin/kill -HUP $MAINPID
+ExecStart=/usr/bin/hekad -config=/etc/{{ service_name }}
+# NOT SURE HEKA doesn't support reloading by signal
+# ExecReload=/bin/kill -HUP $MAINPID
KillMode=process
Restart=on-failure
StandardError=inherit
@@ -16,3 +19,26 @@
[Install]
WantedBy=multi-user.target
+{%- else %}
+
+# heka {{ service_name }}
+
+description "{{ service_name }}"
+
+start on runlevel [2345]
+stop on runlevel [!2345]
+
+respawn
+
+pre-start script
+ touch /var/log/{{ service_name }}.log
+ chown heka:heka /var/log/{{ service_name }}.log
+end script
+
+script
+ # https://bugs.launchpad.net/lma-toolchain/+bug/1543289
+ ulimit -n 102400
+ exec start-stop-daemon --start --chuid heka --exec /usr/local/bin/{{ service_name }}_wrapper 2>>/var/log/{{ service_name }}.log
+end script
+
+{%- endif %}
\ No newline at end of file
diff --git a/heka/files/input/amqp.toml b/heka/files/input/amqp.toml
deleted file mode 100644
index c3dbd70..0000000
--- a/heka/files/input/amqp.toml
+++ /dev/null
@@ -1,39 +0,0 @@
-[input_{{ name }}]
-type = "AMQPInput"
-url = "amqp{% if values.ssl is defined and values.ssl.get('enabled', True) %}s{% endif %}://{{ values.user }}:{{ values.password }}@{{ values.host }}/{{ values.vhost }}"
-exchange = "{{ values.exchange }}"
-exchange_type = "{{ values.exchange_type }}"
-
-{% if values.prefetch_count is defined -%}
-prefetch_count = {{ values.prefetch_count }}
-{% endif %}
-{%- if values.exchange_durability is defined -%}
-exchange_durability = "{{ values.exchange_durability }}"
-{% endif %}
-{%- if values.exchange_auto_delete is defined -%}
-exchange_auto_delete = "{{ values.exchange_auto_delete }}"
-{% endif %}
-{%- if values.queue_auto_delete is defined -%}
-queue_auto_delete = {{ values.queue_auto_delete }}
-{% endif %}
-{%- if values.queue is defined -%}
-queue = "{{ values.queue }}"
-{% endif %}
-{%- if values.routing_key is defined -%}
-routing_key = "{{ values.routing_key }}"
-{% endif %}
-decoder = "{{ values.decoder }}"
-splitter = "{{ values.splitter }}"
-
-{%- if values.ssl is defined and values.ssl.get('enabled', True) %}
-[input_{{ name }}.tls]
-cert_file = "{{ values.ssl.cert_file }}"
-key_file = "{{ values.ssl.key_file }}"
-{%- if values.ssl.ca_file is defined %}
-root_cafile = "{{ values.ssl.ca_file }}"
-{%- endif %}
-{%- endif %}
-
-{#-
-vim: syntax=jinja
--#}
diff --git a/heka/files/input/logstreamer.toml b/heka/files/input/logstreamer.toml
deleted file mode 100644
index 8e3ac11..0000000
--- a/heka/files/input/logstreamer.toml
+++ /dev/null
@@ -1,19 +0,0 @@
-[logstreamer_{{ name }}]
-type = "LogstreamerInput"
-log_directory = "{{ values.log_directory }}"
-file_match = '{{ values.file_match }}'
-{% if values.priority is defined %}
-priority = {{ values.priority }}
-{% endif %}
-{% if values.decoder is defined %}
-decoder = "{{ values.decoder }}"
-{% endif %}
-{% if values.splitter is defined %}
-splitter = '{{ values.splitter }}'
-{% endif %}
-{% if values.differentiator is defined %}
-differentiator = {{ values.differentiator }}
-{% endif %}
-{% if values.oldest_duration is defined %}
-oldest_duration = "{{ values.oldest_duration }}"
-{% endif %}
diff --git a/heka/files/lua/common/accumulator.lua b/heka/files/lua/common/accumulator.lua
new file mode 100644
index 0000000..9b01a8f
--- /dev/null
+++ b/heka/files/lua/common/accumulator.lua
@@ -0,0 +1,63 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local time = os.time
+local string = string
+local table = table
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local tostring = tostring
+local type = type
+
+local Accumulator = {}
+Accumulator.__index = Accumulator
+
+setfenv(1, Accumulator) -- Remove external access to contain everything in the module
+
+-- Create a new Accumulator
+--
+-- flush_count: the maximum number of items to accumulate before flushing
+-- flush_interval: the maximum number of seconds to wait before flushing
+-- callback: the function to call back when flushing the accumulator, it will
+-- receive the table of accumulated items as parameter.
+function Accumulator.new(flush_count, flush_interval, callback)
+ local a = {}
+ setmetatable(a, Accumulator)
+ a.flush_count = flush_count
+ a.flush_interval = flush_interval
+ a.flush_cb = callback
+ a.last_flush = time() * 1e9
+ a.buffer = {}
+ return a
+end
+
+-- Flush the buffer if flush_count or flush_interval are met
+--
+-- ns: the current timestamp in nanosecond (optional)
+function Accumulator:flush(ns)
+ local now = ns or time() * 1e9
+ if #self.buffer > self.flush_count or now - self.last_flush > self.flush_interval then
+ self.flush_cb(self.buffer)
+ self.buffer = {}
+ self.last_flush = now
+ end
+end
+
+-- Append an item to the buffer and flush the buffer if needed
+function Accumulator:append(item)
+ self.buffer[#self.buffer+1] = item
+ self:flush()
+end
+
+return Accumulator
diff --git a/heka/files/lua/common/afd.lua b/heka/files/lua/common/afd.lua
new file mode 100644
index 0000000..9e864df
--- /dev/null
+++ b/heka/files/lua/common/afd.lua
@@ -0,0 +1,185 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local cjson = require 'cjson'
+local string = require 'string'
+
+local lma = require 'lma_utils'
+local consts = require 'gse_constants'
+
+local read_message = read_message
+local assert = assert
+local ipairs = ipairs
+local pairs = pairs
+local pcall = pcall
+local table = table
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+function get_entity_name(field)
+ return read_message(string.format('Fields[%s]', field))
+end
+
+function get_status()
+ return read_message('Fields[value]')
+end
+
+function extract_alarms()
+ local ok, payload = pcall(cjson.decode, read_message('Payload'))
+ if not ok or not payload.alarms then
+ return nil
+ end
+ return payload.alarms
+end
+
+-- return a human-readable message from an alarm table
+-- for instance: "CPU load too high (WARNING, rule='last(load_midterm)>=5', current=7)"
+function get_alarm_for_human(alarm)
+ local metric
+ local fields = {}
+ for name, value in pairs(alarm.fields) do
+ fields[#fields+1] = name .. '="' .. value .. '"'
+ end
+ if #fields > 0 then
+ metric = string.format('%s[%s]', alarm.metric, table.concat(fields, ','))
+ else
+ metric = alarm.metric
+ end
+
+ local host = ''
+ if alarm.hostname then
+ host = string.format(', host=%s', alarm.hostname)
+ end
+
+ return string.format(
+ "%s (%s, rule='%s(%s)%s%s', current=%.2f%s)",
+ alarm.message,
+ alarm.severity,
+ alarm['function'],
+ metric,
+ alarm.operator,
+ alarm.threshold,
+ alarm.value,
+ host
+ )
+end
+
+function alarms_for_human(alarms)
+ local alarm_messages = {}
+ local hint_messages = {}
+
+ for _, v in ipairs(alarms) do
+ if v.tags and v.tags.dependency_level and v.tags.dependency_level == 'hint' then
+ hint_messages[#hint_messages+1] = get_alarm_for_human(v)
+ else
+ alarm_messages[#alarm_messages+1] = get_alarm_for_human(v)
+ end
+ end
+
+ if #hint_messages > 0 then
+ alarm_messages[#alarm_messages+1] = "Other related alarms:"
+ end
+ for _, v in ipairs(hint_messages) do
+ alarm_messages[#alarm_messages+1] = v
+ end
+
+ return alarm_messages
+end
+
+local alarms = {}
+
+-- append an alarm to the list of pending alarms
+-- the list is sent when inject_afd_metric is called
+function add_to_alarms(status, fn, metric, fields, tags, operator, value, threshold, window, periods, message)
+ local severity = consts.status_label(status)
+ assert(severity)
+ alarms[#alarms+1] = {
+ severity=severity,
+ ['function']=fn,
+ metric=metric,
+ fields=fields or {},
+ tags=tags or {},
+ operator=operator,
+ value=value,
+ threshold=threshold,
+ window=window or 0,
+ periods=periods or 0,
+ message=message
+ }
+end
+
+function get_alarms()
+ return alarms
+end
+
+function reset_alarms()
+ alarms = {}
+end
+
+-- inject an AFD event into the Heka pipeline
+function inject_afd_metric(msg_type, msg_tag_name, msg_tag_value, metric_name,
+ value, hostname, interval, source, to_alerting)
+ local payload
+
+ if #alarms > 0 then
+ payload = lma.safe_json_encode({alarms=alarms})
+ reset_alarms()
+ if not payload then
+ return
+ end
+ else
+ -- because cjson encodes empty tables as objects instead of arrays
+ payload = '{"alarms":[]}'
+ end
+
+ local no_alerting
+ if to_alerting ~= nil and to_alerting == false then
+ no_alerting = true
+ end
+
+ local msg = {
+ Type = msg_type,
+ Payload = payload,
+ Fields = {
+ name=metric_name,
+ value=value,
+ hostname=hostname,
+ interval=interval,
+ source=source,
+ tag_fields={msg_tag_name, 'source', 'hostname'},
+ no_alerting = no_alerting,
+ }
+ }
+ msg.Fields[msg_tag_name] = msg_tag_value,
+ lma.inject_tags(msg)
+ lma.safe_inject_message(msg)
+end
+
+-- inject an AFD service event into the Heka pipeline
+function inject_afd_service_metric(service, value, hostname, interval, source)
+ inject_afd_metric('afd_service_metric',
+ 'service',
+ service,
+ 'service_status',
+ value, hostname, interval, source)
+end
+
+MATCH = 1
+NO_MATCH = 2
+NO_DATA = 3
+MISSING_DATA = 4
+
+
+return M
diff --git a/heka/files/lua/common/afd_alarm.lua b/heka/files/lua/common/afd_alarm.lua
new file mode 100644
index 0000000..4fd660f
--- /dev/null
+++ b/heka/files/lua/common/afd_alarm.lua
@@ -0,0 +1,216 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local assert = assert
+local ipairs = ipairs
+local pairs = pairs
+local string = string
+local setmetatable = setmetatable
+
+-- LMA libs
+local utils = require 'lma_utils'
+local table_utils = require 'table_utils'
+local consts = require 'gse_constants'
+local afd = require 'afd'
+local Rule = require 'afd_rule'
+
+local SEVERITIES = {
+ warning = consts.WARN,
+ critical = consts.CRIT,
+ down = consts.DOWN,
+ unknown = consts.UNKW,
+ okay = consts.OKAY,
+}
+
+local Alarm = {}
+Alarm.__index = Alarm
+
+setfenv(1, Alarm) -- Remove external access to contain everything in the module
+
+function Alarm.new(alarm)
+ local a = {}
+ setmetatable(a, Alarm)
+ a._metrics_list = nil
+ a.name = alarm.name
+ a.description = alarm.description
+ if alarm.trigger.logical_operator then
+ a.logical_operator = string.lower(alarm.trigger.logical_operator)
+ else
+ a.logical_operator = 'or'
+ end
+ a.severity_str = string.upper(alarm.severity)
+ a.severity = SEVERITIES[string.lower(alarm.severity)]
+ assert(a.severity ~= nil)
+
+ a.skip_when_no_data = false
+ if alarm.no_data_policy then
+ if string.lower(alarm.no_data_policy) == 'skip' then
+ a.skip_when_no_data = true
+ else
+ a.no_data_severity = SEVERITIES[string.lower(alarm.no_data_policy)]
+ end
+ else
+ a.no_data_severity = consts.UNKW
+ end
+ assert(a.skip_when_no_data or a.no_data_severity ~= nil)
+
+ a.rules = {}
+ a.initial_wait = 0
+ for _, rule in ipairs(alarm.trigger.rules) do
+ local r = Rule.new(rule)
+ a.rules[#a.rules+1] = r
+ local wait = r.window * r.periods
+ if wait > a.initial_wait then
+ a.initial_wait = wait * 1e9
+ end
+ end
+ a.start_time_ns = 0
+
+ return a
+end
+
+-- return the Set of metrics used by the alarm
+function Alarm:get_metrics()
+ if not self._metrics_list then
+ self._metrics_list = {}
+ for _, rule in ipairs(self.rules) do
+ if not table_utils.item_find(rule.metric, metrics) then
+ self._metrics_list[#self._metrics_list+1] = rule.metric
+ end
+ end
+ end
+ return self._metrics_list
+end
+
+-- return a list of field names used for the metric
+-- (can have duplicate names)
+function Alarm:get_metric_fields(metric_name)
+ local fields = {}
+ for _, rule in ipairs(self.rules) do
+ if rule.metric == metric_name then
+ for k, _ in pairs(rule.fields) do
+ fields[#fields+1] = k
+ end
+ for _, g in ipairs(rule.group_by) do
+ fields[#fields+1] = g
+ end
+ end
+ end
+ return fields
+end
+
+function Alarm:has_metric(metric)
+ return table_utils.item_find(metric, self:get_metrics())
+end
+
+-- dispatch datapoint in datastores
+function Alarm:add_value(ts, metric, value, fields)
+ local data
+ for id, rule in pairs(self.rules) do
+ if rule.metric == metric then
+ rule:add_value(ts, value, fields)
+ end
+ end
+end
+
+-- return: state of alarm and a list of alarm details.
+--
+-- with alarm list when state != OKAY:
+-- {
+-- {
+-- value = <current value>,
+-- fields = <metric fields table>,
+-- message = <string>,
+-- },
+-- }
+function Alarm:evaluate(ns)
+ local state = consts.OKAY
+ local matches = 0
+ local all_alerts = {}
+ local function add_alarm(rule, value, message, fields)
+ all_alerts[#all_alerts+1] = {
+ severity = self.severity_str,
+ ['function'] = rule.fct,
+ metric = rule.metric,
+ operator = rule.relational_operator,
+ threshold = rule.threshold,
+ window = rule.window,
+ periods = rule.periods,
+ value = value,
+ fields = fields,
+ message = message
+ }
+ end
+ local one_unknown = false
+ local msg
+
+ for _, rule in ipairs(self.rules) do
+ local eval, context_list = rule:evaluate(ns)
+ if eval == afd.MATCH then
+ matches = matches + 1
+ msg = self.description
+ elseif eval == afd.MISSING_DATA then
+ msg = 'No datapoint have been received over the last ' .. rule.observation_window .. ' seconds'
+ one_unknown = true
+ elseif eval == afd.NO_DATA then
+ msg = 'No datapoint have been received ever'
+ one_unknown = true
+ end
+ for _, context in ipairs(context_list) do
+ add_alarm(rule, context.value, msg,
+ context.fields)
+ end
+ end
+
+ if self.logical_operator == 'and' then
+ if one_unknown then
+ if self.skip_when_no_data then
+ state = nil
+ else
+ state = self.no_data_severity
+ end
+ elseif #self.rules == matches then
+ state = self.severity
+ end
+ elseif self.logical_operator == 'or' then
+ if matches > 0 then
+ state = self.severity
+ elseif one_unknown then
+ if self.skip_when_no_data then
+ state = nil
+ else
+ state = self.no_data_severity
+ end
+ end
+ end
+
+ if state == nil or state == consts.OKAY then
+ all_alerts = {}
+ end
+ return state, all_alerts
+end
+
+function Alarm:set_start_time(ns)
+ self.start_time_ns = ns
+end
+
+function Alarm:is_evaluation_time(ns)
+ local delta = ns - self.start_time_ns
+ if delta >= self.initial_wait then
+ return true
+ end
+ return false
+end
+
+return Alarm
diff --git a/heka/files/lua/common/afd_alarms.lua b/heka/files/lua/common/afd_alarms.lua
new file mode 100644
index 0000000..74dc328
--- /dev/null
+++ b/heka/files/lua/common/afd_alarms.lua
@@ -0,0 +1,120 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local pairs = pairs
+local ipairs = ipairs
+local lma = require 'lma_utils'
+local table_utils = require 'table_utils'
+local consts = require 'gse_constants'
+local gse_utils = require 'gse_utils'
+local Alarm = require 'afd_alarm'
+
+local all_alarms = {}
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+-- return a list of field names required for the metric
+function get_metric_fields(metric_name)
+ local fields = {}
+ for name, alarm in pairs(all_alarms) do
+ local mf = alarm:get_metric_fields(metric_name)
+ if mf then
+ for _, field in pairs(mf) do
+ if not table_utils.item_find(field, fields) then
+ fields[#fields+1] = field
+ end
+ end
+ end
+ end
+ return fields
+end
+
+-- return list of alarms interested by a metric
+function get_interested_alarms(metric)
+ local interested_alarms = {}
+ for _, alarm in pairs(all_alarms) do
+ if alarm:has_metric(metric) then
+
+ interested_alarms[#interested_alarms+1] = alarm
+ end
+ end
+ return interested_alarms
+end
+
+function add_value(ts, metric, value, fields)
+ local interested_alarms = get_interested_alarms(metric)
+ for _, alarm in ipairs (interested_alarms) do
+ alarm:add_value(ts, metric, value, fields)
+ end
+end
+
+function reset_alarms()
+ all_alarms = {}
+end
+
+function evaluate(ns)
+ local global_state
+ local all_alerts = {}
+ for _, alarm in pairs(all_alarms) do
+ if alarm:is_evaluation_time(ns) then
+ local state, alerts = alarm:evaluate(ns)
+ global_state = gse_utils.max_status(state, global_state)
+ for _, a in ipairs(alerts) do
+ all_alerts[#all_alerts+1] = { state=state, alert=a }
+ end
+ -- raise the first triggered alarm except for OKAY/UNKW states
+ if global_state ~= consts.UNKW and global_state ~= consts.OKAY then
+ break
+ end
+ end
+ end
+ return global_state, all_alerts
+end
+
+function get_alarms()
+ return all_alarms
+end
+function get_alarm(alarm_name)
+ for _, a in ipairs(all_alarms) do
+ if a.name == alarm_name then
+ return a
+ end
+ end
+end
+
+function load_alarm(alarm)
+ local A = Alarm.new(alarm)
+ all_alarms[#all_alarms+1] = A
+end
+
+function load_alarms(alarms)
+ for _, alarm in ipairs(alarms) do
+ load_alarm(alarm)
+ end
+end
+
+local started = false
+function set_start_time(ns)
+ for _, alarm in ipairs(all_alarms) do
+ alarm:set_start_time(ns)
+ end
+ started = true
+end
+
+function is_started()
+ return started
+end
+
+return M
diff --git a/heka/files/lua/common/afd_rule.lua b/heka/files/lua/common/afd_rule.lua
new file mode 100644
index 0000000..680ea61
--- /dev/null
+++ b/heka/files/lua/common/afd_rule.lua
@@ -0,0 +1,324 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local anomaly = require('anomaly')
+local circular_buffer = require('circular_buffer')
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local math = require 'math'
+local string = string
+local table = table
+local assert = assert
+local type = type
+
+-- LMA libs
+local utils = require 'lma_utils'
+local table_utils = require 'table_utils'
+local consts = require 'gse_constants'
+local gse_utils = require 'gse_utils'
+local afd = require 'afd'
+local matching = require 'value_matching'
+
+local MIN_WINDOW = 10
+local MIN_PERIOD = 1
+local SECONDS_PER_ROW = 5
+
+local Rule = {}
+Rule.__index = Rule
+
+setfenv(1, Rule) -- Remove external access to contain everything in the module
+
+function Rule.new(rule)
+ local r = {}
+ setmetatable(r, Rule)
+
+ local win = MIN_WINDOW
+ if rule.window and rule.window + 0 > 0 then
+ win = rule.window + 0
+ end
+ r.window = win
+ local periods = MIN_PERIOD
+ if rule.periods and rule.periods + 0 > 0 then
+ periods = rule.periods + 0
+ end
+ r.periods = periods
+ r.relational_operator = rule.relational_operator
+ r.metric = rule.metric
+ r.fields = rule.fields or {}
+
+ -- build field matching
+ r.field_matchers = {}
+ for f, expression in pairs(r.fields) do
+ r.field_matchers[f] = matching.new(expression)
+ end
+
+ r.fct = rule['function']
+ r.threshold = rule.threshold + 0
+ r.value_index = rule.value or nil -- Can be nil
+
+ -- build unique rule id
+ local arr = {r.metric, r.fct, r.window, r.periods}
+ for f, v in table_utils.orderedPairs(r.fields or {}) do
+ arr[#arr+1] = string.format('(%s=%s)', f, v)
+ end
+ r.rule_id = table.concat(arr, '/')
+
+ r.group_by = rule.group_by or {}
+
+ if r.fct == 'roc' then
+ -- We use the name of the metric as the payload_name.
+ --
+ -- The ROC algorithm needs the following parameters:
+ -- - the number of intervals in the analysis window
+ -- - the number of intervals in the historical analysis window
+ -- - the threshold
+ --
+ -- r.window is an interval in seconds. So to get the number of
+ -- intervals we divide r.window by the number of seconds per row.
+ --
+ -- r.periods represents the number of windows that we want to use for
+ -- the historical analysis. As we tell the ROC algorithm to use all
+ -- remaining buffer for the historical window we need to allocate
+ -- r.periods * (r.window / SECONDS_PER_ROW) for the historical
+ -- analysis and 2 additional periods for the previous and current
+ -- analysis windows.
+ --
+ local cfg_str = string.format('roc("%s",1,%s,0,%s,false,false)',
+ r.metric,
+ math.ceil(r.window/SECONDS_PER_ROW),
+ r.threshold)
+ r.roc_cfg = anomaly.parse_config(cfg_str)
+ r.cbuf_size = math.ceil(r.window / SECONDS_PER_ROW) * (r.periods + 2)
+ else
+ r.roc_cfg = nil
+ r.cbuf_size = math.ceil(r.window * r.periods / SECONDS_PER_ROW)
+ end
+ r.ids_datastore = {}
+ r.datastore = {}
+ r.observation_window = math.ceil(r.window * r.periods)
+
+ return r
+end
+
+function Rule:get_datastore_id(fields)
+ if #self.group_by == 0 or fields == nil then
+ return self.rule_id
+ end
+
+ local arr = {}
+ arr[#arr + 1] = self.rule_id
+ for _, g in ipairs(self.group_by) do
+ arr[#arr + 1] = fields[g]
+ end
+ return table.concat(arr, '/')
+end
+
+function Rule:fields_accepted(fields)
+ if not fields then
+ fields = {}
+ end
+ local matched_fields = 0
+ local no_match_on_fields = true
+ for f, expression in pairs(self.field_matchers) do
+ no_match_on_fields = false
+ for k, v in pairs(fields) do
+ if k == f then
+ if expression:matches(v) then
+ matched_fields = matched_fields + 1
+ else
+ return false
+ end
+ end
+ end
+ end
+ return no_match_on_fields or matched_fields > 0
+end
+
+function Rule:get_circular_buffer()
+ local cbuf
+ if self.fct == 'avg' then
+ cbuf = circular_buffer.new(self.cbuf_size, 2, SECONDS_PER_ROW)
+ cbuf:set_header(1, self.metric, 'sum', 'sum')
+ cbuf:set_header(2, self.metric, 'count', 'sum')
+ elseif self.fct == 'min' or self.fct == 'max' then
+ cbuf = circular_buffer.new(self.cbuf_size, 1, SECONDS_PER_ROW)
+ cbuf:set_header(1, self.metric, self.fct)
+ else
+ cbuf = circular_buffer.new(self.cbuf_size, 1, SECONDS_PER_ROW)
+ cbuf:set_header(1, self.metric)
+ end
+ return cbuf
+end
+
+-- store datapoints in cbuf, create the cbuf if not exists.
+-- value can be a table where the index to choose is referenced by self.value_index
+function Rule:add_value(ts, value, fields)
+ if not self:fields_accepted(fields) then
+ return
+ end
+ if type(value) == 'table' then
+ value = value[self.value_index]
+ end
+ if value == nil then
+ return
+ end
+
+ local data
+ local uniq_field_id = self:get_datastore_id(fields)
+ if not self.datastore[uniq_field_id] then
+ self.datastore[uniq_field_id] = {
+ fields = self.fields,
+ cbuf = self:get_circular_buffer()
+ }
+ if #self.group_by > 0 then
+ self.datastore[uniq_field_id].fields = fields
+ end
+
+ self:add_datastore(uniq_field_id)
+ end
+ data = self.datastore[uniq_field_id]
+
+ if self.fct == 'avg' then
+ data.cbuf:add(ts, 1, value)
+ data.cbuf:add(ts, 2, 1)
+ else
+ data.cbuf:set(ts, 1, value)
+ end
+end
+
+function Rule:add_datastore(id)
+ if not table_utils.item_find(id, self.ids_datastore) then
+ self.ids_datastore[#self.ids_datastore+1] = id
+ end
+end
+
+function Rule:compare_threshold(value)
+ return gse_utils.compare_threshold(value, self.relational_operator, self.threshold)
+end
+
+local function isnumber(value)
+ return value ~= nil and not (value ~= value)
+end
+
+local available_functions = {last=true, avg=true, max=true, min=true, sum=true,
+ variance=true, sd=true, diff=true, roc=true}
+
+-- evaluate the rule against datapoints
+-- return a list: match (bool or string), context ({value=v, fields=list of field table})
+--
+-- examples:
+-- true, { {value=100, fields={{queue='nova'}, {queue='neutron'}}, ..}
+-- false, { {value=10, fields={}}, ..}
+-- with 2 special cases:
+-- - never receive one datapoint
+-- 'nodata', {}
+-- - no more datapoint received for a metric
+-- 'missing', {value=-1, fields={}}
+-- There is a drawback with the 'missing' state and could leads to emit false positive
+-- state. For example when the monitored thing has been renamed/deleted,
+-- it's normal to don't receive datapoint anymore .. for example a filesystem.
+function Rule:evaluate(ns)
+ local fields = {}
+ local one_match, one_no_match, one_missing_data = false, false, false
+ for _, id in ipairs(self.ids_datastore) do
+ local data = self.datastore[id]
+ if data then
+ local cbuf_time = data.cbuf:current_time()
+ -- if we didn't receive datapoint within the observation window this means
+ -- we don't receive anymore data and cannot compute the rule.
+ if ns - cbuf_time > self.observation_window * 1e9 then
+ one_missing_data = true
+ fields[#fields+1] = {value = -1, fields = data.fields}
+ else
+ assert(available_functions[self.fct])
+ local result
+
+ if self.fct == 'roc' then
+ local anomaly_detected, _ = anomaly.detect(ns, self.metric, data.cbuf, self.roc_cfg)
+ if anomaly_detected then
+ one_match = true
+ fields[#fields+1] = {value=-1, fields=data.fields}
+ else
+ one_no_match = true
+ end
+ elseif self.fct == 'avg' then
+ local total
+ total = data.cbuf:compute('sum', 1)
+ local count = data.cbuf:compute('sum', 2)
+ result = total/count
+ elseif self.fct == 'last' then
+ local last
+ local t = ns
+ while (not isnumber(last)) and t >= ns - self.observation_window * 1e9 do
+ last = data.cbuf:get(t, 1)
+ t = t - SECONDS_PER_ROW * 1e9
+ end
+ if isnumber(last) then
+ result = last
+ else
+ one_missing_data = true
+ fields[#fields+1] = {value = -1, fields = data.fields}
+ end
+ elseif self.fct == 'diff' then
+ local first, last
+
+ local t = ns
+ while (not isnumber(last)) and t >= ns - self.observation_window * 1e9 do
+ last = data.cbuf:get(t, 1)
+ t = t - SECONDS_PER_ROW * 1e9
+ end
+
+ if isnumber(last) then
+ t = ns - self.observation_window * 1e9
+ while (not isnumber(first)) and t <= ns do
+ first = data.cbuf:get(t, 1)
+ t = t + SECONDS_PER_ROW * 1e9
+ end
+ end
+
+ if not isnumber(last) or not isnumber(first) then
+ one_missing_data = true
+ fields[#fields+1] = {value = -1, fields = data.fields}
+ else
+ result = last - first
+ end
+ else
+ result = data.cbuf:compute(self.fct, 1)
+ end
+
+ if result then
+ local m = self:compare_threshold(result)
+ if m then
+ one_match = true
+ fields[#fields+1] = {value=result, fields=data.fields}
+ else
+ one_no_match = true
+ end
+ end
+ end
+ end
+ end
+ if one_match then
+ return afd.MATCH, fields
+ elseif one_missing_data then
+ return afd.MISSING_DATA, fields
+ elseif one_no_match then
+ return afd.NO_MATCH, {}
+ else
+ return afd.NO_DATA, {{value=-1, fields=self.fields}}
+ end
+end
+
+return Rule
diff --git a/heka/files/lua/common/gse.lua b/heka/files/lua/common/gse.lua
new file mode 100644
index 0000000..16bb950
--- /dev/null
+++ b/heka/files/lua/common/gse.lua
@@ -0,0 +1,164 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local consts = require 'gse_constants'
+local string = require 'string'
+local table = require 'table'
+local GseCluster = require 'gse_cluster'
+local lma = require 'lma_utils'
+local table_utils = require 'table_utils'
+
+local pairs = pairs
+local ipairs = ipairs
+local assert = assert
+local type = type
+local read_message = read_message
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+-- Hash of GseCluster instances organized by name
+local clusters = {}
+-- Reverse index table to map cluster's members to clusters
+local reverse_cluster_index = {}
+-- Array of cluster names ordered by dependency
+local ordered_clusters = {}
+
+function add_cluster(cluster_id, members, hints, group_by, policy_rules)
+ assert(type(members) == 'table')
+ assert(type(hints) == 'table')
+ assert(type(policy_rules) == 'table')
+
+ local cluster = GseCluster.new(members, hints, group_by, policy_rules)
+ clusters[cluster_id] = cluster
+
+ -- update the reverse index
+ for _, member in ipairs(members) do
+ if not reverse_cluster_index[member] then
+ reverse_cluster_index[member] = {}
+ end
+ local reverse_table = reverse_cluster_index[member]
+ if not table_utils.item_find(cluster_id, reverse_table) then
+ reverse_table[#reverse_table+1] = cluster_id
+ end
+ end
+
+ if not table_utils.item_find(cluster_id, ordered_clusters) then
+ local after_index = 1
+ for current_pos, id in ipairs(ordered_clusters) do
+ if table_utils.item_find(id, cluster.hints) then
+ after_index = current_pos + 1
+ end
+ end
+
+ local index = after_index
+ for _, item in pairs(clusters) do
+ for _, hint in pairs(item.hints) do
+ if hint == cluster_id then
+ local pos = table_utils.item_pos(hint, cluster_orderings)
+ if pos and pos <= index then
+ index = pos
+ elseif index > after_index then
+ error('circular dependency between clusters!')
+ end
+ end
+ end
+ end
+ table.insert(ordered_clusters, index, cluster_id)
+ end
+end
+
+function get_ordered_clusters()
+ return ordered_clusters
+end
+
+function cluster_exists(cluster_id)
+ return clusters[cluster_id] ~= nil
+end
+
+-- return the list of clusters which depends on a given member
+function find_cluster_memberships(member_id)
+ return reverse_cluster_index[member_id] or {}
+end
+
+-- store the status of a cluster's member and its current alarms
+function set_member_status(cluster_id, member, value, alarms, hostname)
+ local cluster = clusters[cluster_id]
+ if cluster then
+ cluster:update_fact(member, hostname, value, alarms)
+ end
+end
+
+-- The cluster status depends on the status of its members.
+-- The status of the related clusters (defined by cluster.hints) doesn't modify
+-- the overall status but their alarms are returned.
+function resolve_status(cluster_id)
+ local cluster = clusters[cluster_id]
+ assert(cluster)
+
+ cluster:refresh_status()
+ local alarms = table_utils.deepcopy(cluster.alarms)
+
+ if cluster.status ~= consts.OKAY then
+ -- add hints if the cluster isn't healthy
+ for _, other_id in ipairs(cluster.hints or {}) do
+ for _, v in pairs(cluster:subtract_alarms(clusters[other_id])) do
+ alarms[#alarms+1] = table_utils.deepcopy(v)
+ alarms[#alarms].tags['dependency_name'] = other_id
+ alarms[#alarms].tags['dependency_level'] = 'hint'
+ end
+ end
+ end
+
+ return cluster.status, alarms
+end
+
+-- compute the cluster metric and inject it into the Heka pipeline
+-- the metric's value is computed using the status of its members
+function inject_cluster_metric(msg_type, cluster_name, metric_name, interval, source, to_alerting)
+ local payload
+ local status, alarms = resolve_status(cluster_name)
+
+ if #alarms > 0 then
+ payload = lma.safe_json_encode({alarms=alarms})
+ if not payload then
+ return
+ end
+ else
+ -- because cjson encodes empty tables as objects instead of arrays
+ payload = '{"alarms":[]}'
+ end
+
+ local no_alerting
+ if to_alerting ~= nil and to_alerting == false then
+ no_alerting = true
+ end
+
+ local msg = {
+ Type = msg_type,
+ Payload = payload,
+ Fields = {
+ name=metric_name,
+ value=status,
+ cluster_name=cluster_name,
+ tag_fields={'cluster_name'},
+ interval=interval,
+ source=source,
+ no_alerting=no_alerting,
+ }
+ }
+ lma.inject_tags(msg)
+ lma.safe_inject_message(msg)
+end
+
+return M
diff --git a/heka/files/lua/common/gse_cluster.lua b/heka/files/lua/common/gse_cluster.lua
new file mode 100644
index 0000000..cce94f4
--- /dev/null
+++ b/heka/files/lua/common/gse_cluster.lua
@@ -0,0 +1,154 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local consts = require 'gse_constants'
+local gse_utils = require 'gse_utils'
+local table_utils = require 'table_utils'
+
+local ipairs = ipairs
+local pairs = pairs
+local setmetatable = setmetatable
+local assert = assert
+local type = type
+
+local GseCluster = {}
+GseCluster.__index = GseCluster
+
+setfenv(1, GseCluster) -- Remove external access to contain everything in the module
+
+local VALID_STATUSES = {
+ [consts.OKAY]=true,
+ [consts.WARN]=true,
+ [consts.CRIT]=true,
+ [consts.DOWN]=true,
+ [consts.UNKW]=true
+}
+
+function GseCluster.new(members, hints, group_by, policy_rules)
+ assert(type(members) == 'table')
+ assert(type(hints) == 'table')
+ assert(type(policy_rules) == 'table')
+
+ local cluster = {}
+ setmetatable(cluster, GseCluster)
+
+ cluster.members = members
+ cluster.hints = hints
+ cluster.policy_rules = policy_rules
+ -- when group_by is 'hostname', facts are stored by hostname then member
+ -- when group_by is 'member', facts are stored by member only
+ -- otherwise facts are stored by member then hostname
+ if group_by == 'hostname' or group_by == 'member' then
+ cluster.group_by = group_by
+ else
+ cluster.group_by = 'none'
+ end
+ cluster.status = consts.UNKW
+ cluster.facts = {}
+ cluster.alarms = {}
+ cluster.member_index = {}
+ for _, v in ipairs(members) do
+ cluster.member_index[v] = true
+ end
+
+ return cluster
+end
+
+function GseCluster:has_member(member)
+ return self.member_index[member]
+end
+
+-- Update the facts table for a cluster's member
+function GseCluster:update_fact(member, hostname, value, alarms)
+ assert(VALID_STATUSES[value])
+ assert(type(alarms) == 'table')
+
+ local key1, key2 = member, hostname
+ if self.group_by == 'hostname' then
+ key1 = hostname
+ key2 = member
+ elseif self.group_by == 'member' then
+ key2 = '__anyhost__'
+ end
+
+ if not self.facts[key1] then
+ self.facts[key1] = {}
+ end
+ self.facts[key1][key2] = {
+ status=value,
+ alarms=table_utils.deepcopy(alarms),
+ member=member
+ }
+ if self.group_by == 'hostname' then
+ -- store the hostname for later reference in the alarms
+ self.facts[key1][key2].hostname = hostname
+ end
+end
+
+-- Compute the status and alarms of the cluster according to the current facts
+-- and the cluster's policy
+function GseCluster:refresh_status()
+ local alarms = {}
+ local status_breakdown = {}
+
+ self.status = consts.UNKW
+ self.alarms = {}
+
+ for group_key, _ in table_utils.orderedPairs(self.facts) do
+ local group_status = consts.UNKW
+ for sub_key, fact in table_utils.orderedPairs(self.facts[group_key]) do
+ group_status = gse_utils.max_status(group_status, fact.status)
+ if fact.status ~= consts.OKAY then
+ for _, v in ipairs(fact.alarms) do
+ alarms[#alarms+1] = table_utils.deepcopy(v)
+ if not alarms[#alarms]['tags'] then
+ alarms[#alarms]['tags'] = {}
+ end
+ alarms[#alarms].tags['dependency_name'] = fact.member
+ alarms[#alarms].tags['dependency_level'] = 'direct'
+ if fact.hostname then
+ alarms[#alarms].hostname = fact.hostname
+ end
+ end
+ end
+ end
+ status_breakdown[group_status] = (status_breakdown[group_status] or 0) + 1
+ end
+ for _, policy_rule in ipairs(self.policy_rules) do
+ if policy_rule:evaluate(status_breakdown) then
+ self.status = policy_rule.status
+ break
+ end
+ end
+ if self.status ~= consts.OKAY then
+ self.alarms = alarms
+ end
+
+ return self.status
+end
+
+-- Return the alarms from another cluster which aren't already known by this
+-- cluster
+function GseCluster:subtract_alarms(cluster)
+ local subset = {}
+ if cluster then
+ for _, alarm in ipairs(cluster.alarms) do
+ if alarm.tags and alarm.tags['dependency_name'] and not self:has_member(alarm.tags['dependency_name']) then
+ subset[#subset+1] = alarm
+ end
+ end
+ end
+ return subset
+end
+
+return GseCluster
diff --git a/heka/files/lua/common/gse_constants.lua b/heka/files/lua/common/gse_constants.lua
new file mode 100644
index 0000000..2a328e6
--- /dev/null
+++ b/heka/files/lua/common/gse_constants.lua
@@ -0,0 +1,39 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+-- The status values were chosen to match with the Grafana constraints:
+-- OKAY => green
+-- WARN & UNKW => orange
+-- CRIT & DOWN => red
+OKAY=0
+WARN=1
+UNKW=2
+CRIT=3
+DOWN=4
+
+local STATUS_LABELS = {
+ [OKAY]='OKAY',
+ [WARN]='WARN',
+ [UNKW]='UNKNOWN',
+ [CRIT]='CRITICAL',
+ [DOWN]='DOWN'
+}
+
+function status_label(v)
+ return STATUS_LABELS[v]
+end
+
+return M
diff --git a/heka/files/lua/common/gse_policy.lua b/heka/files/lua/common/gse_policy.lua
new file mode 100644
index 0000000..6c7e52c
--- /dev/null
+++ b/heka/files/lua/common/gse_policy.lua
@@ -0,0 +1,109 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local consts = require 'gse_constants'
+local gse_utils = require 'gse_utils'
+
+local assert = assert
+local ipairs = ipairs
+local pairs = pairs
+local setmetatable = setmetatable
+local string = string
+local tonumber = tonumber
+
+local GsePolicy = {}
+GsePolicy.__index = GsePolicy
+
+setfenv(1, GsePolicy) -- Remove external access to contain everything in the module
+
+local SEVERITIES = {
+ okay=consts.OKAY,
+ warning=consts.WARN,
+ unknown=consts.UNKW,
+ critical=consts.CRIT,
+ down=consts.DOWN
+}
+
+function GsePolicy.new(policy)
+ local p = {}
+ setmetatable(p, GsePolicy)
+
+ p.status = SEVERITIES[string.lower(policy.status)]
+ assert(p.status)
+
+ p.require_percent = false
+ p.rules = {}
+ if policy.trigger then
+ p.logical_op = string.lower(policy.trigger.logical_operator or 'or')
+ for _, r in ipairs(policy.trigger.rules or {}) do
+ assert(r['function'] == 'count' or r['function'] == 'percent')
+ if r['function'] == 'percent' then
+ p.require_percent = true
+ end
+ local rule = {
+ ['function']=r['function'],
+ relational_op=r.relational_operator,
+ threshold=tonumber(r.threshold),
+ arguments={}
+ }
+ for _, v in ipairs(r.arguments) do
+ assert(SEVERITIES[v])
+ rule.arguments[#rule.arguments+1] = SEVERITIES[v]
+ end
+ p.rules[#p.rules+1] = rule
+ end
+ end
+
+ return p
+end
+
+-- return true or false depending on whether the facts match the policy
+function GsePolicy:evaluate(facts)
+ local total = 0
+
+ if #self.rules == 0 then
+ return true
+ end
+
+ if self.require_percent then
+ for _, v in pairs(facts) do
+ total = total + v
+ end
+ end
+
+ local one_match = false
+ for _, r in ipairs(self.rules) do
+ local value = 0
+ for _, status in ipairs(r.arguments) do
+ if facts[status] then
+ value = value + facts[status]
+ end
+ end
+ if r['function'] == 'percent' then
+ value = value * 100 / total
+ end
+
+ if gse_utils.compare_threshold(value, r.relational_op, r.threshold) then
+ one_match = true
+ if self.logical_op == 'or' then
+ return true
+ end
+ elseif self.logical_op == 'and' then
+ return false
+ end
+ end
+
+ return one_match
+end
+
+return GsePolicy
diff --git a/heka/files/lua/common/gse_utils.lua b/heka/files/lua/common/gse_utils.lua
new file mode 100644
index 0000000..dd82aa2
--- /dev/null
+++ b/heka/files/lua/common/gse_utils.lua
@@ -0,0 +1,58 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local consts = require 'gse_constants'
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+local STATUS_WEIGHTS = {
+ [consts.UNKW]=0,
+ [consts.OKAY]=1,
+ [consts.WARN]=2,
+ [consts.CRIT]=3,
+ [consts.DOWN]=4
+}
+
+function max_status(val1, val2)
+ if not val1 then
+ return val2
+ elseif not val2 then
+ return val1
+ elseif STATUS_WEIGHTS[val1] > STATUS_WEIGHTS[val2] then
+ return val1
+ else
+ return val2
+ end
+end
+
+function compare_threshold(value, op, threshold)
+ local rule_matches = false
+ if op == '==' or op == 'eq' then
+ rule_matches = value == threshold
+ elseif op == '!=' or op == 'ne' then
+ rule_matches = value ~= threshold
+ elseif op == '>=' or op == 'gte' then
+ rule_matches = value >= threshold
+ elseif op == '>' or op == 'gt' then
+ rule_matches = value > threshold
+ elseif op == '<=' or op == 'lte' then
+ rule_matches = value <= threshold
+ elseif op == '<' or op == 'lt' then
+ rule_matches = value < threshold
+ end
+ return rule_matches
+end
+
+return M
diff --git a/heka/files/lua/common/influxdb.lua b/heka/files/lua/common/influxdb.lua
new file mode 100644
index 0000000..d9c359d
--- /dev/null
+++ b/heka/files/lua/common/influxdb.lua
@@ -0,0 +1,114 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local string = string
+local table = table
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local tostring = tostring
+local type = type
+
+local utils = require 'lma_utils'
+
+local InfluxEncoder = {}
+InfluxEncoder.__index = InfluxEncoder
+
+setfenv(1, InfluxEncoder) -- Remove external access to contain everything in the module
+
+local function escape_string(str)
+ return tostring(str):gsub("([ ,])", "\\%1")
+end
+
+local function encode_scalar_value(value)
+ if type(value) == "number" then
+ -- Always send numbers as formatted floats, so InfluxDB will accept
+ -- them if they happen to change from ints to floats between
+ -- points in time. Forcing them to always be floats avoids this.
+ return string.format("%.6f", value)
+ elseif type(value) == "string" then
+ -- string values need to be double quoted
+ return '"' .. value:gsub('"', '\\"') .. '"'
+ elseif type(value) == "boolean" then
+ return '"' .. tostring(value) .. '"'
+ end
+end
+
+local function encode_value(value)
+ if type(value) == "table" then
+ local values = {}
+ for k,v in pairs(value) do
+ table.insert(
+ values,
+ string.format("%s=%s", escape_string(k), encode_scalar_value(v))
+ )
+ end
+ return table.concat(values, ',')
+ else
+ return "value=" .. encode_scalar_value(value)
+ end
+end
+
+-- Create a new InfluxDB encoder
+--
+-- time_precision: "s", "m", "ms", "us" or "ns" (default: "ns")
+function InfluxEncoder.new(time_precision)
+ local e = {}
+ setmetatable(e, InfluxEncoder)
+ e.time_precision = time_precision or 'ns'
+ return e
+end
+
+-- Encode a single datapoint using the InfluxDB line protocol
+--
+-- timestamp: the timestamp in nanosecond
+-- name: the measurement's name
+-- value: a scalar value or a list of key-value pairs
+-- tags: a list of key-value pairs encoded as InfluxDB tags
+function InfluxEncoder:encode_datapoint(timestamp, name, value, tags)
+ if timestamp == nil or type(name) ~= 'string' or value == nil or type(tags or {}) ~= 'table' then
+ -- fail silently if any input parameter is invalid
+ return ""
+ end
+
+ local ts = timestamp
+ if self.time_precision ~= 'ns' then
+ ts = utils.message_timestamp(self.time_precision, ts)
+ end
+
+ local tags_array = {}
+ for k,v in pairs(tags or {}) do
+ if k ~= '' and v ~= '' then
+ -- empty tag name and value aren't allowed by InfluxDB
+ table.insert(tags_array, escape_string(k) .. '=' .. escape_string(v))
+ end
+ end
+
+ if #tags_array > 0 then
+ -- for performance reasons, it is recommended to always send the tags
+ -- in the same order.
+ table.sort(tags_array)
+ return string.format("%s,%s %s %d",
+ escape_string(name),
+ table.concat(tags_array, ','),
+ encode_value(value),
+ ts)
+ else
+ return string.format("%s %s %d",
+ escape_string(name),
+ encode_value(value),
+ ts)
+ end
+end
+
+return InfluxEncoder
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
new file mode 100644
index 0000000..7b063d1
--- /dev/null
+++ b/heka/files/lua/common/lma_utils.lua
@@ -0,0 +1,307 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local cjson = require 'cjson'
+local string = require 'string'
+local extra = require 'extra_fields'
+local patt = require 'patterns'
+local math = require 'math'
+
+local pairs = pairs
+local inject_message = inject_message
+local inject_payload = inject_payload
+local read_message = read_message
+local pcall = pcall
+local type = type
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+severity_to_label_map = {
+ [0] = 'EMERGENCY',
+ [1] = 'ALERT',
+ [2] = 'CRITICAL',
+ [3] = 'ERROR',
+ [4] = 'WARNING',
+ [5] = 'NOTICE',
+ [6] = 'INFO',
+ [7] = 'DEBUG',
+}
+
+label_to_severity_map = {
+ EMERGENCY = 0,
+ ALERT = 1,
+ CRITICAL = 2,
+ ERROR = 3,
+ WARNING = 4,
+ NOTICE = 5,
+ INFO= 6,
+ DEBUG = 7,
+}
+
+metric_type = {
+ COUNTER = "counter",
+ GAUGE = "gauge",
+ DERIVE = "derive",
+}
+
+local default_severity = 7
+
+local bulk_datapoints = {}
+
+-- Add a datapoint to the bulk metric message
+-- The 'value' parameter can be a table to support multi-value metric
+function add_to_bulk_metric(name, value, tags)
+ bulk_datapoints[#bulk_datapoints+1] = {
+ name = name,
+ tags = tags or {},
+ }
+ if type(value) == 'table' then
+ bulk_datapoints[#bulk_datapoints].values = value
+ else
+ bulk_datapoints[#bulk_datapoints].value = value
+ end
+end
+
+-- Send the bulk metric message to the Heka pipeline
+function inject_bulk_metric(ts, hostname, source)
+ if #bulk_datapoints == 0 then
+ return
+ end
+
+ local payload = safe_json_encode(bulk_datapoints)
+ if not payload then
+ -- Reset the table otherwise it may grow infinitely and the sandbox
+ -- will eventually be killed by Heka.
+ -- See https://bugs.launchpad.net/lma-toolchain/+bug/1545743
+ bulk_datapoints = {}
+ return
+ end
+
+ local msg = {
+ Hostname = hostname,
+ Timestamp = ts,
+ Payload = payload,
+ Type = 'bulk_metric', -- prepended with 'heka.sandbox'
+ Severity = label_to_severity_map.INFO,
+ Fields = {
+ hostname = hostname,
+ source = source
+ }
+ }
+ -- reset the local table storing the datapoints
+ bulk_datapoints = {}
+
+ inject_tags(msg)
+ safe_inject_message(msg)
+end
+
+-- Encode a Lua variable as JSON without raising an exception if the encoding
+-- fails for some reason (for instance, the encoded buffer exceeds the sandbox
+-- limit)
+function safe_json_encode(v)
+ local ok, data = pcall(cjson.encode, v)
+
+ if not ok then
+ return
+ end
+
+ return data
+end
+
+-- Call inject_payload() wrapped by pcall()
+function safe_inject_payload(payload_type, payload_name, data)
+ local ok, err_msg = pcall(inject_payload, payload_type, payload_name, data)
+ if not ok then
+ return -1, err_msg
+ else
+ return 0
+ end
+end
+
+-- Call inject_message() wrapped by pcall()
+function safe_inject_message(msg)
+ local ok, err_msg = pcall(inject_message, msg)
+ if not ok then
+ return -1, err_msg
+ else
+ return 0
+ end
+end
+
+-- Parse a Syslog-based payload and update the Heka message
+-- Return true if successful, false otherwise
+function parse_syslog_message(grammar, payload, msg)
+ -- capture everything after the first backslash because syslog_grammar will
+ -- drop it
+ local extra_msg = string.match(payload, '^[^\n]+\n(.+)\n$')
+
+ local fields = grammar:match(payload)
+ if not fields then
+ return false
+ end
+
+ msg.Timestamp = fields.timestamp
+ fields.timestamp = nil
+
+ msg.Hostname = fields.hostname
+ fields.hostname = nil
+
+ msg.Pid = fields.syslogtag.pid or 0
+ fields.programname = fields.syslogtag.programname
+ fields.syslogtag = nil
+
+ if fields.pri then
+ msg.Severity = fields.pri.severity
+ fields.syslogfacility = fields.pri.facility
+ fields.pri = nil
+ else
+ msg.Severity = fields.syslogseverity or fields["syslogseverity-text"]
+ or fields.syslogpriority or fields["syslogpriority-text"]
+ or default_severity
+ fields.syslogseverity = nil
+ fields["syslogseverity-text"] = nil
+ fields.syslogpriority = nil
+ fields["syslogpriority-text"] = nil
+ end
+ fields.severity_label = severity_to_label_map[msg.Severity]
+
+ if extra_msg ~= nil then
+ msg.Payload = fields.msg .. "\n" .. extra_msg
+ else
+ msg.Payload = fields.msg
+ end
+ fields.msg = nil
+
+ msg.Fields = fields
+
+ inject_tags(msg)
+
+ return true
+end
+
+-- Inject tags into the Heka message
+function inject_tags(msg)
+ for k,v in pairs(extra.tags) do
+ if msg.Fields[k] == nil then
+ msg.Fields[k] = v
+ end
+ end
+end
+
+-- Convert a datetime string to the RFC3339 format
+-- it supports a variety of datetime formats.
+-- Return the string unmodified if the datetime couldn't be parsed
+function format_datetime (raw_datetime)
+ local datetime
+ local t = patt.TimestampTable:match(raw_datetime)
+ if t then
+ local frac = 0
+ local offset_sign = '+'
+ local offset_hour = 0
+ local offset_min = 0
+ if t.sec_frac then frac = t.sec_frac end
+ if t.offset_sign then offset_sign = t.offset_sign end
+ if t.offset_hour then offset_hour = t.offset_hour end
+ if t.offset_min then offset_min = t.offset_min end
+ datetime = string.format("%04d-%02d-%02dT%02d:%02d:%02d.%06d%s%02d:%02d",
+ t.year, t.month, t.day, t.hour, t.min, t.sec, frac*1e6, offset_sign,
+ offset_hour, offset_min)
+ end
+ return datetime
+end
+
+function chomp(s)
+ return string.gsub(s, "\n$", "")
+end
+
+function truncate(str, max_length, delimiter)
+ if string.len(str) <= max_length then
+ return str
+ end
+
+ local pos = 1
+ while true do
+ local next_pos1, next_pos2 = string.find(str, delimiter, pos)
+ if not next_pos1 or next_pos1 - 1 > max_length then
+ pos = pos - string.len(delimiter) - 1
+ if pos < 1 then
+ pos = max_length
+ end
+ break
+ end
+ pos = next_pos2 + 1
+ end
+
+ return string.sub(str, 1, pos)
+end
+
+-- Convert a nanosecond timestamp to a lower precision timestamp.
+-- Arguments:
+-- timestamp_precision: one of 'us', 'ms', 's', 'm' or 'h'.
+-- timestamp: a timestamp in nanosecond, if not provided the message Timestamp is used.
+function message_timestamp(timestamp_precision, timestamp)
+ -- Default is to divide ns to ms
+ local timestamp_divisor = 1e6
+ -- Divide ns to s
+ if timestamp_precision == "s" then
+ timestamp_divisor = 1e9
+ -- Divide ns to us
+ elseif timestamp_precision == "us" then
+ timestamp_divisor = 1e3
+ -- Divide ns to m
+ elseif timestamp_precision == "m" then
+ timestamp_divisor = 1e9 * 60
+ -- Divide ns to h
+ elseif timestamp_precision == "h" then
+ timestamp_divisor = 1e9 * 60 * 60
+ end
+ if timestamp == nil then
+ timestamp = read_message("Timestamp")
+ end
+ return math.floor(timestamp / timestamp_divisor)
+end
+
+-- Extract the metric value(s) from the message.
+-- The value can be either a scalar value or a table for mulitvalue metrics.
+-- Returns true plus the value or if it fails, returns false plus the error message.
+function get_values_from_metric()
+ if read_message('Fields[value_fields]') then
+ value = {}
+ local i = 0
+ local val
+ while true do
+ local f = read_message("Fields[value_fields]", 0, i)
+ if not f then
+ break
+ end
+ val = read_message(string.format('Fields[%s]', f))
+ if val ~= nil then
+ value[f] = val
+ i = i + 1
+ end
+ end
+ if i == 0 then
+ return false, 'Fields[value_fields] does not list any valid field'
+ end
+ else
+ value = read_message("Fields[value]")
+ if not value then
+ return false, 'Fields[value] is missing'
+ end
+ end
+
+ return true, value
+end
+
+return M
diff --git a/heka/files/lua/common/patterns.lua b/heka/files/lua/common/patterns.lua
new file mode 100644
index 0000000..e9acc18
--- /dev/null
+++ b/heka/files/lua/common/patterns.lua
@@ -0,0 +1,147 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local table = require 'table'
+local dt = require "date_time"
+local l = require 'lpeg'
+l.locale(l)
+
+local tonumber = tonumber
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+function format_uuid(t)
+ return table.concat(t, '-')
+end
+
+function anywhere (patt)
+ return l.P {
+ patt + 1 * l.V(1)
+ }
+end
+
+sp = l.space
+colon = l.P":"
+dash = l.P"-"
+dot = l.P'.'
+quote = l.P'"'
+
+local x4digit = l.xdigit * l.xdigit * l.xdigit * l.xdigit
+local uuid_dash = l.C(x4digit * x4digit * dash * x4digit * dash * x4digit * dash * x4digit * dash * x4digit * x4digit * x4digit)
+local uuid_nodash = l.Ct(l.C(x4digit * x4digit) * l.C(x4digit) * l.C(x4digit) * l.C(x4digit) * l.C(x4digit * x4digit * x4digit)) / format_uuid
+
+-- Return a UUID string in canonical format (eg with dashes)
+Uuid = uuid_nodash + uuid_dash
+
+-- Parse a datetime string and return a table with the following keys
+-- year (string)
+-- month (string)
+-- day (string)
+-- hour (string)
+-- min (string)
+-- sec (string)
+-- sec_frac (number less than 1, can be nil)
+-- offset_sign ('-' or '+', can be nil)
+-- offset_hour (number, can be nil)
+-- offset_min (number, can be nil)
+--
+-- The datetime string can be formatted as
+-- 'YYYY-MM-DD( |T)HH:MM:SS(.ssssss)?(offset indicator)?'
+TimestampTable = l.Ct(dt.rfc3339_full_date * (sp + l.P"T") * dt.rfc3339_partial_time * (dt.rfc3339_time_offset + dt.timezone_offset)^-1)
+
+-- Returns the parsed datetime converted to nanosec
+Timestamp = TimestampTable / dt.time_to_ns
+
+programname = (l.R("az", "AZ", "09") + l.P"." + dash + l.P"_")^1
+Pid = l.digit^1
+SeverityLabel = l.P"CRITICAL" + l.P"ERROR" + l.P"WARNING" + l.P"INFO" + l.P"AUDIT" + l.P"DEBUG" + l.P"TRACE"
+Message = l.P(1)^0
+
+-- Capture for OpenStack logs producing four values: Timestamp, Pid,
+-- SeverityLabel, PythonModule and Message.
+--
+-- OpenStack log messages are of this form:
+-- 2015-11-30 08:38:59.306 3434 INFO oslo_service.periodic_task [-] Blabla...
+--
+-- [-] is the "request" part, it can take multiple forms. See below.
+openstack = l.Ct(l.Cg(Timestamp, "Timestamp")* sp * l.Cg(Pid, "Pid") * sp *
+ l.Cg(SeverityLabel, "SeverityLabel") * sp * l.Cg(programname, "PythonModule") *
+ sp * l.Cg(Message, "Message"))
+
+-- Capture for OpenStack request context producing three values: RequestId,
+-- UserId and TenantId.
+--
+-- Notes:
+--
+-- OpenStack logs include a request context, enclosed between square brackets.
+-- It takes one of these forms:
+--
+-- [-]
+-- [req-0fd2a9ba-448d-40f5-995e-33e32ac5a6ba - - - - -]
+-- [req-4db318af-54c9-466d-b365-fe17fe4adeed 8206d40abcc3452d8a9c1ea629b4a8d0 112245730b1f4858ab62e3673e1ee9e2 - - -]
+--
+-- In the 1st case the capture produces nil.
+-- In the 2nd case the capture produces one value: RequestId.
+-- In the 3rd case the capture produces three values: RequestId, UserId, TenantId.
+--
+-- The request id may be formatted as 'req-xxx' or 'xxx' depending on the project.
+-- The user id and tenant id may not be present depending on the OpenStack release.
+openstack_request_context = (l.P(1) - "[" )^0 * "[" * l.P"req-"^-1 *
+ l.Ct(l.Cg(Uuid, "RequestId") * sp * ((l.Cg(Uuid, "UserId") * sp *
+ l.Cg(Uuid, "TenantId")) + l.P(1)^0)) - "]"
+
+local http_method = l.Cg(l.R"AZ"^3, "http_method")
+local url = l.Cg( (1 - sp)^1, "http_url")
+local http_version = l.Cg(l.digit * dot * l.digit, "http_version")
+
+-- Pattern for the "<http_method> <http_url> HTTP/<http_version>" format found
+-- found in both OpenStack and Apache log files.
+-- Example : OPTIONS /example.com HTTP/1.0
+http_request = http_method * sp * url * sp * l.P'HTTP/' * http_version
+
+-- Patterns for HTTP status, HTTP response size and HTTP response time in
+-- OpenLayers logs.
+--
+-- Notes:
+-- Nova changes the default log format of eventlet.wsgi (see nova/wsgi.py) and
+-- prefixes the HTTP status, response size and response time values with
+-- respectively "status: ", "len: " and "time: ".
+-- Other OpenStack services just rely on the default log format.
+-- TODO(pasquier-s): build the LPEG grammar based on the log_format parameter
+-- passed to eventlet.wsgi.server similar to what the build_rsyslog_grammar
+-- function does for RSyslog.
+local openstack_http_status = l.P"status: "^-1 * l.Cg(l.digit^3, "http_status")
+local openstack_response_size = l.P"len: "^-1 * l.Cg(l.digit^1 / tonumber, "http_response_size")
+local openstack_response_time = l.P"time: "^-1 * l.Cg(l.digit^1 * dot^0 * l.digit^0 / tonumber, "http_response_time")
+
+-- Capture for OpenStack HTTP producing six values: http_method, http_url,
+-- http_version, http_status, http_response_size and http_response_time.
+openstack_http = anywhere(l.Ct(
+ quote * http_request * quote * sp^1 *
+ openstack_http_status * sp^1 * openstack_response_size * sp^1 *
+ openstack_response_time
+))
+
+-- Capture for IP addresses producing one value: ip_address.
+ip_address = anywhere(l.Ct(
+ l.Cg(l.digit^-3 * dot * l.digit^-3 * dot * l.digit^-3 * dot * l.digit^-3, "ip_address")
+))
+
+-- Pattern used to match the beginning of a Python Traceback.
+traceback = l.P'Traceback (most recent call last):'
+
+-- Pattern used to match a number
+Number = l.P"-"^-1 * l.xdigit^1 * (l.S(".,") * l.xdigit^1 )^-1 / tonumber
+
+return M
diff --git a/heka/files/lua/common/table_utils.lua b/heka/files/lua/common/table_utils.lua
new file mode 100644
index 0000000..ba5fb93
--- /dev/null
+++ b/heka/files/lua/common/table_utils.lua
@@ -0,0 +1,109 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local table = require 'table'
+local pairs = pairs
+local ipairs = ipairs
+local type = type
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+-- return a clone of the passed table
+function deepcopy(t)
+ if type(t) == 'table' then
+ local copy = {}
+ for k, v in pairs(t) do
+ copy[k] = deepcopy(v)
+ end
+ return copy
+ end
+ return t
+end
+
+-- return the position (index) of an item in a list, nil if not found
+function item_pos(item, list)
+ if type(list) == 'table' then
+ for i, v in ipairs(list) do
+ if v == item then
+ return i
+ end
+ end
+ end
+end
+
+-- return true if an item is present in the list, false otherwise
+function item_find(item, list)
+ return item_pos(item, list) ~= nil
+end
+
+-- from http://lua-users.org/wiki/SortedIteration
+function __genOrderedIndex( t )
+ local orderedIndex = {}
+ for key in pairs(t) do
+ table.insert( orderedIndex, key )
+ end
+ table.sort( orderedIndex )
+ return orderedIndex
+end
+
+function orderedNext(t, state)
+ -- Equivalent of the next function, but returns the keys in the alphabetic
+ -- order. We use a temporary ordered key table that is stored in the
+ -- table being iterated.
+
+ key = nil
+ if state == nil then
+ -- the first time, generate the index
+ t.__orderedIndex = __genOrderedIndex( t )
+ key = t.__orderedIndex[1]
+ else
+ -- fetch the next value
+ for i = 1,table.getn(t.__orderedIndex) do
+ if t.__orderedIndex[i] == state then
+ key = t.__orderedIndex[i+1]
+ end
+ end
+ end
+
+ if key then
+ return key, t[key]
+ end
+
+ -- no more value to return, cleanup
+ t.__orderedIndex = nil
+ return
+end
+
+function orderedPairs(t)
+ -- Equivalent of the pairs() function on tables. Allows to iterate
+ -- in order
+ return orderedNext, t, nil
+end
+
+-- Shallow comparison between two tables.
+-- Return true if the two tables have the same keys with identical
+-- values, otherwise false.
+function table_equal(t1, t2)
+ -- all key-value pairs in t1 must be in t2
+ for k, v in pairs(t1) do
+ if t2[k] ~= v then return false end
+ end
+ -- there must not be other keys in t2
+ for k, v in pairs(t2) do
+ if t1[k] == nil then return false end
+ end
+ return true
+end
+
+return M
diff --git a/heka/files/lua/common/value_matching.lua b/heka/files/lua/common/value_matching.lua
new file mode 100644
index 0000000..152425e
--- /dev/null
+++ b/heka/files/lua/common/value_matching.lua
@@ -0,0 +1,171 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local l = require "lpeg"
+l.locale(l)
+local pcall = pcall
+local string = require 'string'
+
+local patterns = require 'patterns'
+local error = error
+local setmetatable = setmetatable
+local tonumber = tonumber
+
+local C = l.C
+local P = l.P
+local S = l.S
+local V = l.V
+local Ct = l.Ct
+local Cc = l.Cc
+
+local Optional_space = patterns.sp^0
+local Only_spaces = patterns.sp^1 * -1
+
+local function space(pat)
+ return Optional_space * pat * Optional_space
+end
+
+local EQ = P'=='
+local NEQ = P'!='
+local GT = P'>'
+local LT = P'<'
+local GTE = P'>='
+local LTE = P'<='
+local MATCH = P'=~'
+local NO_MATCH = P'!~'
+
+local OR = P'||'
+local AND = P'&&'
+
+local function get_operator(op)
+ if op == '' then
+ return '=='
+ end
+ return op
+end
+
+local numerical_operator = (EQ + NEQ + LTE + GTE + GT + LT )^-1 / get_operator
+local sub_numerical_expression = space(numerical_operator) * patterns.Number * Optional_space
+local is_plain_numeric = (sub_numerical_expression * ((OR^1 + AND^1) * sub_numerical_expression)^0) * -1
+
+local quoted_string = (P'"' * C((P(1) - (P'"'))^1) * P'"' + C((P(1) - patterns.sp)^1))
+local string_operator = (EQ + NEQ + MATCH + NO_MATCH)^-1 / get_operator
+local sub_string_expression = space(string_operator) * quoted_string * Optional_space
+local is_plain_string = (sub_string_expression * ((OR^1 + AND^1) * sub_string_expression)^0) * -1
+
+local numerical_expression = P {
+ 'OR';
+ AND = Ct(Cc('and') * V'SUB' * space(AND) * V'AND' + V'SUB'),
+ OR = Ct(Cc('or') * V'AND' * space(OR) * V'OR' + V'AND'),
+ SUB = Ct(sub_numerical_expression)
+} * -1
+
+local string_expression = P {
+ 'OR';
+ AND = Ct(Cc('and') * V'SUB' * space(AND) * V'AND' + V'SUB'),
+ OR = Ct(Cc('or') * V'AND' * space(OR) * V'OR' + V'AND'),
+ SUB = Ct(sub_string_expression)
+} * -1
+
+local is_complex = patterns.anywhere(EQ + NEQ + LTE + GTE + GT + LT + MATCH + NO_MATCH + OR + AND)
+
+local function eval_tree(tree, value)
+ local match = false
+
+ if type(tree[1]) == 'table' then
+ match = eval_tree(tree[1], value)
+ else
+ local operator = tree[1]
+ if operator == 'and' or operator == 'or' then
+ match = eval_tree(tree[2], value)
+ for i=3, #tree, 1 do
+ local m = eval_tree(tree[i], value)
+ if operator == 'or' then
+ match = match or m
+ else
+ match = match and m
+ end
+ end
+ else
+ local matcher = tree[2]
+ if operator == '==' then
+ return value == matcher
+ elseif operator == '!=' then
+ return value ~= matcher
+ elseif operator == '>' then
+ return value > matcher
+ elseif operator == '<' then
+ return value < matcher
+ elseif operator == '>=' then
+ return value >= matcher
+ elseif operator == '<=' then
+ return value <= matcher
+ elseif operator == '=~' then
+ local ok, m = pcall(string.find, value, matcher)
+ return ok and m ~= nil
+ elseif operator == '!~' then
+ local ok, m = pcall(string.find, value, matcher)
+ return ok and m == nil
+ end
+ end
+ end
+ return match
+end
+
+local MatchExpression = {}
+MatchExpression.__index = MatchExpression
+
+setfenv(1, MatchExpression) -- Remove external access to contain everything in the module
+
+function MatchExpression.new(expression)
+ local r = {}
+ setmetatable(r, MatchExpression)
+ if is_complex:match(expression) then
+ r.is_plain_numeric_exp = is_plain_numeric:match(expression) ~= nil
+
+ if r.is_plain_numeric_exp then
+ r.tree = numerical_expression:match(expression)
+ elseif is_plain_string:match(expression) ~= nil then
+ r.tree = string_expression:match(expression)
+ end
+ if r.tree == nil then
+ error('Invalid expression: ' .. expression)
+ end
+ else
+ if expression == '' or Only_spaces:match(expression) then
+ error('Expression is empty')
+ end
+ r.is_simple_equality_matching = true
+ end
+ r.expression = expression
+
+ return r
+end
+
+function MatchExpression:matches(value)
+ if self.is_simple_equality_matching then
+ return self.expression == value or
+ tonumber(self.expression) == value or
+ tonumber(value) == self.expression
+ end
+ if self.is_plain_numeric_exp then
+ value = tonumber(value)
+ if value == nil then
+ return false
+ end
+ end
+ return eval_tree(self.tree, value)
+end
+
+return MatchExpression
diff --git a/heka/files/lua/decoders/ceilometer.lua b/heka/files/lua/decoders/ceilometer.lua
new file mode 100644
index 0000000..01564af
--- /dev/null
+++ b/heka/files/lua/decoders/ceilometer.lua
@@ -0,0 +1,135 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "cjson"
+require 'table'
+require 'math'
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+local l = require 'lpeg'
+l.locale(l)
+
+function normalize_uuid(uuid)
+ return patt.Uuid:match(uuid)
+end
+
+-- the metadata_fields parameter is a list of words separated by space
+local fields_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
+local metadata_fields = fields_grammar:match(
+ read_config("metadata_fields") or ""
+)
+
+local decode_resources = read_config('decode_resources') or false
+
+local sample_msg = {
+ Timestamp = nil,
+ -- This message type has the same structure than 'bulk_metric'.
+ Type = "ceilometer_samples",
+ Payload = nil
+}
+
+local resource_msg = {
+ Timestamp = nil,
+ Type = "ceilometer_resource",
+ Fields = nil,
+}
+
+function inject_metadata(metadata, tags)
+ local value
+ for _, field in ipairs(metadata_fields) do
+ value = metadata[field]
+ if value ~= nil and type(value) ~= 'table' then
+ tags["metadata." .. field] = value
+ end
+ end
+end
+
+function add_resource_to_payload(sample, payload)
+
+ local resource_data = {
+ timestamp = sample.timestamp,
+ resource_id = sample.resource_id,
+ source = sample.source or "",
+ metadata = sample.resource_metadata,
+ user_id = sample.user_id,
+ project_id = sample.project_id,
+ meter = {
+ [sample.counter_name] = {
+ type = sample.counter_type,
+ unit = sample.counter_unit
+ }
+ }
+ }
+ payload[sample.resource_id] = resource_data
+end
+
+
+function add_sample_to_payload(sample, payload)
+ local sample_data = {
+ name='sample',
+ timestamp = patt.Timestamp:match(sample.timestamp),
+ values = {
+ value = sample.counter_volume,
+ message_id = sample.message_id,
+ recorded_at = sample.recorded_at,
+ timestamp = sample.timestamp,
+ message_signature = sample.signature,
+ type = sample.counter_type,
+ unit = sample.counter_unit
+ }
+ }
+ local tags = {
+ meter = sample.counter_name,
+ resource_id = sample.resource_id,
+ project_id = sample.project_id ,
+ user_id = sample.user_id,
+ source = sample.source
+ }
+
+ inject_metadata(sample.resource_metadata or {}, tags)
+ sample_data["tags"] = tags
+ table.insert(payload, sample_data)
+end
+
+function process_message ()
+ local data = read_message("Payload")
+ local ok, message = pcall(cjson.decode, data)
+ if not ok then
+ return -1, "Cannot decode Payload"
+ end
+ local ok, message_body = pcall(cjson.decode, message["oslo.message"])
+ if not ok then
+ return -1, "Cannot decode Payload[oslo.message]"
+ end
+ local sample_payload = {}
+ local resource_payload = {}
+ for _, sample in ipairs(message_body["payload"]) do
+ add_sample_to_payload(sample, sample_payload)
+ if decode_resources then
+ add_resource_to_payload(sample, resource_payload)
+ end
+ end
+ sample_msg.Payload = cjson.encode(sample_payload)
+ sample_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
+ utils.safe_inject_message(sample_msg)
+
+ if decode_resources then
+ resource_msg.Payload = cjson.encode(resource_payload)
+ resource_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
+ utils.safe_inject_message(resource_msg)
+ end
+
+ return 0
+end
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
new file mode 100644
index 0000000..f94a0e8
--- /dev/null
+++ b/heka/files/lua/decoders/collectd.lua
@@ -0,0 +1,447 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "cjson"
+
+local utils = require 'lma_utils'
+
+local sep = '_'
+
+local processes_map = {
+ ps_code = 'memory_code',
+ ps_count = '',
+ ps_cputime = 'cputime',
+ ps_data = 'memory_data',
+ ps_disk_octets = 'disk_bytes',
+ ps_disk_ops = 'disk_ops',
+ ps_pagefaults = 'pagefaults',
+ ps_rss = 'memory_rss',
+ ps_stacksize = 'stacksize',
+ ps_vm = 'memory_virtual',
+}
+
+-- The following table keeps a list of metrics from plugin where the
+-- hostname is not relevant.
+local hostname_free = {
+ -- Add "metric_source = true" to skip the hostname for all metrics
+ -- from the metric_source
+ -- Add "metric_source = { list of metrics } to skip hostname for a
+ -- subset of metrics. The list of metrics is referenced through the
+ -- field 'type_instance'.
+ hypervisor_stats = {
+ total_free_disk_GB = true,
+ total_free_ram_MB = true,
+ total_free_vcpus = true,
+ total_used_disk_GB = true,
+ total_used_ram_MB = true,
+ total_used_vcpus = true,
+ total_running_instances = true,
+ total_running_tasks = true,
+ },
+ check_openstack_api = true,
+ http_check = true,
+}
+
+-- this is needed for the libvirt metrics because in that case, collectd sends
+-- the instance's ID instead of the hostname in the 'host' attribute
+local hostname = read_config('hostname') or error('hostname must be specified')
+local swap_size = (read_config('swap_size') or 0) + 0
+
+function replace_dot_by_sep (str)
+ return string.gsub(str, '%.', sep)
+end
+
+function process_message ()
+ local ok, samples = pcall(cjson.decode, read_message("Payload"))
+ if not ok then
+ -- TODO: log error
+ return -1
+ end
+
+ for _, sample in ipairs(samples) do
+ local metric_prefix = sample['type']
+ if sample['type_instance'] ~= "" then
+ metric_prefix = metric_prefix .. sep .. sample['type_instance']
+ end
+
+ local metric_source = sample['plugin']
+
+ for i, value in ipairs(sample['values']) do
+ local skip_it = false
+ local metric_name = metric_prefix
+ if sample['dsnames'][i] ~= "value" then
+ metric_name = metric_name .. sep .. sample['dsnames'][i]
+ end
+
+ local msg = {
+ Timestamp = sample['time'] * 1e9, -- Heka expects nanoseconds
+ Hostname = sample['host'],
+ Logger = "collectd",
+ Payload = utils.safe_json_encode(sample) or '',
+ Severity = 6,
+ Type = "metric",
+ Fields = {
+ interval = sample['interval'],
+ source = metric_source,
+ type = sample['dstypes'][i],
+ value = value,
+ tag_fields = {},
+ }
+ }
+
+ -- Normalize metric name, unfortunately collectd plugins aren't
+ -- always consistent on metric namespaces so we need a few if/else
+ -- statements to cover all cases.
+
+ -- Check if hostname is needed or not
+ local add_hostname = true
+ if hostname_free[metric_source] == true then
+ add_hostname = false
+ elseif hostname_free[metric_source] and
+ hostname_free[metric_source][sample['type_instance']] then
+ add_hostname = false
+ end
+
+ if add_hostname then
+ msg['Fields']['hostname'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'hostname')
+ end
+
+ if sample['meta'] and sample['meta']['service_check'] then
+ msg['Fields']['name'] = sample['meta']['service_check'] .. sep .. 'check'
+ msg['Fields']['details'] = sample['meta']['failure']
+ elseif metric_source == 'df' then
+ local entity
+ if sample['type'] == 'df_inodes' then
+ entity = 'inodes'
+ elseif sample['type'] == 'percent_inodes' then
+ entity = 'inodes_percent'
+ elseif sample['type'] == 'percent_bytes' then
+ entity = 'space_percent'
+ else -- sample['type'] == 'df_complex'
+ entity = 'space'
+ end
+
+ local mount = sample['plugin_instance']
+ if mount == 'root' then
+ mount = '/'
+ else
+ mount = '/' .. mount:gsub('-', '/')
+ end
+
+ msg['Fields']['name'] = 'fs' .. sep .. entity .. sep .. sample['type_instance']
+ msg['Fields']['fs'] = mount
+ table.insert(msg['Fields']['tag_fields'], 'fs')
+ elseif metric_source == 'disk' then
+ msg['Fields']['name'] = metric_name
+ msg['Fields']['device'] = sample['plugin_instance']
+ table.insert(msg['Fields']['tag_fields'], 'device')
+ elseif metric_source == 'cpu' then
+ msg['Fields']['name'] = 'cpu' .. sep .. sample['type_instance']
+ msg['Fields']['cpu_number'] = sample['plugin_instance']
+ table.insert(msg['Fields']['tag_fields'], 'cpu_number')
+ elseif metric_source == 'netlink' then
+ local netlink_metric = sample['type']
+ if netlink_metric == 'if_rx_errors' then
+ netlink_metric = 'if_errors_rx'
+ elseif netlink_metric == 'if_tx_errors' then
+ netlink_metric = 'if_errors_tx'
+ end
+
+ -- Netlink plugin can send one or two values. Use dsnames only when needed.
+ if sample['dsnames'][i] ~= 'value' then
+ netlink_metric = netlink_metric .. sep .. sample['dsnames'][i]
+ end
+ -- and type of errors is set in type_instance
+ if sample['type_instance'] ~= '' then
+ netlink_metric = netlink_metric .. sep .. sample['type_instance']
+ end
+ msg['Fields']['name'] = netlink_metric
+ msg['Fields']['interface'] = sample['plugin_instance']
+ table.insert(msg['Fields']['tag_fields'], 'interface')
+ elseif metric_source == 'processes' then
+ if processes_map[sample['type']] then
+ -- metrics related to a specific process
+ msg['Fields']['service'] = sample['plugin_instance']
+ table.insert(msg['Fields']['tag_fields'], 'service')
+ msg['Fields']['name'] = 'lma_components'
+ if processes_map[sample['type']] ~= '' then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. processes_map[sample['type']]
+ end
+ if sample['dsnames'][i] ~= 'value' then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
+ end
+
+ -- For ps_cputime, convert it to a percentage: collectd is
+ -- sending us the number of microseconds allocated to the
+ -- process as a rate so within 1 second.
+ if sample['type'] == 'ps_cputime' then
+ msg['Fields']['value'] = 100 * value / 1e6
+ end
+ else
+ -- metrics related to all processes
+ msg['Fields']['name'] = 'processes'
+ if sample['type'] == 'ps_state' then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. 'count'
+ msg['Fields']['state'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ else
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
+ end
+ end
+ elseif metric_source == 'dbi' and sample['plugin_instance'] == 'mysql_status' then
+ msg['Fields']['name'] = 'mysql' .. sep .. replace_dot_by_sep(sample['type_instance'])
+ elseif metric_source == 'mysql' then
+ if sample['type'] == 'threads' then
+ msg['Fields']['name'] = 'mysql_' .. metric_name
+ elseif sample['type'] == 'mysql_commands' then
+ msg['Fields']['name'] = sample['type']
+ msg['Fields']['statement'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'statement')
+ elseif sample['type'] == 'mysql_handler' then
+ msg['Fields']['name'] = sample['type']
+ msg['Fields']['handler'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'handler')
+ else
+ msg['Fields']['name'] = metric_name
+ end
+ elseif metric_source == 'check_openstack_api' then
+ -- For OpenStack API metrics, plugin_instance = <service name>
+ msg['Fields']['name'] = 'openstack_check_api'
+ msg['Fields']['service'] = sample['plugin_instance']
+ table.insert(msg['Fields']['tag_fields'], 'service')
+ if sample['meta'] then
+ msg['Fields']['os_region'] = sample['meta']['region']
+ end
+ elseif metric_source == 'hypervisor_stats' then
+ -- Metrics from the OpenStack hypervisor metrics where
+ -- type_instance = <metric name> which can end by _MB or _GB
+ msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep
+ local name, unit
+ name, unit = string.match(sample['type_instance'], '^(.+)_(.B)$')
+ if name then
+ msg['Fields']['name'] = msg['Fields']['name'] .. name
+ msg.Fields['value'] = {value = msg.Fields['value'], representation = unit}
+ else
+ msg['Fields']['name'] = msg['Fields']['name'] .. sample['type_instance']
+ end
+ elseif metric_source == 'rabbitmq_info' then
+ msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance']
+ if sample['meta'] and sample['meta']['queue'] then
+ msg['Fields']['queue'] = sample['meta']['queue']
+ table.insert(msg['Fields']['tag_fields'], 'queue')
+ end
+ elseif metric_source == 'nova' then
+ if sample['plugin_instance'] == 'nova_services' or
+ sample['plugin_instance'] == 'nova_service' then
+ msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
+ msg['Fields']['service'] = sample['meta']['service']
+ msg['Fields']['state'] = sample['meta']['state']
+ table.insert(msg['Fields']['tag_fields'], 'service')
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ if sample['plugin_instance'] == 'nova_service' then
+ msg['Fields']['hostname'] = sample['meta']['host']
+ end
+ else
+ msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
+ msg['Fields']['state'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ end
+ elseif metric_source == 'cinder' then
+ if sample['plugin_instance'] == 'cinder_services' or
+ sample['plugin_instance'] == 'cinder_service' then
+ msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
+ msg['Fields']['service'] = sample['meta']['service']
+ msg['Fields']['state'] = sample['meta']['state']
+ table.insert(msg['Fields']['tag_fields'], 'service')
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ if sample['plugin_instance'] == 'cinder_service' then
+ msg['Fields']['hostname'] = sample['meta']['host']
+ end
+ else
+ msg['Fields']['name'] = 'openstack' .. sep .. 'cinder' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
+ msg['Fields']['state'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ end
+ elseif metric_source == 'glance' then
+ msg['Fields']['name'] = 'openstack' .. sep .. 'glance' .. sep .. sample['type_instance']
+ msg['Fields']['state'] = sample['meta']['status']
+ msg['Fields']['visibility'] = sample['meta']['visibility']
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ table.insert(msg['Fields']['tag_fields'], 'visibility')
+ elseif metric_source == 'keystone' then
+ msg['Fields']['name'] = 'openstack' .. sep .. 'keystone' .. sep .. sample['type_instance']
+ if sample['meta']['state'] then
+ msg['Fields']['state'] = sample['meta']['state']
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ end
+ elseif metric_source == 'neutron' then
+ if sample['type_instance'] == 'networks' or sample['type_instance'] == 'ports' or sample['type_instance'] == 'routers' or sample['type_instance'] == 'floatingips' then
+ skip_it = true
+ elseif sample['type_instance'] == 'subnets' then
+ msg['Fields']['name'] = 'openstack' .. sep .. 'neutron' .. sep .. 'subnets'
+ elseif sample['type_instance'] == 'neutron_agents' or
+ sample['type_instance'] == 'neutron_agent' then
+ msg['Fields']['name'] = 'openstack_' .. sample['type_instance']
+ msg['Fields']['service'] = sample['meta']['service']
+ msg['Fields']['state'] = sample['meta']['state']
+ table.insert(msg['Fields']['tag_fields'], 'service')
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ if sample['type_instance'] == 'neutron_agent' then
+ msg['Fields']['hostname'] = sample['meta']['host']
+ end
+ elseif string.match(sample['type_instance'], '^ports') then
+ local resource, owner, state = string.match(sample['type_instance'], '^([^.]+)%.([^.]+)%.(.+)$')
+ msg['Fields']['name'] = 'openstack' .. sep .. 'neutron' .. sep .. replace_dot_by_sep(resource)
+ msg['Fields']['owner'] = owner
+ msg['Fields']['state'] = state
+ table.insert(msg['Fields']['tag_fields'], 'owner')
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ else
+ local resource, state = string.match(sample['type_instance'], '^([^.]+)%.(.+)$')
+ msg['Fields']['name'] = 'openstack' .. sep .. 'neutron' .. sep .. replace_dot_by_sep(resource)
+ msg['Fields']['state'] = state
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ end
+ elseif metric_source == 'memcached' then
+ msg['Fields']['name'] = 'memcached' .. sep .. string.gsub(metric_name, 'memcached_', '')
+ elseif metric_source == 'haproxy' then
+ msg['Fields']['name'] = 'haproxy' .. sep .. sample['type_instance']
+ if sample['meta'] then
+ if sample['meta']['backend'] then
+ msg['Fields']['backend'] = sample['meta']['backend']
+ table.insert(msg['Fields']['tag_fields'], 'backend')
+ if sample['meta']['state'] then
+ msg['Fields']['state'] = sample['meta']['state']
+ table.insert(msg['Fields']['tag_fields'], 'state')
+ end
+ if sample['meta']['server'] then
+ msg['Fields']['server'] = sample['meta']['server']
+ table.insert(msg['Fields']['tag_fields'], 'server')
+ end
+ elseif sample['meta']['frontend'] then
+ msg['Fields']['frontend'] = sample['meta']['frontend']
+ table.insert(msg['Fields']['tag_fields'], 'frontend')
+ end
+ end
+ elseif metric_source == 'apache' then
+ metric_name = string.gsub(metric_name, 'apache_', '')
+ msg['Fields']['name'] = 'apache' .. sep .. string.gsub(metric_name, 'scoreboard', 'workers')
+ elseif metric_source == 'ceph_osd_perf' then
+ msg['Fields']['name'] = 'ceph_perf' .. sep .. sample['type']
+
+ msg['Fields']['cluster'] = sample['plugin_instance']
+ msg['Fields']['osd'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'cluster')
+ table.insert(msg['Fields']['tag_fields'], 'osd')
+ elseif metric_source:match('^ceph') then
+ msg['Fields']['name'] = 'ceph' .. sep .. sample['type']
+ if sample['dsnames'][i] ~= 'value' then
+ msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
+ end
+
+ msg['Fields']['cluster'] = sample['plugin_instance']
+ table.insert(msg['Fields']['tag_fields'], 'cluster')
+
+ if sample['type_instance'] ~= '' then
+ local additional_tag
+ if string.match(sample['type'], '^pool_') then
+ additional_tag = 'pool'
+ elseif string.match(sample['type'], '^pg_state') then
+ additional_tag = 'state'
+ elseif string.match(sample['type'], '^osd_') then
+ additional_tag = 'osd'
+ end
+ if additional_tag then
+ msg['Fields'][additional_tag] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], additional_tag)
+ end
+ end
+ elseif metric_source == 'pacemaker' then
+ if sample['meta'] and sample['meta']['host'] then
+ msg['Fields']['hostname'] = sample['meta']['host']
+ end
+
+ msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
+
+ -- add dimension fields
+ for _, v in ipairs({'status', 'resource'}) do
+ if sample['meta'] and sample['meta'][v] then
+ msg['Fields'][v] = sample['meta'][v]
+ table.insert(msg['Fields']['tag_fields'], v)
+ end
+ end
+ elseif metric_source == 'users' then
+ -- 'users' is a reserved name for InfluxDB v0.9
+ msg['Fields']['name'] = 'logged_users'
+ elseif metric_source == 'libvirt' then
+ -- collectd sends the instance's ID in the 'host' field
+ msg['Fields']['instance_id'] = sample['host']
+ table.insert(msg['Fields']['tag_fields'], 'instance_id')
+ msg['Fields']['hostname'] = hostname
+ msg['Hostname'] = hostname
+
+ if string.match(sample['type'], '^disk_') then
+ msg['Fields']['name'] = 'virt' .. sep .. sample['type'] .. sep .. sample['dsnames'][i]
+ msg['Fields']['device'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'device')
+ elseif string.match(sample['type'], '^if_') then
+ msg['Fields']['name'] = 'virt' .. sep .. sample['type'] .. sep .. sample['dsnames'][i]
+ msg['Fields']['interface'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'interface')
+ elseif sample['type'] == 'virt_cpu_total' then
+ msg['Fields']['name'] = 'virt_cpu_time'
+ elseif sample['type'] == 'virt_vcpu' then
+ msg['Fields']['name'] = 'virt_vcpu_time'
+ msg['Fields']['vcpu_number'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'vcpu_number')
+ else
+ msg['Fields']['name'] = 'virt' .. sep .. metric_name
+ end
+ elseif metric_source == 'elasticsearch_cluster' or metric_source == 'influxdb' then
+ msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
+ elseif metric_source == 'http_check' then
+ msg['Fields']['name'] = metric_source
+ msg['Fields']['service'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'service')
+ elseif metric_source == 'check_local_endpoint' then
+ msg['Fields']['name'] = 'openstack_check_local_api'
+ msg['Fields']['service'] = sample['type_instance']
+ table.insert(msg['Fields']['tag_fields'], 'service')
+ else
+ msg['Fields']['name'] = replace_dot_by_sep(metric_name)
+ end
+
+ if not skip_it then
+ utils.inject_tags(msg)
+ -- Before injecting the message we need to check that tag_fields is not an
+ -- empty table otherwise the protobuf encoder fails to encode the table.
+ if #msg['Fields']['tag_fields'] == 0 then
+ msg['Fields']['tag_fields'] = nil
+ end
+ utils.safe_inject_message(msg)
+ if metric_source == 'swap' and metric_name == 'swap_used' and swap_size > 0 then
+ -- collectd 5.4.0 doesn't report the used swap in
+ -- percentage, this is why the metric is computed and
+ -- injected by this plugin.
+ msg['Fields']['name'] = 'swap_percent_used'
+ msg['Fields']['value'] = value / swap_size
+ utils.safe_inject_message(msg)
+ end
+ end
+ end
+ end
+
+ return 0
+end
diff --git a/heka/files/lua/decoders/generic_syslog.lua b/heka/files/lua/decoders/generic_syslog.lua
new file mode 100644
index 0000000..48e5262
--- /dev/null
+++ b/heka/files/lua/decoders/generic_syslog.lua
@@ -0,0 +1,50 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+
+local syslog = require "syslog"
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+local syslog_pattern = read_config("syslog_pattern") or error("syslog_pattern configuration must be specified")
+local syslog_grammar = syslog.build_rsyslog_grammar(syslog_pattern)
+
+-- This grammar is intended for log messages that are generated before RSYSLOG
+-- is fully configured
+local fallback_syslog_pattern = read_config("fallback_syslog_pattern")
+local fallback_syslog_grammar
+if fallback_syslog_pattern then
+ fallback_syslog_grammar = syslog.build_rsyslog_grammar(fallback_syslog_pattern)
+end
+
+function process_message ()
+ local log = read_message("Payload")
+
+ if utils.parse_syslog_message(syslog_grammar, log, msg) or
+ (fallback_syslog_grammar and utils.parse_syslog_message(fallback_syslog_grammar, log, msg)) then
+ msg.Logger = string.gsub(read_message('Logger'), '%.log$', '')
+ return utils.safe_inject_message(msg)
+ end
+
+ return -1
+end
diff --git a/heka/files/lua/decoders/keystone_wsgi_log.lua b/heka/files/lua/decoders/keystone_wsgi_log.lua
new file mode 100644
index 0000000..a3c970b
--- /dev/null
+++ b/heka/files/lua/decoders/keystone_wsgi_log.lua
@@ -0,0 +1,73 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local l = require 'lpeg'
+l.locale(l)
+
+local common_log_format = require 'common_log_format'
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = 6,
+}
+
+local severity_label = utils.severity_to_label_map[msg.Severity]
+
+local apache_log_pattern = read_config("apache_log_pattern") or error(
+ "apache_log_pattern configuration must be specificed")
+local apache_grammar = common_log_format.build_apache_grammar(apache_log_pattern)
+local request_grammar = l.Ct(patt.http_request)
+
+function process_message ()
+
+ -- logger is either "keystone-wsgi-main" or "keystone-wsgi-admin"
+ local logger = read_message("Logger")
+
+ local log = read_message("Payload")
+
+ local m
+
+ m = apache_grammar:match(log)
+ if m then
+ msg.Logger = 'openstack.keystone'
+ msg.Payload = log
+ msg.Timestamp = m.time
+
+ msg.Fields = {}
+ msg.Fields.http_status = m.status
+ msg.Fields.http_response_time = m.request_time.value / 1e6 -- us to sec
+ msg.Fields.programname = logger
+ msg.Fields.severity_label = severity_label
+
+ local request = m.request
+ m = request_grammar:match(request)
+ if m then
+ msg.Fields.http_method = m.http_method
+ msg.Fields.http_url = m.http_url
+ msg.Fields.http_version = m.http_version
+ end
+
+ utils.inject_tags(msg)
+ return utils.safe_inject_message(msg)
+ end
+
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+end
diff --git a/heka/files/lua/decoders/libvirt_log.lua b/heka/files/lua/decoders/libvirt_log.lua
new file mode 100644
index 0000000..a191b8f
--- /dev/null
+++ b/heka/files/lua/decoders/libvirt_log.lua
@@ -0,0 +1,63 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local l = require 'lpeg'
+l.locale(l)
+
+local string = require 'string'
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+-- libvirt message logs are formatted like this:
+--
+-- 2015-03-26 17:24:52.126+0000: <PID>: <SEV> : Message
+
+local timestamp = l.Cg(patt.Timestamp, "Timestamp")
+local pid = l.Cg(patt.Pid, "Pid")
+local severity = l.Cg(l.P"debug" + "info" + "warning" + "error", "Severity")
+local message = l.Cg(patt.Message, "Message")
+
+local grammar = l.Ct(timestamp * ": " * pid * ": " * severity * " : " * message)
+
+function process_message ()
+ local log = read_message("Payload")
+
+ local m = grammar:match(log)
+ if not m then
+ return -1
+ end
+
+ m.Severity = string.upper(m.Severity)
+
+ msg.Timestamp = m.Timestamp
+ msg.Pid = m.Pid
+ msg.Payload = m.Message
+ msg.Severity = utils.label_to_severity_map[m.Severity]
+
+ msg.Fields = {}
+ msg.Fields.severity_label = m.Severity
+ msg.Fields.programname = 'libvirt'
+ utils.inject_tags(msg)
+
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/metric.lua b/heka/files/lua/decoders/metric.lua
new file mode 100644
index 0000000..0994a00
--- /dev/null
+++ b/heka/files/lua/decoders/metric.lua
@@ -0,0 +1,90 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require "cjson"
+require "string"
+
+local l = require 'lpeg'
+l.locale(l)
+
+local loggers_pattern = l.Ct( (l.C((l.P(1) - l.space)^1) * l.space^0)^1 * -1)
+local loggers_list = loggers_pattern:match(read_config('deserialize_bulk_metric_for_loggers') or '')
+
+local loggers = {}
+for _, logger in ipairs(loggers_list) do
+ loggers[logger] = true
+end
+
+local utils = require 'lma_utils'
+
+function process_message ()
+ local msg = decode_message(read_message("raw"))
+ if string.match(msg.Type, 'bulk_metric$') and loggers[msg.Logger] ~= nil then
+
+ local ok, metrics = pcall(cjson.decode, msg.Payload)
+ if not ok then
+ return -1, metrics
+ end
+
+ local new_msg = {
+ Timestamp = msg.Timestamp,
+ Hostname = msg.Hostname,
+ Severity = msg.Severity,
+ Logger = msg.Logger,
+ Type = nil,
+ Payload = '',
+ Fields = {},
+ }
+ for _, metric in ipairs(metrics) do
+ local fields = {}
+ local metric_type
+ if metric.value then
+ metric_type = 'metric'
+ fields['value'] = metric.value
+ else
+ metric_type = 'multivalue_metric'
+ local value_fields = {}
+ for k, v in pairs(metric.values) do
+ fields[k] = v
+ table.insert(value_fields, k)
+ end
+ fields['value_fields'] = value_fields
+ end
+ local tag_fields = {}
+ for t, v in pairs(metric.tags or {}) do
+ fields[t] = v
+ table.insert(tag_fields, t)
+ end
+ fields['tag_fields'] = tag_fields
+ fields['name'] = metric.name
+ fields['hostname'] = msg.Hostname
+
+ new_msg.Type = metric_type
+ new_msg.Fields = fields
+
+ utils.inject_tags(new_msg)
+ ok, err = utils.safe_inject_message(new_msg)
+ if ok ~= 0 then
+ return -1, err
+ end
+ end
+ else -- simple metric
+ utils.inject_tags(msg)
+ ok, err = utils.safe_inject_message(msg)
+ if ok ~= 0 then
+ return -1, err
+ end
+ end
+ return 0
+end
diff --git a/heka/files/lua/decoders/mysql_log.lua b/heka/files/lua/decoders/mysql_log.lua
new file mode 100644
index 0000000..689aa16
--- /dev/null
+++ b/heka/files/lua/decoders/mysql_log.lua
@@ -0,0 +1,57 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+local l = require 'lpeg'
+l.locale(l)
+
+local syslog = require "syslog"
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+local syslog_pattern = read_config("syslog_pattern") or error("syslog_pattern configuration must be specified")
+
+local sp = l.space
+local colon = l.P":"
+
+local syslog_grammar = syslog.build_rsyslog_grammar(syslog_pattern)
+
+-- mysqld logs are cranky,the date is YYMMMDD, the hours have no leading zero and the "real" severity level is enclosed by square brackets...
+local mysql_grammar = l.Ct(l.digit^-6 * sp^1 * l.digit^-2 * colon * l.digit^-2 * colon * l.digit^-2 * sp^1 * l.P"[" * l.Cg(l.R("az", "AZ")^0 / string.upper, "SeverityLabel") * l.P"]" * sp^1 * l.Cg(patt.Message, "Message"))
+
+
+function process_message ()
+ local log = read_message("Payload")
+
+ if not utils.parse_syslog_message(syslog_grammar, log, msg) then
+ return -1
+ end
+
+ local m = mysql_grammar:match(msg.Payload)
+ if m then
+ msg.Fields.severity_label = m.SeverityLabel
+ msg.Payload = m.Message
+ end
+
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/noop.lua b/heka/files/lua/decoders/noop.lua
new file mode 100644
index 0000000..be9a9dd
--- /dev/null
+++ b/heka/files/lua/decoders/noop.lua
@@ -0,0 +1,28 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local msg_type = read_config('msg_type') or error('msg_type must be defined')
+
+local msg = {
+ Type = msg_type,
+ Severity = 7, -- debug
+ Payload = nil,
+ Fields = nil,
+}
+
+function process_message ()
+ inject_message(msg)
+
+ return 0
+end
diff --git a/heka/files/lua/decoders/notification.lua b/heka/files/lua/decoders/notification.lua
new file mode 100644
index 0000000..d4073d9
--- /dev/null
+++ b/heka/files/lua/decoders/notification.lua
@@ -0,0 +1,152 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "cjson"
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = "notification",
+ Payload = nil,
+ Fields = nil
+}
+
+-- Mapping table from event_type prefixes to notification loggers
+local logger_map = {
+ --cinder
+ volume = 'cinder',
+ snapshot = 'cinder',
+ -- glance
+ image = 'glance',
+ -- heat
+ orchestration = 'heat',
+ -- keystone
+ identity = 'keystone',
+ -- nova
+ compute = 'nova',
+ compute_task = 'nova',
+ scheduler = 'nova',
+ keypair = 'nova',
+ -- neutron
+ floatingip = 'neutron',
+ security_group = 'neutron',
+ security_group_rule = 'neutron',
+ network = 'neutron',
+ port = 'neutron',
+ router = 'neutron',
+ subnet = 'neutron',
+ -- sahara
+ sahara = 'sahara',
+}
+
+-- Mapping table between the attributes in the notification's payload and the
+-- fields in the Heka message
+local payload_fields = {
+ -- all
+ tenant_id = 'tenant_id',
+ user_id = 'user_id',
+ display_name = 'display_name',
+ -- nova
+ vcpus = 'vcpus',
+ availability_zone = 'availability_zone',
+ instance_id = 'instance_id',
+ instance_type = 'instance_type',
+ image_name = 'image_name',
+ memory_mb = 'memory_mb',
+ disk_gb = 'disk_gb',
+ state = 'state',
+ old_state = 'old_state',
+ old_task_state = 'old_task_state',
+ new_task_state = 'new_task_state',
+ created_at = 'created_at',
+ launched_at = 'launched_at',
+ deleted_at = 'deleted_at',
+ terminated_at = 'terminated_at',
+ -- neutron
+ network_id = 'network_id',
+ subnet_id = 'subnet_id',
+ port_id = 'port_id',
+ -- cinder
+ volume_id = 'volume_id',
+ size = 'size',
+ status = 'state',
+ replication_status = 'replication_status',
+}
+
+function normalize_uuid(uuid)
+ return patt.Uuid:match(uuid)
+end
+
+-- Mapping table defining transformation functions to be applied, keys are the
+-- attributes in the notification's payload and values are Lua functions
+local transform_functions = {
+ created_at = utils.format_datetime,
+ launched_at = utils.format_datetime,
+ deleted_at = utils.format_datetime,
+ terminated_at = utils.format_datetime,
+ user_id = normalize_uuid,
+ tenant_id = normalize_uuid,
+ instance_id = normalize_uuid,
+ network_id = normalize_uuid,
+ subnet_id = normalize_uuid,
+ port_id = normalize_uuid,
+ volume_id = normalize_uuid,
+}
+
+local include_full_notification = read_config("include_full_notification") or false
+
+function process_message ()
+ local data = read_message("Payload")
+ local ok, notif = pcall(cjson.decode, data)
+ if not ok then
+ return -1
+ end
+
+ if include_full_notification then
+ msg.Payload = data
+ else
+ msg.Payload = utils.safe_json_encode(notif.payload) or '{}'
+ end
+
+ msg.Fields = {}
+ msg.Logger = logger_map[string.match(notif.event_type, '([^.]+)')]
+ msg.Severity = utils.label_to_severity_map[notif.priority]
+ msg.Timestamp = patt.Timestamp:match(notif.timestamp)
+ msg.Fields.publisher, msg.Hostname = string.match(notif.publisher_id, '([^.]+)%.([%w_-]+)')
+ if notif.payload.host ~= nil then
+ msg.Hostname = string.match(notif.payload.host, '([%w_-]+)')
+ end
+
+ msg.Fields.event_type = notif.event_type
+ msg.Fields.severity_label = notif.priority
+ msg.Fields.hostname = msg.Hostname
+
+ for k, v in pairs(payload_fields) do
+ local val = notif.payload[k]
+ if val ~= nil then
+ local name = payload_fields[k] or k
+ local transform = transform_functions[k]
+ if transform ~= nil then
+ msg.Fields[name] = transform(val)
+ else
+ msg.Fields[name] = val
+ end
+ end
+ end
+ utils.inject_tags(msg)
+
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/openstack_log.lua b/heka/files/lua/decoders/openstack_log.lua
new file mode 100644
index 0000000..9199bd4
--- /dev/null
+++ b/heka/files/lua/decoders/openstack_log.lua
@@ -0,0 +1,143 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "table"
+local l = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+local table_utils = require 'table_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+-- traceback_lines is a reference to a table used to accumulate lines of
+-- a Traceback. traceback_key represent the key of the Traceback lines
+-- being accumulated in traceback_lines. This is used to know when to
+-- stop accumulating and inject the Heka message.
+local traceback_key = nil
+local traceback_lines = nil
+
+function prepare_message (service, timestamp, pid, severity_label,
+ python_module, programname, payload)
+ msg.Logger = 'openstack.' .. service
+ msg.Timestamp = timestamp
+ msg.Payload = payload
+ msg.Pid = pid
+ msg.Severity = utils.label_to_severity_map[severity_label] or 7
+ msg.Fields = {}
+ msg.Fields.severity_label = severity_label
+ msg.Fields.python_module = python_module
+ msg.Fields.programname = programname
+ msg.Payload = payload
+end
+
+-- OpenStack log messages are of this form:
+-- 2015-11-30 08:38:59.306 3434 INFO oslo_service.periodic_task [-] Blabla...
+--
+-- [-] is the "request" part, it can take multiple forms.
+
+function process_message ()
+
+ -- Logger is of form "<service>_<program>" (e.g. "nova_nova-api",
+ -- "neutron_l3-agent").
+ local logger = read_message("Logger")
+ local service, program = string.match(logger, '([^_]+)_(.+)')
+
+ local log = read_message("Payload")
+ local m
+
+ m = patt.openstack:match(log)
+ if not m then
+ return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+ end
+
+ local key = {
+ Timestamp = m.Timestamp,
+ Pid = m.Pid,
+ SeverityLabel = m.SeverityLabel,
+ PythonModule = m.PythonModule,
+ service = service,
+ program = program,
+ }
+
+ if traceback_key ~= nil then
+ -- If traceback_key is not nil then it means we've started accumulated
+ -- lines of a Python traceback. We keep accumulating the traceback
+ -- lines util we get a different log key.
+ if table_utils.table_equal(traceback_key, key) then
+ table.insert(traceback_lines, m.Message)
+ return 0
+ else
+ prepare_message(traceback_key.service, traceback_key.Timestamp,
+ traceback_key.Pid, traceback_key.SeverityLabel,
+ traceback_key.PythonModule, traceback_key.program,
+ table.concat(traceback_lines, ''))
+ traceback_key = nil
+ traceback_lines = nil
+ utils.inject_tags(msg)
+ -- Ignore safe_inject_message status code here to still get a
+ -- chance to inject the current log message.
+ utils.safe_inject_message(msg)
+ end
+ end
+
+ if patt.traceback:match(m.Message) then
+ -- Python traceback detected, begin accumulating the lines making
+ -- up the traceback.
+ traceback_key = key
+ traceback_lines = {}
+ table.insert(traceback_lines, m.Message)
+ return 0
+ end
+
+ prepare_message(service, m.Timestamp, m.Pid, m.SeverityLabel, m.PythonModule,
+ program, m.Message)
+
+ m = patt.openstack_request_context:match(msg.Payload)
+ if m then
+ msg.Fields.request_id = m.RequestId
+ if m.UserId then
+ msg.Fields.user_id = m.UserId
+ end
+ if m.TenantId then
+ msg.Fields.tenant_id = m.TenantId
+ end
+ end
+
+ m = patt.openstack_http:match(msg.Payload)
+ if m then
+ msg.Fields.http_method = m.http_method
+ msg.Fields.http_status = m.http_status
+ msg.Fields.http_url = m.http_url
+ msg.Fields.http_version = m.http_version
+ msg.Fields.http_response_size = m.http_response_size
+ msg.Fields.http_response_time = m.http_response_time
+ m = patt.ip_address:match(msg.Payload)
+ if m then
+ msg.Fields.http_client_ip_address = m.ip_address
+ end
+ end
+
+ utils.inject_tags(msg)
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/ovs_log.lua b/heka/files/lua/decoders/ovs_log.lua
new file mode 100644
index 0000000..47b88eb
--- /dev/null
+++ b/heka/files/lua/decoders/ovs_log.lua
@@ -0,0 +1,62 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local l = require 'lpeg'
+l.locale(l)
+
+local syslog = require "syslog"
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Paload = nil,
+ Logger = 'ovs',
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+local pipe = l.P'|'
+
+local function_name = l.Cg((l.R("az", "AZ", "09") + l.P"." + l.P"-" + l.P"_" + l.P"(" + l.P")")^1, 'function_name')
+local pid = l.Cg(patt.Pid, "Pid")
+local timestamp = l.Cg(patt.Timestamp, "Timestamp")
+local severity = l.Cg(syslog.severity, 'Severity')
+local message = l.Cg(l.P(1 - l.P"\n")^0, "Message")
+
+local ovs_grammar = l.Ct(timestamp * pipe * pid * pipe * function_name * pipe * severity * pipe * message)
+
+function process_message ()
+ local log = read_message("Payload")
+ local logger = read_message("Logger")
+ local m
+
+ msg.Fields = {}
+ m = ovs_grammar:match(log)
+ if not m then
+ return -1
+ end
+ msg.Timestamp = m.Timestamp
+ msg.Payload = m.function_name .. ': ' .. m.Message
+ msg.Pid = m.Pid
+ msg.Severity = m.Severity or 5
+ msg.Fields.severity_label = utils.severity_to_label_map[m.Severity]
+ msg.Fields.programname = logger
+
+ utils.inject_tags(msg)
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/pacemaker_log.lua b/heka/files/lua/decoders/pacemaker_log.lua
new file mode 100644
index 0000000..1286e89
--- /dev/null
+++ b/heka/files/lua/decoders/pacemaker_log.lua
@@ -0,0 +1,75 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+local l = require 'lpeg'
+l.locale(l)
+
+local dt = require "date_time"
+local patt = require "patterns"
+local syslog = require "syslog"
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+local syslog_pattern = read_config("syslog_pattern") or error("syslog_pattern configuration must be specified")
+local syslog_grammar = syslog.build_rsyslog_grammar(syslog_pattern)
+
+-- This grammar is intended for debug and info messages which aren't emitted
+-- through Syslog. For example:
+-- Apr 29 13:23:46 [13545] node-32.domain.tld pacemakerd: INFO: get_cluster_type: Detected an active 'corosync' cluster
+local sp = l.space
+local colon = l.P":"
+
+local timestamp = l.Cg(dt.rfc3164_timestamp / dt.time_to_ns, "Timestamp")
+local pid = l.Cg(patt.Pid, "Pid")
+local severity = l.Cg((l.R"AZ" + l.R"az")^1 / string.upper, "SeverityLabel")
+local programname = l.Cg(patt.programname, "programname")
+local message = l.Cg(patt.Message, "Message")
+
+local fallback_grammar = l.Ct(timestamp * sp^1 * l.P'[' * pid * l.P']' * sp^1 *
+ (l.P(1) - sp)^0 * sp^1 * programname * colon * sp^1 * severity * colon *
+ sp^1 * message)
+
+function process_message ()
+ local log = read_message("Payload")
+
+ if utils.parse_syslog_message(syslog_grammar, log, msg) then
+ return utils.safe_inject_message(msg)
+ else
+ local m = fallback_grammar:match(log)
+ if m then
+ msg.Timestamp = m.Timestamp
+ msg.Payload = m.Message
+ msg.Pid = m.Pid
+ msg.Severity = utils.label_to_severity_map[m.SeverityLabel] or 7
+
+ msg.Fields = {}
+ msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+ msg.Fields.programname = m.programname
+ utils.inject_tags(msg)
+
+ return utils.safe_inject_message(msg)
+ end
+ end
+
+ return -1
+end
diff --git a/heka/files/lua/decoders/pacemaker_resources.lua b/heka/files/lua/decoders/pacemaker_resources.lua
new file mode 100644
index 0000000..ae56965
--- /dev/null
+++ b/heka/files/lua/decoders/pacemaker_resources.lua
@@ -0,0 +1,47 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'string'
+local l = require 'lpeg'
+local utils = require 'lma_utils'
+l.locale(l)
+
+local msg = {
+ Timestamp = nil,
+ Type = "metric",
+ Payload = nil,
+ Severity = 6, -- INFO
+ Fields = nil
+}
+
+local word = (l.R("az", "AZ", "09") + l.P"." + l.P"_" + l.P"-")^1
+local grammar = l.Ct(l.Cg(word, 'resource') * " " * l.Cg(l.xdigit, 'active'))
+
+function process_message ()
+ local data = read_message("Payload")
+ local m = grammar:match(data)
+ if not m then
+ return -1
+ end
+ msg.Timestamp = read_message("Timestamp")
+ msg.Payload = data
+ msg.Fields = {}
+ msg.Fields.source = 'pacemaker'
+ msg.Fields.type = utils.metric_type['GAUGE']
+ msg.Fields.hostname = read_message('Hostname')
+ utils.inject_tags(msg)
+
+ msg.Fields.name= string.format('pacemaker.resource.%s.active', m.resource)
+ msg.Fields.value = tonumber(m.active)
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/rabbitmq.lua b/heka/files/lua/decoders/rabbitmq.lua
new file mode 100644
index 0000000..846a5f6
--- /dev/null
+++ b/heka/files/lua/decoders/rabbitmq.lua
@@ -0,0 +1,71 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local dt = require "date_time"
+local l = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+ Timestamp = nil,
+ Type = 'log',
+ Hostname = nil,
+ Payload = nil,
+ Pid = nil,
+ Fields = nil,
+ Severity = nil,
+}
+
+-- RabbitMQ message logs are formatted like this:
+-- =ERROR REPORT==== 2-Jan-2015::09:17:22 ===
+-- Blabla
+-- Blabla
+--
+local message = l.Cg(patt.Message / utils.chomp, "Message")
+-- The token before 'REPORT' isn't standardized so it can be a valid severity
+-- level as 'INFO' or 'ERROR' but also 'CRASH' or 'SUPERVISOR'.
+local severity = l.Cg(l.R"AZ"^1, "SeverityLabel")
+local day = l.R"13" * l.R"09" + l.R"19"
+local datetime = l.Cg(day, "day") * patt.dash * dt.date_mabbr * patt.dash * dt.date_fullyear *
+ "::" * dt.rfc3339_partial_time
+local timestamp = l.Cg(l.Ct(datetime)/ dt.time_to_ns, "Timestamp")
+
+local grammar = l.Ct("=" * severity * " REPORT==== " * timestamp * " ===" * l.P'\n' * message)
+
+function process_message ()
+ local log = read_message("Payload")
+
+ local m = grammar:match(log)
+ if not m then
+ return -1
+ end
+
+ msg.Timestamp = m.Timestamp
+ msg.Payload = m.Message
+ if utils.label_to_severity_map[m.SeverityLabel] then
+ msg.Severity = utils.label_to_severity_map[m.SeverityLabel]
+ elseif m.SeverityLabel == 'CRASH' then
+ msg.Severity = 2 -- CRITICAL
+ else
+ msg.Severity = 5 -- NOTICE
+ end
+
+ msg.Fields = {}
+ msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+ msg.Fields.programname = 'rabbitmq'
+ utils.inject_tags(msg)
+
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/encoders/es_ceilometer_resources.lua b/heka/files/lua/encoders/es_ceilometer_resources.lua
new file mode 100644
index 0000000..860ba03
--- /dev/null
+++ b/heka/files/lua/encoders/es_ceilometer_resources.lua
@@ -0,0 +1,66 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "cjson"
+local elasticsearch = require "elasticsearch"
+
+local index = read_config("index") or "index"
+local type_name = read_config("type_name") or "message"
+
+function process_message()
+ local ns
+ local resources = cjson.decode(read_message("Payload"))
+ for resource_id, resource in pairs(resources) do
+ local update = cjson.encode({update = {_index = index, _type = type_name,
+ _id = resource_id}})
+ local body = {
+ script = 'ctx._source.meters += meter;' ..
+ 'ctx._source.user_id = user_id;' ..
+ 'ctx._source.project_id = project_id;' ..
+ 'ctx._source.source = source; ' ..
+ 'ctx._source.metadata = ' ..
+ 'ctx._source.last_sample_timestamp <= timestamp ? ' ..
+ 'metadata : ctx._source.metadata;' ..
+ 'ctx._source.last_sample_timestamp = ' ..
+ 'ctx._source.last_sample_timestamp < timestamp ?' ..
+ 'timestamp : ctx._source.last_sample_timestamp;' ..
+ 'ctx._source.first_sample_timestamp = ' ..
+ 'ctx._source.first_sample_timestamp > timestamp ?' ..
+ 'timestamp : ctx._source.first_sample_timestamp;',
+ params = {
+ meter = resource.meter,
+ metadata = resource.metadata,
+ timestamp = resource.timestamp,
+ user_id = resource.user_id or '',
+ project_id = resource.project_id or '',
+ source = resource.source or '',
+ },
+ upsert = {
+ first_sample_timestamp = resource.timestamp,
+ last_sample_timestamp = resource.timestamp,
+ project_id = resource.project_id or '',
+ user_id = resource.user_id or '',
+ source = resource.source or '',
+ metadata = resource.metadata,
+ meters = resource.meter
+ }
+ }
+ body = cjson.encode(body)
+
+ add_to_payload(update, "\n", body, "\n")
+ end
+
+ inject_payload()
+ return 0
+end
diff --git a/heka/files/lua/encoders/status_nagios.lua b/heka/files/lua/encoders/status_nagios.lua
new file mode 100644
index 0000000..ad4c540
--- /dev/null
+++ b/heka/files/lua/encoders/status_nagios.lua
@@ -0,0 +1,87 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'table'
+require 'string'
+
+local afd = require 'afd'
+local consts = require 'gse_constants'
+local lma = require 'lma_utils'
+local interp = require "msg_interpolate"
+
+local host = read_config('nagios_host')
+local service_template = read_config('service_template') or error('service_template is required!')
+-- Nagios CGI cannot accept 'plugin_output' parameter greater than 1024 bytes
+-- See bug #1517917 for details.
+-- With the 'cmd.cgi' re-implementation for the command PROCESS_SERVICE_CHECK_RESULT,
+-- this limit can be increased to 3KB. See blueprint scalable-nagios-api.
+local truncate_size = (read_config('truncate_size') or 3072) + 0
+local data = {
+ cmd_typ = '30',
+ cmd_mod = '2',
+ host = host,
+ service = nil,
+ plugin_state = nil,
+ plugin_output = nil,
+ performance_data = '',
+}
+local nagios_break_line = '\\n'
+-- mapping GSE statuses to Nagios states
+local nagios_state_map = {
+ [consts.OKAY]=0,
+ [consts.WARN]=1,
+ [consts.UNKW]=3,
+ [consts.CRIT]=2,
+ [consts.DOWN]=2
+}
+
+function url_encode(str)
+ if (str) then
+ str = string.gsub (str, "([^%w %-%_%.%~])",
+ function (c) return string.format ("%%%02X", string.byte(c)) end)
+ str = string.gsub (str, " ", "+")
+ end
+ return str
+end
+
+function process_message()
+ local service_name = interp.interpolate_from_msg(service_template)
+ local status = afd.get_status()
+ local alarms = afd.alarms_for_human(afd.extract_alarms())
+
+ if not service_name or not nagios_state_map[status] or not alarms then
+ return -1
+ end
+
+ data['service'] = service_name
+ data['plugin_state'] = nagios_state_map[status]
+
+ local details = {
+ string.format('%s %s', service_name, consts.status_label(status))
+ }
+ if #alarms == 0 then
+ details[#details+1] = 'no details'
+ else
+ for _, alarm in ipairs(alarms) do
+ details[#details+1] = alarm
+ end
+ end
+ data['plugin_output'] = lma.truncate(table.concat(details, nagios_break_line), truncate_size, nagios_break_line)
+
+ local params = {}
+ for k, v in pairs(data) do
+ params[#params+1] = string.format("%s=%s", k, url_encode(v))
+ end
+
+ return lma.safe_inject_payload('txt', 'nagios', table.concat(params, '&'))
+end
diff --git a/heka/files/lua/encoders/status_smtp.lua b/heka/files/lua/encoders/status_smtp.lua
new file mode 100644
index 0000000..3bfad7f
--- /dev/null
+++ b/heka/files/lua/encoders/status_smtp.lua
@@ -0,0 +1,79 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'table'
+require 'string'
+
+local afd = require 'afd'
+local consts = require 'gse_constants'
+local lma = require 'lma_utils'
+
+local statuses = {}
+
+local concat_string = '\n'
+
+function process_message()
+ local previous
+ local text
+ local cluster = afd.get_entity_name('cluster_name')
+ local msg_type = read_message("Type")
+ local status = afd.get_status()
+ local alarms = afd.extract_alarms()
+
+ if not cluster or not status or not alarms then
+ return -1
+ end
+ local key = string.format('%s.%s', msg_type, cluster)
+ if not statuses[key] then
+ statuses[key] = {}
+ end
+ previous = statuses[key]
+
+ local text
+ if #alarms == 0 then
+ text = 'no detail'
+ else
+ text = table.concat(afd.alarms_for_human(alarms), concat_string)
+ end
+
+ local title
+ if not previous.status and status == consts.OKAY then
+ -- don't send a email when we detect a new cluster which is OKAY
+ return 0
+ elseif not previous.status then
+ title = string.format('%s status is %s',
+ cluster,
+ consts.status_label(status))
+ elseif status ~= previous.status then
+ title = string.format('%s status %s -> %s',
+ cluster,
+ consts.status_label(previous.status),
+ consts.status_label(status))
+-- TODO(scroiset): avoid spam
+-- This code has the same issue than annotations, see filters/influxdb_annotation.lua
+-- elseif previous.text ~= text then
+-- title = string.format('%s status remains %s',
+-- cluster,
+-- consts.status_label(status))
+ else
+ -- nothing has changed since the last message
+ return 0
+ end
+
+ -- store the last status and text for future messages
+ previous.status = status
+ previous.text = text
+
+ return lma.safe_inject_payload('txt', '', table.concat({title, text}, concat_string))
+end
diff --git a/heka/files/lua/filters/afd.lua b/heka/files/lua/filters/afd.lua
new file mode 100644
index 0000000..bf10322
--- /dev/null
+++ b/heka/files/lua/filters/afd.lua
@@ -0,0 +1,99 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local string = require 'string'
+
+local utils = require 'lma_utils'
+local afd = require 'afd'
+
+-- node or service
+local afd_type = read_config('afd_type') or error('afd_type must be specified!')
+local activate_alerting = read_config('activate_alerting') or true
+local msg_type
+local msg_field_name
+local afd_entity
+
+if afd_type == 'node' then
+ msg_type = 'afd_node_metric'
+ msg_field_name = 'node_status'
+ afd_entity = 'node_role'
+elseif afd_type == 'service' then
+ msg_type = 'afd_service_metric'
+ msg_field_name = 'service_status'
+ afd_entity = 'service'
+else
+ error('invalid afd_type value')
+end
+
+-- ie: controller for node AFD / rabbitmq for service AFD
+local afd_entity_value = read_config('afd_cluster_name') or error('afd_cluster_name must be specified!')
+
+-- ie: cpu for node AFD / queue for service AFD
+local msg_field_source = read_config('afd_logical_name') or error('afd_logical_name must be specified!')
+
+local hostname = read_config('hostname') or error('hostname must be specified')
+
+local afd_file = read_config('afd_file') or error('afd_file must be specified')
+local all_alarms = require(afd_file)
+local A = require 'afd_alarms'
+A.load_alarms(all_alarms)
+
+function process_message()
+
+ local metric_name = read_message('Fields[name]')
+ local ts = read_message('Timestamp')
+
+ local ok, value = utils.get_values_from_metric()
+ if not ok then
+ return -1, value
+ end
+ -- retrieve field values
+ local fields = {}
+ for _, field in ipairs (A.get_metric_fields(metric_name)) do
+ local field_value = afd.get_entity_name(field)
+ if not field_value then
+ return -1, "Cannot find Fields[" .. field .. "] for the metric " .. metric_name
+ end
+ fields[field] = field_value
+ end
+ A.add_value(ts, metric_name, value, fields)
+ return 0
+end
+
+function timer_event(ns)
+ if A.is_started() then
+ local state, alarms = A.evaluate(ns)
+ if state then -- it was time to evaluate at least one alarm
+ for _, alarm in ipairs(alarms) do
+ afd.add_to_alarms(
+ alarm.state,
+ alarm.alert['function'],
+ alarm.alert.metric,
+ alarm.alert.fields,
+ {}, -- tags
+ alarm.alert.operator,
+ alarm.alert.value,
+ alarm.alert.threshold,
+ alarm.alert.window,
+ alarm.alert.periods,
+ alarm.alert.message)
+ end
+
+ afd.inject_afd_metric(msg_type, afd_entity, afd_entity_value, msg_field_name,
+ state, hostname, interval, msg_field_source, activate_alerting)
+ end
+ else
+ A.set_start_time(ns)
+ end
+end
diff --git a/heka/files/lua/filters/afd_api_backends.lua b/heka/files/lua/filters/afd_api_backends.lua
new file mode 100644
index 0000000..37cccd5
--- /dev/null
+++ b/heka/files/lua/filters/afd_api_backends.lua
@@ -0,0 +1,90 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'string'
+
+local afd = require 'afd'
+local consts = require 'gse_constants'
+
+local haproxy_backend_states = {}
+
+-- emit AFD event metrics based on HAProxy backends
+function process_message()
+ local metric_name = read_message('Fields[name]')
+ local value = read_message('Fields[value]')
+ local service = read_message('Fields[backend]')
+ local state = consts.OKAY
+
+ if not haproxy_backend_states[service] then
+ haproxy_backend_states[service] = {}
+ end
+ haproxy_backend_states[service][read_message('Fields[state]')] = value
+
+ if not (haproxy_backend_states[service].up and haproxy_backend_states[service].down) then
+ -- not enough data for now
+ return 0
+ end
+
+ if haproxy_backend_states[service].up == 0 then
+ state = consts.DOWN
+ afd.add_to_alarms(consts.DOWN,
+ 'last',
+ metric_name,
+ {service=service,state='up'},
+ {},
+ '==',
+ haproxy_backend_states[service].up,
+ 0,
+ nil,
+ nil,
+ string.format("All %s backends are down", service))
+ elseif haproxy_backend_states[service].down >= haproxy_backend_states[service].up then
+ state = consts.CRIT
+ afd.add_to_alarms(consts.CRIT,
+ 'last',
+ metric_name,
+ {service=service,state='down'},
+ {},
+ '>=',
+ haproxy_backend_states[service].down,
+ haproxy_backend_states[service].up,
+ nil,
+ nil,
+ string.format("More backends for %s are down than up", service))
+ elseif haproxy_backend_states[service].down > 0 then
+ state = consts.WARN
+ afd.add_to_alarms(consts.WARN,
+ 'last',
+ metric_name,
+ {service=service,state='down'},
+ {},
+ '>',
+ haproxy_backend_states[service].down,
+ 0,
+ nil,
+ nil,
+ string.format("At least one %s backend is down", service))
+ end
+
+ afd.inject_afd_service_metric(service,
+ state,
+ read_message('Fields[hostname]'),
+ 0,
+ 'backends')
+
+ -- reset the cache for this service
+ haproxy_backend_states[service] = {}
+
+ return 0
+end
diff --git a/heka/files/lua/filters/afd_workers.lua b/heka/files/lua/filters/afd_workers.lua
new file mode 100644
index 0000000..8eeb222
--- /dev/null
+++ b/heka/files/lua/filters/afd_workers.lua
@@ -0,0 +1,94 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'string'
+
+local afd = require 'afd'
+local consts = require 'gse_constants'
+
+local worker_states = {}
+
+-- emit AFD event metrics based on openstack_nova_services, openstack_cinder_services and openstack_neutron_agents metrics
+function process_message()
+ local metric_name = read_message('Fields[name]')
+ local service = string.format('%s-%s',
+ string.match(metric_name, 'openstack_([^_]+)'),
+ read_message('Fields[service]'))
+ local worker_key = string.format('%s.%s', metric_name, service)
+
+ if not worker_states[worker_key] then
+ worker_states[worker_key] = {}
+ end
+
+ local worker = worker_states[worker_key]
+ worker[read_message('Fields[state]')] = read_message('Fields[value]')
+
+ local state = consts.OKAY
+ if not(worker.up and worker.down) then
+ -- not enough data for now
+ return 0
+ end
+
+ if worker.up == 0 then
+ state = consts.DOWN
+ afd.add_to_alarms(consts.DOWN,
+ 'last',
+ metric_name,
+ {service=service,state='up'},
+ {},
+ '==',
+ worker.up,
+ 0,
+ nil,
+ nil,
+ string.format("All instances for the service %s are down or disabled", service))
+ elseif worker.down >= worker.up then
+ state = consts.CRIT
+ afd.add_to_alarms(consts.CRIT,
+ 'last',
+ metric_name,
+ {service=service,state='down'},
+ {},
+ '>=',
+ worker.down,
+ worker.up,
+ nil,
+ nil,
+ string.format("More instances of %s are down than up", service))
+ elseif worker.down > 0 then
+ state = consts.WARN
+ afd.add_to_alarms(consts.WARN,
+ 'last',
+ metric_name,
+ {service=service,state='down'},
+ {},
+ '>',
+ worker.down,
+ 0,
+ nil,
+ nil,
+ string.format("At least one %s instance is down", service))
+ end
+
+ afd.inject_afd_service_metric(service,
+ state,
+ read_message('Fields[hostname]'),
+ 0,
+ 'workers')
+
+ -- reset the cache for this worker
+ worker_states[worker_key] = {}
+
+ return 0
+end
diff --git a/heka/files/lua/filters/gse_cluster_filter.lua b/heka/files/lua/filters/gse_cluster_filter.lua
new file mode 100644
index 0000000..6c8415f
--- /dev/null
+++ b/heka/files/lua/filters/gse_cluster_filter.lua
@@ -0,0 +1,139 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local cjson = require 'cjson'
+
+local afd = require 'afd'
+local gse = require 'gse'
+local lma = require 'lma_utils'
+
+local output_message_type = read_config('output_message_type') or error('output_message_type must be specified!')
+local cluster_field = read_config('cluster_field')
+local member_field = read_config('member_field') or error('member_field must be specified!')
+local output_metric_name = read_config('output_metric_name') or error('output_metric_name must be specified!')
+local source = read_config('source') or error('source must be specified!')
+local topology_file = read_config('topology_file') or error('topology_file must be specified!')
+local policies_file = read_config('policies_file') or error('policies_file must be specified!')
+local interval = (read_config('interval') or error('interval must be specified!')) + 0
+local interval_in_ns = interval * 1e9
+local max_inject = (read_config('max_inject') or 10) + 0
+local warm_up_period = ((read_config('warm_up_period') or 0) + 0) * 1e9
+local activate_alerting = read_config('activate_alerting') or true
+
+local is_active = false
+local first_tick
+local last_tick = 0
+local last_index = nil
+
+local topology = require(topology_file)
+local policies = require(policies_file)
+
+for cluster_name, attributes in pairs(topology.clusters) do
+ local policy = policies.find(attributes.policy)
+ if not policy then
+ error('Cannot find ' .. attributes.policy .. ' policy!')
+ end
+ gse.add_cluster(cluster_name, attributes.members, attributes.hints, attributes.group_by, policy)
+end
+
+function process_message()
+ local name = read_message('Fields[name]')
+ local hostname = read_message('Fields[hostname]')
+ if name and name == 'pacemaker_local_resource_active' and read_message("Fields[resource]") == 'vip__management' then
+ -- Skip pacemaker_local_resource_active metrics that don't
+ -- concern the local node
+ if read_message('Hostname') == hostname then
+ if read_message('Fields[value]') == 1 then
+ is_active = true
+ else
+ is_active = false
+ end
+ end
+ return 0
+ end
+
+ local member_id = afd.get_entity_name(member_field)
+ if not member_id then
+ return -1, "Cannot find entity's name in the AFD/GSE message"
+ end
+
+ local status = afd.get_status()
+ if not status then
+ return -1, "Cannot find status in the AFD/GSE message"
+ end
+
+ local alarms = afd.extract_alarms()
+ if not alarms then
+ return -1, "Cannot find alarms in the AFD/GSE message"
+ end
+
+ local cluster_ids
+ if cluster_field then
+ local cluster_id = afd.get_entity_name(cluster_field)
+ if not cluster_id then
+ return -1, "Cannot find the cluster's name in the AFD/GSE message"
+ elseif not gse.cluster_exists(cluster_id) then
+ -- Just ignore AFD/GSE messages which aren't part of a cluster's definition
+ return 0
+ end
+ cluster_ids = { cluster_id }
+ else
+ cluster_ids = gse.find_cluster_memberships(member_id)
+ end
+
+ -- update all clusters that depend on this entity
+ for _, cluster_id in ipairs(cluster_ids) do
+ gse.set_member_status(cluster_id, member_id, status, alarms, hostname)
+ end
+ return 0
+end
+
+function timer_event(ns)
+ if not is_active then
+ -- not running as the aggregator
+ return
+ elseif not first_tick then
+ first_tick = ns
+ return
+ elseif ns - first_tick <= warm_up_period then
+ -- not started for a long enough period
+ return
+ elseif last_index == nil and (ns - last_tick) < interval_in_ns then
+ -- nothing to send it
+ return
+ end
+ last_tick = ns
+
+ local injected = 0
+ for i, cluster_name in ipairs(gse.get_ordered_clusters()) do
+ if last_index == nil or i > last_index then
+ gse.inject_cluster_metric(
+ output_message_type,
+ cluster_name,
+ output_metric_name,
+ interval,
+ source,
+ activate_alerting
+ )
+ last_index = i
+ injected = injected + 1
+
+ if injected >= max_inject then
+ return
+ end
+ end
+ end
+
+ last_index = nil
+end
diff --git a/heka/files/lua/filters/hdd_errors_counter.lua b/heka/files/lua/filters/hdd_errors_counter.lua
new file mode 100644
index 0000000..66980bc
--- /dev/null
+++ b/heka/files/lua/filters/hdd_errors_counter.lua
@@ -0,0 +1,92 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'math'
+require 'os'
+require 'string'
+local utils = require 'lma_utils'
+
+local hostname = read_config('hostname') or error('hostname must be specified')
+local patterns_config = read_config('patterns') or error('patterns must be specified')
+local patterns = {}
+for pattern in string.gmatch(patterns_config, "/(%S+)/") do
+ local ok, msg = pcall(string.match, "", pattern)
+ if not ok then
+ error("Invalid pattern detected: '" .. pattern "' (" .. msg .. ")")
+ end
+ patterns[#patterns + 1] = pattern
+end
+-- Heka cannot guarantee that logs are processed in real-time so the
+-- grace_interval parameter allows to take into account log messages that are
+-- received in the current interval but emitted before it.
+local grace_interval = (read_config('grace_interval') or 0) + 0
+
+local error_counters = {}
+local enter_at
+local start_time = os.time()
+
+local function convert_to_sec(ns)
+ return math.floor(ns/1e9)
+end
+
+function process_message ()
+ -- timestamp values should be converted to seconds because log timestamps
+ -- have a precision of one second (or millisecond sometimes)
+ if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(enter_at or 0), start_time) then
+ -- skip the log message if it doesn't fall into the current interval
+ return 0
+ end
+
+ local payload = read_message('Payload')
+ if not payload then
+ return 0
+ end
+
+ -- example of kern.log lines:
+ -- <3>Jul 24 14:42:21 node-164 kernel: [505801.068621] Buffer I/O error on device sdd2, logical block 51184
+ -- <4>Aug 22 09:37:09 node-164 kernel: [767975.369264] XFS (sda): xfs_log_force: error 5 returned.
+ -- <1>May 17 23:07:11 sd-os1-stor05 kernel: [ 2119.105909] XFS (sdf3): metadata I/O error: block 0x68c2b7d8 ("xfs_trans_read_buf_map") error 121 numblks 8
+ for _, pattern in ipairs(patterns) do
+ local ok, device = pcall(string.match, payload, pattern)
+ if not ok then
+ return -1, device
+ end
+ if device then
+ error_counters[device] = (error_counters[device] or 0) + 1
+ break
+ end
+ end
+ return 0
+end
+
+function timer_event(ns)
+ -- Is error_counters empty?
+ if next(error_counters) == nil then
+ return 0
+ end
+
+ local delta_sec = (ns - (enter_at or 0)) / 1e9
+ for dev, value in pairs(error_counters) do
+ -- Don`t send values from first ticker interval
+ if enter_at ~= nil then
+ utils.add_to_bulk_metric("hdd_errors_rate", value / delta_sec, {device=dev})
+ end
+ error_counters[dev] = 0
+ end
+
+ enter_at = ns
+ utils.inject_bulk_metric(ns, hostname, 'hdd_errors_filter')
+
+ return 0
+end
\ No newline at end of file
diff --git a/heka/files/lua/filters/heka_monitoring.lua b/heka/files/lua/filters/heka_monitoring.lua
new file mode 100644
index 0000000..35efcad
--- /dev/null
+++ b/heka/files/lua/filters/heka_monitoring.lua
@@ -0,0 +1,85 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'cjson'
+require 'string'
+require 'math'
+local utils = require 'lma_utils'
+
+function process_table(typ, array)
+ -- NOTE: It has been written for "filters" and "decoders". If we need to
+ -- use it to collect metrics from other components of the Heka pipeline,
+ -- we need to ensure that JSON provides names and table with
+ -- ProcessMessageCount and ProcessMessageAvgDuration:
+ --
+ -- "decoder": {
+ -- ...
+ -- },
+ -- "Name": "a name",
+ -- "ProcessMessageCount" : {
+ -- "representation": "count",
+ -- "value": 12
+ -- },
+ -- "ProcessMessageAvgDuration" : {
+ -- "representation": "ns",
+ -- "value": 192913
+ -- },
+ -- { ... }}
+ --
+ for _, v in pairs(array) do
+ if type(v) == "table" then
+ -- strip off the '_decoder'/'_filter' suffix
+ local name = v['Name']:gsub("_" .. typ, "")
+
+ local tags = {
+ ['type'] = typ,
+ ['name'] = name,
+ }
+
+ utils.add_to_bulk_metric('hekad_msg_count', v.ProcessMessageCount.value, tags)
+ utils.add_to_bulk_metric('hekad_msg_avg_duration', v.ProcessMessageAvgDuration.value, tags)
+ if v.Memory then
+ utils.add_to_bulk_metric('hekad_memory', v.Memory.value, tags)
+ end
+ if v.TimerEventAvgDuration then
+ utils.add_to_bulk_metric('hekad_timer_event_avg_duration', v.TimerEventAvgDuration.value, tags)
+ end
+ if v.TimerEventSamples then
+ utils.add_to_bulk_metric('hekad_timer_event_count', v.TimerEventSamples.value, tags)
+ end
+ end
+ end
+end
+
+function singularize(str)
+ return str:gsub('s$', '')
+end
+
+function process_message ()
+ local ok, data = pcall(cjson.decode, read_message("Payload"))
+ if not ok then
+ return -1
+ end
+
+ local hostname = read_message("Hostname")
+ local ts = read_message("Timestamp")
+
+ for k, v in pairs(data) do
+ if k == "filters" or k == "decoders" then
+ process_table(singularize(k), v)
+ end
+ end
+
+ utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
+ return 0
+end
diff --git a/heka/files/lua/filters/http_metrics_aggregator.lua b/heka/files/lua/filters/http_metrics_aggregator.lua
new file mode 100644
index 0000000..1756a59
--- /dev/null
+++ b/heka/files/lua/filters/http_metrics_aggregator.lua
@@ -0,0 +1,185 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'string'
+require 'math'
+require 'os'
+local utils = require 'lma_utils'
+local tab = require 'table_utils'
+local table = require 'table'
+
+local hostname = read_config('hostname') or error('hostname must be specified')
+local interval = (read_config('interval') or error('interval must be specified')) + 0
+-- max_timer_inject is the maximum number of injected messages by timer_event()
+local max_timer_inject = (read_config('max_timer_inject') or 10) + 0
+-- bulk_size is the maximum number of metrics embedded by a bulk_metric within the Payload.
+-- The bulk_size depends on the hekad max_message_size (64 KB by default).
+-- At most, there are 45 metrics/service * 300B (per bucket) =~ 13KB * 4 services = 52KB for 225 metrics.
+-- With a max_message_size set to 256KB, it's possible to embed more than 800 metrics.
+local bulk_size = (read_config('bulk_size') or 225) + 0
+local percentile_thresh = (read_config('percentile') or 90) + 0
+-- grace_time is used to palliate the time precision difference
+-- (in second or millisecond for logs versus nanosecond for the ticker)
+-- and also to compensate the delay introduced by log parsing/decoding
+-- which leads to arrive too late in its interval.
+local grace_time = (read_config('grace_time') or 0) + 0
+
+local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
+
+local percentile_field_name = string.format('upper_%s', percentile_thresh)
+local msg_source = 'http_metric_filter'
+local last_tick = os.time() * 1e9
+local interval_in_ns = interval * 1e9
+
+local http_verbs = {
+ GET = true,
+ POST = true,
+ OPTIONS = true,
+ DELETE = true,
+ PUT = true,
+ HEAD = true,
+ TRACE = true,
+ CONNECT = true,
+ PATCH = true,
+}
+
+local metric_bucket = {
+ min = 0,
+ max = 0,
+ sum = 0,
+ count = 0,
+ times = {},
+ [percentile_field_name] = 0,
+ rate = 0,
+}
+local all_times = {}
+local num_metrics = 0
+
+function process_message ()
+ local severity = read_message("Fields[severity_label]")
+ local logger = read_message("Logger")
+ local timestamp = read_message("Timestamp")
+ local http_method = read_message("Fields[http_method]")
+ local http_status = read_message("Fields[http_status]")
+ local response_time = read_message("Fields[http_response_time]")
+
+ if timestamp < last_tick - grace_time then
+ -- drop silently old logs
+ return 0
+ end
+ if http_method == nil or http_status == nil or response_time == nil then
+ return -1
+ end
+
+ -- keep only the first 2 tokens because some services like Neutron report
+ -- themselves as 'openstack.<service>.server'
+ local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
+ if service == nil then
+ return -1, "Cannot match any service from " .. logger
+ end
+
+ -- coerce http_status to integer
+ http_status = http_status + 0
+ local http_status_family
+ if http_status >= 100 and http_status < 200 then
+ http_status_family = '1xx'
+ elseif http_status >= 200 and http_status < 300 then
+ http_status_family = '2xx'
+ elseif http_status >= 300 and http_status < 400 then
+ http_status_family = '3xx'
+ elseif http_status >= 400 and http_status < 500 then
+ http_status_family = '4xx'
+ elseif http_status >= 500 and http_status < 600 then
+ http_status_family = '5xx'
+ else
+ return -1, "Unsupported http_status " .. http_status
+ end
+
+ if not http_verbs[http_method] then
+ return -1, "Unsupported http_method " .. http_method
+ end
+
+ if not all_times[service] then
+ all_times[service] = {}
+ end
+ if not all_times[service][http_method] then
+ all_times[service][http_method] = {}
+ end
+ if not all_times[service][http_method][http_status_family] then
+ -- verify that the sandbox has enough capacity to emit all metrics
+ if num_metrics > (bulk_size * max_timer_inject) then
+ return -1, inject_reached_error
+ end
+ all_times[service][http_method][http_status_family] = tab.deepcopy(metric_bucket)
+ num_metrics = num_metrics + 1
+ end
+
+ local bucket = all_times[service][http_method][http_status_family]
+ bucket.times[#bucket.times + 1] = response_time
+ bucket.count = bucket.count + 1
+ bucket.sum = bucket.sum + response_time
+ if bucket.max < response_time then
+ bucket.max = response_time
+ end
+ if bucket.min == 0 or bucket.min > response_time then
+ bucket.min = response_time
+ end
+
+ return 0
+end
+
+function timer_event(ns)
+
+ last_tick = ns
+
+ local num = 0
+ local msg_injected = 0
+ for service, methods in pairs(all_times) do
+ for method, statuses in pairs(methods) do
+ for status, bucket in pairs(statuses) do
+ local metric_name = service .. '_http_response_times'
+ bucket.rate = bucket.count / interval
+ bucket[percentile_field_name] = bucket.max
+ if bucket.count > 1 then
+ table.sort(bucket.times)
+ local tmp = ((100 - percentile_thresh) / 100) * bucket.count
+ local idx = bucket.count - math.floor(tmp + .5)
+ if idx > 0 and bucket.times[idx] then
+ bucket[percentile_field_name] = bucket.times[idx]
+ end
+ end
+ bucket.times = nil
+ utils.add_to_bulk_metric(metric_name, bucket, {hostname=hostname, http_method=method, http_status=status})
+ all_times[service][method][status] = nil
+ num = num + 1
+ num_metrics = num_metrics - 1
+ if num >= bulk_size then
+ if msg_injected < max_timer_inject then
+ utils.inject_bulk_metric(ns, hostname, msg_source)
+ msg_injected = msg_injected + 1
+ num = 0
+ num_metrics = 0
+ end
+ end
+ end
+ all_times[service][method] = nil
+ end
+ all_times[service] = nil
+ end
+ if num > 0 then
+ utils.inject_bulk_metric(ns, hostname, msg_source)
+ num = 0
+ num_metrics = 0
+ end
+end
diff --git a/heka/files/lua/filters/influxdb_accumulator.lua b/heka/files/lua/filters/influxdb_accumulator.lua
new file mode 100644
index 0000000..470bdef
--- /dev/null
+++ b/heka/files/lua/filters/influxdb_accumulator.lua
@@ -0,0 +1,142 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'cjson'
+require 'os'
+require 'string'
+require 'table'
+local utils = require 'lma_utils'
+local Accumulator = require 'accumulator'
+local Influxdb = require 'influxdb'
+local l = require 'lpeg'
+l.locale(l)
+
+local flush_count = (read_config('flush_count') or 100) + 0
+local flush_interval = (read_config('flush_interval') or 5) + 0
+local time_precision = read_config("time_precision")
+local payload_name = read_config("payload_name") or "influxdb"
+local bulk_metric_type_matcher = read_config("bulk_metric_type_matcher") or "bulk_metric$"
+
+-- the tag_fields parameter is a list of tags separated by spaces
+local tag_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
+local tag_fields = tag_grammar:match(read_config("tag_fields") or "")
+
+function flush_cb(datapoints)
+ if #datapoints > 0 then
+ datapoints[#datapoints+1] = ''
+ utils.safe_inject_payload("txt", payload_name, table.concat(datapoints, "\n"))
+ end
+end
+local accumulator = Accumulator.new(flush_count, flush_interval, flush_cb)
+local encoder = Influxdb.new(time_precision)
+
+-- return a table containing the common tags from the message
+function get_common_tags()
+ local tags = {}
+ for _, t in ipairs(tag_fields) do
+ tags[t] = read_message(string.format('Fields[%s]', t))
+ end
+ return tags
+end
+
+-- process a single metric
+function process_single_metric()
+ local name = read_message("Fields[name]")
+
+ if not name then
+ return 'Fields[name] is missing'
+ end
+ local ok, value = utils.get_values_from_metric()
+ if not ok then
+ return value
+ end
+
+ -- collect tags from Fields[tag_fields]
+ local tags = get_common_tags()
+ local i = 0
+ while true do
+ local t = read_message("Fields[tag_fields]", 0, i)
+ if not t then
+ break
+ end
+ tags[t] = read_message(string.format('Fields[%s]', t))
+ i = i + 1
+ end
+
+ accumulator:append(
+ encoder:encode_datapoint(
+ read_message('Timestamp'),
+ name,
+ value,
+ tags))
+ return
+end
+
+function process_bulk_metric()
+ -- The payload of the message contains a list of datapoints.
+ --
+ -- Each point is formatted either like this:
+ --
+ -- {name='foo',
+ -- value=1,
+ -- tags={k1=v1,...}}
+ --
+ -- or like this for multi-value points:
+ --
+ -- {name='bar',
+ -- values={k1=v1, ..},
+ -- tags={k1=v1,...}
+ --
+ local ok, points = pcall(cjson.decode, read_message("Payload"))
+ if not ok then
+ return 'Invalid payload value for bulk metric'
+ end
+
+ local common_tags = get_common_tags()
+ local msg_timestamp = read_message('Timestamp')
+ for _, point in ipairs(points) do
+ point.tags = point.tags or {}
+ for k,v in pairs(common_tags) do
+ if point.tags[k] == nil then
+ point.tags[k] = v
+ end
+ end
+ accumulator:append(
+ encoder:encode_datapoint(
+ msg_timestamp,
+ point.name,
+ point.value or point.values,
+ point.tags))
+ end
+ return
+end
+
+function process_message()
+ local err_msg
+ local msg_type = read_message("Type")
+ if msg_type:match(bulk_metric_type_matcher) then
+ err_msg = process_bulk_metric()
+ else
+ err_msg = process_single_metric()
+ end
+
+ if err_msg then
+ return -1, err_msg
+ else
+ return 0
+ end
+end
+
+function timer_event(ns)
+ accumulator:flush(ns)
+end
diff --git a/heka/files/lua/filters/influxdb_annotation.lua b/heka/files/lua/filters/influxdb_annotation.lua
new file mode 100644
index 0000000..a1f33ee
--- /dev/null
+++ b/heka/files/lua/filters/influxdb_annotation.lua
@@ -0,0 +1,92 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'cjson'
+require 'string'
+require 'table'
+
+local utils = require 'lma_utils'
+local consts = require 'gse_constants'
+local afd = require 'afd'
+
+local measurement_name = read_config('measurement_name') or 'annotations'
+local html_break_line = '<br />'
+
+local statuses = {}
+
+-- Transform a GSE cluster metric into an annotation stored into InfluxDB
+function process_message ()
+ local previous
+ local text
+ local cluster = afd.get_entity_name('cluster_name')
+ local status = afd.get_status()
+ local alarms = afd.extract_alarms()
+
+ if not cluster or not status or not alarms then
+ return -1
+ end
+
+ if not statuses[cluster] then
+ statuses[cluster] = {}
+ end
+ previous = statuses[cluster]
+
+ text = table.concat(afd.alarms_for_human(alarms), html_break_line)
+
+ -- build the title
+ if not previous.status and status == consts.OKAY then
+ -- don't send an annotation when we detect a new cluster which is OKAY
+ return 0
+ elseif not previous.status then
+ title = string.format('General status is %s',
+ consts.status_label(status))
+ elseif previous.status ~= status then
+ title = string.format('General status %s -> %s',
+ consts.status_label(previous.status),
+ consts.status_label(status))
+-- TODO(pasquier-s): generate an annotation when the set of alarms has changed.
+-- the following code generated an annotation whenever at least one value
+-- associated to an alarm was changing. This led to way too many annotations
+-- with alarms monitoring the CPU usage for instance.
+-- elseif previous.text ~= text then
+-- title = string.format('General status remains %s',
+-- consts.status_label(status))
+ else
+ -- nothing has changed since the last message
+ return 0
+ end
+
+ local msg = {
+ Timestamp = read_message('Timestamp'),
+ Type = 'metric',
+ Severity = utils.label_to_severity_map.INFO,
+ Hostname = read_message('Hostname'),
+ Fields = {
+ name = measurement_name,
+ tag_fields = { 'cluster' },
+ value_fields = { 'title', 'tags', 'text' },
+ title = title,
+ tags = cluster,
+ text = text,
+ cluster = cluster,
+ source = 'influxdb_annotation'
+ }
+ }
+ utils.inject_tags(msg)
+
+ -- store the last status and alarm text for future messages
+ previous.status = status
+ previous.text = text
+
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/filters/instance_state.lua b/heka/files/lua/filters/instance_state.lua
new file mode 100644
index 0000000..9354267
--- /dev/null
+++ b/heka/files/lua/filters/instance_state.lua
@@ -0,0 +1,48 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local utils = require 'lma_utils'
+
+local msg = {
+ Type = "metric", -- will be prefixed by "heka.sandbox."
+ Timestamp = nil,
+ Severity = 6,
+}
+
+count = 0
+
+function process_message ()
+ local state = read_message("Fields[state]")
+ local old_state = read_message("Fields[old_state]")
+ if old_state ~= nil and state == old_state then
+ -- nothing to do
+ return 0
+ end
+ msg.Timestamp = read_message("Timestamp")
+ msg.Fields = {
+ source = read_message('Logger'),
+ name = "openstack_nova_instance_state",
+ -- preserve the original hostname in the Fields attribute because
+ -- sandboxed filters cannot override the Hostname attribute
+ hostname = read_message("Fields[hostname]"),
+ type = utils.metric_type['COUNTER'],
+ value = 1,
+ tenant_id = read_message("Fields[tenant_id]"),
+ user_id = read_message("Fields[user_id]"),
+ state = state,
+ tag_fields = { 'state', 'hostname' },
+ }
+ utils.inject_tags(msg)
+
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
new file mode 100644
index 0000000..7da74e9
--- /dev/null
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -0,0 +1,134 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'math'
+require 'os'
+require 'string'
+local utils = require 'lma_utils'
+
+local hostname = read_config('hostname') or error('hostname must be specified')
+local interval = (read_config('interval') or error('interval must be specified')) + 0
+-- Heka cannot guarantee that logs are processed in real-time so the
+-- grace_interval parameter allows to take into account log messages that are
+-- received in the current interval but emitted before it.
+local grace_interval = (read_config('grace_interval') or 0) + 0
+
+local discovered_services = {}
+local logs_counters = {}
+local last_timer_events = {}
+local current_service = 1
+local enter_at
+local interval_in_ns = interval * 1e9
+local start_time = os.time()
+local msg = {
+ Type = "metric",
+ Timestamp = nil,
+ Severity = 6,
+}
+
+function convert_to_sec(ns)
+ return math.floor(ns/1e9)
+end
+
+function process_message ()
+ local severity = read_message("Fields[severity_label]")
+ local logger = read_message("Logger")
+
+ local service = string.match(logger, "^openstack%.(%a+)$")
+ if service == nil then
+ return -1, "Cannot match any services from " .. logger
+ end
+
+ -- timestamp values should be converted to seconds because log timestamps
+ -- have a precision of one second (or millisecond sometimes)
+ if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(last_timer_events[service] or 0), start_time) then
+ -- skip the the log message if it doesn't fall into the current interval
+ return 0
+ end
+
+ if not logs_counters[service] then
+ -- a new service has been discovered
+ discovered_services[#discovered_services + 1] = service
+ logs_counters[service] = {}
+ for _, label in pairs(utils.severity_to_label_map) do
+ logs_counters[service][label] = 0
+ end
+ end
+
+ logs_counters[service][severity] = logs_counters[service][severity] + 1
+
+ return 0
+end
+
+function timer_event(ns)
+
+ -- We can only send a maximum of ten events per call.
+ -- So we send all metrics about one service and we will proceed with
+ -- the following services at the next ticker event.
+
+ if #discovered_services == 0 then
+ return 0
+ end
+
+ -- Initialize enter_at during the first call to timer_event
+ if not enter_at then
+ enter_at = ns
+ end
+
+ -- To be able to send a metric we need to check if we are within the
+ -- interval specified in the configuration and if we haven't already sent
+ -- all metrics.
+ if ns - enter_at < interval_in_ns and current_service <= #discovered_services then
+ local service_name = discovered_services[current_service]
+ local last_timer_event = last_timer_events[service_name] or 0
+ local delta_sec = (ns - last_timer_event) / 1e9
+
+ for level, val in pairs(logs_counters[service_name]) do
+
+ -- We don't send the first value
+ if last_timer_event ~= 0 and delta_sec ~= 0 then
+ msg.Timestamp = ns
+ msg.Fields = {
+ name = 'log_messages',
+ type = utils.metric_type['DERIVE'],
+ value = val / delta_sec,
+ service = service_name,
+ level = string.lower(level),
+ hostname = hostname,
+ tag_fields = {'service', 'level', 'hostname'},
+ }
+
+ utils.inject_tags(msg)
+ ok, err = utils.safe_inject_message(msg)
+ if ok ~= 0 then
+ return -1, err
+ end
+ end
+
+ -- reset the counter
+ logs_counters[service_name][level] = 0
+
+ end
+
+ last_timer_events[service_name] = ns
+ current_service = current_service + 1
+ end
+
+ if ns - enter_at >= interval_in_ns then
+ enter_at = ns
+ current_service = 1
+ end
+
+ return 0
+end
diff --git a/heka/files/lua/filters/resource_creation_time.lua b/heka/files/lua/filters/resource_creation_time.lua
new file mode 100644
index 0000000..0047ed3
--- /dev/null
+++ b/heka/files/lua/filters/resource_creation_time.lua
@@ -0,0 +1,82 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'math'
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+ Type = "metric", -- will be prefixed by "heka.sandbox."
+ Timestamp = nil,
+ Severity = 6,
+}
+
+local event_type_to_name = {
+ ["compute.instance.create.end"] = "openstack_nova_instance_creation_time",
+ ["volume.create.end"] = "openstack_cinder_volume_creation_time",
+ ["volume.attach.end"] = "openstack_cinder_volume_attachment_time",
+}
+
+function process_message ()
+ local metric_name = event_type_to_name[read_message("Fields[event_type]")]
+ if not metric_name then
+ return -1
+ end
+
+ local started_at, completed_at
+
+ if metric_name == "openstack_cinder_volume_attachment_time" then
+ --[[ To compute the metric we need fields that are not available
+ directly in the Heka message. So we need to decode the message,
+ check if it is a full notification or not and extract the needed
+ values. ]]--
+ local data = read_message("Payload")
+ local ok, notif = pcall(cjson.decode, data)
+ if not ok then
+ return -1
+ end
+
+ notif = notif.payload or notif
+ local t = unpack(notif['volume_attachment'])
+ started_at = t.created_at or ''
+ completed_at = t.attach_time or ''
+ else
+ started_at = read_message("Fields[created_at]") or ''
+ completed_at = read_message("Fields[launched_at]") or ''
+ end
+
+ started_at = patt.Timestamp:match(started_at)
+ completed_at = patt.Timestamp:match(completed_at)
+ if started_at == nil or completed_at == nil or started_at == 0 or completed_at == 0 or started_at > completed_at then
+ return -1
+ end
+
+ msg.Timestamp = read_message("Timestamp")
+ msg.Fields = {
+ source = read_message('Logger'),
+ name = metric_name,
+ -- preserve the original hostname in the Fields attribute because
+ -- sandboxed filters cannot override the Hostname attribute
+ hostname = read_message("Fields[hostname]"),
+ type = utils.metric_type['GAUGE'],
+ -- Having a millisecond precision for creation time is good enough given
+ -- that the started_at field has only a 1-second precision.
+ value = {value = math.floor((completed_at - started_at)/1e6 + 0.5) / 1e3, representation = 's'},
+ tenant_id = read_message("Fields[tenant_id]"),
+ user_id = read_message("Fields[user_id]"),
+ tag_fields = {'hostname'},
+ }
+ utils.inject_tags(msg)
+
+ return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/filters/watchdog.lua b/heka/files/lua/filters/watchdog.lua
new file mode 100644
index 0000000..0399d5c
--- /dev/null
+++ b/heka/files/lua/filters/watchdog.lua
@@ -0,0 +1,24 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'math'
+
+local payload_name = read_config('payload_name') or error('payload_name is required')
+local payload = read_config('payload')
+
+-- Very simple filter that emits a fixed message or the current timestamp (in
+-- second) every ticker interval. It can be used to check the liveness of the
+-- Heka service.
+function timer_event(ns)
+ inject_payload("txt", payload_name, payload or math.floor(ns / 1e9))
+end
diff --git a/heka/files/lua/outputs/lastfile.lua b/heka/files/lua/outputs/lastfile.lua
new file mode 100644
index 0000000..28335a5
--- /dev/null
+++ b/heka/files/lua/outputs/lastfile.lua
@@ -0,0 +1,27 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "io"
+
+local path = read_config('path') or error('path required')
+local field = read_config('field') or 'Payload'
+
+-- Very simple output sandbox that writes the value of one of the message's
+-- fields ('Payload' by default) to a file.
+function process_message()
+ local fh = io.open(path, "w")
+ io.output(fh)
+ io.write(read_message(field))
+ io.close()
+ return 0
+end
diff --git a/heka/files/output/dashboard.toml b/heka/files/output/dashboard.toml
deleted file mode 100644
index 72c74c1..0000000
--- a/heka/files/output/dashboard.toml
+++ /dev/null
@@ -1,5 +0,0 @@
-[DashboardOutput]
-{%- if values.dashboard_address is defined %}
-address = "{{ values.dashboard_address }}:{{ values.dashboard_port }}"
-{% endif %}
-ticker_interval = {{ values.ticker_interval }}
diff --git a/heka/files/output/logoutput.toml b/heka/files/output/logoutput.toml
deleted file mode 100644
index 0df4f54..0000000
--- a/heka/files/output/logoutput.toml
+++ /dev/null
@@ -1,3 +0,0 @@
-[LogOutput]
-encoder = "{{ values.encoder }}"
-message_matcher = "{{ values.message_matcher }}"
diff --git a/heka/files/service_wrapper b/heka/files/service_wrapper
new file mode 100644
index 0000000..f3532f1
--- /dev/null
+++ b/heka/files/service_wrapper
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+HEKAD="/usr/bin/hekad"
+
+exec $HEKAD -config=/etc/{{ service_name }}
diff --git a/heka/files/toml/decoder/multidecoder.toml b/heka/files/toml/decoder/multidecoder.toml
new file mode 100644
index 0000000..3ac53c9
--- /dev/null
+++ b/heka/files/toml/decoder/multidecoder.toml
@@ -0,0 +1,5 @@
+[{{ decoder_name }}_decoder]
+type = "MultiDecoder"
+subs = {{ decoder.subs }}
+cascade_strategy = "{{ decoder.cascade_strategy }}"
+log_sub_errors = {{ decoder.log_sub_errors }}
diff --git a/heka/files/toml/decoder/payloadregex.toml b/heka/files/toml/decoder/payloadregex.toml
new file mode 100644
index 0000000..3e791bd
--- /dev/null
+++ b/heka/files/toml/decoder/payloadregex.toml
@@ -0,0 +1,4 @@
+[{{ decoder_name }}_decoder]
+type = "PayloadRegexDecoder"
+match_regex = "{{ decoder.match_regex }}"
+timestamp_layout = "{{ decoder.timestamp_layout }}"
diff --git a/heka/files/toml/decoder/protobuf.toml b/heka/files/toml/decoder/protobuf.toml
new file mode 100644
index 0000000..9c5ccac
--- /dev/null
+++ b/heka/files/toml/decoder/protobuf.toml
@@ -0,0 +1,2 @@
+[{{ decoder_name }}_decoder]
+type = "ProtobufDecoder"
diff --git a/heka/files/toml/decoder/sandbox.toml b/heka/files/toml/decoder/sandbox.toml
new file mode 100644
index 0000000..897f4be
--- /dev/null
+++ b/heka/files/toml/decoder/sandbox.toml
@@ -0,0 +1,25 @@
+[{{ decoder_name }}_decoder]
+type = "SandboxDecoder"
+filename = "{{ decoder.module_file }}"
+{%- if decoder.module_directory is defined %}
+module_directory = "{{ decoder.module_dir }}"
+{%- endif %}
+{%- if decoder.memory_limit is defined %}
+memory_limit = "{{ decoder.memory_limit }}"
+{%- endif %}
+{%- if decoder.preserve_data is defined %}
+preserve_data = "{{ decoder.preserve_data }}"
+{%- endif %}
+{%- if decoder.instruction_limit is defined %}
+instruction_limit = "{{ decoder.instruction_limit }}"
+{%- endif %}
+{%- if decoder.output_limit is defined %}
+output_limit = "{{ decoder.output_limit }}"
+{%- endif %}
+
+{%- if decoder.config is defined %}
+[{{ decoder_name }}_decoder.config]
+{%- for config_param, config_value in decoder.config.iteritems() %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% else %}{{ config_value }}{% endif %}
+{%- endfor %}
+{%- endif %}
diff --git a/heka/files/toml/encoder/elasticsearch.toml b/heka/files/toml/encoder/elasticsearch.toml
new file mode 100644
index 0000000..4765798
--- /dev/null
+++ b/heka/files/toml/encoder/elasticsearch.toml
@@ -0,0 +1,5 @@
+[{{ name }}_encoder]
+type = "ESJsonEncoder"
+index = "{{ values.index }}"
+es_index_from_timestamp = true
+fields = [ "DynamicFields", "Hostname", "Logger", "Payload", "Pid", "Severity", "Timestamp", "Type" ]
diff --git a/heka/files/encoder/es-json.toml b/heka/files/toml/encoder/es-json.toml
similarity index 100%
rename from heka/files/encoder/es-json.toml
rename to heka/files/toml/encoder/es-json.toml
diff --git a/heka/files/encoder/es-payload.toml b/heka/files/toml/encoder/es-payload.toml
similarity index 100%
rename from heka/files/encoder/es-payload.toml
rename to heka/files/toml/encoder/es-payload.toml
diff --git a/heka/files/encoder/protobuf.toml b/heka/files/toml/encoder/protobuf.toml
similarity index 100%
rename from heka/files/encoder/protobuf.toml
rename to heka/files/toml/encoder/protobuf.toml
diff --git a/heka/files/encoder/RstEncoder.toml b/heka/files/toml/encoder/rst.toml
similarity index 100%
rename from heka/files/encoder/RstEncoder.toml
rename to heka/files/toml/encoder/rst.toml
diff --git a/heka/files/toml/filter/sandbox.toml b/heka/files/toml/filter/sandbox.toml
new file mode 100644
index 0000000..76afba2
--- /dev/null
+++ b/heka/files/toml/filter/sandbox.toml
@@ -0,0 +1,25 @@
+[{{ filter_name }}_filter]
+type = "SandboxFilter"
+filename = "{{ filter.module_file }}"
+{%- if filter.module_directory is defined %}
+module_directory = "{{ filter.module_dir }}"
+{%- endif %}
+{%- if filter.message_matcher is defined %}
+message_matcher = "{{ filter.message_matcher }}"
+{%- endif %}
+{%- if filter.preserve_data is defined %}
+preserve_data = "{{ filter.preserve_data }}"
+{%- endif %}
+{%- if filter.ticker_interval is defined %}
+ticker_interval = "{{ filter.ticker_interval }}"
+{%- endif %}
+{%- if filter.hostname is defined %}
+hostname = "{{ filter.hostname }}"
+{%- endif %}
+
+{%- if filter.config is defined %}
+[{{ filter_name }}_decoder.config]
+{%- for config_param, config_value in filter.config.iteritems() %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% else %}{{ config_value }}{% endif %}
+{%- endfor %}
+{%- endif %}
diff --git a/heka/files/toml/global.toml b/heka/files/toml/global.toml
new file mode 100644
index 0000000..40aaf3b
--- /dev/null
+++ b/heka/files/toml/global.toml
@@ -0,0 +1,12 @@
+[hekad]
+
+{%- set workers = grains.num_cpus + 1 %}
+maxprocs={{ workers }}
+base_dir="/var/cache/{{ service_name }}"
+
+hostname="{{ grains.fqdn.split('.')[0] }}"
+
+max_message_size = 262144
+max_process_inject = 1
+max_timer_inject = 10
+poolsize = 100
diff --git a/heka/files/toml/input/amqp.toml b/heka/files/toml/input/amqp.toml
new file mode 100644
index 0000000..111ad7d
--- /dev/null
+++ b/heka/files/toml/input/amqp.toml
@@ -0,0 +1,39 @@
+[input_{{ input_name }}]
+type = "AMQPInput"
+url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}/{{ input.vhost }}"
+exchange = "{{ input.exchange }}"
+exchange_type = "{{ input.exchange_type }}"
+
+{% if input.prefetch_count is defined -%}
+prefetch_count = {{ input.prefetch_count }}
+{% endif %}
+{%- if input.exchange_durability is defined -%}
+exchange_durability = "{{ input.exchange_durability }}"
+{% endif %}
+{%- if input.exchange_auto_delete is defined -%}
+exchange_auto_delete = "{{ input.exchange_auto_delete }}"
+{% endif %}
+{%- if input.queue_auto_delete is defined -%}
+queue_auto_delete = {{ input.queue_auto_delete }}
+{% endif %}
+{%- if input.queue is defined -%}
+queue = "{{ input.queue }}"
+{% endif %}
+{%- if input.routing_key is defined -%}
+routing_key = "{{ input.routing_key }}"
+{% endif %}
+decoder = "{{ input.decoder }}"
+splitter = "{{ input.splitter }}"
+
+{%- if input.ssl is defined and input.ssl.get('enabled', True) %}
+[input_{{ input_name }}.tls]
+cert_file = "{{ input.ssl.cert_file }}"
+key_file = "{{ input.ssl.key_file }}"
+{%- if input.ssl.ca_file is defined %}
+root_cafile = "{{ input.ssl.ca_file }}"
+{%- endif %}
+{%- endif %}
+
+{#-
+vim: syntax=jinja
+-#}
diff --git a/heka/files/toml/input/collectd.toml b/heka/files/toml/input/collectd.toml
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/heka/files/toml/input/collectd.toml
diff --git a/heka/files/toml/input/logstreamer.toml b/heka/files/toml/input/logstreamer.toml
new file mode 100644
index 0000000..791467e
--- /dev/null
+++ b/heka/files/toml/input/logstreamer.toml
@@ -0,0 +1,19 @@
+[{{ input_name }}_input]
+type = "LogstreamerInput"
+log_directory = "{{ input.log_directory }}"
+file_match = '{{ input.file_match }}'
+{%- if input.differentiator is defined %}
+differentiator = {{ input.differentiator }}
+{%- endif %}
+{%- if input.priority is defined %}
+priority = {{ input.priority }}
+{%- endif %}
+{%- if input.decoder is defined %}
+decoder = "{{ input.decoder }}"
+{%- endif %}
+{%- if input.splitter is defined %}
+splitter = '{{ input.splitter }}'
+{%- endif %}
+{%- if input.oldest_duration is defined %}
+oldest_duration = "{{ input.oldest_duration }}"
+{%- endif %}
diff --git a/heka/files/input/process.toml b/heka/files/toml/input/process.toml
similarity index 100%
rename from heka/files/input/process.toml
rename to heka/files/toml/input/process.toml
diff --git a/heka/files/output/amqp.toml b/heka/files/toml/output/amqp.toml
similarity index 90%
rename from heka/files/output/amqp.toml
rename to heka/files/toml/output/amqp.toml
index 5932b71..2c9bd7e 100644
--- a/heka/files/output/amqp.toml
+++ b/heka/files/toml/output/amqp.toml
@@ -1,4 +1,4 @@
-[AMQPOutput_{{ name }}]
+[{{ name }}_output]
type = "AMQPOutput"
url = "amqp{% if values.ssl is defined and values.ssl.get('enabled', True) %}s{% endif %}://{{ values.user }}:{{ values.password }}@{{ values.host }}/{{ values.vhost }}"
exchange = "{{ values.exchange }}"
@@ -7,13 +7,13 @@
use_framing = true
encoder = "{{ values.encoder }}"
-[AMQPOutput_{{ name }}.retries]
+[{{ name }}_output.retries]
max_delay = "{{ values.get('max_delay', '30s') }}"
delay = "{{ values.get('delay', '250ms') }}"
max_retries = {{ values.get('max_retries', '-1') }}
{%- if values.ssl is defined and values.ssl.get('enabled', True) %}
-[AMQPOutput_{{ name }}.tls]
+[{{ name }}_output.tls]
cert_file = "{{ values.ssl.cert_file }}"
key_file = "{{ values.ssl.key_file }}"
{%- if values.ssl.ca_file is defined %}
diff --git a/heka/files/toml/output/dashboard.toml b/heka/files/toml/output/dashboard.toml
new file mode 100644
index 0000000..fb63768
--- /dev/null
+++ b/heka/files/toml/output/dashboard.toml
@@ -0,0 +1,3 @@
+[DashboardOutput]
+address = "{{ values.host }}:{{ values.port }}"
+ticker_interval = {{ values.ticker_interval }}
diff --git a/heka/files/output/elasticsearch.toml b/heka/files/toml/output/elasticsearch.toml
similarity index 66%
rename from heka/files/output/elasticsearch.toml
rename to heka/files/toml/output/elasticsearch.toml
index 7d00888..0a55b52 100644
--- a/heka/files/output/elasticsearch.toml
+++ b/heka/files/toml/output/elasticsearch.toml
@@ -1,13 +1,13 @@
-[output_{{ name }}]
+[{{ name }}_output]
type = "ElasticSearchOutput"
message_matcher = "{{ values.message_matcher }}"
encoder = "{{ values.encoder }}"
server = "http://{{ values.host }}:{{ values.port }}"
flush_interval = {{ values.flush_interval }}
flush_count = {{ values.flush_count }}
+use_buffering = true
-[output_{{ name }}.buffering]
-max_file_size = 16777216 #16M
-max_buffer_size = 67108864 #64M
+[{{ name }}_output.buffering]
+max_buffer_size = 1073741824
+max_file_size = 134217728
full_action = "block"
-cursor_update_count = 100
diff --git a/heka/files/toml/output/log.toml b/heka/files/toml/output/log.toml
new file mode 100644
index 0000000..0d18530
--- /dev/null
+++ b/heka/files/toml/output/log.toml
@@ -0,0 +1,3 @@
+[{{ name }}_output]
+encoder = "LogOutput"
+message_matcher = "{{ values.message_matcher }}"
diff --git a/heka/files/toml/output/tcp.toml b/heka/files/toml/output/tcp.toml
new file mode 100644
index 0000000..11d5763
--- /dev/null
+++ b/heka/files/toml/output/tcp.toml
@@ -0,0 +1,11 @@
+[output_{{ name }}]
+type="TcpOutput"
+address = "{{ values.host }}:{{ values.port }}"
+encoder = "ProtobufEncoder"
+message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric' || Type == 'heka.sandbox.bulk_metric')"
+
+use_buffering = true
+[output_{{ name }}.buffering]
+max_buffer_size = 268435456
+max_file_size = 67108864
+full_action = "drop"
diff --git a/heka/files/toml/splitter/regex.toml b/heka/files/toml/splitter/regex.toml
new file mode 100644
index 0000000..1f4a6c9
--- /dev/null
+++ b/heka/files/toml/splitter/regex.toml
@@ -0,0 +1,4 @@
+[{{ splitter_name }}_splitter]
+type = "RegexSplitter"
+delimiter = "{{ splitter.delimiter }}"
+delimiter_eol = "{{ splitter.delimiter_eol }}"
diff --git a/heka/files/toml/splitter/token.toml b/heka/files/toml/splitter/token.toml
new file mode 100644
index 0000000..bbe5464
--- /dev/null
+++ b/heka/files/toml/splitter/token.toml
@@ -0,0 +1,3 @@
+[{{ splitter_name }}_splitter]
+type = "TokenSplitter"
+delimiter = "{{ splitter.delimiter }}"
diff --git a/heka/init.sls b/heka/init.sls
index 71d650a..0c4f1fa 100644
--- a/heka/init.sls
+++ b/heka/init.sls
@@ -1,5 +1,15 @@
+{%- if pillar.heka is defined %}
include:
-{% if pillar.heka.server is defined %}
-- heka.server
-{% endif %}
-
+{%- if pillar.heka.log_collector is defined %}
+- heka.log_collector
+{%- endif %}
+{%- if pillar.heka.metric_collector is defined %}
+- heka.metric_collector
+{%- endif %}
+{%- if pillar.heka.remote_collector is defined %}
+- heka.remote_collector
+{%- endif %}
+{%- if pillar.heka.aggregator is defined %}
+- heka.aggregator
+{%- endif %}
+{%- endif %}
diff --git a/heka/log_collector.sls b/heka/log_collector.sls
new file mode 100644
index 0000000..a50e6d2
--- /dev/null
+++ b/heka/log_collector.sls
@@ -0,0 +1,10 @@
+{%- if pillar.heka.log_collector is defined %}
+
+include:
+- heka._common
+
+{%- set service_name = "log_collector" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/map.jinja b/heka/map.jinja
index cab337d..c4e3848 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -3,7 +3,6 @@
pkgs:
- acl
- heka
- - heka-lua-scripts
user:
- heka
groups:
@@ -26,4 +25,3 @@
{%- endload %}
{%- set server = salt['grains.filter_by'](server_defaults, merge=salt['pillar.get']('heka:server')) %}
-
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
new file mode 100644
index 0000000..fbb0ba7
--- /dev/null
+++ b/heka/meta/heka.yml
@@ -0,0 +1,83 @@
+log_collector:
+ encoder:
+ elasticsearch:
+ engine: elasticsearch
+ index: "%{Type}-%{%Y.%m.%d}"
+ output:
+ elasticsearch:
+ engine: elasticsearch
+ message_matcher: "Type == 'log' || Type == 'notification'"
+ encoder: elasticsearch_encoder
+ host: mon
+ port: 9200
+ flush_interval: 5000
+ flush_count: 100
+ metric_collector:
+ engine: tcp
+ host: 127.0.0.1
+ port: 5567
+ log_dashboard:
+ engine: dashboard
+ host: 127.0.0.1
+ port: 4352
+metric_collector:
+ decoder:
+ heka_collectd:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/decoders/collectd.lua
+ module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+ config:
+ hostname: '{{ grains.fqdn.split('.')[0] }}'
+ swap_size: 4294967296
+ heka_http_check:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/decoders/noop.lua
+ module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+ config:
+ msg_type: lma.http-check
+ heka_metric:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/decoders/metric.lua
+ module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+ config:
+ deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter'
+ input: {}
+ filter:
+ heka_metric_collector:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/heka_monitoring.lua
+ module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+ preserve_data: false
+ message_matcher: "Type == 'heka.all-report'"
+ rabbitmq_service_status:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/afd.lua
+ module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+ preserve_data: false
+ message_matcher: "(Type == 'metric' || Type == 'heka.sandbox.metric') && (Fields[name] == 'rabbitmq_check')"
+ ticker_interval: 10
+ config:
+ hostname: '{{ grains.fqdn.split('.')[0] }}'
+ afd_type: 'service'
+ afd_file: 'lma_alarms_rabbitmq_service_check'
+ afd_cluster_name: 'rabbitmq-service'
+ afd_logical_name: 'check'
+ activate_alerting: true
+ enable_notification: false
+ output: {}
+ splitter: {}
+ encoder: {}
+remote_collector:
+ decoder: {}
+ input: {}
+ filter: {}
+ output: {}
+ splitter: {}
+ encoder: {}
+aggregator:
+ decoder: {}
+ input: {}
+ filter: {}
+ output: {}
+ splitter: {}
+ encoder: {}
diff --git a/heka/metric_collector.sls b/heka/metric_collector.sls
new file mode 100644
index 0000000..9684c1b
--- /dev/null
+++ b/heka/metric_collector.sls
@@ -0,0 +1,10 @@
+{%- if pillar.heka.metric_collector is defined %}
+
+include:
+- heka._common
+
+{%- set service_name = "metric_collector" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/remote_collector.sls b/heka/remote_collector.sls
new file mode 100644
index 0000000..961587d
--- /dev/null
+++ b/heka/remote_collector.sls
@@ -0,0 +1,10 @@
+{%- if pillar.heka.remote_collector is defined %}
+
+include:
+- heka._common
+
+{%- set service_name = "remote_collector" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/server.sls b/heka/server.sls
deleted file mode 100644
index c26c967..0000000
--- a/heka/server.sls
+++ /dev/null
@@ -1,201 +0,0 @@
-{%- from "heka/map.jinja" import server with context %}
-{%- if server.enabled %}
-
-heka_packages:
- pkg.latest:
- - names: {{ server.pkgs }}
-
-purge-heka-conf-dir:
- file.directory:
- - name: /etc/heka/conf.d/
- - clean: True
- - makedirs: True
- - require:
- - pkg: heka_packages
-
-heka_ssl:
- file.directory:
- - name: /etc/heka/ssl
- - user: root
- - group: heka
- - mode: 750
- - require:
- - pkg: heka_packages
- - user: heka_user
-
-/etc/heka/conf.d/00-hekad.toml:
- file.managed:
- - source: salt://heka/files/00-hekad.toml
- - template: jinja
- - mode: 640
- - group: heka
- - require:
- - pkg: heka_packages
- - file: purge-heka-conf-dir
- - user: heka_user
-
-{%- if grains.os_family == 'RedHat' %}
-/etc/systemd/system/heka.service:
- file.managed:
- - source: salt://heka/files/heka.service
- - require:
- - file: /etc/heka/conf.d/00-hekad.toml
-
-/var/cache/hekad:
- file.directory:
- - user: heka
- - require:
- - user: heka_user
-{%- endif %}
-
-heka_acl_log:
- cmd.run:
- - name: "setfacl -R -m g:adm:rx /var/log; setfacl -R -d -m g:adm:rx /var/log"
- - unless: "getfacl /var/log/|grep default:group:adm"
-
-heka_service:
- service.running:
- - enable: true
- - name: heka
- - watch:
- - file: /etc/heka/conf.d/00-hekad.toml
- - require:
- - user: heka_user
-
-heka_user:
- user.present:
- - name: heka
- - system: true
- - shell: /bin/nologin
- - groups: {{ server.groups }}
- - require:
- - pkg: heka_packages
-
-{%- for name,values in server.input.iteritems() %}
-
-/etc/heka/conf.d/15-input-{{ name }}-{{ values['engine'] }}.toml:
- file.managed:
- - source: salt://heka/files/input/{{ values['engine'] }}.toml
- - template: jinja
- - mode: 640
- - group: heka
- - require:
- - file: /etc/heka/conf.d/00-hekad.toml
- - watch_in:
- - service: heka_service
- - defaults:
- name: {{ name }}
- values: {{ values }}
-
-{%- endfor %}
-
-{%- for name,values in server.output.iteritems() %}
-{%- if values.enabled %}
-/etc/heka/conf.d/60-output-{{ name }}-{{ values['engine'] }}.toml:
- file.managed:
- - source: salt://heka/files/output/{{ values['engine'] }}.toml
- - template: jinja
- - mode: 640
- - group: heka
- - require:
- - file: /etc/heka/conf.d/00-hekad.toml
- - watch_in:
- - service: heka_service
- - defaults:
- name: {{ name }}
- values: {{ values }}
-
-{%- endif %}
-{%- endfor %}
-
-
-{%- for name,values in server.filter.iteritems() %}
-
-/etc/heka/conf.d/20-filter-{{ name }}-{{ values['engine'] }}.toml:
- file.managed:
- - source: salt://heka/files/filter/{{ values['engine'] }}.toml
- - template: jinja
- - mode: 640
- - group: heka
- - require:
- - file: /etc/heka/conf.d/00-hekad.toml
- - watch_in:
- - service: heka_service
- - defaults:
- name: {{ name }}
- values: {{ values }}
-
-{%- endfor %}
-
-{%- for name,values in server.splitter.iteritems() %}
-
-/etc/heka/conf.d/30-splitter-{{ name }}-{{ values['engine'] }}.toml:
- file.managed:
- - source: salt://heka/files/splitter/{{ values['engine'] }}.toml
- - template: jinja
- - mode: 640
- - group: heka
- - require:
- - file: /etc/heka/conf.d/00-hekad.toml
- - watch_in:
- - service: heka_service
- - defaults:
- name: {{ name }}
- values: {{ values }}
-
-{%- endfor %}
-
-{%- for name,values in server.encoder.iteritems() %}
-
-/etc/heka/conf.d/40-encoder-{{ name }}-{{ values['engine'] }}.toml:
- file.managed:
- - source: salt://heka/files/encoder/{{ values['engine'] }}.toml
- - template: jinja
- - mode: 640
- - group: heka
- - require:
- - file: /etc/heka/conf.d/00-hekad.toml
- - watch_in:
- - service: heka_service
- - defaults:
- name: {{ name }}
- values: {{ values }}
-
-{%- endfor %}
-
-{%- for name,values in server.decoder.iteritems() %}
-
-/etc/heka/conf.d/10-decoder-{{ name }}-{{ values['engine'] }}.toml:
- file.managed:
- - source: salt://heka/files/decoder/{{ values['engine'] }}.toml
- - template: jinja
- - mode: 640
- - group: heka
- - require:
- - file: /etc/heka/conf.d/00-hekad.toml
- - watch_in:
- - service: heka_service
- - defaults:
- name: {{ name }}
- values: {{ values }}
-
-{%- endfor %}
-
-{%- for service_name, service in pillar.items() %}
-{%- if service.get('_support', {}).get('heka', {}).get('enabled', False) %}
-
-/etc/heka/conf.d/99-{{ service_name }}.toml:
- file.managed:
- - source: salt://{{ service_name }}/files/heka.toml
- - template: jinja
- - mode: 640
- - group: heka
- - require:
- - file: /etc/heka/conf.d/00-hekad.toml
- - watch_in:
- - service: heka_service
-
-{%- endif %}
-{%- endfor %}
-
-{%- endif %}
diff --git a/metadata/service/aggregator/single.yml b/metadata/service/aggregator/single.yml
new file mode 100644
index 0000000..a44ae20
--- /dev/null
+++ b/metadata/service/aggregator/single.yml
@@ -0,0 +1,6 @@
+applications:
+- heka
+parameters:
+ heka:
+ aggregator:
+ enabled: true
diff --git a/metadata/service/log_collector/single.yml b/metadata/service/log_collector/single.yml
new file mode 100644
index 0000000..0406f0e
--- /dev/null
+++ b/metadata/service/log_collector/single.yml
@@ -0,0 +1,6 @@
+applications:
+- heka
+parameters:
+ heka:
+ log_collector:
+ enabled: true
diff --git a/metadata/service/metric_collector/single.yml b/metadata/service/metric_collector/single.yml
new file mode 100644
index 0000000..2dcd5e0
--- /dev/null
+++ b/metadata/service/metric_collector/single.yml
@@ -0,0 +1,6 @@
+applications:
+- heka
+parameters:
+ heka:
+ metric_collector:
+ enabled: true
diff --git a/metadata/service/remote_collector/single.yml b/metadata/service/remote_collector/single.yml
new file mode 100644
index 0000000..67bdf8d
--- /dev/null
+++ b/metadata/service/remote_collector/single.yml
@@ -0,0 +1,6 @@
+applications:
+- heka
+parameters:
+ heka:
+ remote_collector:
+ enabled: true
diff --git a/metadata/service/support.yml b/metadata/service/support.yml
index 8041310..4919327 100644
--- a/metadata/service/support.yml
+++ b/metadata/service/support.yml
@@ -4,7 +4,7 @@
collectd:
enabled: false
heka:
- enabled: false
+ enabled: true
sensu:
enabled: false
sphinx: