Stacklight integration
diff --git a/README.rst b/README.rst
index 90730cd..75e0ed0 100644
--- a/README.rst
+++ b/README.rst
@@ -3,98 +3,33 @@
 Heka Formula
 ============
 
-Heka is an open source stream processing software system developed by Mozilla. Heka is a Swiss Army Knife type tool for data processing
+Heka is an open source stream processing software system developed by Mozilla. Heka is a Swiss Army Knife type tool for data processing.
 
 Sample pillars
 ==============
 
-Basic log shipper streaming decoded rsyslog's logfiles using amqp broker as transport.
-From every message there is one amqp message and it's also logged to  heka's logfile in RST format.
+Log collector service
 
 .. code-block:: yaml
 
-
     heka:
-      server:
+      log_collector:
         enabled: true
-        input:
-          rsyslog-syslog:
-            engine: logstreamer
-            log_directory: /var/log
-            file_match: syslog\.?(?P<Index>\d+)?(.gz)?
-            decoder: RsyslogDecoder
-            priority: ["^Index"]
-          rsyslog-auth:
-            engine: logstreamer
-            log_directory: /var/log
-            file_match: auth\.log\.?(?P<Index>\d+)?(.gz)?
-            decoder: RsyslogDecoder
-            priority: ["^Index"]
-        decoder:
-          rsyslog:
-            engine: rsyslog
-            template: %TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n
-            hostname_keep: TRUE
-            tz: Europe/Prague
         output:
-          rabbitmq:
-            engine: amqp
+          elasticsearch01:
+            engine: elasticsearch
             host: localhost
-            user: guest
-            password: guest
-            vhost: /logs
-            exchange: logs
-            exchange_type: fanout
-            encoder: ProtobufEncoder
-            use_framing: true
-          heka-logfile:
-            engine: logoutput
-            encoder: RstEncoder
+            port: 9200
+            encoder: es_json
             message_matcher: TRUE
-        encoder:
-          heka-logfile:
-            engine: RstEncoder
 
-
-Heka acting as message router and dashboard.
-Messages are consumed from amqp and sent to elasticsearch server.
-
+Metric collector service
 
 .. code-block:: yaml
 
-
     heka:
-      server:
+      metric_collector:
         enabled: true
-        input:
-          rabbitmq:
-            engine: amqp
-            host: localhost
-            user: guest
-            password: guest
-            vhost: /logs
-            exchange: logs
-            exchange_type: fanout
-            decoder: ProtoBufDecoder
-            splitter: HekaFramingSplitter
-          rsyslog-syslog:
-            engine: logstreamer
-            log_directory: /var/log
-            file_match: syslog\.?(?P<Index>\d+)?(.gz)?
-            decoder: RsyslogDecoder
-            priority: ["^Index"]
-          rsyslog-auth:
-            engine: logstreamer
-            log_directory: /var/log
-            file_match: auth\.log\.?(?P<Index>\d+)?(.gz)?
-            decoder: RsyslogDecoder
-            priority: ["^Index"]
-        decoder:
-          rsyslog:
-            engine: rsyslog
-            template: %TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n
-            hostname_keep: TRUE
-            tz: Europe/Prague
         output:
           elasticsearch01:
             engine: elasticsearch
@@ -105,11 +40,25 @@
           dashboard01:
             engine: dashboard
             ticker_interval: 30
-        encoder:
-          es-json:
-            engine: es-json
+
+Aggregator service
+
+.. code-block:: yaml
+
+    heka:
+      aggregator:
+        enabled: true
+        output:
+          elasticsearch01:
+            engine: elasticsearch
+            host: localhost
+            port: 9200
+            encoder: es_json
             message_matcher: TRUE
-            index: logfile-%{%Y.%m.%d}
+          dashboard01:
+            engine: dashboard
+            ticker_interval: 30
+
 
 Read more
 =========
diff --git a/debian/changelog b/debian/changelog
index f828603..b604c53 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,5 +1,4 @@
-<<<<<<< HEAD
-=======
+
 salt-formula-heka (0.2) trusty; urgency=medium
 
   * First public release
diff --git a/heka/_common.sls b/heka/_common.sls
new file mode 100644
index 0000000..0ffad13
--- /dev/null
+++ b/heka/_common.sls
@@ -0,0 +1,29 @@
+{%- from "heka/map.jinja" import server with context %}
+
+heka_packages:
+  pkg.latest:
+  - names: {{ server.pkgs }}
+
+/usr/share/lma_collector:
+  file.recurse:
+  - source: salt://heka/files/lua
+
+heka_user:
+  user.present:
+  - name: heka
+  - system: true
+  - shell: /bin/false
+  - groups: {{ server.groups }}
+  - require:
+    - pkg: heka_packages
+
+heka_acl_log:
+  cmd.run:
+  - name: "setfacl -R -m g:adm:rx /var/log; setfacl -R -d -m g:adm:rx /var/log"
+  - unless: "getfacl /var/log/|grep default:group:adm"
+
+heka_service:
+  service.dead:
+  - name: heka
+
+{%- endif %}
diff --git a/heka/_service.sls b/heka/_service.sls
new file mode 100644
index 0000000..7085abe
--- /dev/null
+++ b/heka/_service.sls
@@ -0,0 +1,286 @@
+
+{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file %}{% endmacro %}
+
+{%- macro service_config(service_name) %}
+
+{%- set server = salt['pillar.get']('heka:'+service_name) %}
+
+{%- if server.enabled %}
+
+heka_{{ service_name }}_log_file:
+  file.managed:
+  - name: /var/log/{{ service_name }}.log
+  - user: heka
+  - mode: 644
+  - replace: False
+
+heka_{{ service_name }}_conf_dir:
+  file.directory:
+  - name: /etc/{{ service_name }}
+  - user: heka
+  - mode: 750
+  - makedirs: true
+
+heka_{{ service_name }}_conf_dir_clean:
+  file.directory:
+  - name: /etc/{{ service_name }}
+  - clean: true
+
+{%- if grains.get('init', None) == 'systemd' %}
+
+heka_{{ service_name }}_service_file:
+  file.managed:
+  - name: /etc/systemd/system/{{ service_name }}.service
+  - source: salt://heka/files/heka.service
+  - user: root
+  - mode: 644
+  - template: jinja
+
+{%- else %}
+
+heka_{{ service_name }}_service_file:
+  file.managed:
+  - name: /etc/init/{{ service_name }}.conf
+  - source: salt://heka/files/heka.service
+  - user: root
+  - mode: 644
+  - template: jinja
+
+heka_{{ service_name }}_service_wrapper:
+  file.managed:
+  - name: /usr/local/bin/{{ service_name }}
+  - source: salt://heka/files/service_wrapper
+  - user: root
+  - mode: 755
+  - template: jinja
+
+{%- endif %}
+
+
+{# Setup basic structure for all roles so updates can apply #}
+
+{%- set service_grains = {
+  'heka': {
+    'log_collector': {
+      'decoder': {},
+      'input': {},
+      'filter': {},
+      'splitter': {},
+      'encoder': {},
+      'output': {}
+    },
+    'metric_collector': {
+      'decoder': {},
+      'input': {},
+      'filter': {},
+      'splitter': {},
+      'encoder': {},
+      'output': {}
+    },
+    'remote_collector': {
+      'decoder': {},
+      'input': {},
+      'filter': {},
+      'splitter': {},
+      'encoder': {},
+      'output': {}
+    },
+    'aggregator': {
+      'decoder': {},
+      'input': {},
+      'filter': {},
+      'splitter': {},
+      'encoder': {},
+      'output': {}
+    }
+  }
+} %}
+
+
+{# Loading the other services' support metadata for local roles #}
+
+{%- if service_name in ['log_collector', 'metric_collector'] %}
+
+{%- for service_name, service in pillar.iteritems() %}
+{%- if service.get('_support', {}).get('heka', {}).get('enabled', False) %}
+
+{%- set grains_fragment_file = service_name+'/meta/heka.yml' %}
+{%- set grains_yaml = load_grains_file(grains_fragment_file)|load_yaml %}
+{%- do service_grains.heka.update(grains_yaml) %}
+
+{%- endif %}
+{%- endfor %}
+
+{%- endif %}
+
+
+{# Loading the other services' support metadata from salt-mine #}
+
+{%- if service_name in ['remote_collector', 'aggregator'] %}
+
+{%- for node_name, node_grains in salt['mine.get']('*', 'grains.items').iteritems() %}
+{%- if node_grains.heka is defined %}
+
+{%- do service_grains.heka.update(node_grains.heka) %}
+
+{%- endif %}
+{%- endfor %}
+
+{%- endif %}
+
+
+{# Overriding aggregated metadata from user-space pillar data #}
+
+{%- for service_grain_name, service_grain in service_grains.iteritems() %}
+{% if salt['pillar.get']('heka:'+service_grain_name) %}
+
+{%- for service_action_name, service_action in service_grain.iteritems() %}
+{%- if salt['pillar.get']('heka:'+service_grain_name).get(service_action_name, False) is mapping %}
+{%- do service_grains.heka.[service_grain_name].[service_action_name].update(salt['pillar.get']('heka:'+service_grain_name+':'+service_action_name)) %}
+{%- endif %}
+{%- endfor %}
+
+{%- endif %}
+{%- endfor %}
+
+
+/etc/{{ service_name }}/global.toml:
+  file.managed:
+  - source: salt://heka/files/toml/global.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - defaults:
+    service_name: {{ service_name }}
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+
+{%- for decoder_name, decoder in service_grains.heka.[service_name].decoder.iteritems() %}
+
+/etc/{{ service_name }}/10-decoder-{{ decoder_name }}-{{ decoder.engine }}.toml:
+  file.managed:
+  - source: salt://heka/files/toml/decoder/{{ decoder.engine }}.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      decoder_name: {{ decoder_name }}
+      decoder: {{ decoder|yaml }}
+
+{%- endfor %}
+
+{%- for input_name, input in service_grains.heka.[service_name].input.iteritems() %}
+
+/etc/{{ service_name }}/15-input-{{ input_name }}-{{ input.engine }}.toml:
+  file.managed:
+  - source: salt://heka/files/toml/input/{{ input.engine }}.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      input_name: {{ input_name }}
+      input: {{ input|yaml }}
+
+{%- endfor %}
+
+{%- for filter_name, filter in service_grains.heka.[service_name].filter.iteritems() %}
+
+/etc/{{ service_name }}/20-filter-{{ filter_name }}-{{ filter.engine }}.toml:
+  file.managed:
+  - source: salt://heka/files/toml/filter/{{ filter.engine }}.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      filter_name: {{ filter_name }}
+      filter: {{ filter|yaml }}
+
+{%- endfor %}
+
+{%- for splitter_name, splitter in service_grains.heka.[service_name].splitter.iteritems() %}
+
+/etc/{{ service_name }}/30-splitter-{{ splitter_name }}-{{ splitter.engine }}.toml:
+  file.managed:
+  - source: salt://heka/files/toml/splitter/{{ splitter.engine }}.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      splitter_name: {{ splitter_name }}
+      splitter: {{ splitter|yaml }}
+
+{%- endfor %}
+
+{%- for encoder_name, encoder in service_grains.heka.[service_name].encoder.iteritems() %}
+
+/etc/{{ service_name }}/40-encoder-{{ encoder_name }}-{{ encoder.engine }}.toml:
+  file.managed:
+  - source: salt://heka/files/toml/encoder/{{ encoder.engine }}.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      encoder_name: {{ encoder_name }}
+      encoder: {{ encoder|yaml }}
+
+{%- endfor %}
+
+{%- for output_name, output in service_grains.heka.[service_name].output.iteritems() %}
+
+/etc/{{ service_name }}/60-output-{{ output_name }}-{{ output.engine }}.toml:
+  file.managed:
+  - source: salt://heka/files/toml/output/{{ output.engine }}.toml
+  - template: jinja
+  - mode: 640
+  - group: heka
+  - require:
+    - file: heka_{{ service_name }}_conf_dir
+  - require_in:
+    - file: heka_{{ service_name }}_conf_dir_clean
+  - watch_in:
+    - service: heka_{{ service_name }}_service
+  - defaults:
+      output_name: {{ output_name }}
+      output: {{ output|yaml }}
+
+{%- endfor %}
+
+{%- endif %}
+
+{%- endmacro %}
+
+{%- service_config(service_name) %}
diff --git a/heka/aggregator.sls b/heka/aggregator.sls
new file mode 100644
index 0000000..c53d8e7
--- /dev/null
+++ b/heka/aggregator.sls
@@ -0,0 +1,10 @@
+{%- if pillar.heka.aggregator is defined %}
+
+include:
+- heka._common
+
+{%- set service_name = "aggregator" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/files/00-hekad.toml b/heka/files/00-hekad.toml
deleted file mode 100644
index 99cad61..0000000
--- a/heka/files/00-hekad.toml
+++ /dev/null
@@ -1,3 +0,0 @@
-[hekad]
-{%- set workers = grains.num_cpus + 1 %}
-maxprocs = {{ workers }}
diff --git a/heka/files/decoder/multidecoder.toml b/heka/files/decoder/multidecoder.toml
deleted file mode 100644
index fe73218..0000000
--- a/heka/files/decoder/multidecoder.toml
+++ /dev/null
@@ -1,6 +0,0 @@
-[multidecoder_{{ name }}]
-type = "MultiDecoder"
-subs = {{ values.subs }}
-cascade_strategy = "{{ values.cascade_strategy }}"
-log_sub_errors = {{ values.log_sub_errors }}
-
diff --git a/heka/files/decoder/payloadregex.toml b/heka/files/decoder/payloadregex.toml
deleted file mode 100644
index 0af80f7..0000000
--- a/heka/files/decoder/payloadregex.toml
+++ /dev/null
@@ -1,5 +0,0 @@
-[Payloadregex_{{ name }}]
-type = "PayloadRegexDecoder"
-match_regex = "{{ values.match_regex }}"
-timestamp_layout = "{{ values.timestamp_layout }}"
-
diff --git a/heka/files/decoder/protobuf.toml b/heka/files/decoder/protobuf.toml
deleted file mode 100644
index c856239..0000000
--- a/heka/files/decoder/protobuf.toml
+++ /dev/null
@@ -1,3 +0,0 @@
-[ProtoBufDecoder]
-type = "ProtobufDecoder"
-
diff --git a/heka/files/decoder/rsyslog.toml b/heka/files/decoder/rsyslog.toml
deleted file mode 100644
index 6abd40e..0000000
--- a/heka/files/decoder/rsyslog.toml
+++ /dev/null
@@ -1,9 +0,0 @@
-{%- from "heka/map.jinja" import server with context -%}
-[RsyslogDecoder]
-type = "SandboxDecoder"
-filename = "lua_decoders/rsyslog.lua"
-
-[RsyslogDecoder.config]
-type = "{{ values.type }}"
-template = '%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n'
-tz = "{{ server.decoder.rsyslog.tz }}"
diff --git a/heka/files/decoder/sandbox.toml b/heka/files/decoder/sandbox.toml
deleted file mode 100644
index 053b475..0000000
--- a/heka/files/decoder/sandbox.toml
+++ /dev/null
@@ -1,30 +0,0 @@
-[Sandbox_{{ name }}]
-type = "SandboxDecoder"
-filename = "{{ values.file_name }}"
-
-{% if values.module_directory is defined %}
-module_directory =  "{{ values.module_directory }}"
-{%- endif %}
-
-{% if values.memory_limit is defined %}
-memory_limit = "{{ values.memory_limit }}"
-{%- endif %}
-
-{% if values.preserve_data is defined %}
-preserve_data = "{{ values.preserve_data }}"
-{%- endif %}
-
-{% if values.instruction_limit is defined %}
-instruction_limit = "{{ values.instruction_limit }}"
-{%- endif %}
-
-{% if values.output_limit is defined %}
-output_limit = "{{ values.output_limit }}"
-{%- endif %}
-
-{% if values.config is defined %}
-config = [
-{% for k,v in values.config.iteritems() %}
-{{ k }} = {{ v }} ]
-{%- endfor %}
-{%- endif %}
diff --git a/heka/files/heka.service b/heka/files/heka.service
index b95fab6..b32bf8e 100644
--- a/heka/files/heka.service
+++ b/heka/files/heka.service
@@ -1,14 +1,17 @@
+{%- if grains.get('init', None) == 'systemd' %}
+
 [Unit]
-Description=heka - data collector and processor daemon
+Description=heka {{ service_name }} - data collector and processor daemon
 After=network.target auditd.service
-ConditionPathExists=!/etc/heka/hekad_not_to_be_run
+ConditionPathExists=!/etc/{{ service_name }}_not_to_be_run
 
 [Service]
-EnvironmentFile=-/etc/default/heka
+EnvironmentFile=-/etc/default/{{ service_name }}
 User=heka
 Group=heka
-ExecStart=/usr/bin/hekad -config=/etc/heka/conf.d/
-ExecReload=/bin/kill -HUP $MAINPID
+ExecStart=/usr/bin/hekad -config=/etc/{{ service_name }}
+# NOT SURE HEKA doesn't support reloading by signal
+# ExecReload=/bin/kill -HUP $MAINPID
 KillMode=process
 Restart=on-failure
 StandardError=inherit
@@ -16,3 +19,26 @@
 [Install]
 WantedBy=multi-user.target
 
+{%- else %}
+
+# heka {{ service_name }}
+
+description     "{{ service_name }}"
+
+start on runlevel [2345]
+stop on runlevel [!2345]
+
+respawn
+
+pre-start script
+    touch /var/log/{{ service_name }}.log
+    chown heka:heka /var/log/{{ service_name }}.log
+end script
+
+script
+    # https://bugs.launchpad.net/lma-toolchain/+bug/1543289
+    ulimit -n 102400
+    exec start-stop-daemon --start  --chuid heka --exec /usr/local/bin/{{ service_name }}_wrapper 2>>/var/log/{{ service_name }}.log
+end script
+
+{%- endif %}
\ No newline at end of file
diff --git a/heka/files/input/amqp.toml b/heka/files/input/amqp.toml
deleted file mode 100644
index c3dbd70..0000000
--- a/heka/files/input/amqp.toml
+++ /dev/null
@@ -1,39 +0,0 @@
-[input_{{ name }}]
-type = "AMQPInput"
-url = "amqp{% if values.ssl is defined and values.ssl.get('enabled', True) %}s{% endif %}://{{ values.user }}:{{ values.password }}@{{ values.host }}/{{ values.vhost }}"
-exchange = "{{ values.exchange }}"
-exchange_type = "{{ values.exchange_type }}"
-
-{% if values.prefetch_count is defined -%}
-prefetch_count = {{ values.prefetch_count }}
-{% endif %}
-{%- if values.exchange_durability is defined -%}
-exchange_durability = "{{ values.exchange_durability }}"
-{% endif %}
-{%- if values.exchange_auto_delete is defined -%}
-exchange_auto_delete = "{{ values.exchange_auto_delete }}"
-{% endif %}
-{%- if values.queue_auto_delete is defined -%}
-queue_auto_delete = {{ values.queue_auto_delete }}
-{% endif %}
-{%- if values.queue is defined -%}
-queue = "{{ values.queue }}"
-{% endif %}
-{%- if values.routing_key is defined -%}
-routing_key = "{{ values.routing_key }}"
-{% endif %}
-decoder = "{{ values.decoder }}"
-splitter = "{{ values.splitter }}"
-
-{%- if values.ssl is defined and values.ssl.get('enabled', True) %}
-[input_{{ name }}.tls]
-cert_file = "{{ values.ssl.cert_file }}"
-key_file = "{{ values.ssl.key_file }}"
-{%- if values.ssl.ca_file is defined %}
-root_cafile = "{{ values.ssl.ca_file }}"
-{%- endif %}
-{%- endif %}
-
-{#-
-vim: syntax=jinja
--#}
diff --git a/heka/files/input/logstreamer.toml b/heka/files/input/logstreamer.toml
deleted file mode 100644
index 8e3ac11..0000000
--- a/heka/files/input/logstreamer.toml
+++ /dev/null
@@ -1,19 +0,0 @@
-[logstreamer_{{ name }}]
-type = "LogstreamerInput"
-log_directory = "{{ values.log_directory }}"
-file_match = '{{ values.file_match }}'
-{% if values.priority is defined %}
-priority = {{ values.priority }}
-{% endif %}
-{% if values.decoder is defined %}
-decoder = "{{ values.decoder }}"
-{% endif %}
-{% if values.splitter is defined %}
-splitter = '{{ values.splitter }}'
-{% endif %}
-{% if values.differentiator is defined %}
-differentiator = {{ values.differentiator }}
-{% endif %}
-{% if values.oldest_duration is defined %}
-oldest_duration = "{{ values.oldest_duration }}"
-{% endif %}
diff --git a/heka/files/lua/common/accumulator.lua b/heka/files/lua/common/accumulator.lua
new file mode 100644
index 0000000..9b01a8f
--- /dev/null
+++ b/heka/files/lua/common/accumulator.lua
@@ -0,0 +1,63 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local time = os.time
+local string = string
+local table = table
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local tostring = tostring
+local type = type
+
+local Accumulator = {}
+Accumulator.__index = Accumulator
+
+setfenv(1, Accumulator) -- Remove external access to contain everything in the module
+
+-- Create a new Accumulator
+--
+-- flush_count: the maximum number of items to accumulate before flushing
+-- flush_interval: the maximum number of seconds to wait before flushing
+-- callback: the function to call back when flushing the accumulator, it will
+-- receive the table of accumulated items as parameter.
+function Accumulator.new(flush_count, flush_interval, callback)
+    local a = {}
+    setmetatable(a, Accumulator)
+    a.flush_count = flush_count
+    a.flush_interval = flush_interval
+    a.flush_cb = callback
+    a.last_flush = time() * 1e9
+    a.buffer = {}
+    return a
+end
+
+-- Flush the buffer if flush_count or flush_interval are met
+--
+-- ns: the current timestamp in nanosecond (optional)
+function Accumulator:flush(ns)
+    local now = ns or time() * 1e9
+    if #self.buffer > self.flush_count or now - self.last_flush > self.flush_interval then
+        self.flush_cb(self.buffer)
+        self.buffer = {}
+        self.last_flush = now
+    end
+end
+
+-- Append an item to the buffer and flush the buffer if needed
+function Accumulator:append(item)
+    self.buffer[#self.buffer+1] = item
+    self:flush()
+end
+
+return Accumulator
diff --git a/heka/files/lua/common/afd.lua b/heka/files/lua/common/afd.lua
new file mode 100644
index 0000000..9e864df
--- /dev/null
+++ b/heka/files/lua/common/afd.lua
@@ -0,0 +1,185 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local cjson = require 'cjson'
+local string = require 'string'
+
+local lma = require 'lma_utils'
+local consts = require 'gse_constants'
+
+local read_message = read_message
+local assert = assert
+local ipairs = ipairs
+local pairs = pairs
+local pcall = pcall
+local table = table
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+function get_entity_name(field)
+    return read_message(string.format('Fields[%s]', field))
+end
+
+function get_status()
+    return read_message('Fields[value]')
+end
+
+function extract_alarms()
+    local ok, payload = pcall(cjson.decode, read_message('Payload'))
+    if not ok or not payload.alarms then
+        return nil
+    end
+    return payload.alarms
+end
+
+-- return a human-readable message from an alarm table
+-- for instance: "CPU load too high (WARNING, rule='last(load_midterm)>=5', current=7)"
+function get_alarm_for_human(alarm)
+    local metric
+    local fields = {}
+    for name, value in pairs(alarm.fields) do
+        fields[#fields+1] = name .. '="' .. value .. '"'
+    end
+    if #fields > 0 then
+        metric = string.format('%s[%s]', alarm.metric, table.concat(fields, ','))
+    else
+        metric = alarm.metric
+    end
+
+    local host = ''
+    if alarm.hostname then
+        host = string.format(', host=%s', alarm.hostname)
+    end
+
+    return string.format(
+        "%s (%s, rule='%s(%s)%s%s', current=%.2f%s)",
+        alarm.message,
+        alarm.severity,
+        alarm['function'],
+        metric,
+        alarm.operator,
+        alarm.threshold,
+        alarm.value,
+        host
+    )
+end
+
+function alarms_for_human(alarms)
+    local alarm_messages = {}
+    local hint_messages = {}
+
+    for _, v in ipairs(alarms) do
+        if v.tags and v.tags.dependency_level and v.tags.dependency_level == 'hint' then
+            hint_messages[#hint_messages+1] = get_alarm_for_human(v)
+        else
+            alarm_messages[#alarm_messages+1] = get_alarm_for_human(v)
+        end
+    end
+
+    if #hint_messages > 0 then
+        alarm_messages[#alarm_messages+1] = "Other related alarms:"
+    end
+    for _, v in ipairs(hint_messages) do
+        alarm_messages[#alarm_messages+1] = v
+    end
+
+    return alarm_messages
+end
+
+local alarms = {}
+
+-- append an alarm to the list of pending alarms
+-- the list is sent when inject_afd_metric is called
+function add_to_alarms(status, fn, metric, fields, tags, operator, value, threshold, window, periods, message)
+    local severity = consts.status_label(status)
+    assert(severity)
+    alarms[#alarms+1] = {
+        severity=severity,
+        ['function']=fn,
+        metric=metric,
+        fields=fields or {},
+        tags=tags or {},
+        operator=operator,
+        value=value,
+        threshold=threshold,
+        window=window or 0,
+        periods=periods or 0,
+        message=message
+    }
+end
+
+function get_alarms()
+    return alarms
+end
+
+function reset_alarms()
+    alarms = {}
+end
+
+-- inject an AFD event into the Heka pipeline
+function inject_afd_metric(msg_type, msg_tag_name, msg_tag_value, metric_name,
+                           value, hostname, interval, source, to_alerting)
+    local payload
+
+    if #alarms > 0 then
+        payload = lma.safe_json_encode({alarms=alarms})
+        reset_alarms()
+        if not payload then
+            return
+        end
+    else
+        -- because cjson encodes empty tables as objects instead of arrays
+        payload = '{"alarms":[]}'
+    end
+
+    local no_alerting
+    if to_alerting ~= nil and to_alerting == false then
+        no_alerting = true
+    end
+
+    local msg = {
+        Type = msg_type,
+        Payload = payload,
+        Fields = {
+            name=metric_name,
+            value=value,
+            hostname=hostname,
+            interval=interval,
+            source=source,
+            tag_fields={msg_tag_name, 'source', 'hostname'},
+            no_alerting = no_alerting,
+        }
+    }
+    msg.Fields[msg_tag_name] = msg_tag_value,
+    lma.inject_tags(msg)
+    lma.safe_inject_message(msg)
+end
+
+-- inject an AFD service event into the Heka pipeline
+function inject_afd_service_metric(service, value, hostname, interval, source)
+    inject_afd_metric('afd_service_metric',
+                      'service',
+                      service,
+                      'service_status',
+                      value, hostname, interval, source)
+end
+
+MATCH = 1
+NO_MATCH = 2
+NO_DATA = 3
+MISSING_DATA = 4
+
+
+return M
diff --git a/heka/files/lua/common/afd_alarm.lua b/heka/files/lua/common/afd_alarm.lua
new file mode 100644
index 0000000..4fd660f
--- /dev/null
+++ b/heka/files/lua/common/afd_alarm.lua
@@ -0,0 +1,216 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local assert = assert
+local ipairs = ipairs
+local pairs = pairs
+local string = string
+local setmetatable = setmetatable
+
+-- LMA libs
+local utils = require 'lma_utils'
+local table_utils = require 'table_utils'
+local consts = require 'gse_constants'
+local afd = require 'afd'
+local Rule = require 'afd_rule'
+
+local SEVERITIES = {
+    warning = consts.WARN,
+    critical = consts.CRIT,
+    down = consts.DOWN,
+    unknown = consts.UNKW,
+    okay = consts.OKAY,
+}
+
+local Alarm = {}
+Alarm.__index = Alarm
+
+setfenv(1, Alarm) -- Remove external access to contain everything in the module
+
+function Alarm.new(alarm)
+    local a = {}
+    setmetatable(a, Alarm)
+    a._metrics_list = nil
+    a.name = alarm.name
+    a.description = alarm.description
+    if alarm.trigger.logical_operator then
+        a.logical_operator = string.lower(alarm.trigger.logical_operator)
+    else
+        a.logical_operator = 'or'
+    end
+    a.severity_str = string.upper(alarm.severity)
+    a.severity = SEVERITIES[string.lower(alarm.severity)]
+    assert(a.severity ~= nil)
+
+    a.skip_when_no_data = false
+    if alarm.no_data_policy then
+        if string.lower(alarm.no_data_policy) == 'skip' then
+            a.skip_when_no_data = true
+        else
+            a.no_data_severity = SEVERITIES[string.lower(alarm.no_data_policy)]
+        end
+    else
+        a.no_data_severity = consts.UNKW
+    end
+    assert(a.skip_when_no_data or a.no_data_severity ~= nil)
+
+    a.rules = {}
+    a.initial_wait = 0
+    for _, rule in ipairs(alarm.trigger.rules) do
+        local r = Rule.new(rule)
+        a.rules[#a.rules+1] = r
+        local wait = r.window * r.periods
+        if wait > a.initial_wait then
+            a.initial_wait = wait * 1e9
+        end
+    end
+    a.start_time_ns = 0
+
+    return a
+end
+
+-- return the Set of metrics used by the alarm
+function Alarm:get_metrics()
+    if not self._metrics_list then
+        self._metrics_list = {}
+        for _, rule in ipairs(self.rules) do
+            if not table_utils.item_find(rule.metric, metrics) then
+                self._metrics_list[#self._metrics_list+1] = rule.metric
+            end
+        end
+    end
+    return self._metrics_list
+end
+
+-- return a list of field names used for the metric
+-- (can have duplicate names)
+function Alarm:get_metric_fields(metric_name)
+    local fields = {}
+    for _, rule in ipairs(self.rules) do
+        if rule.metric == metric_name then
+            for k, _ in pairs(rule.fields) do
+                fields[#fields+1] = k
+            end
+            for _, g in ipairs(rule.group_by) do
+                fields[#fields+1] = g
+            end
+        end
+    end
+    return fields
+end
+
+function Alarm:has_metric(metric)
+    return table_utils.item_find(metric, self:get_metrics())
+end
+
+-- dispatch datapoint in datastores
+function Alarm:add_value(ts, metric, value, fields)
+    local data
+    for id, rule in pairs(self.rules) do
+        if rule.metric == metric then
+            rule:add_value(ts, value, fields)
+        end
+    end
+end
+
+-- return: state of alarm and a list of alarm details.
+--
+-- with alarm list when state != OKAY:
+-- {
+--  {
+--    value = <current value>,
+--    fields = <metric fields table>,
+--    message = <string>,
+--  },
+-- }
+function Alarm:evaluate(ns)
+    local state = consts.OKAY
+    local matches = 0
+    local all_alerts = {}
+    local function add_alarm(rule, value, message, fields)
+        all_alerts[#all_alerts+1] = {
+            severity = self.severity_str,
+            ['function'] = rule.fct,
+            metric = rule.metric,
+            operator = rule.relational_operator,
+            threshold = rule.threshold,
+            window = rule.window,
+            periods = rule.periods,
+            value = value,
+            fields = fields,
+            message = message
+        }
+    end
+    local one_unknown = false
+    local msg
+
+    for _, rule in ipairs(self.rules) do
+        local eval, context_list = rule:evaluate(ns)
+        if eval == afd.MATCH then
+            matches = matches + 1
+            msg = self.description
+        elseif eval == afd.MISSING_DATA then
+            msg = 'No datapoint have been received over the last ' .. rule.observation_window .. ' seconds'
+            one_unknown = true
+        elseif eval == afd.NO_DATA then
+            msg = 'No datapoint have been received ever'
+            one_unknown = true
+        end
+        for _, context in ipairs(context_list) do
+            add_alarm(rule, context.value, msg,
+                      context.fields)
+        end
+    end
+
+    if self.logical_operator == 'and' then
+        if one_unknown then
+            if self.skip_when_no_data then
+                state = nil
+            else
+                state = self.no_data_severity
+            end
+        elseif #self.rules == matches then
+            state = self.severity
+        end
+    elseif self.logical_operator == 'or' then
+        if matches > 0 then
+            state = self.severity
+        elseif one_unknown then
+            if self.skip_when_no_data then
+                state = nil
+            else
+                state = self.no_data_severity
+            end
+        end
+    end
+
+    if state == nil or state == consts.OKAY then
+        all_alerts = {}
+    end
+    return state, all_alerts
+end
+
+function Alarm:set_start_time(ns)
+    self.start_time_ns = ns
+end
+
+function Alarm:is_evaluation_time(ns)
+    local delta = ns - self.start_time_ns
+    if delta >= self.initial_wait then
+        return true
+    end
+    return false
+end
+
+return Alarm
diff --git a/heka/files/lua/common/afd_alarms.lua b/heka/files/lua/common/afd_alarms.lua
new file mode 100644
index 0000000..74dc328
--- /dev/null
+++ b/heka/files/lua/common/afd_alarms.lua
@@ -0,0 +1,120 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local pairs = pairs
+local ipairs = ipairs
+local lma = require 'lma_utils'
+local table_utils = require 'table_utils'
+local consts = require 'gse_constants'
+local gse_utils = require 'gse_utils'
+local Alarm = require 'afd_alarm'
+
+local all_alarms = {}
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+-- return a list of field names required for the metric
+function get_metric_fields(metric_name)
+    local fields = {}
+    for name, alarm in pairs(all_alarms) do
+        local mf = alarm:get_metric_fields(metric_name)
+        if mf then
+            for _, field in pairs(mf) do
+                if not table_utils.item_find(field, fields) then
+                    fields[#fields+1] = field
+                end
+            end
+        end
+    end
+    return fields
+end
+
+-- return list of alarms interested by a metric
+function get_interested_alarms(metric)
+    local interested_alarms = {}
+    for _, alarm in pairs(all_alarms) do
+        if alarm:has_metric(metric) then
+
+            interested_alarms[#interested_alarms+1] = alarm
+        end
+    end
+    return interested_alarms
+end
+
+function add_value(ts, metric, value, fields)
+    local interested_alarms = get_interested_alarms(metric)
+    for _, alarm in ipairs (interested_alarms) do
+        alarm:add_value(ts, metric, value, fields)
+    end
+end
+
+function reset_alarms()
+    all_alarms = {}
+end
+
+function evaluate(ns)
+    local global_state
+    local all_alerts = {}
+    for _, alarm in pairs(all_alarms) do
+        if alarm:is_evaluation_time(ns) then
+            local state, alerts = alarm:evaluate(ns)
+            global_state = gse_utils.max_status(state, global_state)
+            for _, a in ipairs(alerts) do
+                all_alerts[#all_alerts+1] = { state=state, alert=a }
+            end
+            -- raise the first triggered alarm except for OKAY/UNKW states
+            if global_state ~= consts.UNKW and global_state ~= consts.OKAY then
+                break
+            end
+        end
+    end
+    return global_state, all_alerts
+end
+
+function get_alarms()
+    return all_alarms
+end
+function get_alarm(alarm_name)
+    for _, a in ipairs(all_alarms) do
+        if a.name == alarm_name then
+            return a
+        end
+    end
+end
+
+function load_alarm(alarm)
+    local A = Alarm.new(alarm)
+    all_alarms[#all_alarms+1] = A
+end
+
+function load_alarms(alarms)
+    for _, alarm in ipairs(alarms) do
+        load_alarm(alarm)
+    end
+end
+
+local started = false
+function set_start_time(ns)
+    for _, alarm in ipairs(all_alarms) do
+        alarm:set_start_time(ns)
+    end
+    started = true
+end
+
+function is_started()
+    return started
+end
+
+return M
diff --git a/heka/files/lua/common/afd_rule.lua b/heka/files/lua/common/afd_rule.lua
new file mode 100644
index 0000000..680ea61
--- /dev/null
+++ b/heka/files/lua/common/afd_rule.lua
@@ -0,0 +1,324 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local anomaly = require('anomaly')
+local circular_buffer = require('circular_buffer')
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local math = require 'math'
+local string = string
+local table = table
+local assert = assert
+local type = type
+
+-- LMA libs
+local utils = require 'lma_utils'
+local table_utils = require 'table_utils'
+local consts = require 'gse_constants'
+local gse_utils = require 'gse_utils'
+local afd = require 'afd'
+local matching = require 'value_matching'
+
+local MIN_WINDOW = 10
+local MIN_PERIOD = 1
+local SECONDS_PER_ROW = 5
+
+local Rule = {}
+Rule.__index = Rule
+
+setfenv(1, Rule) -- Remove external access to contain everything in the module
+
+function Rule.new(rule)
+    local r = {}
+    setmetatable(r, Rule)
+
+    local win = MIN_WINDOW
+    if rule.window and rule.window + 0 > 0 then
+        win = rule.window + 0
+    end
+    r.window = win
+    local periods = MIN_PERIOD
+    if rule.periods and rule.periods + 0 > 0 then
+        periods = rule.periods + 0
+    end
+    r.periods = periods
+    r.relational_operator = rule.relational_operator
+    r.metric = rule.metric
+    r.fields = rule.fields or {}
+
+    -- build field matching
+    r.field_matchers = {}
+    for f, expression in pairs(r.fields) do
+        r.field_matchers[f] = matching.new(expression)
+    end
+
+    r.fct = rule['function']
+    r.threshold = rule.threshold + 0
+    r.value_index = rule.value or nil -- Can be nil
+
+    -- build unique rule id
+    local arr = {r.metric, r.fct, r.window, r.periods}
+    for f, v in table_utils.orderedPairs(r.fields or {}) do
+        arr[#arr+1] = string.format('(%s=%s)', f, v)
+    end
+    r.rule_id = table.concat(arr, '/')
+
+    r.group_by = rule.group_by or {}
+
+    if r.fct == 'roc' then
+        -- We use the name of the metric as the payload_name.
+        --
+        -- The ROC algorithm needs the following parameters:
+        --   - the number of intervals in the analysis window
+        --   - the number of intervals in the historical analysis window
+        --   - the threshold
+        --
+        -- r.window is an interval in seconds. So to get the number of
+        -- intervals we divide r.window by the number of seconds per row.
+        --
+        -- r.periods represents the number of windows that we want to use for
+        -- the historical analysis. As we tell the ROC algorithm to use all
+        -- remaining buffer for the historical window we need to allocate
+        -- r.periods * (r.window / SECONDS_PER_ROW) for the historical
+        -- analysis and 2 additional periods for the previous and current
+        -- analysis windows.
+        --
+        local cfg_str = string.format('roc("%s",1,%s,0,%s,false,false)',
+                                       r.metric,
+                                       math.ceil(r.window/SECONDS_PER_ROW),
+                                       r.threshold)
+        r.roc_cfg = anomaly.parse_config(cfg_str)
+        r.cbuf_size = math.ceil(r.window / SECONDS_PER_ROW) * (r.periods + 2)
+    else
+        r.roc_cfg = nil
+        r.cbuf_size = math.ceil(r.window * r.periods / SECONDS_PER_ROW)
+    end
+    r.ids_datastore = {}
+    r.datastore = {}
+    r.observation_window = math.ceil(r.window * r.periods)
+
+    return r
+end
+
+function Rule:get_datastore_id(fields)
+    if #self.group_by == 0 or fields == nil then
+        return self.rule_id
+    end
+
+    local arr = {}
+    arr[#arr + 1] = self.rule_id
+    for _, g in ipairs(self.group_by) do
+        arr[#arr + 1] = fields[g]
+    end
+    return table.concat(arr, '/')
+end
+
+function Rule:fields_accepted(fields)
+    if not fields then
+        fields = {}
+    end
+    local matched_fields = 0
+    local no_match_on_fields = true
+    for f, expression in pairs(self.field_matchers) do
+        no_match_on_fields = false
+        for k, v in pairs(fields) do
+            if k == f then
+                if expression:matches(v) then
+                    matched_fields = matched_fields + 1
+                else
+                    return false
+                end
+            end
+        end
+    end
+    return no_match_on_fields or matched_fields > 0
+end
+
+function Rule:get_circular_buffer()
+    local cbuf
+    if self.fct == 'avg' then
+        cbuf = circular_buffer.new(self.cbuf_size, 2, SECONDS_PER_ROW)
+        cbuf:set_header(1, self.metric, 'sum', 'sum')
+        cbuf:set_header(2, self.metric, 'count', 'sum')
+    elseif self.fct == 'min' or self.fct == 'max' then
+        cbuf = circular_buffer.new(self.cbuf_size, 1, SECONDS_PER_ROW)
+        cbuf:set_header(1, self.metric, self.fct)
+    else
+        cbuf = circular_buffer.new(self.cbuf_size, 1, SECONDS_PER_ROW)
+        cbuf:set_header(1, self.metric)
+    end
+    return cbuf
+end
+
+-- store datapoints in cbuf, create the cbuf if not exists.
+-- value can be a table where the index to choose is referenced by self.value_index
+function Rule:add_value(ts, value, fields)
+    if not self:fields_accepted(fields) then
+        return
+    end
+    if type(value) == 'table' then
+        value = value[self.value_index]
+    end
+    if value == nil then
+        return
+    end
+
+    local data
+    local uniq_field_id = self:get_datastore_id(fields)
+    if not self.datastore[uniq_field_id] then
+        self.datastore[uniq_field_id] = {
+            fields = self.fields,
+            cbuf = self:get_circular_buffer()
+        }
+        if #self.group_by > 0 then
+            self.datastore[uniq_field_id].fields = fields
+        end
+
+        self:add_datastore(uniq_field_id)
+    end
+    data = self.datastore[uniq_field_id]
+
+    if self.fct == 'avg' then
+        data.cbuf:add(ts, 1, value)
+        data.cbuf:add(ts, 2, 1)
+    else
+        data.cbuf:set(ts, 1, value)
+    end
+end
+
+function Rule:add_datastore(id)
+    if not table_utils.item_find(id, self.ids_datastore) then
+        self.ids_datastore[#self.ids_datastore+1] = id
+    end
+end
+
+function Rule:compare_threshold(value)
+    return gse_utils.compare_threshold(value, self.relational_operator, self.threshold)
+end
+
+local function isnumber(value)
+    return value ~= nil and not (value ~= value)
+end
+
+local available_functions = {last=true, avg=true, max=true, min=true, sum=true,
+                             variance=true, sd=true, diff=true, roc=true}
+
+-- evaluate the rule against datapoints
+-- return a list: match (bool or string), context ({value=v, fields=list of field table})
+--
+-- examples:
+--   true, { {value=100, fields={{queue='nova'}, {queue='neutron'}}, ..}
+--   false, { {value=10, fields={}}, ..}
+-- with 2 special cases:
+--   - never receive one datapoint
+--      'nodata', {}
+--   - no more datapoint received for a metric
+--      'missing', {value=-1, fields={}}
+-- There is a drawback with the 'missing' state and could leads to emit false positive
+-- state. For example when the monitored thing has been renamed/deleted,
+-- it's normal to don't receive datapoint anymore .. for example a filesystem.
+function Rule:evaluate(ns)
+    local fields = {}
+    local one_match, one_no_match, one_missing_data = false, false, false
+    for _, id in ipairs(self.ids_datastore) do
+        local data = self.datastore[id]
+        if data then
+            local cbuf_time = data.cbuf:current_time()
+            -- if we didn't receive datapoint within the observation window this means
+            -- we don't receive anymore data and cannot compute the rule.
+            if ns - cbuf_time > self.observation_window * 1e9 then
+                one_missing_data = true
+                fields[#fields+1] = {value = -1, fields = data.fields}
+            else
+                assert(available_functions[self.fct])
+                local result
+
+                if self.fct == 'roc' then
+                    local anomaly_detected, _ = anomaly.detect(ns, self.metric, data.cbuf, self.roc_cfg)
+                    if anomaly_detected then
+                        one_match = true
+                        fields[#fields+1] = {value=-1, fields=data.fields}
+                    else
+                        one_no_match = true
+                    end
+                elseif self.fct == 'avg' then
+                    local total
+                    total = data.cbuf:compute('sum', 1)
+                    local count = data.cbuf:compute('sum', 2)
+                    result = total/count
+                elseif self.fct == 'last' then
+                    local last
+                    local t = ns
+                    while (not isnumber(last)) and t >= ns - self.observation_window * 1e9 do
+                        last = data.cbuf:get(t, 1)
+                        t = t - SECONDS_PER_ROW * 1e9
+                    end
+                    if isnumber(last) then
+                        result = last
+                    else
+                        one_missing_data = true
+                        fields[#fields+1] = {value = -1, fields = data.fields}
+                    end
+                elseif self.fct == 'diff' then
+                    local first, last
+
+                    local t = ns
+                    while (not isnumber(last)) and t >= ns - self.observation_window * 1e9 do
+                        last = data.cbuf:get(t, 1)
+                        t = t - SECONDS_PER_ROW * 1e9
+                    end
+
+                    if isnumber(last) then
+                        t = ns - self.observation_window * 1e9
+                        while (not isnumber(first)) and t <= ns do
+                            first = data.cbuf:get(t, 1)
+                            t = t + SECONDS_PER_ROW * 1e9
+                        end
+                    end
+
+                    if not isnumber(last) or not isnumber(first) then
+                        one_missing_data = true
+                        fields[#fields+1] = {value = -1, fields = data.fields}
+                    else
+                        result = last - first
+                    end
+                else
+                    result = data.cbuf:compute(self.fct, 1)
+                end
+
+                if result then
+                    local m = self:compare_threshold(result)
+                    if m then
+                        one_match = true
+                        fields[#fields+1] = {value=result, fields=data.fields}
+                    else
+                        one_no_match = true
+                    end
+                end
+            end
+        end
+    end
+    if one_match then
+        return afd.MATCH, fields
+    elseif one_missing_data then
+        return afd.MISSING_DATA, fields
+    elseif one_no_match then
+        return afd.NO_MATCH, {}
+    else
+        return afd.NO_DATA, {{value=-1, fields=self.fields}}
+    end
+end
+
+return Rule
diff --git a/heka/files/lua/common/gse.lua b/heka/files/lua/common/gse.lua
new file mode 100644
index 0000000..16bb950
--- /dev/null
+++ b/heka/files/lua/common/gse.lua
@@ -0,0 +1,164 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local consts = require 'gse_constants'
+local string = require 'string'
+local table = require 'table'
+local GseCluster = require 'gse_cluster'
+local lma = require 'lma_utils'
+local table_utils = require 'table_utils'
+
+local pairs = pairs
+local ipairs = ipairs
+local assert = assert
+local type = type
+local read_message = read_message
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+-- Hash of GseCluster instances organized by name
+local clusters = {}
+-- Reverse index table to map cluster's members to clusters
+local reverse_cluster_index = {}
+-- Array of cluster names ordered by dependency
+local ordered_clusters = {}
+
+function add_cluster(cluster_id, members, hints, group_by, policy_rules)
+    assert(type(members) == 'table')
+    assert(type(hints) == 'table')
+    assert(type(policy_rules) == 'table')
+
+    local cluster = GseCluster.new(members, hints, group_by, policy_rules)
+    clusters[cluster_id] = cluster
+
+    -- update the reverse index
+    for _, member in ipairs(members) do
+        if not reverse_cluster_index[member] then
+            reverse_cluster_index[member] = {}
+        end
+        local reverse_table = reverse_cluster_index[member]
+        if not table_utils.item_find(cluster_id, reverse_table) then
+            reverse_table[#reverse_table+1] = cluster_id
+        end
+    end
+
+    if not table_utils.item_find(cluster_id, ordered_clusters) then
+        local after_index = 1
+        for current_pos, id in ipairs(ordered_clusters) do
+            if table_utils.item_find(id, cluster.hints) then
+                after_index = current_pos + 1
+            end
+        end
+
+        local index = after_index
+        for _, item in pairs(clusters) do
+            for _, hint in pairs(item.hints) do
+                if hint == cluster_id then
+                    local pos = table_utils.item_pos(hint, cluster_orderings)
+                    if pos and pos <= index then
+                        index = pos
+                    elseif index > after_index then
+                        error('circular dependency between clusters!')
+                    end
+                end
+            end
+        end
+        table.insert(ordered_clusters, index, cluster_id)
+    end
+end
+
+function get_ordered_clusters()
+    return ordered_clusters
+end
+
+function cluster_exists(cluster_id)
+    return clusters[cluster_id] ~= nil
+end
+
+-- return the list of clusters which depends on a given member
+function find_cluster_memberships(member_id)
+    return reverse_cluster_index[member_id] or {}
+end
+
+-- store the status of a cluster's member and its current alarms
+function set_member_status(cluster_id, member, value, alarms, hostname)
+    local cluster = clusters[cluster_id]
+    if cluster then
+        cluster:update_fact(member, hostname, value, alarms)
+    end
+end
+
+-- The cluster status depends on the status of its members.
+-- The status of the related clusters (defined by cluster.hints) doesn't modify
+-- the overall status but their alarms are returned.
+function resolve_status(cluster_id)
+    local cluster = clusters[cluster_id]
+    assert(cluster)
+
+    cluster:refresh_status()
+    local alarms = table_utils.deepcopy(cluster.alarms)
+
+    if cluster.status ~= consts.OKAY then
+        -- add hints if the cluster isn't healthy
+        for _, other_id in ipairs(cluster.hints or {}) do
+            for _, v in pairs(cluster:subtract_alarms(clusters[other_id])) do
+                alarms[#alarms+1] = table_utils.deepcopy(v)
+                alarms[#alarms].tags['dependency_name'] = other_id
+                alarms[#alarms].tags['dependency_level'] = 'hint'
+            end
+        end
+    end
+
+    return cluster.status, alarms
+end
+
+-- compute the cluster metric and inject it into the Heka pipeline
+-- the metric's value is computed using the status of its members
+function inject_cluster_metric(msg_type, cluster_name, metric_name, interval, source, to_alerting)
+    local payload
+    local status, alarms = resolve_status(cluster_name)
+
+    if #alarms > 0 then
+        payload = lma.safe_json_encode({alarms=alarms})
+        if not payload then
+            return
+        end
+    else
+        -- because cjson encodes empty tables as objects instead of arrays
+        payload = '{"alarms":[]}'
+    end
+
+    local no_alerting
+    if to_alerting ~= nil and to_alerting == false then
+        no_alerting = true
+    end
+
+    local msg = {
+        Type = msg_type,
+        Payload = payload,
+        Fields = {
+            name=metric_name,
+            value=status,
+            cluster_name=cluster_name,
+            tag_fields={'cluster_name'},
+            interval=interval,
+            source=source,
+            no_alerting=no_alerting,
+        }
+    }
+    lma.inject_tags(msg)
+    lma.safe_inject_message(msg)
+end
+
+return M
diff --git a/heka/files/lua/common/gse_cluster.lua b/heka/files/lua/common/gse_cluster.lua
new file mode 100644
index 0000000..cce94f4
--- /dev/null
+++ b/heka/files/lua/common/gse_cluster.lua
@@ -0,0 +1,154 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local consts = require 'gse_constants'
+local gse_utils = require 'gse_utils'
+local table_utils = require 'table_utils'
+
+local ipairs = ipairs
+local pairs = pairs
+local setmetatable = setmetatable
+local assert = assert
+local type = type
+
+local GseCluster = {}
+GseCluster.__index = GseCluster
+
+setfenv(1, GseCluster) -- Remove external access to contain everything in the module
+
+local VALID_STATUSES = {
+    [consts.OKAY]=true,
+    [consts.WARN]=true,
+    [consts.CRIT]=true,
+    [consts.DOWN]=true,
+    [consts.UNKW]=true
+}
+
+function GseCluster.new(members, hints, group_by, policy_rules)
+    assert(type(members) == 'table')
+    assert(type(hints) == 'table')
+    assert(type(policy_rules) == 'table')
+
+    local cluster = {}
+    setmetatable(cluster, GseCluster)
+
+    cluster.members = members
+    cluster.hints = hints
+    cluster.policy_rules = policy_rules
+    -- when group_by is 'hostname', facts are stored by hostname then member
+    -- when group_by is 'member', facts are stored by member only
+    -- otherwise facts are stored by member then hostname
+    if group_by == 'hostname' or group_by == 'member' then
+        cluster.group_by = group_by
+    else
+        cluster.group_by = 'none'
+    end
+    cluster.status = consts.UNKW
+    cluster.facts = {}
+    cluster.alarms = {}
+    cluster.member_index = {}
+    for _, v in ipairs(members) do
+        cluster.member_index[v] = true
+    end
+
+    return cluster
+end
+
+function GseCluster:has_member(member)
+    return self.member_index[member]
+end
+
+-- Update the facts table for a cluster's member
+function GseCluster:update_fact(member, hostname, value, alarms)
+    assert(VALID_STATUSES[value])
+    assert(type(alarms) == 'table')
+
+    local key1, key2 = member, hostname
+    if self.group_by == 'hostname' then
+        key1 = hostname
+        key2 = member
+    elseif self.group_by == 'member' then
+        key2 = '__anyhost__'
+    end
+
+    if not self.facts[key1] then
+        self.facts[key1] = {}
+    end
+    self.facts[key1][key2] = {
+        status=value,
+        alarms=table_utils.deepcopy(alarms),
+        member=member
+    }
+    if self.group_by == 'hostname' then
+        -- store the hostname for later reference in the alarms
+        self.facts[key1][key2].hostname = hostname
+    end
+end
+
+-- Compute the status and alarms of the cluster according to the current facts
+-- and the cluster's policy
+function GseCluster:refresh_status()
+    local alarms = {}
+    local status_breakdown = {}
+
+    self.status = consts.UNKW
+    self.alarms = {}
+
+    for group_key, _ in table_utils.orderedPairs(self.facts) do
+        local group_status = consts.UNKW
+        for sub_key, fact in table_utils.orderedPairs(self.facts[group_key]) do
+            group_status = gse_utils.max_status(group_status, fact.status)
+            if fact.status ~= consts.OKAY then
+                for _, v in ipairs(fact.alarms) do
+                    alarms[#alarms+1] = table_utils.deepcopy(v)
+                    if not alarms[#alarms]['tags'] then
+                        alarms[#alarms]['tags'] = {}
+                    end
+                    alarms[#alarms].tags['dependency_name'] = fact.member
+                    alarms[#alarms].tags['dependency_level'] = 'direct'
+                    if fact.hostname then
+                        alarms[#alarms].hostname = fact.hostname
+                    end
+                end
+            end
+        end
+        status_breakdown[group_status] = (status_breakdown[group_status] or 0) + 1
+    end
+    for _, policy_rule in ipairs(self.policy_rules) do
+        if policy_rule:evaluate(status_breakdown) then
+            self.status = policy_rule.status
+            break
+        end
+    end
+    if self.status ~= consts.OKAY then
+        self.alarms = alarms
+    end
+
+    return self.status
+end
+
+-- Return the alarms from another cluster which aren't already known by this
+-- cluster
+function GseCluster:subtract_alarms(cluster)
+    local subset = {}
+    if cluster then
+        for _, alarm in ipairs(cluster.alarms) do
+            if alarm.tags and alarm.tags['dependency_name'] and not self:has_member(alarm.tags['dependency_name']) then
+                subset[#subset+1] = alarm
+            end
+        end
+    end
+    return subset
+end
+
+return GseCluster
diff --git a/heka/files/lua/common/gse_constants.lua b/heka/files/lua/common/gse_constants.lua
new file mode 100644
index 0000000..2a328e6
--- /dev/null
+++ b/heka/files/lua/common/gse_constants.lua
@@ -0,0 +1,39 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+-- The status values were chosen to match with the Grafana constraints:
+-- OKAY => green
+-- WARN & UNKW => orange
+-- CRIT & DOWN => red
+OKAY=0
+WARN=1
+UNKW=2
+CRIT=3
+DOWN=4
+
+local STATUS_LABELS = {
+    [OKAY]='OKAY',
+    [WARN]='WARN',
+    [UNKW]='UNKNOWN',
+    [CRIT]='CRITICAL',
+    [DOWN]='DOWN'
+}
+
+function status_label(v)
+    return STATUS_LABELS[v]
+end
+
+return M
diff --git a/heka/files/lua/common/gse_policy.lua b/heka/files/lua/common/gse_policy.lua
new file mode 100644
index 0000000..6c7e52c
--- /dev/null
+++ b/heka/files/lua/common/gse_policy.lua
@@ -0,0 +1,109 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local consts = require 'gse_constants'
+local gse_utils = require 'gse_utils'
+
+local assert = assert
+local ipairs = ipairs
+local pairs = pairs
+local setmetatable = setmetatable
+local string = string
+local tonumber = tonumber
+
+local GsePolicy = {}
+GsePolicy.__index = GsePolicy
+
+setfenv(1, GsePolicy) -- Remove external access to contain everything in the module
+
+local SEVERITIES = {
+    okay=consts.OKAY,
+    warning=consts.WARN,
+    unknown=consts.UNKW,
+    critical=consts.CRIT,
+    down=consts.DOWN
+}
+
+function GsePolicy.new(policy)
+    local p = {}
+    setmetatable(p, GsePolicy)
+
+    p.status = SEVERITIES[string.lower(policy.status)]
+    assert(p.status)
+
+    p.require_percent = false
+    p.rules = {}
+    if policy.trigger then
+        p.logical_op = string.lower(policy.trigger.logical_operator or 'or')
+        for _, r in ipairs(policy.trigger.rules or {}) do
+            assert(r['function'] == 'count' or r['function'] == 'percent')
+            if r['function'] == 'percent' then
+                p.require_percent = true
+            end
+            local rule = {
+                ['function']=r['function'],
+                relational_op=r.relational_operator,
+                threshold=tonumber(r.threshold),
+                arguments={}
+            }
+            for _, v in ipairs(r.arguments) do
+                assert(SEVERITIES[v])
+                rule.arguments[#rule.arguments+1] = SEVERITIES[v]
+            end
+            p.rules[#p.rules+1] = rule
+        end
+    end
+
+    return p
+end
+
+-- return true or false depending on whether the facts match the policy
+function GsePolicy:evaluate(facts)
+    local total = 0
+
+    if #self.rules == 0 then
+        return true
+    end
+
+    if self.require_percent then
+        for _, v in pairs(facts) do
+            total = total + v
+        end
+    end
+
+    local one_match = false
+    for _, r in ipairs(self.rules) do
+        local value = 0
+        for _, status in ipairs(r.arguments) do
+            if facts[status] then
+                value = value + facts[status]
+            end
+        end
+        if r['function'] == 'percent' then
+            value = value * 100 / total
+        end
+
+        if gse_utils.compare_threshold(value, r.relational_op, r.threshold) then
+            one_match = true
+            if self.logical_op == 'or' then
+                return true
+            end
+        elseif self.logical_op == 'and' then
+            return false
+        end
+    end
+
+    return one_match
+end
+
+return GsePolicy
diff --git a/heka/files/lua/common/gse_utils.lua b/heka/files/lua/common/gse_utils.lua
new file mode 100644
index 0000000..dd82aa2
--- /dev/null
+++ b/heka/files/lua/common/gse_utils.lua
@@ -0,0 +1,58 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local consts = require 'gse_constants'
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+local STATUS_WEIGHTS = {
+    [consts.UNKW]=0,
+    [consts.OKAY]=1,
+    [consts.WARN]=2,
+    [consts.CRIT]=3,
+    [consts.DOWN]=4
+}
+
+function max_status(val1, val2)
+    if not val1 then
+        return val2
+    elseif not val2 then
+        return val1
+    elseif STATUS_WEIGHTS[val1] > STATUS_WEIGHTS[val2] then
+        return val1
+    else
+        return val2
+    end
+end
+
+function compare_threshold(value, op, threshold)
+    local rule_matches = false
+    if op == '==' or op == 'eq' then
+        rule_matches = value == threshold
+    elseif op == '!=' or op == 'ne' then
+        rule_matches = value ~= threshold
+    elseif op == '>=' or op == 'gte' then
+        rule_matches = value >= threshold
+    elseif op == '>' or op == 'gt' then
+        rule_matches = value > threshold
+    elseif op == '<=' or op == 'lte' then
+        rule_matches = value <= threshold
+    elseif op == '<' or op == 'lt' then
+        rule_matches = value < threshold
+    end
+    return rule_matches
+end
+
+return M
diff --git a/heka/files/lua/common/influxdb.lua b/heka/files/lua/common/influxdb.lua
new file mode 100644
index 0000000..d9c359d
--- /dev/null
+++ b/heka/files/lua/common/influxdb.lua
@@ -0,0 +1,114 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local string = string
+local table = table
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local tostring = tostring
+local type = type
+
+local utils = require 'lma_utils'
+
+local InfluxEncoder = {}
+InfluxEncoder.__index = InfluxEncoder
+
+setfenv(1, InfluxEncoder) -- Remove external access to contain everything in the module
+
+local function escape_string(str)
+    return tostring(str):gsub("([ ,])", "\\%1")
+end
+
+local function encode_scalar_value(value)
+    if type(value) == "number" then
+        -- Always send numbers as formatted floats, so InfluxDB will accept
+        -- them if they happen to change from ints to floats between
+        -- points in time.  Forcing them to always be floats avoids this.
+        return string.format("%.6f", value)
+    elseif type(value) == "string" then
+        -- string values need to be double quoted
+        return '"' .. value:gsub('"', '\\"') .. '"'
+    elseif type(value) == "boolean" then
+        return '"' .. tostring(value) .. '"'
+    end
+end
+
+local function encode_value(value)
+    if type(value) == "table" then
+        local values = {}
+        for k,v in pairs(value) do
+            table.insert(
+                values,
+                string.format("%s=%s", escape_string(k), encode_scalar_value(v))
+            )
+        end
+        return table.concat(values, ',')
+    else
+        return "value=" .. encode_scalar_value(value)
+    end
+end
+
+-- Create a new InfluxDB encoder
+--
+-- time_precision: "s", "m", "ms", "us" or "ns" (default: "ns")
+function InfluxEncoder.new(time_precision)
+    local e = {}
+    setmetatable(e, InfluxEncoder)
+    e.time_precision = time_precision or 'ns'
+    return e
+end
+
+-- Encode a single datapoint using the InfluxDB line protocol
+--
+-- timestamp: the timestamp in nanosecond
+-- name:  the measurement's name
+-- value: a scalar value or a list of key-value pairs
+-- tags:  a list of key-value pairs encoded as InfluxDB tags
+function InfluxEncoder:encode_datapoint(timestamp, name, value, tags)
+    if timestamp == nil or type(name) ~= 'string' or value == nil or type(tags or {}) ~= 'table' then
+        -- fail silently if any input parameter is invalid
+        return ""
+    end
+
+    local ts = timestamp
+    if self.time_precision ~= 'ns' then
+        ts = utils.message_timestamp(self.time_precision, ts)
+    end
+
+    local tags_array = {}
+    for k,v in pairs(tags or {}) do
+        if k ~= '' and v ~= '' then
+            -- empty tag name and value aren't allowed by InfluxDB
+            table.insert(tags_array, escape_string(k) .. '=' .. escape_string(v))
+        end
+    end
+
+    if #tags_array > 0 then
+        -- for performance reasons, it is recommended to always send the tags
+        -- in the same order.
+        table.sort(tags_array)
+        return string.format("%s,%s %s %d",
+            escape_string(name),
+            table.concat(tags_array, ','),
+            encode_value(value),
+            ts)
+    else
+        return string.format("%s %s %d",
+            escape_string(name),
+            encode_value(value),
+            ts)
+    end
+end
+
+return InfluxEncoder
diff --git a/heka/files/lua/common/lma_utils.lua b/heka/files/lua/common/lma_utils.lua
new file mode 100644
index 0000000..7b063d1
--- /dev/null
+++ b/heka/files/lua/common/lma_utils.lua
@@ -0,0 +1,307 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local cjson = require 'cjson'
+local string = require 'string'
+local extra = require 'extra_fields'
+local patt  = require 'patterns'
+local math = require 'math'
+
+local pairs = pairs
+local inject_message = inject_message
+local inject_payload = inject_payload
+local read_message = read_message
+local pcall = pcall
+local type = type
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+severity_to_label_map = {
+    [0] = 'EMERGENCY',
+    [1] = 'ALERT',
+    [2] = 'CRITICAL',
+    [3] = 'ERROR',
+    [4] = 'WARNING',
+    [5] = 'NOTICE',
+    [6] = 'INFO',
+    [7] = 'DEBUG',
+}
+
+label_to_severity_map = {
+    EMERGENCY = 0,
+    ALERT = 1,
+    CRITICAL = 2,
+    ERROR = 3,
+    WARNING = 4,
+    NOTICE = 5,
+    INFO= 6,
+    DEBUG = 7,
+}
+
+metric_type = {
+    COUNTER = "counter",
+    GAUGE = "gauge",
+    DERIVE = "derive",
+}
+
+local default_severity = 7
+
+local bulk_datapoints = {}
+
+-- Add a datapoint to the bulk metric message
+-- The 'value' parameter can be a table to support multi-value metric
+function add_to_bulk_metric(name, value, tags)
+    bulk_datapoints[#bulk_datapoints+1] = {
+        name = name,
+        tags = tags or {},
+    }
+    if type(value) == 'table' then
+        bulk_datapoints[#bulk_datapoints].values = value
+    else
+        bulk_datapoints[#bulk_datapoints].value = value
+    end
+end
+
+-- Send the bulk metric message to the Heka pipeline
+function inject_bulk_metric(ts, hostname, source)
+    if #bulk_datapoints == 0 then
+        return
+    end
+
+    local payload = safe_json_encode(bulk_datapoints)
+    if not payload then
+        -- Reset the table otherwise it may grow infinitely and the sandbox
+        -- will eventually be killed by Heka.
+        -- See https://bugs.launchpad.net/lma-toolchain/+bug/1545743
+        bulk_datapoints = {}
+        return
+    end
+
+    local msg = {
+        Hostname = hostname,
+        Timestamp = ts,
+        Payload = payload,
+        Type = 'bulk_metric', -- prepended with 'heka.sandbox'
+        Severity = label_to_severity_map.INFO,
+        Fields = {
+            hostname = hostname,
+            source = source
+      }
+    }
+    -- reset the local table storing the datapoints
+    bulk_datapoints = {}
+
+    inject_tags(msg)
+    safe_inject_message(msg)
+end
+
+-- Encode a Lua variable as JSON without raising an exception if the encoding
+-- fails for some reason (for instance, the encoded buffer exceeds the sandbox
+-- limit)
+function safe_json_encode(v)
+    local ok, data = pcall(cjson.encode, v)
+
+    if not ok then
+        return
+    end
+
+    return data
+end
+
+-- Call inject_payload() wrapped by pcall()
+function safe_inject_payload(payload_type, payload_name, data)
+    local ok, err_msg = pcall(inject_payload, payload_type, payload_name, data)
+    if not ok then
+        return -1, err_msg
+    else
+        return 0
+    end
+end
+
+-- Call inject_message() wrapped by pcall()
+function safe_inject_message(msg)
+    local ok, err_msg = pcall(inject_message, msg)
+    if not ok then
+        return -1, err_msg
+    else
+        return 0
+    end
+end
+
+-- Parse a Syslog-based payload and update the Heka message
+-- Return true if successful, false otherwise
+function parse_syslog_message(grammar, payload, msg)
+    -- capture everything after the first backslash because syslog_grammar will
+    -- drop it
+    local extra_msg = string.match(payload, '^[^\n]+\n(.+)\n$')
+
+    local fields = grammar:match(payload)
+    if not fields then
+        return false
+    end
+
+    msg.Timestamp = fields.timestamp
+    fields.timestamp = nil
+
+    msg.Hostname = fields.hostname
+    fields.hostname = nil
+
+    msg.Pid = fields.syslogtag.pid or 0
+    fields.programname = fields.syslogtag.programname
+    fields.syslogtag = nil
+
+    if fields.pri then
+        msg.Severity = fields.pri.severity
+        fields.syslogfacility = fields.pri.facility
+        fields.pri = nil
+    else
+        msg.Severity = fields.syslogseverity or fields["syslogseverity-text"]
+            or fields.syslogpriority or fields["syslogpriority-text"]
+            or default_severity
+        fields.syslogseverity = nil
+        fields["syslogseverity-text"] = nil
+        fields.syslogpriority = nil
+        fields["syslogpriority-text"] = nil
+    end
+    fields.severity_label = severity_to_label_map[msg.Severity]
+
+    if extra_msg ~= nil then
+        msg.Payload = fields.msg .. "\n" .. extra_msg
+    else
+        msg.Payload = fields.msg
+    end
+    fields.msg = nil
+
+    msg.Fields = fields
+
+    inject_tags(msg)
+
+    return true
+end
+
+-- Inject tags into the Heka message
+function inject_tags(msg)
+    for k,v in pairs(extra.tags) do
+        if msg.Fields[k] == nil then
+            msg.Fields[k] = v
+        end
+    end
+end
+
+-- Convert a datetime string to the RFC3339 format
+-- it supports a variety of datetime formats.
+-- Return the string unmodified if the datetime couldn't be parsed
+function format_datetime (raw_datetime)
+    local datetime
+    local t = patt.TimestampTable:match(raw_datetime)
+    if t then
+        local frac = 0
+        local offset_sign = '+'
+        local offset_hour = 0
+        local offset_min = 0
+        if t.sec_frac then frac = t.sec_frac end
+        if t.offset_sign then offset_sign = t.offset_sign end
+        if t.offset_hour then offset_hour = t.offset_hour end
+        if t.offset_min then offset_min = t.offset_min end
+        datetime = string.format("%04d-%02d-%02dT%02d:%02d:%02d.%06d%s%02d:%02d",
+            t.year, t.month, t.day, t.hour, t.min, t.sec, frac*1e6, offset_sign,
+            offset_hour, offset_min)
+    end
+    return datetime
+end
+
+function chomp(s)
+    return string.gsub(s, "\n$", "")
+end
+
+function truncate(str, max_length, delimiter)
+    if string.len(str) <= max_length then
+        return str
+    end
+
+    local pos = 1
+    while true do
+        local next_pos1, next_pos2 = string.find(str, delimiter, pos)
+        if not next_pos1 or next_pos1 - 1 > max_length then
+            pos = pos - string.len(delimiter) - 1
+            if pos < 1 then
+                pos = max_length
+            end
+            break
+        end
+        pos = next_pos2 + 1
+    end
+
+    return string.sub(str, 1, pos)
+end
+
+-- Convert a nanosecond timestamp to a lower precision timestamp.
+-- Arguments:
+--   timestamp_precision: one of 'us', 'ms', 's', 'm' or 'h'.
+--   timestamp: a timestamp in nanosecond, if not provided the message Timestamp is used.
+function message_timestamp(timestamp_precision, timestamp)
+    -- Default is to divide ns to ms
+    local timestamp_divisor = 1e6
+    -- Divide ns to s
+    if timestamp_precision == "s" then
+        timestamp_divisor = 1e9
+    -- Divide ns to us
+    elseif timestamp_precision == "us" then
+        timestamp_divisor = 1e3
+    -- Divide ns to m
+    elseif timestamp_precision == "m" then
+        timestamp_divisor = 1e9 * 60
+    -- Divide ns to h
+    elseif timestamp_precision == "h" then
+        timestamp_divisor = 1e9 * 60 * 60
+    end
+    if timestamp == nil then
+        timestamp = read_message("Timestamp")
+    end
+    return math.floor(timestamp / timestamp_divisor)
+end
+
+-- Extract the metric value(s) from the message.
+-- The value can be either a scalar value or a table for mulitvalue metrics.
+-- Returns true plus the value or if it fails, returns false plus the error message.
+function get_values_from_metric()
+    if read_message('Fields[value_fields]') then
+        value = {}
+        local i = 0
+        local val
+        while true do
+            local f = read_message("Fields[value_fields]", 0, i)
+            if not f then
+                break
+            end
+            val = read_message(string.format('Fields[%s]', f))
+            if val ~= nil then
+                value[f] = val
+                i = i + 1
+            end
+        end
+        if i == 0 then
+           return false, 'Fields[value_fields] does not list any valid field'
+        end
+    else
+        value = read_message("Fields[value]")
+        if not value then
+            return false, 'Fields[value] is missing'
+        end
+    end
+
+    return true, value
+end
+
+return M
diff --git a/heka/files/lua/common/patterns.lua b/heka/files/lua/common/patterns.lua
new file mode 100644
index 0000000..e9acc18
--- /dev/null
+++ b/heka/files/lua/common/patterns.lua
@@ -0,0 +1,147 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local table  = require 'table'
+local dt     = require "date_time"
+local l      = require 'lpeg'
+l.locale(l)
+
+local tonumber = tonumber
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+function format_uuid(t)
+    return table.concat(t, '-')
+end
+
+function anywhere (patt)
+  return l.P {
+    patt + 1 * l.V(1)
+  }
+end
+
+sp = l.space
+colon = l.P":"
+dash = l.P"-"
+dot = l.P'.'
+quote = l.P'"'
+
+local x4digit = l.xdigit * l.xdigit * l.xdigit * l.xdigit
+local uuid_dash = l.C(x4digit * x4digit * dash * x4digit * dash * x4digit * dash * x4digit * dash * x4digit * x4digit * x4digit)
+local uuid_nodash = l.Ct(l.C(x4digit * x4digit) * l.C(x4digit) * l.C(x4digit) * l.C(x4digit) * l.C(x4digit * x4digit * x4digit)) / format_uuid
+
+-- Return a UUID string in canonical format (eg with dashes)
+Uuid = uuid_nodash + uuid_dash
+
+-- Parse a datetime string and return a table with the following keys
+--   year (string)
+--   month (string)
+--   day (string)
+--   hour (string)
+--   min (string)
+--   sec (string)
+--   sec_frac (number less than 1, can be nil)
+--   offset_sign ('-' or '+', can be nil)
+--   offset_hour (number, can be nil)
+--   offset_min (number, can be nil)
+--
+-- The datetime string can be formatted as
+-- 'YYYY-MM-DD( |T)HH:MM:SS(.ssssss)?(offset indicator)?'
+TimestampTable = l.Ct(dt.rfc3339_full_date * (sp + l.P"T") * dt.rfc3339_partial_time * (dt.rfc3339_time_offset + dt.timezone_offset)^-1)
+
+-- Returns the parsed datetime converted to nanosec
+Timestamp = TimestampTable / dt.time_to_ns
+
+programname   = (l.R("az", "AZ", "09") + l.P"." + dash + l.P"_")^1
+Pid           = l.digit^1
+SeverityLabel = l.P"CRITICAL" + l.P"ERROR" + l.P"WARNING" + l.P"INFO" + l.P"AUDIT" + l.P"DEBUG" + l.P"TRACE"
+Message       = l.P(1)^0
+
+-- Capture for OpenStack logs producing four values: Timestamp, Pid,
+-- SeverityLabel, PythonModule and Message.
+--
+-- OpenStack log messages are of this form:
+-- 2015-11-30 08:38:59.306 3434 INFO oslo_service.periodic_task [-] Blabla...
+--
+-- [-] is the "request" part, it can take multiple forms. See below.
+openstack = l.Ct(l.Cg(Timestamp, "Timestamp")* sp * l.Cg(Pid, "Pid") * sp *
+    l.Cg(SeverityLabel, "SeverityLabel") * sp * l.Cg(programname, "PythonModule") *
+    sp * l.Cg(Message, "Message"))
+
+-- Capture for OpenStack request context producing three values: RequestId,
+-- UserId and TenantId.
+--
+-- Notes:
+--
+-- OpenStack logs include a request context, enclosed between square brackets.
+-- It takes one of these forms:
+--
+-- [-]
+-- [req-0fd2a9ba-448d-40f5-995e-33e32ac5a6ba - - - - -]
+-- [req-4db318af-54c9-466d-b365-fe17fe4adeed 8206d40abcc3452d8a9c1ea629b4a8d0 112245730b1f4858ab62e3673e1ee9e2 - - -]
+--
+-- In the 1st case the capture produces nil.
+-- In the 2nd case the capture produces one value: RequestId.
+-- In the 3rd case the capture produces three values: RequestId, UserId, TenantId.
+--
+-- The request id  may be formatted as 'req-xxx' or 'xxx' depending on the project.
+-- The user id and tenant id may not be present depending on the OpenStack release.
+openstack_request_context = (l.P(1) - "[" )^0 * "[" * l.P"req-"^-1 *
+    l.Ct(l.Cg(Uuid, "RequestId") * sp * ((l.Cg(Uuid, "UserId") * sp *
+    l.Cg(Uuid, "TenantId")) + l.P(1)^0)) - "]"
+
+local http_method = l.Cg(l.R"AZ"^3, "http_method")
+local url = l.Cg( (1 - sp)^1, "http_url")
+local http_version = l.Cg(l.digit * dot * l.digit, "http_version")
+
+-- Pattern for the "<http_method> <http_url> HTTP/<http_version>" format found
+-- found in both OpenStack and Apache log files.
+-- Example : OPTIONS /example.com HTTP/1.0
+http_request = http_method * sp * url * sp * l.P'HTTP/' * http_version
+
+-- Patterns for HTTP status, HTTP response size and HTTP response time in
+-- OpenLayers logs.
+--
+-- Notes:
+-- Nova changes the default log format of eventlet.wsgi (see nova/wsgi.py) and
+-- prefixes the HTTP status, response size and response time values with
+-- respectively "status: ", "len: " and "time: ".
+-- Other OpenStack services just rely on the default log format.
+-- TODO(pasquier-s): build the LPEG grammar based on the log_format parameter
+-- passed to eventlet.wsgi.server similar to what the build_rsyslog_grammar
+-- function does for RSyslog.
+local openstack_http_status = l.P"status: "^-1 * l.Cg(l.digit^3, "http_status")
+local openstack_response_size = l.P"len: "^-1 * l.Cg(l.digit^1 / tonumber, "http_response_size")
+local openstack_response_time = l.P"time: "^-1 * l.Cg(l.digit^1 * dot^0 * l.digit^0 / tonumber, "http_response_time")
+
+-- Capture for OpenStack HTTP producing six values: http_method, http_url,
+-- http_version, http_status, http_response_size and http_response_time.
+openstack_http = anywhere(l.Ct(
+    quote * http_request * quote * sp^1 *
+    openstack_http_status * sp^1 * openstack_response_size * sp^1 *
+    openstack_response_time
+))
+
+-- Capture for IP addresses producing one value: ip_address.
+ip_address = anywhere(l.Ct(
+    l.Cg(l.digit^-3 * dot * l.digit^-3 * dot * l.digit^-3 * dot * l.digit^-3, "ip_address")
+))
+
+-- Pattern used to match the beginning of a Python Traceback.
+traceback = l.P'Traceback (most recent call last):'
+
+-- Pattern used to match a number
+Number = l.P"-"^-1 * l.xdigit^1 * (l.S(".,") * l.xdigit^1 )^-1 / tonumber
+
+return M
diff --git a/heka/files/lua/common/table_utils.lua b/heka/files/lua/common/table_utils.lua
new file mode 100644
index 0000000..ba5fb93
--- /dev/null
+++ b/heka/files/lua/common/table_utils.lua
@@ -0,0 +1,109 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local table = require 'table'
+local pairs = pairs
+local ipairs = ipairs
+local type = type
+
+local M = {}
+setfenv(1, M) -- Remove external access to contain everything in the module
+
+-- return a clone of the passed table
+function deepcopy(t)
+    if type(t) == 'table' then
+        local copy = {}
+        for k, v in pairs(t) do
+            copy[k] = deepcopy(v)
+        end
+        return copy
+    end
+    return t
+end
+
+-- return the position (index) of an item in a list, nil if not found
+function item_pos(item, list)
+  if type(list) == 'table' then
+    for i, v in ipairs(list) do
+      if v == item then
+        return i
+      end
+    end
+  end
+end
+
+-- return true if an item is present in the list, false otherwise
+function item_find(item, list)
+    return item_pos(item, list) ~= nil
+end
+
+-- from http://lua-users.org/wiki/SortedIteration
+function __genOrderedIndex( t )
+    local orderedIndex = {}
+    for key in pairs(t) do
+        table.insert( orderedIndex, key )
+    end
+    table.sort( orderedIndex )
+    return orderedIndex
+end
+
+function orderedNext(t, state)
+    -- Equivalent of the next function, but returns the keys in the alphabetic
+    -- order. We use a temporary ordered key table that is stored in the
+    -- table being iterated.
+
+    key = nil
+    if state == nil then
+        -- the first time, generate the index
+        t.__orderedIndex = __genOrderedIndex( t )
+        key = t.__orderedIndex[1]
+    else
+        -- fetch the next value
+        for i = 1,table.getn(t.__orderedIndex) do
+            if t.__orderedIndex[i] == state then
+                key = t.__orderedIndex[i+1]
+            end
+        end
+    end
+
+    if key then
+        return key, t[key]
+    end
+
+    -- no more value to return, cleanup
+    t.__orderedIndex = nil
+    return
+end
+
+function orderedPairs(t)
+    -- Equivalent of the pairs() function on tables. Allows to iterate
+    -- in order
+    return orderedNext, t, nil
+end
+
+-- Shallow comparison between two tables.
+-- Return true if the two tables have the same keys with identical
+-- values, otherwise false.
+function table_equal(t1, t2)
+    -- all key-value pairs in t1 must be in t2
+    for k, v in pairs(t1) do
+        if t2[k] ~= v then return false end
+    end
+    -- there must not be other keys in t2
+    for k, v in pairs(t2) do
+        if t1[k] == nil then return false end
+    end
+    return true
+end
+
+return M
diff --git a/heka/files/lua/common/value_matching.lua b/heka/files/lua/common/value_matching.lua
new file mode 100644
index 0000000..152425e
--- /dev/null
+++ b/heka/files/lua/common/value_matching.lua
@@ -0,0 +1,171 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local l = require "lpeg"
+l.locale(l)
+local pcall = pcall
+local string = require 'string'
+
+local patterns = require 'patterns'
+local error = error
+local setmetatable = setmetatable
+local tonumber = tonumber
+
+local C = l.C
+local P = l.P
+local S = l.S
+local V = l.V
+local Ct = l.Ct
+local Cc = l.Cc
+
+local Optional_space = patterns.sp^0
+local Only_spaces = patterns.sp^1 * -1
+
+local function space(pat)
+    return Optional_space * pat * Optional_space
+end
+
+local EQ = P'=='
+local NEQ = P'!='
+local GT = P'>'
+local LT = P'<'
+local GTE = P'>='
+local LTE = P'<='
+local MATCH = P'=~'
+local NO_MATCH = P'!~'
+
+local OR = P'||'
+local AND = P'&&'
+
+local function get_operator(op)
+    if op == '' then
+        return '=='
+    end
+    return op
+end
+
+local numerical_operator = (EQ + NEQ + LTE + GTE + GT + LT )^-1 / get_operator
+local sub_numerical_expression = space(numerical_operator) * patterns.Number * Optional_space
+local is_plain_numeric = (sub_numerical_expression * ((OR^1 + AND^1) * sub_numerical_expression)^0) * -1
+
+local quoted_string = (P'"' * C((P(1) - (P'"'))^1) * P'"' + C((P(1) - patterns.sp)^1))
+local string_operator = (EQ + NEQ + MATCH + NO_MATCH)^-1 / get_operator
+local sub_string_expression = space(string_operator) * quoted_string * Optional_space
+local is_plain_string = (sub_string_expression * ((OR^1 + AND^1) * sub_string_expression)^0) * -1
+
+local numerical_expression = P {
+    'OR';
+    AND = Ct(Cc('and') * V'SUB' * space(AND) * V'AND' + V'SUB'),
+    OR = Ct(Cc('or') * V'AND' * space(OR) * V'OR' + V'AND'),
+    SUB = Ct(sub_numerical_expression)
+} * -1
+
+local string_expression = P {
+    'OR';
+    AND = Ct(Cc('and') * V'SUB' * space(AND) * V'AND' + V'SUB'),
+    OR = Ct(Cc('or') * V'AND' * space(OR) * V'OR' + V'AND'),
+    SUB = Ct(sub_string_expression)
+} * -1
+
+local is_complex = patterns.anywhere(EQ + NEQ + LTE + GTE + GT + LT + MATCH + NO_MATCH + OR + AND)
+
+local function eval_tree(tree, value)
+    local match = false
+
+    if type(tree[1]) == 'table' then
+        match = eval_tree(tree[1], value)
+    else
+        local operator = tree[1]
+        if operator == 'and' or operator == 'or' then
+            match = eval_tree(tree[2], value)
+            for i=3, #tree, 1 do
+                local m = eval_tree(tree[i], value)
+                if operator == 'or' then
+                    match = match or m
+                else
+                    match = match and m
+                end
+            end
+        else
+            local matcher = tree[2]
+            if operator == '==' then
+                return value == matcher
+            elseif operator == '!=' then
+                return value ~= matcher
+            elseif operator == '>' then
+                return value > matcher
+            elseif operator == '<' then
+                return value < matcher
+            elseif operator == '>=' then
+                return value >= matcher
+            elseif operator == '<=' then
+                return value <= matcher
+            elseif operator == '=~' then
+                local ok, m = pcall(string.find, value, matcher)
+                return ok and m ~= nil
+            elseif operator == '!~' then
+                local ok, m = pcall(string.find, value, matcher)
+                return ok and m == nil
+            end
+        end
+    end
+    return match
+end
+
+local MatchExpression = {}
+MatchExpression.__index = MatchExpression
+
+setfenv(1, MatchExpression) -- Remove external access to contain everything in the module
+
+function MatchExpression.new(expression)
+    local r = {}
+    setmetatable(r, MatchExpression)
+    if is_complex:match(expression) then
+        r.is_plain_numeric_exp = is_plain_numeric:match(expression) ~= nil
+
+        if r.is_plain_numeric_exp then
+            r.tree = numerical_expression:match(expression)
+        elseif is_plain_string:match(expression) ~= nil then
+            r.tree = string_expression:match(expression)
+        end
+        if r.tree == nil then
+            error('Invalid expression: ' .. expression)
+        end
+    else
+        if expression == '' or Only_spaces:match(expression) then
+            error('Expression is empty')
+        end
+        r.is_simple_equality_matching = true
+    end
+    r.expression = expression
+
+    return r
+end
+
+function MatchExpression:matches(value)
+    if self.is_simple_equality_matching then
+        return self.expression == value or
+                tonumber(self.expression) == value or
+                tonumber(value) == self.expression
+    end
+    if self.is_plain_numeric_exp then
+        value = tonumber(value)
+        if value == nil then
+            return false
+        end
+    end
+    return eval_tree(self.tree, value)
+end
+
+return MatchExpression
diff --git a/heka/files/lua/decoders/ceilometer.lua b/heka/files/lua/decoders/ceilometer.lua
new file mode 100644
index 0000000..01564af
--- /dev/null
+++ b/heka/files/lua/decoders/ceilometer.lua
@@ -0,0 +1,135 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "cjson"
+require 'table'
+require 'math'
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+local l = require 'lpeg'
+l.locale(l)
+
+function normalize_uuid(uuid)
+    return patt.Uuid:match(uuid)
+end
+
+-- the metadata_fields parameter is a list of words separated by space
+local fields_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
+local metadata_fields = fields_grammar:match(
+    read_config("metadata_fields") or ""
+)
+
+local decode_resources = read_config('decode_resources') or false
+
+local sample_msg = {
+    Timestamp = nil,
+    -- This message type has the same structure than 'bulk_metric'.
+    Type = "ceilometer_samples",
+    Payload = nil
+}
+
+local resource_msg = {
+    Timestamp = nil,
+    Type = "ceilometer_resource",
+    Fields = nil,
+}
+
+function inject_metadata(metadata, tags)
+    local value
+    for _, field in ipairs(metadata_fields) do
+        value = metadata[field]
+        if value ~= nil and type(value) ~= 'table' then
+            tags["metadata." .. field] = value
+        end
+    end
+end
+
+function add_resource_to_payload(sample, payload)
+
+    local resource_data = {
+        timestamp = sample.timestamp,
+        resource_id = sample.resource_id,
+        source = sample.source or "",
+        metadata = sample.resource_metadata,
+        user_id = sample.user_id,
+        project_id = sample.project_id,
+        meter = {
+            [sample.counter_name] = {
+                type = sample.counter_type,
+                unit = sample.counter_unit
+            }
+        }
+    }
+    payload[sample.resource_id] = resource_data
+end
+
+
+function add_sample_to_payload(sample, payload)
+    local sample_data = {
+        name='sample',
+        timestamp = patt.Timestamp:match(sample.timestamp),
+        values = {
+            value = sample.counter_volume,
+            message_id = sample.message_id,
+            recorded_at = sample.recorded_at,
+            timestamp = sample.timestamp,
+            message_signature = sample.signature,
+            type = sample.counter_type,
+            unit = sample.counter_unit
+        }
+    }
+    local tags = {
+        meter = sample.counter_name,
+        resource_id = sample.resource_id,
+        project_id = sample.project_id ,
+        user_id = sample.user_id,
+        source = sample.source
+    }
+
+    inject_metadata(sample.resource_metadata or {}, tags)
+    sample_data["tags"] = tags
+    table.insert(payload, sample_data)
+end
+
+function process_message ()
+    local data = read_message("Payload")
+    local ok, message = pcall(cjson.decode, data)
+    if not ok then
+        return -1, "Cannot decode Payload"
+    end
+    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
+    if not ok then
+        return -1, "Cannot decode Payload[oslo.message]"
+    end
+    local sample_payload = {}
+    local resource_payload = {}
+    for _, sample in ipairs(message_body["payload"]) do
+        add_sample_to_payload(sample, sample_payload)
+        if decode_resources then
+            add_resource_to_payload(sample, resource_payload)
+        end
+    end
+    sample_msg.Payload = cjson.encode(sample_payload)
+    sample_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
+    utils.safe_inject_message(sample_msg)
+
+    if decode_resources then
+        resource_msg.Payload = cjson.encode(resource_payload)
+        resource_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
+        utils.safe_inject_message(resource_msg)
+    end
+
+    return 0
+end
diff --git a/heka/files/lua/decoders/collectd.lua b/heka/files/lua/decoders/collectd.lua
new file mode 100644
index 0000000..f94a0e8
--- /dev/null
+++ b/heka/files/lua/decoders/collectd.lua
@@ -0,0 +1,447 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "cjson"
+
+local utils = require 'lma_utils'
+
+local sep = '_'
+
+local processes_map = {
+    ps_code = 'memory_code',
+    ps_count = '',
+    ps_cputime = 'cputime',
+    ps_data = 'memory_data',
+    ps_disk_octets = 'disk_bytes',
+    ps_disk_ops = 'disk_ops',
+    ps_pagefaults = 'pagefaults',
+    ps_rss = 'memory_rss',
+    ps_stacksize = 'stacksize',
+    ps_vm = 'memory_virtual',
+}
+
+-- The following table keeps a list of metrics from plugin where the
+-- hostname is not relevant.
+local hostname_free = {
+    -- Add "metric_source = true" to skip the hostname for all metrics
+    -- from the metric_source
+    -- Add "metric_source = { list of metrics } to skip hostname for a
+    -- subset of metrics. The list of metrics is referenced through the
+    -- field 'type_instance'.
+    hypervisor_stats = {
+        total_free_disk_GB = true,
+        total_free_ram_MB = true,
+        total_free_vcpus = true,
+        total_used_disk_GB = true,
+        total_used_ram_MB = true,
+        total_used_vcpus = true,
+        total_running_instances = true,
+        total_running_tasks = true,
+    },
+    check_openstack_api = true,
+    http_check = true,
+}
+
+-- this is needed for the libvirt metrics because in that case, collectd sends
+-- the instance's ID instead of the hostname in the 'host' attribute
+local hostname = read_config('hostname') or error('hostname must be specified')
+local swap_size = (read_config('swap_size') or 0) + 0
+
+function replace_dot_by_sep (str)
+    return string.gsub(str, '%.', sep)
+end
+
+function process_message ()
+    local ok, samples = pcall(cjson.decode, read_message("Payload"))
+    if not ok then
+        -- TODO: log error
+        return -1
+    end
+
+    for _, sample in ipairs(samples) do
+        local metric_prefix = sample['type']
+        if sample['type_instance'] ~= "" then
+            metric_prefix = metric_prefix .. sep .. sample['type_instance']
+        end
+
+        local metric_source = sample['plugin']
+
+        for i, value in ipairs(sample['values']) do
+            local skip_it = false
+            local metric_name = metric_prefix
+            if sample['dsnames'][i] ~= "value" then
+                metric_name = metric_name .. sep .. sample['dsnames'][i]
+            end
+
+            local msg = {
+                Timestamp = sample['time'] * 1e9, -- Heka expects nanoseconds
+                Hostname = sample['host'],
+                Logger = "collectd",
+                Payload = utils.safe_json_encode(sample) or '',
+                Severity = 6,
+                Type = "metric",
+                Fields = {
+                    interval = sample['interval'],
+                    source =  metric_source,
+                    type =  sample['dstypes'][i],
+                    value =  value,
+                    tag_fields = {},
+                }
+            }
+
+            -- Normalize metric name, unfortunately collectd plugins aren't
+            -- always consistent on metric namespaces so we need a few if/else
+            -- statements to cover all cases.
+
+            -- Check if hostname is needed or not
+            local add_hostname = true
+            if hostname_free[metric_source] == true then
+                add_hostname = false
+            elseif hostname_free[metric_source] and
+                hostname_free[metric_source][sample['type_instance']] then
+                add_hostname = false
+            end
+
+            if add_hostname then
+                msg['Fields']['hostname'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'hostname')
+            end
+
+            if sample['meta'] and sample['meta']['service_check'] then
+                msg['Fields']['name'] = sample['meta']['service_check'] .. sep .. 'check'
+                msg['Fields']['details'] = sample['meta']['failure']
+            elseif metric_source == 'df' then
+                local entity
+                if sample['type'] == 'df_inodes' then
+                    entity = 'inodes'
+                elseif sample['type'] == 'percent_inodes' then
+                    entity = 'inodes_percent'
+                elseif sample['type'] == 'percent_bytes' then
+                    entity = 'space_percent'
+                else -- sample['type'] == 'df_complex'
+                    entity = 'space'
+                end
+
+                local mount = sample['plugin_instance']
+                if mount == 'root' then
+                    mount  = '/'
+                else
+                    mount = '/' .. mount:gsub('-', '/')
+                end
+
+                msg['Fields']['name'] = 'fs' .. sep .. entity .. sep .. sample['type_instance']
+                msg['Fields']['fs'] = mount
+                table.insert(msg['Fields']['tag_fields'], 'fs')
+            elseif metric_source == 'disk' then
+                msg['Fields']['name'] = metric_name
+                msg['Fields']['device'] = sample['plugin_instance']
+                table.insert(msg['Fields']['tag_fields'], 'device')
+            elseif metric_source == 'cpu' then
+                msg['Fields']['name'] = 'cpu' .. sep .. sample['type_instance']
+                msg['Fields']['cpu_number'] = sample['plugin_instance']
+                table.insert(msg['Fields']['tag_fields'], 'cpu_number')
+            elseif metric_source == 'netlink' then
+                local netlink_metric = sample['type']
+                if netlink_metric == 'if_rx_errors' then
+                    netlink_metric = 'if_errors_rx'
+                elseif netlink_metric == 'if_tx_errors' then
+                    netlink_metric = 'if_errors_tx'
+                end
+
+                -- Netlink plugin can send one or two values. Use dsnames only when needed.
+                if sample['dsnames'][i] ~= 'value' then
+                    netlink_metric = netlink_metric .. sep .. sample['dsnames'][i]
+                end
+                -- and type of errors is set in type_instance
+                if sample['type_instance'] ~= '' then
+                    netlink_metric = netlink_metric .. sep .. sample['type_instance']
+                end
+                msg['Fields']['name'] = netlink_metric
+                msg['Fields']['interface'] = sample['plugin_instance']
+                table.insert(msg['Fields']['tag_fields'], 'interface')
+            elseif metric_source == 'processes' then
+                if processes_map[sample['type']] then
+                    -- metrics related to a specific process
+                    msg['Fields']['service'] = sample['plugin_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'service')
+                    msg['Fields']['name'] = 'lma_components'
+                    if processes_map[sample['type']] ~= '' then
+                        msg['Fields']['name'] = msg['Fields']['name'] .. sep .. processes_map[sample['type']]
+                    end
+                    if sample['dsnames'][i] ~= 'value' then
+                        msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
+                    end
+
+                    -- For ps_cputime, convert it to a percentage: collectd is
+                    -- sending us the number of microseconds allocated to the
+                    -- process as a rate so within 1 second.
+                    if sample['type'] == 'ps_cputime' then
+                        msg['Fields']['value'] = 100 * value / 1e6
+                    end
+                else
+                    -- metrics related to all processes
+                    msg['Fields']['name'] = 'processes'
+                    if sample['type'] == 'ps_state' then
+                        msg['Fields']['name'] = msg['Fields']['name'] .. sep .. 'count'
+                        msg['Fields']['state'] = sample['type_instance']
+                        table.insert(msg['Fields']['tag_fields'], 'state')
+                    else
+                        msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['type']
+                    end
+                end
+            elseif metric_source ==  'dbi' and sample['plugin_instance'] == 'mysql_status' then
+                msg['Fields']['name'] = 'mysql' .. sep .. replace_dot_by_sep(sample['type_instance'])
+            elseif metric_source == 'mysql' then
+                if sample['type'] == 'threads' then
+                    msg['Fields']['name'] = 'mysql_' .. metric_name
+                elseif sample['type'] == 'mysql_commands' then
+                    msg['Fields']['name'] = sample['type']
+                    msg['Fields']['statement'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'statement')
+                elseif sample['type'] == 'mysql_handler' then
+                    msg['Fields']['name'] = sample['type']
+                    msg['Fields']['handler'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'handler')
+                else
+                    msg['Fields']['name'] = metric_name
+                end
+            elseif metric_source == 'check_openstack_api' then
+                -- For OpenStack API metrics, plugin_instance = <service name>
+                msg['Fields']['name'] = 'openstack_check_api'
+                msg['Fields']['service'] = sample['plugin_instance']
+                table.insert(msg['Fields']['tag_fields'], 'service')
+                if sample['meta'] then
+                    msg['Fields']['os_region'] = sample['meta']['region']
+                end
+            elseif metric_source == 'hypervisor_stats' then
+                -- Metrics from the OpenStack hypervisor metrics where
+                -- type_instance = <metric name> which can end by _MB or _GB
+                msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep
+                local name, unit
+                name, unit = string.match(sample['type_instance'], '^(.+)_(.B)$')
+                if name then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. name
+                    msg.Fields['value'] = {value = msg.Fields['value'], representation = unit}
+                else
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sample['type_instance']
+                end
+            elseif metric_source == 'rabbitmq_info' then
+                msg['Fields']['name'] = 'rabbitmq' .. sep .. sample['type_instance']
+                if sample['meta'] and sample['meta']['queue'] then
+                    msg['Fields']['queue'] = sample['meta']['queue']
+                    table.insert(msg['Fields']['tag_fields'], 'queue')
+                end
+            elseif metric_source == 'nova' then
+                if sample['plugin_instance'] == 'nova_services' or
+                   sample['plugin_instance'] == 'nova_service'  then
+                    msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
+                    msg['Fields']['service'] = sample['meta']['service']
+                    msg['Fields']['state'] = sample['meta']['state']
+                    table.insert(msg['Fields']['tag_fields'], 'service')
+                    table.insert(msg['Fields']['tag_fields'], 'state')
+                    if sample['plugin_instance'] == 'nova_service'  then
+                        msg['Fields']['hostname'] = sample['meta']['host']
+                    end
+                else
+                    msg['Fields']['name'] = 'openstack' .. sep .. 'nova' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
+                    msg['Fields']['state'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'state')
+                end
+            elseif metric_source == 'cinder' then
+                if sample['plugin_instance'] == 'cinder_services' or
+                   sample['plugin_instance'] == 'cinder_service' then
+                    msg['Fields']['name'] = 'openstack_' .. sample['plugin_instance']
+                    msg['Fields']['service'] = sample['meta']['service']
+                    msg['Fields']['state'] = sample['meta']['state']
+                    table.insert(msg['Fields']['tag_fields'], 'service')
+                    table.insert(msg['Fields']['tag_fields'], 'state')
+                    if sample['plugin_instance'] == 'cinder_service' then
+                        msg['Fields']['hostname'] = sample['meta']['host']
+                    end
+                else
+                    msg['Fields']['name'] = 'openstack' .. sep .. 'cinder' .. sep .. replace_dot_by_sep(sample['plugin_instance'])
+                    msg['Fields']['state'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'state')
+                end
+            elseif metric_source == 'glance' then
+                msg['Fields']['name'] = 'openstack'  .. sep .. 'glance' .. sep .. sample['type_instance']
+                msg['Fields']['state'] = sample['meta']['status']
+                msg['Fields']['visibility'] = sample['meta']['visibility']
+                table.insert(msg['Fields']['tag_fields'], 'state')
+                table.insert(msg['Fields']['tag_fields'], 'visibility')
+            elseif metric_source == 'keystone' then
+                msg['Fields']['name'] = 'openstack'  .. sep .. 'keystone' .. sep .. sample['type_instance']
+                if sample['meta']['state'] then
+                    msg['Fields']['state'] = sample['meta']['state']
+                    table.insert(msg['Fields']['tag_fields'], 'state')
+                end
+            elseif metric_source == 'neutron' then
+                if sample['type_instance'] == 'networks' or sample['type_instance'] == 'ports' or sample['type_instance'] == 'routers' or sample['type_instance'] == 'floatingips' then
+                    skip_it = true
+                elseif sample['type_instance'] == 'subnets' then
+                    msg['Fields']['name'] = 'openstack'  .. sep .. 'neutron' .. sep .. 'subnets'
+                elseif sample['type_instance'] == 'neutron_agents' or
+                       sample['type_instance'] == 'neutron_agent' then
+                    msg['Fields']['name'] = 'openstack_' .. sample['type_instance']
+                    msg['Fields']['service'] = sample['meta']['service']
+                    msg['Fields']['state'] = sample['meta']['state']
+                    table.insert(msg['Fields']['tag_fields'], 'service')
+                    table.insert(msg['Fields']['tag_fields'], 'state')
+                    if sample['type_instance'] == 'neutron_agent'  then
+                        msg['Fields']['hostname'] = sample['meta']['host']
+                    end
+                elseif string.match(sample['type_instance'], '^ports') then
+                    local resource, owner, state = string.match(sample['type_instance'], '^([^.]+)%.([^.]+)%.(.+)$')
+                    msg['Fields']['name'] = 'openstack'  .. sep .. 'neutron' .. sep .. replace_dot_by_sep(resource)
+                    msg['Fields']['owner'] = owner
+                    msg['Fields']['state'] = state
+                    table.insert(msg['Fields']['tag_fields'], 'owner')
+                    table.insert(msg['Fields']['tag_fields'], 'state')
+                else
+                    local resource, state = string.match(sample['type_instance'], '^([^.]+)%.(.+)$')
+                    msg['Fields']['name'] = 'openstack'  .. sep .. 'neutron' .. sep .. replace_dot_by_sep(resource)
+                    msg['Fields']['state'] = state
+                    table.insert(msg['Fields']['tag_fields'], 'state')
+                end
+            elseif metric_source == 'memcached' then
+                msg['Fields']['name'] = 'memcached' .. sep .. string.gsub(metric_name, 'memcached_', '')
+            elseif metric_source == 'haproxy' then
+                msg['Fields']['name'] = 'haproxy' .. sep .. sample['type_instance']
+                if sample['meta'] then
+                    if sample['meta']['backend'] then
+                        msg['Fields']['backend'] = sample['meta']['backend']
+                        table.insert(msg['Fields']['tag_fields'], 'backend')
+                        if sample['meta']['state'] then
+                            msg['Fields']['state'] = sample['meta']['state']
+                            table.insert(msg['Fields']['tag_fields'], 'state')
+                        end
+                        if sample['meta']['server'] then
+                            msg['Fields']['server'] = sample['meta']['server']
+                            table.insert(msg['Fields']['tag_fields'], 'server')
+                        end
+                    elseif sample['meta']['frontend'] then
+                        msg['Fields']['frontend'] = sample['meta']['frontend']
+                        table.insert(msg['Fields']['tag_fields'], 'frontend')
+                    end
+                end
+            elseif metric_source == 'apache' then
+                metric_name = string.gsub(metric_name, 'apache_', '')
+                msg['Fields']['name'] = 'apache' .. sep .. string.gsub(metric_name, 'scoreboard', 'workers')
+            elseif metric_source == 'ceph_osd_perf' then
+                msg['Fields']['name'] = 'ceph_perf' .. sep .. sample['type']
+
+                msg['Fields']['cluster'] = sample['plugin_instance']
+                msg['Fields']['osd'] = sample['type_instance']
+                table.insert(msg['Fields']['tag_fields'], 'cluster')
+                table.insert(msg['Fields']['tag_fields'], 'osd')
+            elseif metric_source:match('^ceph') then
+                msg['Fields']['name'] = 'ceph' .. sep .. sample['type']
+                if sample['dsnames'][i] ~= 'value' then
+                    msg['Fields']['name'] = msg['Fields']['name'] .. sep .. sample['dsnames'][i]
+                end
+
+                msg['Fields']['cluster'] = sample['plugin_instance']
+                table.insert(msg['Fields']['tag_fields'], 'cluster')
+
+                if sample['type_instance'] ~= '' then
+                    local additional_tag
+                    if string.match(sample['type'], '^pool_') then
+                        additional_tag = 'pool'
+                    elseif string.match(sample['type'], '^pg_state') then
+                        additional_tag = 'state'
+                    elseif string.match(sample['type'], '^osd_') then
+                        additional_tag = 'osd'
+                    end
+                    if additional_tag then
+                        msg['Fields'][additional_tag] = sample['type_instance']
+                        table.insert(msg['Fields']['tag_fields'], additional_tag)
+                    end
+                end
+            elseif metric_source == 'pacemaker' then
+                if sample['meta'] and sample['meta']['host'] then
+                    msg['Fields']['hostname'] = sample['meta']['host']
+                end
+
+                msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
+
+                -- add dimension fields
+                for _, v in ipairs({'status', 'resource'}) do
+                    if sample['meta'] and sample['meta'][v] then
+                        msg['Fields'][v] = sample['meta'][v]
+                        table.insert(msg['Fields']['tag_fields'], v)
+                    end
+                end
+            elseif metric_source ==  'users' then
+                -- 'users' is a reserved name for InfluxDB v0.9
+                msg['Fields']['name'] = 'logged_users'
+            elseif metric_source ==  'libvirt' then
+                -- collectd sends the instance's ID in the 'host' field
+                msg['Fields']['instance_id'] = sample['host']
+                table.insert(msg['Fields']['tag_fields'], 'instance_id')
+                msg['Fields']['hostname'] = hostname
+                msg['Hostname'] = hostname
+
+                if string.match(sample['type'], '^disk_') then
+                    msg['Fields']['name'] = 'virt' .. sep .. sample['type'] .. sep .. sample['dsnames'][i]
+                    msg['Fields']['device'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'device')
+                elseif string.match(sample['type'], '^if_') then
+                    msg['Fields']['name'] = 'virt' .. sep .. sample['type'] .. sep .. sample['dsnames'][i]
+                    msg['Fields']['interface'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'interface')
+                elseif sample['type'] == 'virt_cpu_total' then
+                    msg['Fields']['name'] = 'virt_cpu_time'
+                elseif sample['type'] == 'virt_vcpu' then
+                    msg['Fields']['name'] = 'virt_vcpu_time'
+                    msg['Fields']['vcpu_number'] = sample['type_instance']
+                    table.insert(msg['Fields']['tag_fields'], 'vcpu_number')
+                else
+                    msg['Fields']['name'] = 'virt' .. sep .. metric_name
+                end
+            elseif metric_source == 'elasticsearch_cluster' or metric_source == 'influxdb' then
+                msg['Fields']['name'] = metric_source .. sep .. sample['type_instance']
+            elseif metric_source == 'http_check' then
+                msg['Fields']['name'] = metric_source
+                msg['Fields']['service'] = sample['type_instance']
+                table.insert(msg['Fields']['tag_fields'], 'service')
+            elseif metric_source == 'check_local_endpoint' then
+                msg['Fields']['name'] = 'openstack_check_local_api'
+                msg['Fields']['service'] = sample['type_instance']
+                table.insert(msg['Fields']['tag_fields'], 'service')
+            else
+                msg['Fields']['name'] = replace_dot_by_sep(metric_name)
+            end
+
+            if not skip_it then
+                utils.inject_tags(msg)
+                -- Before injecting the message we need to check that tag_fields is not an
+                -- empty table otherwise the protobuf encoder fails to encode the table.
+                if #msg['Fields']['tag_fields'] == 0 then
+                    msg['Fields']['tag_fields'] = nil
+                end
+                utils.safe_inject_message(msg)
+                if metric_source == 'swap' and metric_name == 'swap_used' and swap_size > 0 then
+                    -- collectd 5.4.0 doesn't report the used swap in
+                    -- percentage, this is why the metric is computed and
+                    -- injected by this plugin.
+                    msg['Fields']['name'] = 'swap_percent_used'
+                    msg['Fields']['value'] = value / swap_size
+                    utils.safe_inject_message(msg)
+                end
+            end
+        end
+    end
+
+    return 0
+end
diff --git a/heka/files/lua/decoders/generic_syslog.lua b/heka/files/lua/decoders/generic_syslog.lua
new file mode 100644
index 0000000..48e5262
--- /dev/null
+++ b/heka/files/lua/decoders/generic_syslog.lua
@@ -0,0 +1,50 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+
+local syslog = require "syslog"
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+local syslog_pattern = read_config("syslog_pattern") or error("syslog_pattern configuration must be specified")
+local syslog_grammar = syslog.build_rsyslog_grammar(syslog_pattern)
+
+-- This grammar is intended for log messages that are generated before RSYSLOG
+-- is fully configured
+local fallback_syslog_pattern = read_config("fallback_syslog_pattern")
+local fallback_syslog_grammar
+if fallback_syslog_pattern then
+    fallback_syslog_grammar = syslog.build_rsyslog_grammar(fallback_syslog_pattern)
+end
+
+function process_message ()
+    local log = read_message("Payload")
+
+    if utils.parse_syslog_message(syslog_grammar, log, msg) or
+       (fallback_syslog_grammar and utils.parse_syslog_message(fallback_syslog_grammar, log, msg)) then
+        msg.Logger = string.gsub(read_message('Logger'), '%.log$', '')
+        return utils.safe_inject_message(msg)
+    end
+
+    return -1
+end
diff --git a/heka/files/lua/decoders/keystone_wsgi_log.lua b/heka/files/lua/decoders/keystone_wsgi_log.lua
new file mode 100644
index 0000000..a3c970b
--- /dev/null
+++ b/heka/files/lua/decoders/keystone_wsgi_log.lua
@@ -0,0 +1,73 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local l      = require 'lpeg'
+l.locale(l)
+
+local common_log_format = require 'common_log_format'
+local patt = require 'patterns'
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = 6,
+}
+
+local severity_label = utils.severity_to_label_map[msg.Severity]
+
+local apache_log_pattern = read_config("apache_log_pattern") or error(
+    "apache_log_pattern configuration must be specificed")
+local apache_grammar = common_log_format.build_apache_grammar(apache_log_pattern)
+local request_grammar = l.Ct(patt.http_request)
+
+function process_message ()
+
+    -- logger is either "keystone-wsgi-main" or "keystone-wsgi-admin"
+    local logger = read_message("Logger")
+
+    local log = read_message("Payload")
+
+    local m
+
+    m = apache_grammar:match(log)
+    if m then
+        msg.Logger = 'openstack.keystone'
+        msg.Payload = log
+        msg.Timestamp = m.time
+
+        msg.Fields = {}
+        msg.Fields.http_status = m.status
+        msg.Fields.http_response_time = m.request_time.value / 1e6 -- us to sec
+        msg.Fields.programname = logger
+        msg.Fields.severity_label = severity_label
+
+        local request = m.request
+        m = request_grammar:match(request)
+        if m then
+            msg.Fields.http_method = m.http_method
+            msg.Fields.http_url = m.http_url
+            msg.Fields.http_version = m.http_version
+        end
+
+        utils.inject_tags(msg)
+        return utils.safe_inject_message(msg)
+    end
+
+    return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+end
diff --git a/heka/files/lua/decoders/libvirt_log.lua b/heka/files/lua/decoders/libvirt_log.lua
new file mode 100644
index 0000000..a191b8f
--- /dev/null
+++ b/heka/files/lua/decoders/libvirt_log.lua
@@ -0,0 +1,63 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local l      = require 'lpeg'
+l.locale(l)
+
+local string = require 'string'
+local patt   = require 'patterns'
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+-- libvirt message logs are formatted like this:
+--
+-- 2015-03-26 17:24:52.126+0000: <PID>: <SEV> : Message
+
+local timestamp = l.Cg(patt.Timestamp, "Timestamp")
+local pid  = l.Cg(patt.Pid, "Pid")
+local severity = l.Cg(l.P"debug" + "info" + "warning" + "error", "Severity")
+local message = l.Cg(patt.Message, "Message")
+
+local grammar = l.Ct(timestamp * ": " * pid * ": " * severity * " : " * message)
+
+function process_message ()
+    local log = read_message("Payload")
+
+    local m = grammar:match(log)
+    if not m then
+        return -1
+    end
+
+    m.Severity = string.upper(m.Severity)
+
+    msg.Timestamp = m.Timestamp
+    msg.Pid = m.Pid
+    msg.Payload = m.Message
+    msg.Severity = utils.label_to_severity_map[m.Severity]
+
+    msg.Fields = {}
+    msg.Fields.severity_label = m.Severity
+    msg.Fields.programname = 'libvirt'
+    utils.inject_tags(msg)
+
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/metric.lua b/heka/files/lua/decoders/metric.lua
new file mode 100644
index 0000000..0994a00
--- /dev/null
+++ b/heka/files/lua/decoders/metric.lua
@@ -0,0 +1,90 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require "cjson"
+require "string"
+
+local l = require 'lpeg'
+l.locale(l)
+
+local loggers_pattern = l.Ct( (l.C((l.P(1) - l.space)^1) * l.space^0)^1 * -1)
+local loggers_list = loggers_pattern:match(read_config('deserialize_bulk_metric_for_loggers') or '')
+
+local loggers = {}
+for _, logger in ipairs(loggers_list) do
+    loggers[logger] = true
+end
+
+local utils = require 'lma_utils'
+
+function process_message ()
+    local msg = decode_message(read_message("raw"))
+    if string.match(msg.Type, 'bulk_metric$') and loggers[msg.Logger] ~= nil then
+
+        local ok, metrics = pcall(cjson.decode, msg.Payload)
+        if not ok then
+            return -1, metrics
+        end
+
+        local new_msg = {
+            Timestamp = msg.Timestamp,
+            Hostname = msg.Hostname,
+            Severity = msg.Severity,
+            Logger = msg.Logger,
+            Type = nil,
+            Payload = '',
+            Fields = {},
+        }
+        for _, metric in ipairs(metrics) do
+            local fields = {}
+            local metric_type
+            if metric.value then
+                metric_type = 'metric'
+                fields['value'] = metric.value
+            else
+                metric_type = 'multivalue_metric'
+                local value_fields = {}
+                for k, v in pairs(metric.values) do
+                    fields[k] = v
+                    table.insert(value_fields, k)
+                end
+                fields['value_fields'] = value_fields
+            end
+            local tag_fields = {}
+            for t, v in pairs(metric.tags or {}) do
+                fields[t] = v
+                table.insert(tag_fields, t)
+            end
+            fields['tag_fields'] = tag_fields
+            fields['name'] = metric.name
+            fields['hostname'] = msg.Hostname
+
+            new_msg.Type = metric_type
+            new_msg.Fields = fields
+
+            utils.inject_tags(new_msg)
+            ok, err = utils.safe_inject_message(new_msg)
+            if ok ~= 0 then
+                return -1, err
+            end
+        end
+    else -- simple metric
+        utils.inject_tags(msg)
+        ok, err = utils.safe_inject_message(msg)
+        if ok ~= 0 then
+            return -1, err
+        end
+    end
+    return 0
+end
diff --git a/heka/files/lua/decoders/mysql_log.lua b/heka/files/lua/decoders/mysql_log.lua
new file mode 100644
index 0000000..689aa16
--- /dev/null
+++ b/heka/files/lua/decoders/mysql_log.lua
@@ -0,0 +1,57 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+local l      = require 'lpeg'
+l.locale(l)
+
+local syslog = require "syslog"
+local patt   = require 'patterns'
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+local syslog_pattern = read_config("syslog_pattern") or error("syslog_pattern configuration must be specified")
+
+local sp    = l.space
+local colon = l.P":"
+
+local syslog_grammar = syslog.build_rsyslog_grammar(syslog_pattern)
+
+-- mysqld logs are cranky,the date is YYMMMDD, the hours have no leading zero and the "real" severity level is enclosed by square brackets...
+local mysql_grammar = l.Ct(l.digit^-6 * sp^1 *  l.digit^-2 * colon * l.digit^-2 * colon * l.digit^-2 * sp^1 * l.P"[" * l.Cg(l.R("az", "AZ")^0 / string.upper, "SeverityLabel") * l.P"]" * sp^1 * l.Cg(patt.Message, "Message"))
+
+
+function process_message ()
+    local log = read_message("Payload")
+
+    if not utils.parse_syslog_message(syslog_grammar, log, msg) then
+        return -1
+    end
+
+    local m = mysql_grammar:match(msg.Payload)
+    if m then
+        msg.Fields.severity_label = m.SeverityLabel
+        msg.Payload = m.Message
+    end
+
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/noop.lua b/heka/files/lua/decoders/noop.lua
new file mode 100644
index 0000000..be9a9dd
--- /dev/null
+++ b/heka/files/lua/decoders/noop.lua
@@ -0,0 +1,28 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local msg_type = read_config('msg_type') or error('msg_type must be defined')
+
+local msg = {
+    Type = msg_type,
+    Severity = 7, -- debug
+    Payload = nil,
+    Fields = nil,
+}
+
+function process_message ()
+    inject_message(msg)
+
+    return 0
+end
diff --git a/heka/files/lua/decoders/notification.lua b/heka/files/lua/decoders/notification.lua
new file mode 100644
index 0000000..d4073d9
--- /dev/null
+++ b/heka/files/lua/decoders/notification.lua
@@ -0,0 +1,152 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "cjson"
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+    Timestamp = nil,
+    Type = "notification",
+    Payload = nil,
+    Fields = nil
+}
+
+-- Mapping table from event_type prefixes to notification loggers
+local logger_map = {
+    --cinder
+    volume = 'cinder',
+    snapshot = 'cinder',
+    -- glance
+    image = 'glance',
+    -- heat
+    orchestration = 'heat',
+    -- keystone
+    identity = 'keystone',
+    -- nova
+    compute = 'nova',
+    compute_task = 'nova',
+    scheduler = 'nova',
+    keypair = 'nova',
+    -- neutron
+    floatingip = 'neutron',
+    security_group = 'neutron',
+    security_group_rule = 'neutron',
+    network = 'neutron',
+    port = 'neutron',
+    router = 'neutron',
+    subnet = 'neutron',
+    -- sahara
+    sahara = 'sahara',
+}
+
+-- Mapping table between the attributes in the notification's payload and the
+-- fields in the Heka message
+local payload_fields = {
+    -- all
+    tenant_id = 'tenant_id',
+    user_id = 'user_id',
+    display_name = 'display_name',
+    -- nova
+    vcpus = 'vcpus',
+    availability_zone = 'availability_zone',
+    instance_id = 'instance_id',
+    instance_type = 'instance_type',
+    image_name = 'image_name',
+    memory_mb = 'memory_mb',
+    disk_gb = 'disk_gb',
+    state = 'state',
+    old_state = 'old_state',
+    old_task_state = 'old_task_state',
+    new_task_state = 'new_task_state',
+    created_at = 'created_at',
+    launched_at = 'launched_at',
+    deleted_at = 'deleted_at',
+    terminated_at = 'terminated_at',
+    -- neutron
+    network_id = 'network_id',
+    subnet_id = 'subnet_id',
+    port_id = 'port_id',
+    -- cinder
+    volume_id = 'volume_id',
+    size = 'size',
+    status = 'state',
+    replication_status = 'replication_status',
+}
+
+function normalize_uuid(uuid)
+    return patt.Uuid:match(uuid)
+end
+
+-- Mapping table defining transformation functions to be applied, keys are the
+-- attributes in the notification's payload and values are Lua functions
+local transform_functions = {
+    created_at = utils.format_datetime,
+    launched_at = utils.format_datetime,
+    deleted_at = utils.format_datetime,
+    terminated_at = utils.format_datetime,
+    user_id = normalize_uuid,
+    tenant_id = normalize_uuid,
+    instance_id = normalize_uuid,
+    network_id = normalize_uuid,
+    subnet_id = normalize_uuid,
+    port_id = normalize_uuid,
+    volume_id = normalize_uuid,
+}
+
+local include_full_notification = read_config("include_full_notification") or false
+
+function process_message ()
+    local data = read_message("Payload")
+    local ok, notif = pcall(cjson.decode, data)
+    if not ok then
+        return -1
+    end
+
+    if include_full_notification then
+        msg.Payload = data
+    else
+        msg.Payload = utils.safe_json_encode(notif.payload) or '{}'
+    end
+
+    msg.Fields = {}
+    msg.Logger = logger_map[string.match(notif.event_type, '([^.]+)')]
+    msg.Severity = utils.label_to_severity_map[notif.priority]
+    msg.Timestamp = patt.Timestamp:match(notif.timestamp)
+    msg.Fields.publisher, msg.Hostname = string.match(notif.publisher_id, '([^.]+)%.([%w_-]+)')
+    if notif.payload.host ~= nil then
+        msg.Hostname = string.match(notif.payload.host, '([%w_-]+)')
+    end
+
+    msg.Fields.event_type = notif.event_type
+    msg.Fields.severity_label = notif.priority
+    msg.Fields.hostname = msg.Hostname
+
+    for k, v in pairs(payload_fields) do
+        local val = notif.payload[k]
+        if val ~= nil then
+            local name = payload_fields[k] or k
+            local transform = transform_functions[k]
+            if transform ~= nil then
+                msg.Fields[name] = transform(val)
+            else
+                msg.Fields[name] = val
+            end
+        end
+    end
+    utils.inject_tags(msg)
+
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/openstack_log.lua b/heka/files/lua/decoders/openstack_log.lua
new file mode 100644
index 0000000..9199bd4
--- /dev/null
+++ b/heka/files/lua/decoders/openstack_log.lua
@@ -0,0 +1,143 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "table"
+local l      = require 'lpeg'
+l.locale(l)
+
+local patt = require 'patterns'
+local utils  = require 'lma_utils'
+local table_utils = require 'table_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+-- traceback_lines is a reference to a table used to accumulate lines of
+-- a Traceback. traceback_key represent the key of the Traceback lines
+-- being accumulated in traceback_lines. This is used to know when to
+-- stop accumulating and inject the Heka message.
+local traceback_key = nil
+local traceback_lines = nil
+
+function prepare_message (service, timestamp, pid, severity_label,
+        python_module, programname, payload)
+    msg.Logger = 'openstack.' .. service
+    msg.Timestamp = timestamp
+    msg.Payload = payload
+    msg.Pid = pid
+    msg.Severity = utils.label_to_severity_map[severity_label] or 7
+    msg.Fields = {}
+    msg.Fields.severity_label = severity_label
+    msg.Fields.python_module = python_module
+    msg.Fields.programname = programname
+    msg.Payload = payload
+end
+
+-- OpenStack log messages are of this form:
+-- 2015-11-30 08:38:59.306 3434 INFO oslo_service.periodic_task [-] Blabla...
+--
+-- [-] is the "request" part, it can take multiple forms.
+
+function process_message ()
+
+    -- Logger is of form "<service>_<program>" (e.g. "nova_nova-api",
+    -- "neutron_l3-agent").
+    local logger = read_message("Logger")
+    local service, program = string.match(logger, '([^_]+)_(.+)')
+
+    local log = read_message("Payload")
+    local m
+
+    m = patt.openstack:match(log)
+    if not m then
+        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+    end
+
+    local key = {
+        Timestamp     = m.Timestamp,
+        Pid           = m.Pid,
+        SeverityLabel = m.SeverityLabel,
+        PythonModule  = m.PythonModule,
+        service       = service,
+        program       = program,
+    }
+
+    if traceback_key ~= nil then
+        -- If traceback_key is not nil then it means we've started accumulated
+        -- lines of a Python traceback. We keep accumulating the traceback
+        -- lines util we get a different log key.
+        if table_utils.table_equal(traceback_key, key) then
+            table.insert(traceback_lines, m.Message)
+            return 0
+        else
+            prepare_message(traceback_key.service, traceback_key.Timestamp,
+                traceback_key.Pid, traceback_key.SeverityLabel,
+                traceback_key.PythonModule, traceback_key.program,
+                table.concat(traceback_lines, ''))
+            traceback_key = nil
+            traceback_lines = nil
+            utils.inject_tags(msg)
+            -- Ignore safe_inject_message status code here to still get a
+            -- chance to inject the current log message.
+            utils.safe_inject_message(msg)
+        end
+    end
+
+    if patt.traceback:match(m.Message) then
+        -- Python traceback detected, begin accumulating the lines making
+        -- up the traceback.
+        traceback_key = key
+        traceback_lines = {}
+        table.insert(traceback_lines, m.Message)
+        return 0
+    end
+
+    prepare_message(service, m.Timestamp, m.Pid, m.SeverityLabel, m.PythonModule,
+        program, m.Message)
+
+    m = patt.openstack_request_context:match(msg.Payload)
+    if m then
+        msg.Fields.request_id = m.RequestId
+        if m.UserId then
+          msg.Fields.user_id = m.UserId
+        end
+        if m.TenantId then
+          msg.Fields.tenant_id = m.TenantId
+        end
+    end
+
+    m = patt.openstack_http:match(msg.Payload)
+    if m then
+        msg.Fields.http_method = m.http_method
+        msg.Fields.http_status = m.http_status
+        msg.Fields.http_url = m.http_url
+        msg.Fields.http_version = m.http_version
+        msg.Fields.http_response_size = m.http_response_size
+        msg.Fields.http_response_time = m.http_response_time
+        m = patt.ip_address:match(msg.Payload)
+        if m then
+            msg.Fields.http_client_ip_address = m.ip_address
+        end
+    end
+
+    utils.inject_tags(msg)
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/ovs_log.lua b/heka/files/lua/decoders/ovs_log.lua
new file mode 100644
index 0000000..47b88eb
--- /dev/null
+++ b/heka/files/lua/decoders/ovs_log.lua
@@ -0,0 +1,62 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local l      = require 'lpeg'
+l.locale(l)
+
+local syslog = require "syslog"
+local patt   = require 'patterns'
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Paload      = nil,
+    Logger      = 'ovs',
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+local pipe = l.P'|'
+
+local function_name = l.Cg((l.R("az", "AZ", "09") + l.P"." + l.P"-" + l.P"_" + l.P"(" + l.P")")^1, 'function_name')
+local pid       = l.Cg(patt.Pid, "Pid")
+local timestamp = l.Cg(patt.Timestamp, "Timestamp")
+local severity = l.Cg(syslog.severity, 'Severity')
+local message = l.Cg(l.P(1 - l.P"\n")^0, "Message")
+
+local ovs_grammar = l.Ct(timestamp * pipe * pid * pipe * function_name * pipe * severity * pipe * message)
+
+function process_message ()
+    local log = read_message("Payload")
+    local logger = read_message("Logger")
+    local m
+
+    msg.Fields = {}
+    m = ovs_grammar:match(log)
+    if not m then
+        return -1
+    end
+    msg.Timestamp = m.Timestamp
+    msg.Payload = m.function_name .. ': ' .. m.Message
+    msg.Pid = m.Pid
+    msg.Severity = m.Severity or 5
+    msg.Fields.severity_label = utils.severity_to_label_map[m.Severity]
+    msg.Fields.programname = logger
+
+    utils.inject_tags(msg)
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/pacemaker_log.lua b/heka/files/lua/decoders/pacemaker_log.lua
new file mode 100644
index 0000000..1286e89
--- /dev/null
+++ b/heka/files/lua/decoders/pacemaker_log.lua
@@ -0,0 +1,75 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+local l      = require 'lpeg'
+l.locale(l)
+
+local dt     = require "date_time"
+local patt   = require "patterns"
+local syslog = require "syslog"
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+local syslog_pattern = read_config("syslog_pattern") or error("syslog_pattern configuration must be specified")
+local syslog_grammar = syslog.build_rsyslog_grammar(syslog_pattern)
+
+-- This grammar is intended for debug and info messages which aren't emitted
+-- through Syslog. For example:
+-- Apr 29 13:23:46 [13545] node-32.domain.tld pacemakerd: INFO: get_cluster_type: Detected an active 'corosync' cluster
+local sp    = l.space
+local colon = l.P":"
+
+local timestamp   = l.Cg(dt.rfc3164_timestamp / dt.time_to_ns, "Timestamp")
+local pid         = l.Cg(patt.Pid, "Pid")
+local severity    = l.Cg((l.R"AZ" + l.R"az")^1 /  string.upper, "SeverityLabel")
+local programname = l.Cg(patt.programname, "programname")
+local message     = l.Cg(patt.Message, "Message")
+
+local fallback_grammar = l.Ct(timestamp * sp^1 * l.P'[' * pid * l.P']' * sp^1 *
+    (l.P(1) - sp)^0 * sp^1 * programname * colon * sp^1 * severity * colon *
+    sp^1 * message)
+
+function process_message ()
+    local log = read_message("Payload")
+
+    if utils.parse_syslog_message(syslog_grammar, log, msg) then
+        return utils.safe_inject_message(msg)
+    else
+        local m = fallback_grammar:match(log)
+        if m then
+            msg.Timestamp = m.Timestamp
+            msg.Payload = m.Message
+            msg.Pid = m.Pid
+            msg.Severity = utils.label_to_severity_map[m.SeverityLabel] or 7
+
+            msg.Fields = {}
+            msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+            msg.Fields.programname = m.programname
+            utils.inject_tags(msg)
+
+            return utils.safe_inject_message(msg)
+        end
+    end
+
+    return -1
+end
diff --git a/heka/files/lua/decoders/pacemaker_resources.lua b/heka/files/lua/decoders/pacemaker_resources.lua
new file mode 100644
index 0000000..ae56965
--- /dev/null
+++ b/heka/files/lua/decoders/pacemaker_resources.lua
@@ -0,0 +1,47 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'string'
+local l = require 'lpeg'
+local utils = require 'lma_utils'
+l.locale(l)
+
+local msg = {
+    Timestamp = nil,
+    Type = "metric",
+    Payload = nil,
+    Severity = 6, -- INFO
+    Fields = nil
+}
+
+local word = (l.R("az", "AZ", "09") + l.P"." + l.P"_" + l.P"-")^1
+local grammar = l.Ct(l.Cg(word, 'resource') * " " * l.Cg(l.xdigit, 'active'))
+
+function process_message ()
+    local data = read_message("Payload")
+    local m = grammar:match(data)
+    if not m then
+        return -1
+    end
+    msg.Timestamp = read_message("Timestamp")
+    msg.Payload = data
+    msg.Fields = {}
+    msg.Fields.source = 'pacemaker'
+    msg.Fields.type = utils.metric_type['GAUGE']
+    msg.Fields.hostname = read_message('Hostname')
+    utils.inject_tags(msg)
+
+    msg.Fields.name= string.format('pacemaker.resource.%s.active', m.resource)
+    msg.Fields.value = tonumber(m.active)
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/decoders/rabbitmq.lua b/heka/files/lua/decoders/rabbitmq.lua
new file mode 100644
index 0000000..846a5f6
--- /dev/null
+++ b/heka/files/lua/decoders/rabbitmq.lua
@@ -0,0 +1,71 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local dt     = require "date_time"
+local l      = require 'lpeg'
+l.locale(l)
+
+local patt   = require 'patterns'
+local utils  = require 'lma_utils'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = nil,
+}
+
+-- RabbitMQ message logs are formatted like this:
+--   =ERROR REPORT==== 2-Jan-2015::09:17:22 ===
+--   Blabla
+--   Blabla
+--
+local message   = l.Cg(patt.Message / utils.chomp, "Message")
+-- The token before 'REPORT' isn't standardized so it can be a valid severity
+-- level as 'INFO' or 'ERROR' but also 'CRASH' or 'SUPERVISOR'.
+local severity  = l.Cg(l.R"AZ"^1, "SeverityLabel")
+local day = l.R"13" * l.R"09" + l.R"19"
+local datetime = l.Cg(day, "day") * patt.dash * dt.date_mabbr * patt.dash * dt.date_fullyear *
+                 "::" * dt.rfc3339_partial_time
+local timestamp = l.Cg(l.Ct(datetime)/ dt.time_to_ns, "Timestamp")
+
+local grammar = l.Ct("=" * severity * " REPORT==== " * timestamp * " ===" * l.P'\n' * message)
+
+function process_message ()
+    local log = read_message("Payload")
+
+    local m = grammar:match(log)
+    if not m then
+        return -1
+    end
+
+    msg.Timestamp = m.Timestamp
+    msg.Payload = m.Message
+    if utils.label_to_severity_map[m.SeverityLabel] then
+        msg.Severity = utils.label_to_severity_map[m.SeverityLabel]
+    elseif m.SeverityLabel == 'CRASH' then
+        msg.Severity = 2 -- CRITICAL
+    else
+        msg.Severity = 5 -- NOTICE
+    end
+
+    msg.Fields = {}
+    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+    msg.Fields.programname = 'rabbitmq'
+    utils.inject_tags(msg)
+
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/encoders/es_ceilometer_resources.lua b/heka/files/lua/encoders/es_ceilometer_resources.lua
new file mode 100644
index 0000000..860ba03
--- /dev/null
+++ b/heka/files/lua/encoders/es_ceilometer_resources.lua
@@ -0,0 +1,66 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "string"
+require "cjson"
+local elasticsearch = require "elasticsearch"
+
+local index = read_config("index") or "index"
+local type_name = read_config("type_name") or "message"
+
+function process_message()
+    local ns
+    local resources = cjson.decode(read_message("Payload"))
+    for resource_id, resource in pairs(resources) do
+        local update = cjson.encode({update = {_index = index, _type = type_name,
+            _id = resource_id}})
+        local body = {
+            script = 'ctx._source.meters += meter;' ..
+            'ctx._source.user_id = user_id;' ..
+            'ctx._source.project_id = project_id;' ..
+            'ctx._source.source = source; ' ..
+            'ctx._source.metadata =  ' ..
+            'ctx._source.last_sample_timestamp <= timestamp ? ' ..
+            'metadata : ctx._source.metadata;' ..
+            'ctx._source.last_sample_timestamp = ' ..
+            'ctx._source.last_sample_timestamp < timestamp ?' ..
+            'timestamp : ctx._source.last_sample_timestamp;' ..
+            'ctx._source.first_sample_timestamp = ' ..
+            'ctx._source.first_sample_timestamp > timestamp ?' ..
+            'timestamp : ctx._source.first_sample_timestamp;',
+            params = {
+                meter = resource.meter,
+                metadata = resource.metadata,
+                timestamp = resource.timestamp,
+                user_id = resource.user_id or '',
+                project_id = resource.project_id or '',
+                source = resource.source or '',
+            },
+            upsert = {
+                first_sample_timestamp = resource.timestamp,
+                last_sample_timestamp = resource.timestamp,
+                project_id = resource.project_id or '',
+                user_id = resource.user_id or '',
+                source = resource.source or '',
+                metadata = resource.metadata,
+                meters = resource.meter
+            }
+        }
+        body = cjson.encode(body)
+
+        add_to_payload(update, "\n", body, "\n")
+    end
+
+    inject_payload()
+    return 0
+end
diff --git a/heka/files/lua/encoders/status_nagios.lua b/heka/files/lua/encoders/status_nagios.lua
new file mode 100644
index 0000000..ad4c540
--- /dev/null
+++ b/heka/files/lua/encoders/status_nagios.lua
@@ -0,0 +1,87 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'table'
+require 'string'
+
+local afd = require 'afd'
+local consts = require 'gse_constants'
+local lma = require 'lma_utils'
+local interp = require "msg_interpolate"
+
+local host = read_config('nagios_host')
+local service_template = read_config('service_template') or error('service_template is required!')
+-- Nagios CGI cannot accept 'plugin_output' parameter greater than 1024 bytes
+-- See bug #1517917 for details.
+-- With the 'cmd.cgi' re-implementation for the command PROCESS_SERVICE_CHECK_RESULT,
+-- this limit can be increased to 3KB. See blueprint scalable-nagios-api.
+local truncate_size = (read_config('truncate_size') or 3072) + 0
+local data = {
+   cmd_typ = '30',
+   cmd_mod = '2',
+   host    = host,
+   service = nil,
+   plugin_state = nil,
+   plugin_output = nil,
+   performance_data = '',
+}
+local nagios_break_line = '\\n'
+-- mapping GSE statuses to Nagios states
+local nagios_state_map = {
+    [consts.OKAY]=0,
+    [consts.WARN]=1,
+    [consts.UNKW]=3,
+    [consts.CRIT]=2,
+    [consts.DOWN]=2
+}
+
+function url_encode(str)
+  if (str) then
+    str = string.gsub (str, "([^%w %-%_%.%~])",
+        function (c) return string.format ("%%%02X", string.byte(c)) end)
+    str = string.gsub (str, " ", "+")
+  end
+  return str
+end
+
+function process_message()
+    local service_name = interp.interpolate_from_msg(service_template)
+    local status = afd.get_status()
+    local alarms = afd.alarms_for_human(afd.extract_alarms())
+
+    if not service_name or not nagios_state_map[status] or not alarms then
+        return -1
+    end
+
+    data['service'] = service_name
+    data['plugin_state'] = nagios_state_map[status]
+
+    local details = {
+        string.format('%s %s', service_name, consts.status_label(status))
+    }
+    if #alarms == 0 then
+        details[#details+1] = 'no details'
+    else
+        for _, alarm in ipairs(alarms) do
+            details[#details+1] = alarm
+        end
+    end
+    data['plugin_output'] = lma.truncate(table.concat(details, nagios_break_line), truncate_size, nagios_break_line)
+
+    local params = {}
+    for k, v in pairs(data) do
+        params[#params+1] = string.format("%s=%s", k, url_encode(v))
+    end
+
+    return lma.safe_inject_payload('txt', 'nagios', table.concat(params, '&'))
+end
diff --git a/heka/files/lua/encoders/status_smtp.lua b/heka/files/lua/encoders/status_smtp.lua
new file mode 100644
index 0000000..3bfad7f
--- /dev/null
+++ b/heka/files/lua/encoders/status_smtp.lua
@@ -0,0 +1,79 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'table'
+require 'string'
+
+local afd = require 'afd'
+local consts = require 'gse_constants'
+local lma = require 'lma_utils'
+
+local statuses = {}
+
+local concat_string = '\n'
+
+function process_message()
+    local previous
+    local text
+    local cluster = afd.get_entity_name('cluster_name')
+    local msg_type = read_message("Type")
+    local status = afd.get_status()
+    local alarms = afd.extract_alarms()
+
+    if not cluster or not status or not alarms then
+        return -1
+    end
+    local key = string.format('%s.%s', msg_type, cluster)
+    if not statuses[key] then
+        statuses[key] = {}
+    end
+    previous = statuses[key]
+
+    local text
+    if #alarms == 0 then
+        text = 'no detail'
+    else
+        text = table.concat(afd.alarms_for_human(alarms), concat_string)
+    end
+
+    local title
+    if not previous.status and status == consts.OKAY then
+        -- don't send a email when we detect a new cluster which is OKAY
+        return 0
+    elseif not previous.status then
+        title = string.format('%s status is %s',
+                              cluster,
+                              consts.status_label(status))
+    elseif status ~= previous.status then
+        title = string.format('%s status %s -> %s',
+                              cluster,
+                              consts.status_label(previous.status),
+                              consts.status_label(status))
+-- TODO(scroiset): avoid spam
+-- This code has the same issue than annotations, see filters/influxdb_annotation.lua
+--    elseif previous.text ~= text then
+--        title = string.format('%s status remains %s',
+--                              cluster,
+--                              consts.status_label(status))
+    else
+        -- nothing has changed since the last message
+        return 0
+    end
+
+    -- store the last status and text for future messages
+    previous.status = status
+    previous.text = text
+
+    return lma.safe_inject_payload('txt', '', table.concat({title, text}, concat_string))
+end
diff --git a/heka/files/lua/filters/afd.lua b/heka/files/lua/filters/afd.lua
new file mode 100644
index 0000000..bf10322
--- /dev/null
+++ b/heka/files/lua/filters/afd.lua
@@ -0,0 +1,99 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local string = require 'string'
+
+local utils = require 'lma_utils'
+local afd = require 'afd'
+
+-- node or service
+local afd_type = read_config('afd_type') or error('afd_type must be specified!')
+local activate_alerting = read_config('activate_alerting') or true
+local msg_type
+local msg_field_name
+local afd_entity
+
+if afd_type == 'node' then
+    msg_type = 'afd_node_metric'
+    msg_field_name = 'node_status'
+    afd_entity = 'node_role'
+elseif afd_type == 'service' then
+    msg_type = 'afd_service_metric'
+    msg_field_name = 'service_status'
+    afd_entity = 'service'
+else
+    error('invalid afd_type value')
+end
+
+-- ie: controller for node AFD / rabbitmq for service AFD
+local afd_entity_value = read_config('afd_cluster_name') or error('afd_cluster_name must be specified!')
+
+-- ie: cpu for node AFD / queue for service AFD
+local msg_field_source = read_config('afd_logical_name') or error('afd_logical_name must be specified!')
+
+local hostname = read_config('hostname') or error('hostname must be specified')
+
+local afd_file = read_config('afd_file') or error('afd_file must be specified')
+local all_alarms = require(afd_file)
+local A = require 'afd_alarms'
+A.load_alarms(all_alarms)
+
+function process_message()
+
+    local metric_name = read_message('Fields[name]')
+    local ts = read_message('Timestamp')
+
+    local ok, value = utils.get_values_from_metric()
+    if not ok then
+        return -1, value
+    end
+    -- retrieve field values
+    local fields = {}
+    for _, field in ipairs (A.get_metric_fields(metric_name)) do
+        local field_value = afd.get_entity_name(field)
+        if not field_value then
+            return -1, "Cannot find Fields[" .. field .. "] for the metric " .. metric_name
+        end
+        fields[field] = field_value
+    end
+    A.add_value(ts, metric_name, value, fields)
+    return 0
+end
+
+function timer_event(ns)
+    if A.is_started() then
+        local state, alarms = A.evaluate(ns)
+        if state then -- it was time to evaluate at least one alarm
+            for _, alarm in ipairs(alarms) do
+                afd.add_to_alarms(
+                    alarm.state,
+                    alarm.alert['function'],
+                    alarm.alert.metric,
+                    alarm.alert.fields,
+                    {}, -- tags
+                    alarm.alert.operator,
+                    alarm.alert.value,
+                    alarm.alert.threshold,
+                    alarm.alert.window,
+                    alarm.alert.periods,
+                    alarm.alert.message)
+            end
+
+            afd.inject_afd_metric(msg_type, afd_entity, afd_entity_value, msg_field_name,
+                state, hostname, interval, msg_field_source, activate_alerting)
+        end
+    else
+        A.set_start_time(ns)
+    end
+end
diff --git a/heka/files/lua/filters/afd_api_backends.lua b/heka/files/lua/filters/afd_api_backends.lua
new file mode 100644
index 0000000..37cccd5
--- /dev/null
+++ b/heka/files/lua/filters/afd_api_backends.lua
@@ -0,0 +1,90 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'string'
+
+local afd = require 'afd'
+local consts = require 'gse_constants'
+
+local haproxy_backend_states = {}
+
+-- emit AFD event metrics based on HAProxy backends
+function process_message()
+    local metric_name = read_message('Fields[name]')
+    local value = read_message('Fields[value]')
+    local service = read_message('Fields[backend]')
+    local state = consts.OKAY
+
+    if not haproxy_backend_states[service] then
+        haproxy_backend_states[service] = {}
+    end
+    haproxy_backend_states[service][read_message('Fields[state]')] = value
+
+    if not (haproxy_backend_states[service].up and haproxy_backend_states[service].down) then
+       -- not enough data for now
+       return 0
+   end
+
+   if haproxy_backend_states[service].up == 0 then
+        state = consts.DOWN
+        afd.add_to_alarms(consts.DOWN,
+                          'last',
+                          metric_name,
+                          {service=service,state='up'},
+                          {},
+                          '==',
+                          haproxy_backend_states[service].up,
+                          0,
+                          nil,
+                          nil,
+                          string.format("All %s backends are down", service))
+    elseif haproxy_backend_states[service].down >= haproxy_backend_states[service].up then
+        state = consts.CRIT
+        afd.add_to_alarms(consts.CRIT,
+                          'last',
+                          metric_name,
+                          {service=service,state='down'},
+                          {},
+                          '>=',
+                          haproxy_backend_states[service].down,
+                          haproxy_backend_states[service].up,
+                          nil,
+                          nil,
+                          string.format("More backends for %s are down than up", service))
+    elseif haproxy_backend_states[service].down > 0 then
+        state = consts.WARN
+        afd.add_to_alarms(consts.WARN,
+                          'last',
+                          metric_name,
+                          {service=service,state='down'},
+                          {},
+                          '>',
+                          haproxy_backend_states[service].down,
+                          0,
+                          nil,
+                          nil,
+                          string.format("At least one %s backend is down", service))
+    end
+
+    afd.inject_afd_service_metric(service,
+                                  state,
+                                  read_message('Fields[hostname]'),
+                                  0,
+                                  'backends')
+
+    -- reset the cache for this service
+    haproxy_backend_states[service] = {}
+
+    return 0
+end
diff --git a/heka/files/lua/filters/afd_workers.lua b/heka/files/lua/filters/afd_workers.lua
new file mode 100644
index 0000000..8eeb222
--- /dev/null
+++ b/heka/files/lua/filters/afd_workers.lua
@@ -0,0 +1,94 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'string'
+
+local afd = require 'afd'
+local consts = require 'gse_constants'
+
+local worker_states = {}
+
+-- emit AFD event metrics based on openstack_nova_services, openstack_cinder_services and openstack_neutron_agents metrics
+function process_message()
+    local metric_name = read_message('Fields[name]')
+    local service = string.format('%s-%s',
+                                  string.match(metric_name, 'openstack_([^_]+)'),
+                                  read_message('Fields[service]'))
+    local worker_key = string.format('%s.%s', metric_name, service)
+
+    if not worker_states[worker_key] then
+        worker_states[worker_key] = {}
+    end
+
+    local worker = worker_states[worker_key]
+    worker[read_message('Fields[state]')] = read_message('Fields[value]')
+
+    local state = consts.OKAY
+    if not(worker.up and worker.down) then
+        -- not enough data for now
+        return 0
+    end
+
+    if worker.up == 0 then
+        state = consts.DOWN
+        afd.add_to_alarms(consts.DOWN,
+                          'last',
+                          metric_name,
+                          {service=service,state='up'},
+                          {},
+                          '==',
+                          worker.up,
+                          0,
+                          nil,
+                          nil,
+                          string.format("All instances for the service %s are down or disabled", service))
+    elseif worker.down >= worker.up then
+        state = consts.CRIT
+        afd.add_to_alarms(consts.CRIT,
+                          'last',
+                          metric_name,
+                          {service=service,state='down'},
+                          {},
+                          '>=',
+                          worker.down,
+                          worker.up,
+                          nil,
+                          nil,
+                          string.format("More instances of %s are down than up", service))
+    elseif worker.down > 0 then
+        state = consts.WARN
+        afd.add_to_alarms(consts.WARN,
+                          'last',
+                          metric_name,
+                          {service=service,state='down'},
+                          {},
+                          '>',
+                          worker.down,
+                          0,
+                          nil,
+                          nil,
+                          string.format("At least one %s instance is down", service))
+    end
+
+    afd.inject_afd_service_metric(service,
+                                  state,
+                                  read_message('Fields[hostname]'),
+                                  0,
+                                  'workers')
+
+    -- reset the cache for this worker
+    worker_states[worker_key] = {}
+
+    return 0
+end
diff --git a/heka/files/lua/filters/gse_cluster_filter.lua b/heka/files/lua/filters/gse_cluster_filter.lua
new file mode 100644
index 0000000..6c8415f
--- /dev/null
+++ b/heka/files/lua/filters/gse_cluster_filter.lua
@@ -0,0 +1,139 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local cjson = require 'cjson'
+
+local afd = require 'afd'
+local gse = require 'gse'
+local lma = require 'lma_utils'
+
+local output_message_type = read_config('output_message_type') or error('output_message_type must be specified!')
+local cluster_field = read_config('cluster_field')
+local member_field = read_config('member_field') or error('member_field must be specified!')
+local output_metric_name = read_config('output_metric_name') or error('output_metric_name must be specified!')
+local source = read_config('source') or error('source must be specified!')
+local topology_file = read_config('topology_file') or error('topology_file must be specified!')
+local policies_file = read_config('policies_file') or error('policies_file must be specified!')
+local interval = (read_config('interval') or error('interval must be specified!')) + 0
+local interval_in_ns = interval * 1e9
+local max_inject = (read_config('max_inject') or 10) + 0
+local warm_up_period = ((read_config('warm_up_period') or 0) + 0) * 1e9
+local activate_alerting = read_config('activate_alerting') or true
+
+local is_active = false
+local first_tick
+local last_tick = 0
+local last_index = nil
+
+local topology = require(topology_file)
+local policies = require(policies_file)
+
+for cluster_name, attributes in pairs(topology.clusters) do
+    local policy = policies.find(attributes.policy)
+    if not policy then
+        error('Cannot find ' .. attributes.policy .. ' policy!')
+    end
+    gse.add_cluster(cluster_name, attributes.members, attributes.hints, attributes.group_by, policy)
+end
+
+function process_message()
+    local name = read_message('Fields[name]')
+    local hostname = read_message('Fields[hostname]')
+    if name and name == 'pacemaker_local_resource_active' and read_message("Fields[resource]") == 'vip__management' then
+        -- Skip pacemaker_local_resource_active metrics that don't
+        -- concern the local node
+        if read_message('Hostname') == hostname then
+            if read_message('Fields[value]') == 1 then
+                is_active = true
+            else
+                is_active = false
+            end
+        end
+        return 0
+    end
+
+    local member_id = afd.get_entity_name(member_field)
+    if not member_id then
+        return -1, "Cannot find entity's name in the AFD/GSE message"
+    end
+
+    local status = afd.get_status()
+    if not status then
+        return -1, "Cannot find status in the AFD/GSE message"
+    end
+
+    local alarms = afd.extract_alarms()
+    if not alarms then
+        return -1, "Cannot find alarms in the AFD/GSE message"
+    end
+
+    local cluster_ids
+    if cluster_field then
+        local cluster_id = afd.get_entity_name(cluster_field)
+        if not cluster_id then
+            return -1, "Cannot find the cluster's name in the AFD/GSE message"
+        elseif not gse.cluster_exists(cluster_id) then
+            -- Just ignore AFD/GSE messages which aren't part of a cluster's definition
+            return 0
+        end
+        cluster_ids = { cluster_id }
+    else
+        cluster_ids = gse.find_cluster_memberships(member_id)
+    end
+
+    -- update all clusters that depend on this entity
+    for _, cluster_id in ipairs(cluster_ids) do
+        gse.set_member_status(cluster_id, member_id, status, alarms, hostname)
+    end
+    return 0
+end
+
+function timer_event(ns)
+    if not is_active then
+        -- not running as the aggregator
+        return
+    elseif not first_tick then
+        first_tick = ns
+        return
+    elseif ns - first_tick <= warm_up_period then
+        -- not started for a long enough period
+        return
+    elseif last_index == nil and (ns - last_tick) < interval_in_ns then
+        -- nothing to send it
+        return
+    end
+    last_tick = ns
+
+    local injected = 0
+    for i, cluster_name in ipairs(gse.get_ordered_clusters()) do
+        if last_index == nil or i > last_index then
+            gse.inject_cluster_metric(
+                output_message_type,
+                cluster_name,
+                output_metric_name,
+                interval,
+                source,
+                activate_alerting
+            )
+            last_index = i
+            injected = injected + 1
+
+            if injected >= max_inject then
+                return
+            end
+        end
+    end
+
+    last_index = nil
+end
diff --git a/heka/files/lua/filters/hdd_errors_counter.lua b/heka/files/lua/filters/hdd_errors_counter.lua
new file mode 100644
index 0000000..66980bc
--- /dev/null
+++ b/heka/files/lua/filters/hdd_errors_counter.lua
@@ -0,0 +1,92 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'math'
+require 'os'
+require 'string'
+local utils = require 'lma_utils'
+
+local hostname = read_config('hostname') or error('hostname must be specified')
+local patterns_config = read_config('patterns') or error('patterns must be specified')
+local patterns = {}
+for pattern in string.gmatch(patterns_config, "/(%S+)/") do
+    local ok, msg = pcall(string.match, "", pattern)
+    if not ok then
+        error("Invalid pattern detected: '" .. pattern "' (" .. msg .. ")")
+    end
+    patterns[#patterns + 1] = pattern
+end
+-- Heka cannot guarantee that logs are processed in real-time so the
+-- grace_interval parameter allows to take into account log messages that are
+-- received in the current interval but emitted before it.
+local grace_interval = (read_config('grace_interval') or 0) + 0
+
+local error_counters = {}
+local enter_at
+local start_time = os.time()
+
+local function convert_to_sec(ns)
+    return math.floor(ns/1e9)
+end
+
+function process_message ()
+    -- timestamp values should be converted to seconds because log timestamps
+    -- have a precision of one second (or millisecond sometimes)
+    if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(enter_at or 0), start_time) then
+        -- skip the log message if it doesn't fall into the current interval
+        return 0
+    end
+
+    local payload = read_message('Payload')
+    if not payload then
+        return 0
+    end
+
+    -- example of kern.log lines:
+    -- <3>Jul 24 14:42:21 node-164 kernel: [505801.068621] Buffer I/O error on device sdd2, logical block 51184
+    -- <4>Aug 22 09:37:09 node-164 kernel: [767975.369264] XFS (sda): xfs_log_force: error 5 returned.
+    -- <1>May 17 23:07:11 sd-os1-stor05 kernel: [ 2119.105909] XFS (sdf3): metadata I/O error: block 0x68c2b7d8 ("xfs_trans_read_buf_map") error 121 numblks 8
+    for _, pattern in ipairs(patterns) do
+        local ok, device = pcall(string.match, payload, pattern)
+        if not ok then
+            return -1, device
+        end
+        if device then
+            error_counters[device] = (error_counters[device] or 0) + 1
+            break
+        end
+    end
+    return 0
+end
+
+function timer_event(ns)
+    -- Is error_counters empty?
+    if next(error_counters) == nil then
+        return 0
+    end
+
+    local delta_sec = (ns - (enter_at or 0)) / 1e9
+    for dev, value in pairs(error_counters) do
+        -- Don`t send values from first ticker interval
+        if enter_at ~= nil then
+            utils.add_to_bulk_metric("hdd_errors_rate", value / delta_sec, {device=dev})
+        end
+        error_counters[dev] = 0
+    end
+
+    enter_at = ns
+    utils.inject_bulk_metric(ns, hostname, 'hdd_errors_filter')
+
+    return 0
+end
\ No newline at end of file
diff --git a/heka/files/lua/filters/heka_monitoring.lua b/heka/files/lua/filters/heka_monitoring.lua
new file mode 100644
index 0000000..35efcad
--- /dev/null
+++ b/heka/files/lua/filters/heka_monitoring.lua
@@ -0,0 +1,85 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'cjson'
+require 'string'
+require 'math'
+local utils  = require 'lma_utils'
+
+function process_table(typ, array)
+    -- NOTE: It has been written for "filters" and "decoders". If we need to
+    -- use it to collect metrics from other components  of the Heka pipeline,
+    -- we need to ensure that JSON provides names and table with
+    -- ProcessMessageCount and ProcessMessageAvgDuration:
+    --
+    --    "decoder": {
+    --        ...
+    --        },
+    --        "Name": "a name",
+    --        "ProcessMessageCount" : {
+    --            "representation": "count",
+    --            "value": 12
+    --        },
+    --        "ProcessMessageAvgDuration" : {
+    --            "representation": "ns",
+    --            "value": 192913
+    --        },
+    --        { ... }}
+    --
+    for _, v in pairs(array) do
+        if type(v) == "table" then
+            -- strip off the '_decoder'/'_filter' suffix
+            local name = v['Name']:gsub("_" .. typ, "")
+
+            local tags = {
+                ['type'] = typ,
+                ['name'] = name,
+            }
+
+            utils.add_to_bulk_metric('hekad_msg_count', v.ProcessMessageCount.value, tags)
+            utils.add_to_bulk_metric('hekad_msg_avg_duration', v.ProcessMessageAvgDuration.value, tags)
+            if v.Memory then
+                utils.add_to_bulk_metric('hekad_memory', v.Memory.value, tags)
+             end
+            if v.TimerEventAvgDuration then
+                utils.add_to_bulk_metric('hekad_timer_event_avg_duration', v.TimerEventAvgDuration.value, tags)
+            end
+            if v.TimerEventSamples then
+                utils.add_to_bulk_metric('hekad_timer_event_count', v.TimerEventSamples.value, tags)
+            end
+        end
+    end
+end
+
+function singularize(str)
+    return str:gsub('s$', '')
+end
+
+function process_message ()
+    local ok, data = pcall(cjson.decode, read_message("Payload"))
+    if not ok then
+        return -1
+    end
+
+    local hostname = read_message("Hostname")
+    local ts = read_message("Timestamp")
+
+    for k, v in pairs(data) do
+        if k == "filters" or k == "decoders" then
+            process_table(singularize(k), v)
+        end
+    end
+
+    utils.inject_bulk_metric(ts, hostname, 'heka_monitoring')
+    return 0
+end
diff --git a/heka/files/lua/filters/http_metrics_aggregator.lua b/heka/files/lua/filters/http_metrics_aggregator.lua
new file mode 100644
index 0000000..1756a59
--- /dev/null
+++ b/heka/files/lua/filters/http_metrics_aggregator.lua
@@ -0,0 +1,185 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'string'
+require 'math'
+require 'os'
+local utils = require 'lma_utils'
+local tab = require 'table_utils'
+local table = require 'table'
+
+local hostname = read_config('hostname') or error('hostname must be specified')
+local interval = (read_config('interval') or error('interval must be specified')) + 0
+-- max_timer_inject is the maximum number of injected messages by timer_event()
+local max_timer_inject = (read_config('max_timer_inject') or 10) + 0
+-- bulk_size is the maximum number of metrics embedded by a bulk_metric within the Payload.
+-- The bulk_size depends on the hekad max_message_size (64 KB by default).
+-- At most, there are 45 metrics/service * 300B (per bucket) =~ 13KB * 4 services = 52KB for 225 metrics.
+-- With a max_message_size set to 256KB, it's possible to embed more than 800 metrics.
+local bulk_size = (read_config('bulk_size') or 225) + 0
+local percentile_thresh = (read_config('percentile') or 90) + 0
+-- grace_time is used to palliate the time precision difference
+-- (in second or millisecond for logs versus nanosecond for the ticker)
+-- and also to compensate the delay introduced by log parsing/decoding
+-- which leads to arrive too late in its interval.
+local grace_time = (read_config('grace_time') or 0) + 0
+
+local inject_reached_error = 'too many metrics to aggregate, adjust bulk_size and/or max_timer_inject parameters'
+
+local percentile_field_name = string.format('upper_%s', percentile_thresh)
+local msg_source = 'http_metric_filter'
+local last_tick = os.time() * 1e9
+local interval_in_ns = interval * 1e9
+
+local http_verbs = {
+    GET = true,
+    POST = true,
+    OPTIONS = true,
+    DELETE = true,
+    PUT = true,
+    HEAD = true,
+    TRACE = true,
+    CONNECT = true,
+    PATCH = true,
+}
+
+local metric_bucket = {
+    min = 0,
+    max = 0,
+    sum = 0,
+    count = 0,
+    times = {},
+    [percentile_field_name] = 0,
+    rate = 0,
+}
+local all_times = {}
+local num_metrics = 0
+
+function process_message ()
+    local severity = read_message("Fields[severity_label]")
+    local logger = read_message("Logger")
+    local timestamp = read_message("Timestamp")
+    local http_method = read_message("Fields[http_method]")
+    local http_status = read_message("Fields[http_status]")
+    local response_time = read_message("Fields[http_response_time]")
+
+    if timestamp < last_tick - grace_time then
+        -- drop silently old logs
+        return 0
+    end
+    if http_method == nil or http_status == nil or response_time == nil then
+        return -1
+    end
+
+    -- keep only the first 2 tokens because some services like Neutron report
+    -- themselves as 'openstack.<service>.server'
+    local service = string.gsub(read_message("Logger"), '(%w+)%.(%w+).*', '%1_%2')
+    if service == nil then
+        return -1, "Cannot match any service from " .. logger
+    end
+
+    -- coerce http_status to integer
+    http_status = http_status + 0
+    local http_status_family
+    if http_status >= 100 and http_status < 200 then
+        http_status_family = '1xx'
+    elseif http_status >= 200 and http_status < 300 then
+        http_status_family = '2xx'
+    elseif http_status >= 300 and http_status < 400 then
+        http_status_family = '3xx'
+    elseif http_status >= 400 and http_status < 500 then
+        http_status_family = '4xx'
+    elseif http_status >= 500 and http_status < 600 then
+        http_status_family = '5xx'
+    else
+        return -1, "Unsupported http_status " .. http_status
+    end
+
+    if not http_verbs[http_method] then
+        return -1, "Unsupported http_method " .. http_method
+    end
+
+    if not all_times[service] then
+        all_times[service] = {}
+    end
+    if not all_times[service][http_method] then
+        all_times[service][http_method] = {}
+    end
+    if not all_times[service][http_method][http_status_family] then
+        -- verify that the sandbox has enough capacity to emit all metrics
+        if num_metrics > (bulk_size * max_timer_inject) then
+            return -1, inject_reached_error
+        end
+        all_times[service][http_method][http_status_family] = tab.deepcopy(metric_bucket)
+        num_metrics = num_metrics + 1
+    end
+
+    local bucket = all_times[service][http_method][http_status_family]
+    bucket.times[#bucket.times + 1] = response_time
+    bucket.count = bucket.count + 1
+    bucket.sum = bucket.sum + response_time
+    if bucket.max < response_time then
+        bucket.max = response_time
+    end
+    if bucket.min == 0 or bucket.min > response_time then
+        bucket.min = response_time
+    end
+
+    return 0
+end
+
+function timer_event(ns)
+
+    last_tick = ns
+
+    local num = 0
+    local msg_injected = 0
+    for service, methods in pairs(all_times) do
+        for method, statuses in pairs(methods) do
+            for status, bucket in pairs(statuses) do
+                local metric_name = service .. '_http_response_times'
+                bucket.rate = bucket.count / interval
+                bucket[percentile_field_name] = bucket.max
+                if bucket.count > 1 then
+                    table.sort(bucket.times)
+                    local tmp = ((100 - percentile_thresh) / 100) * bucket.count
+                    local idx = bucket.count - math.floor(tmp + .5)
+                    if idx > 0 and bucket.times[idx] then
+                        bucket[percentile_field_name] = bucket.times[idx]
+                    end
+                end
+                bucket.times = nil
+                utils.add_to_bulk_metric(metric_name, bucket, {hostname=hostname, http_method=method, http_status=status})
+                all_times[service][method][status] = nil
+                num = num + 1
+                num_metrics = num_metrics - 1
+                if num >= bulk_size then
+                    if msg_injected < max_timer_inject then
+                        utils.inject_bulk_metric(ns, hostname, msg_source)
+                        msg_injected = msg_injected + 1
+                        num = 0
+                        num_metrics = 0
+                    end
+                end
+            end
+            all_times[service][method] = nil
+        end
+        all_times[service] = nil
+    end
+    if num > 0 then
+        utils.inject_bulk_metric(ns, hostname, msg_source)
+        num = 0
+        num_metrics = 0
+    end
+end
diff --git a/heka/files/lua/filters/influxdb_accumulator.lua b/heka/files/lua/filters/influxdb_accumulator.lua
new file mode 100644
index 0000000..470bdef
--- /dev/null
+++ b/heka/files/lua/filters/influxdb_accumulator.lua
@@ -0,0 +1,142 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'cjson'
+require 'os'
+require 'string'
+require 'table'
+local utils = require 'lma_utils'
+local Accumulator = require 'accumulator'
+local Influxdb = require 'influxdb'
+local l = require 'lpeg'
+l.locale(l)
+
+local flush_count = (read_config('flush_count') or 100) + 0
+local flush_interval = (read_config('flush_interval') or 5) + 0
+local time_precision = read_config("time_precision")
+local payload_name = read_config("payload_name") or "influxdb"
+local bulk_metric_type_matcher = read_config("bulk_metric_type_matcher") or "bulk_metric$"
+
+-- the tag_fields parameter is a list of tags separated by spaces
+local tag_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
+local tag_fields = tag_grammar:match(read_config("tag_fields") or "")
+
+function flush_cb(datapoints)
+    if #datapoints > 0 then
+        datapoints[#datapoints+1] = ''
+        utils.safe_inject_payload("txt", payload_name, table.concat(datapoints, "\n"))
+    end
+end
+local accumulator = Accumulator.new(flush_count, flush_interval, flush_cb)
+local encoder = Influxdb.new(time_precision)
+
+-- return a table containing the common tags from the message
+function get_common_tags()
+    local tags = {}
+    for _, t in ipairs(tag_fields) do
+        tags[t] = read_message(string.format('Fields[%s]', t))
+    end
+    return tags
+end
+
+-- process a single metric
+function process_single_metric()
+    local name = read_message("Fields[name]")
+
+    if not name then
+        return 'Fields[name] is missing'
+    end
+    local ok, value = utils.get_values_from_metric()
+    if not ok then
+        return value
+    end
+
+    -- collect tags from Fields[tag_fields]
+    local tags = get_common_tags()
+    local i = 0
+    while true do
+        local t = read_message("Fields[tag_fields]", 0, i)
+        if not t then
+            break
+        end
+        tags[t] = read_message(string.format('Fields[%s]', t))
+        i = i + 1
+    end
+
+    accumulator:append(
+        encoder:encode_datapoint(
+            read_message('Timestamp'),
+            name,
+            value,
+            tags))
+    return
+end
+
+function process_bulk_metric()
+    -- The payload of the message contains a list of datapoints.
+    --
+    -- Each point is formatted either like this:
+    --
+    --  {name='foo',
+    --   value=1,
+    --   tags={k1=v1,...}}
+    --
+    -- or like this for multi-value points:
+    --
+    --  {name='bar',
+    --   values={k1=v1, ..},
+    --   tags={k1=v1,...}
+    --
+    local ok, points = pcall(cjson.decode, read_message("Payload"))
+    if not ok then
+        return 'Invalid payload value for bulk metric'
+    end
+
+    local common_tags = get_common_tags()
+    local msg_timestamp = read_message('Timestamp')
+    for _, point in ipairs(points) do
+        point.tags = point.tags or {}
+        for k,v in pairs(common_tags) do
+            if point.tags[k] == nil then
+                point.tags[k] = v
+            end
+        end
+        accumulator:append(
+            encoder:encode_datapoint(
+                msg_timestamp,
+                point.name,
+                point.value or point.values,
+                point.tags))
+    end
+    return
+end
+
+function process_message()
+    local err_msg
+    local msg_type = read_message("Type")
+    if msg_type:match(bulk_metric_type_matcher) then
+        err_msg = process_bulk_metric()
+    else
+        err_msg = process_single_metric()
+    end
+
+    if err_msg then
+        return -1, err_msg
+    else
+        return 0
+    end
+end
+
+function timer_event(ns)
+    accumulator:flush(ns)
+end
diff --git a/heka/files/lua/filters/influxdb_annotation.lua b/heka/files/lua/filters/influxdb_annotation.lua
new file mode 100644
index 0000000..a1f33ee
--- /dev/null
+++ b/heka/files/lua/filters/influxdb_annotation.lua
@@ -0,0 +1,92 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'cjson'
+require 'string'
+require 'table'
+
+local utils  = require 'lma_utils'
+local consts = require 'gse_constants'
+local afd = require 'afd'
+
+local measurement_name = read_config('measurement_name') or 'annotations'
+local html_break_line = '<br />'
+
+local statuses = {}
+
+-- Transform a GSE cluster metric into an annotation stored into InfluxDB
+function process_message ()
+    local previous
+    local text
+    local cluster = afd.get_entity_name('cluster_name')
+    local status = afd.get_status()
+    local alarms = afd.extract_alarms()
+
+    if not cluster or not status or not alarms then
+        return -1
+    end
+
+    if not statuses[cluster] then
+        statuses[cluster] = {}
+    end
+    previous = statuses[cluster]
+
+    text = table.concat(afd.alarms_for_human(alarms), html_break_line)
+
+    -- build the title
+    if not previous.status and status == consts.OKAY then
+        -- don't send an annotation when we detect a new cluster which is OKAY
+        return 0
+    elseif not previous.status then
+        title = string.format('General status is %s',
+                              consts.status_label(status))
+    elseif previous.status ~= status then
+        title = string.format('General status %s -> %s',
+                              consts.status_label(previous.status),
+                              consts.status_label(status))
+-- TODO(pasquier-s): generate an annotation when the set of alarms has changed.
+-- the following code generated an annotation whenever at least one value
+-- associated to an alarm was changing. This led to way too many annotations
+-- with alarms monitoring the CPU usage for instance.
+--    elseif previous.text ~= text then
+--        title = string.format('General status remains %s',
+--                              consts.status_label(status))
+    else
+        -- nothing has changed since the last message
+        return 0
+    end
+
+    local msg = {
+        Timestamp = read_message('Timestamp'),
+        Type = 'metric',
+        Severity = utils.label_to_severity_map.INFO,
+        Hostname = read_message('Hostname'),
+        Fields = {
+            name = measurement_name,
+            tag_fields = { 'cluster' },
+            value_fields = { 'title', 'tags', 'text' },
+            title = title,
+            tags = cluster,
+            text = text,
+            cluster = cluster,
+            source = 'influxdb_annotation'
+      }
+    }
+    utils.inject_tags(msg)
+
+    -- store the last status and alarm text for future messages
+    previous.status = status
+    previous.text = text
+
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/filters/instance_state.lua b/heka/files/lua/filters/instance_state.lua
new file mode 100644
index 0000000..9354267
--- /dev/null
+++ b/heka/files/lua/filters/instance_state.lua
@@ -0,0 +1,48 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local utils = require 'lma_utils'
+
+local msg = {
+    Type = "metric", -- will be prefixed by "heka.sandbox."
+    Timestamp = nil,
+    Severity = 6,
+}
+
+count = 0
+
+function process_message ()
+    local state = read_message("Fields[state]")
+    local old_state = read_message("Fields[old_state]")
+    if old_state ~= nil and state == old_state then
+        -- nothing to do
+        return 0
+    end
+    msg.Timestamp = read_message("Timestamp")
+    msg.Fields = {
+        source = read_message('Logger'),
+        name = "openstack_nova_instance_state",
+        -- preserve the original hostname in the Fields attribute because
+        -- sandboxed filters cannot override the Hostname attribute
+        hostname = read_message("Fields[hostname]"),
+        type = utils.metric_type['COUNTER'],
+        value = 1,
+        tenant_id = read_message("Fields[tenant_id]"),
+        user_id = read_message("Fields[user_id]"),
+        state = state,
+        tag_fields = { 'state', 'hostname' },
+    }
+    utils.inject_tags(msg)
+
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/filters/logs_counter.lua b/heka/files/lua/filters/logs_counter.lua
new file mode 100644
index 0000000..7da74e9
--- /dev/null
+++ b/heka/files/lua/filters/logs_counter.lua
@@ -0,0 +1,134 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+require 'math'
+require 'os'
+require 'string'
+local utils = require 'lma_utils'
+
+local hostname = read_config('hostname') or error('hostname must be specified')
+local interval = (read_config('interval') or error('interval must be specified')) + 0
+-- Heka cannot guarantee that logs are processed in real-time so the
+-- grace_interval parameter allows to take into account log messages that are
+-- received in the current interval but emitted before it.
+local grace_interval = (read_config('grace_interval') or 0) + 0
+
+local discovered_services = {}
+local logs_counters = {}
+local last_timer_events = {}
+local current_service = 1
+local enter_at
+local interval_in_ns = interval * 1e9
+local start_time = os.time()
+local msg = {
+    Type = "metric",
+    Timestamp = nil,
+    Severity = 6,
+}
+
+function convert_to_sec(ns)
+    return math.floor(ns/1e9)
+end
+
+function process_message ()
+    local severity = read_message("Fields[severity_label]")
+    local logger = read_message("Logger")
+
+    local service = string.match(logger, "^openstack%.(%a+)$")
+    if service == nil then
+        return -1, "Cannot match any services from " .. logger
+    end
+
+    -- timestamp values should be converted to seconds because log timestamps
+    -- have a precision of one second (or millisecond sometimes)
+    if convert_to_sec(read_message('Timestamp')) + grace_interval < math.max(convert_to_sec(last_timer_events[service] or 0), start_time) then
+        -- skip the the log message if it doesn't fall into the current interval
+        return 0
+    end
+
+    if not logs_counters[service] then
+        -- a new service has been discovered
+        discovered_services[#discovered_services + 1] = service
+        logs_counters[service] = {}
+        for _, label in pairs(utils.severity_to_label_map) do
+            logs_counters[service][label] = 0
+        end
+    end
+
+    logs_counters[service][severity] = logs_counters[service][severity] + 1
+
+    return 0
+end
+
+function timer_event(ns)
+
+    -- We can only send a maximum of ten events per call.
+    -- So we send all metrics about one service and we will proceed with
+    -- the following services at the next ticker event.
+
+    if #discovered_services == 0 then
+        return 0
+    end
+
+    -- Initialize enter_at during the first call to timer_event
+    if not enter_at then
+        enter_at = ns
+    end
+
+    -- To be able to send a metric we need to check if we are within the
+    -- interval specified in the configuration and if we haven't already sent
+    -- all metrics.
+    if ns - enter_at < interval_in_ns and current_service <= #discovered_services then
+        local service_name = discovered_services[current_service]
+        local last_timer_event = last_timer_events[service_name] or 0
+        local delta_sec = (ns - last_timer_event) / 1e9
+
+        for level, val in pairs(logs_counters[service_name]) do
+
+           -- We don't send the first value
+           if last_timer_event ~= 0 and delta_sec ~= 0 then
+               msg.Timestamp = ns
+               msg.Fields = {
+                   name = 'log_messages',
+                   type = utils.metric_type['DERIVE'],
+                   value = val / delta_sec,
+                   service = service_name,
+                   level = string.lower(level),
+                   hostname = hostname,
+                   tag_fields = {'service', 'level', 'hostname'},
+               }
+
+               utils.inject_tags(msg)
+               ok, err = utils.safe_inject_message(msg)
+               if ok ~= 0 then
+                 return -1, err
+               end
+           end
+
+           -- reset the counter
+           logs_counters[service_name][level] = 0
+
+        end
+
+        last_timer_events[service_name] = ns
+        current_service = current_service + 1
+    end
+
+    if ns - enter_at >= interval_in_ns then
+        enter_at = ns
+        current_service = 1
+    end
+
+    return 0
+end
diff --git a/heka/files/lua/filters/resource_creation_time.lua b/heka/files/lua/filters/resource_creation_time.lua
new file mode 100644
index 0000000..0047ed3
--- /dev/null
+++ b/heka/files/lua/filters/resource_creation_time.lua
@@ -0,0 +1,82 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'math'
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local msg = {
+    Type = "metric", -- will be prefixed by "heka.sandbox."
+    Timestamp = nil,
+    Severity = 6,
+}
+
+local event_type_to_name = {
+    ["compute.instance.create.end"] = "openstack_nova_instance_creation_time",
+    ["volume.create.end"] = "openstack_cinder_volume_creation_time",
+    ["volume.attach.end"] = "openstack_cinder_volume_attachment_time",
+}
+
+function process_message ()
+    local metric_name = event_type_to_name[read_message("Fields[event_type]")]
+    if not metric_name then
+        return -1
+    end
+
+    local started_at, completed_at
+
+    if metric_name == "openstack_cinder_volume_attachment_time" then
+        --[[ To compute the metric we need fields that are not available
+          directly in the Heka message. So we need to decode the message,
+          check if it is a full notification or not and extract the needed
+          values. ]]--
+        local data = read_message("Payload")
+        local ok, notif = pcall(cjson.decode, data)
+        if not ok then
+          return -1
+        end
+
+        notif = notif.payload or notif
+        local t = unpack(notif['volume_attachment'])
+        started_at   = t.created_at or ''
+        completed_at = t.attach_time or ''
+    else
+        started_at = read_message("Fields[created_at]") or ''
+        completed_at = read_message("Fields[launched_at]") or ''
+    end
+
+    started_at = patt.Timestamp:match(started_at)
+    completed_at = patt.Timestamp:match(completed_at)
+    if started_at == nil or completed_at == nil or started_at == 0 or completed_at == 0 or started_at > completed_at then
+        return -1
+    end
+
+    msg.Timestamp = read_message("Timestamp")
+    msg.Fields = {
+        source = read_message('Logger'),
+        name = metric_name,
+        -- preserve the original hostname in the Fields attribute because
+        -- sandboxed filters cannot override the Hostname attribute
+        hostname = read_message("Fields[hostname]"),
+        type = utils.metric_type['GAUGE'],
+        -- Having a millisecond precision for creation time is good enough given
+        -- that the started_at field has only a 1-second precision.
+        value = {value = math.floor((completed_at - started_at)/1e6 + 0.5) / 1e3, representation = 's'},
+        tenant_id = read_message("Fields[tenant_id]"),
+        user_id = read_message("Fields[user_id]"),
+        tag_fields = {'hostname'},
+    }
+    utils.inject_tags(msg)
+
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/files/lua/filters/watchdog.lua b/heka/files/lua/filters/watchdog.lua
new file mode 100644
index 0000000..0399d5c
--- /dev/null
+++ b/heka/files/lua/filters/watchdog.lua
@@ -0,0 +1,24 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require 'math'
+
+local payload_name = read_config('payload_name') or error('payload_name is required')
+local payload = read_config('payload')
+
+-- Very simple filter that emits a fixed message or the current timestamp (in
+-- second) every ticker interval. It can be used to check the liveness of the
+-- Heka service.
+function timer_event(ns)
+   inject_payload("txt", payload_name, payload or math.floor(ns / 1e9))
+end
diff --git a/heka/files/lua/outputs/lastfile.lua b/heka/files/lua/outputs/lastfile.lua
new file mode 100644
index 0000000..28335a5
--- /dev/null
+++ b/heka/files/lua/outputs/lastfile.lua
@@ -0,0 +1,27 @@
+-- Copyright 2015 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+require "io"
+
+local path = read_config('path') or error('path required')
+local field = read_config('field') or 'Payload'
+
+-- Very simple output sandbox that writes the value of one of the message's
+-- fields ('Payload' by default) to a file.
+function process_message()
+    local fh = io.open(path, "w")
+    io.output(fh)
+    io.write(read_message(field))
+    io.close()
+    return 0
+end
diff --git a/heka/files/output/dashboard.toml b/heka/files/output/dashboard.toml
deleted file mode 100644
index 72c74c1..0000000
--- a/heka/files/output/dashboard.toml
+++ /dev/null
@@ -1,5 +0,0 @@
-[DashboardOutput]
-{%- if values.dashboard_address is defined %}
-address = "{{ values.dashboard_address }}:{{ values.dashboard_port }}"
-{% endif %}
-ticker_interval = {{ values.ticker_interval }}
diff --git a/heka/files/output/logoutput.toml b/heka/files/output/logoutput.toml
deleted file mode 100644
index 0df4f54..0000000
--- a/heka/files/output/logoutput.toml
+++ /dev/null
@@ -1,3 +0,0 @@
-[LogOutput]
-encoder = "{{ values.encoder }}"
-message_matcher = "{{ values.message_matcher }}"
diff --git a/heka/files/service_wrapper b/heka/files/service_wrapper
new file mode 100644
index 0000000..f3532f1
--- /dev/null
+++ b/heka/files/service_wrapper
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+HEKAD="/usr/bin/hekad"
+
+exec $HEKAD -config=/etc/{{ service_name }}
diff --git a/heka/files/toml/decoder/multidecoder.toml b/heka/files/toml/decoder/multidecoder.toml
new file mode 100644
index 0000000..3ac53c9
--- /dev/null
+++ b/heka/files/toml/decoder/multidecoder.toml
@@ -0,0 +1,5 @@
+[{{ decoder_name }}_decoder]
+type = "MultiDecoder"
+subs = {{ decoder.subs }}
+cascade_strategy = "{{ decoder.cascade_strategy }}"
+log_sub_errors = {{ decoder.log_sub_errors }}
diff --git a/heka/files/toml/decoder/payloadregex.toml b/heka/files/toml/decoder/payloadregex.toml
new file mode 100644
index 0000000..3e791bd
--- /dev/null
+++ b/heka/files/toml/decoder/payloadregex.toml
@@ -0,0 +1,4 @@
+[{{ decoder_name }}_decoder]
+type = "PayloadRegexDecoder"
+match_regex = "{{ decoder.match_regex }}"
+timestamp_layout = "{{ decoder.timestamp_layout }}"
diff --git a/heka/files/toml/decoder/protobuf.toml b/heka/files/toml/decoder/protobuf.toml
new file mode 100644
index 0000000..9c5ccac
--- /dev/null
+++ b/heka/files/toml/decoder/protobuf.toml
@@ -0,0 +1,2 @@
+[{{ decoder_name }}_decoder]
+type = "ProtobufDecoder"
diff --git a/heka/files/toml/decoder/sandbox.toml b/heka/files/toml/decoder/sandbox.toml
new file mode 100644
index 0000000..897f4be
--- /dev/null
+++ b/heka/files/toml/decoder/sandbox.toml
@@ -0,0 +1,25 @@
+[{{ decoder_name }}_decoder]
+type = "SandboxDecoder"
+filename = "{{ decoder.module_file }}"
+{%- if decoder.module_directory is defined %}
+module_directory = "{{ decoder.module_dir }}"
+{%- endif %}
+{%- if decoder.memory_limit is defined %}
+memory_limit = "{{ decoder.memory_limit }}"
+{%- endif %}
+{%- if decoder.preserve_data is defined %}
+preserve_data = "{{ decoder.preserve_data }}"
+{%- endif %}
+{%- if decoder.instruction_limit is defined %}
+instruction_limit = "{{ decoder.instruction_limit }}"
+{%- endif %}
+{%- if decoder.output_limit is defined %}
+output_limit = "{{ decoder.output_limit }}"
+{%- endif %}
+
+{%- if decoder.config is defined %}
+[{{ decoder_name }}_decoder.config]
+{%- for config_param, config_value in decoder.config.iteritems() %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% else %}{{ config_value }}{% endif %}
+{%- endfor %}
+{%- endif %}
diff --git a/heka/files/toml/encoder/elasticsearch.toml b/heka/files/toml/encoder/elasticsearch.toml
new file mode 100644
index 0000000..4765798
--- /dev/null
+++ b/heka/files/toml/encoder/elasticsearch.toml
@@ -0,0 +1,5 @@
+[{{ name }}_encoder]
+type = "ESJsonEncoder"
+index = "{{ values.index }}"
+es_index_from_timestamp = true
+fields = [ "DynamicFields", "Hostname", "Logger", "Payload", "Pid", "Severity", "Timestamp", "Type" ]
diff --git a/heka/files/encoder/es-json.toml b/heka/files/toml/encoder/es-json.toml
similarity index 100%
rename from heka/files/encoder/es-json.toml
rename to heka/files/toml/encoder/es-json.toml
diff --git a/heka/files/encoder/es-payload.toml b/heka/files/toml/encoder/es-payload.toml
similarity index 100%
rename from heka/files/encoder/es-payload.toml
rename to heka/files/toml/encoder/es-payload.toml
diff --git a/heka/files/encoder/protobuf.toml b/heka/files/toml/encoder/protobuf.toml
similarity index 100%
rename from heka/files/encoder/protobuf.toml
rename to heka/files/toml/encoder/protobuf.toml
diff --git a/heka/files/encoder/RstEncoder.toml b/heka/files/toml/encoder/rst.toml
similarity index 100%
rename from heka/files/encoder/RstEncoder.toml
rename to heka/files/toml/encoder/rst.toml
diff --git a/heka/files/toml/filter/sandbox.toml b/heka/files/toml/filter/sandbox.toml
new file mode 100644
index 0000000..76afba2
--- /dev/null
+++ b/heka/files/toml/filter/sandbox.toml
@@ -0,0 +1,25 @@
+[{{ filter_name }}_filter]
+type = "SandboxFilter"
+filename = "{{ filter.module_file }}"
+{%- if filter.module_directory is defined %}
+module_directory = "{{ filter.module_dir }}"
+{%- endif %}
+{%- if filter.message_matcher is defined %}
+message_matcher = "{{ filter.message_matcher }}"
+{%- endif %}
+{%- if filter.preserve_data is defined %}
+preserve_data = "{{ filter.preserve_data }}"
+{%- endif %}
+{%- if filter.ticker_interval is defined %}
+ticker_interval = "{{ filter.ticker_interval }}"
+{%- endif %}
+{%- if filter.hostname is defined %}
+hostname = "{{ filter.hostname }}"
+{%- endif %}
+
+{%- if filter.config is defined %}
+[{{ filter_name }}_decoder.config]
+{%- for config_param, config_value in filter.config.iteritems() %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% else %}{{ config_value }}{% endif %}
+{%- endfor %}
+{%- endif %}
diff --git a/heka/files/toml/global.toml b/heka/files/toml/global.toml
new file mode 100644
index 0000000..40aaf3b
--- /dev/null
+++ b/heka/files/toml/global.toml
@@ -0,0 +1,12 @@
+[hekad]
+
+{%- set workers = grains.num_cpus + 1 %}
+maxprocs={{ workers }}
+base_dir="/var/cache/{{ service_name }}"
+
+hostname="{{ grains.fqdn.split('.')[0] }}"
+
+max_message_size = 262144
+max_process_inject = 1
+max_timer_inject = 10
+poolsize = 100
diff --git a/heka/files/toml/input/amqp.toml b/heka/files/toml/input/amqp.toml
new file mode 100644
index 0000000..111ad7d
--- /dev/null
+++ b/heka/files/toml/input/amqp.toml
@@ -0,0 +1,39 @@
+[input_{{ input_name }}]
+type = "AMQPInput"
+url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}/{{ input.vhost }}"
+exchange = "{{ input.exchange }}"
+exchange_type = "{{ input.exchange_type }}"
+
+{% if input.prefetch_count is defined -%}
+prefetch_count = {{ input.prefetch_count }}
+{% endif %}
+{%- if input.exchange_durability is defined -%}
+exchange_durability = "{{ input.exchange_durability }}"
+{% endif %}
+{%- if input.exchange_auto_delete is defined -%}
+exchange_auto_delete = "{{ input.exchange_auto_delete }}"
+{% endif %}
+{%- if input.queue_auto_delete is defined -%}
+queue_auto_delete = {{ input.queue_auto_delete }}
+{% endif %}
+{%- if input.queue is defined -%}
+queue = "{{ input.queue }}"
+{% endif %}
+{%- if input.routing_key is defined -%}
+routing_key = "{{ input.routing_key }}"
+{% endif %}
+decoder = "{{ input.decoder }}"
+splitter = "{{ input.splitter }}"
+
+{%- if input.ssl is defined and input.ssl.get('enabled', True) %}
+[input_{{ input_name }}.tls]
+cert_file = "{{ input.ssl.cert_file }}"
+key_file = "{{ input.ssl.key_file }}"
+{%- if input.ssl.ca_file is defined %}
+root_cafile = "{{ input.ssl.ca_file }}"
+{%- endif %}
+{%- endif %}
+
+{#-
+vim: syntax=jinja
+-#}
diff --git a/heka/files/toml/input/collectd.toml b/heka/files/toml/input/collectd.toml
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/heka/files/toml/input/collectd.toml
diff --git a/heka/files/toml/input/logstreamer.toml b/heka/files/toml/input/logstreamer.toml
new file mode 100644
index 0000000..791467e
--- /dev/null
+++ b/heka/files/toml/input/logstreamer.toml
@@ -0,0 +1,19 @@
+[{{ input_name }}_input]
+type = "LogstreamerInput"
+log_directory = "{{ input.log_directory }}"
+file_match = '{{ input.file_match }}'
+{%- if input.differentiator is defined %}
+differentiator = {{ input.differentiator }}
+{%- endif %}
+{%- if input.priority is defined %}
+priority = {{ input.priority }}
+{%- endif %}
+{%- if input.decoder is defined %}
+decoder = "{{ input.decoder }}"
+{%- endif %}
+{%- if input.splitter is defined %}
+splitter = '{{ input.splitter }}'
+{%- endif %}
+{%- if input.oldest_duration is defined %}
+oldest_duration = "{{ input.oldest_duration }}"
+{%- endif %}
diff --git a/heka/files/input/process.toml b/heka/files/toml/input/process.toml
similarity index 100%
rename from heka/files/input/process.toml
rename to heka/files/toml/input/process.toml
diff --git a/heka/files/output/amqp.toml b/heka/files/toml/output/amqp.toml
similarity index 90%
rename from heka/files/output/amqp.toml
rename to heka/files/toml/output/amqp.toml
index 5932b71..2c9bd7e 100644
--- a/heka/files/output/amqp.toml
+++ b/heka/files/toml/output/amqp.toml
@@ -1,4 +1,4 @@
-[AMQPOutput_{{ name }}]
+[{{ name }}_output]
 type = "AMQPOutput"
 url = "amqp{% if values.ssl is defined and values.ssl.get('enabled', True) %}s{% endif %}://{{ values.user }}:{{ values.password }}@{{ values.host }}/{{ values.vhost }}"
 exchange = "{{ values.exchange }}"
@@ -7,13 +7,13 @@
 use_framing = true
 encoder = "{{ values.encoder }}"
 
-[AMQPOutput_{{ name }}.retries]
+[{{ name }}_output.retries]
 max_delay = "{{ values.get('max_delay', '30s') }}"
 delay = "{{ values.get('delay', '250ms') }}"
 max_retries = {{ values.get('max_retries', '-1') }}
 
 {%- if values.ssl is defined and values.ssl.get('enabled', True) %}
-[AMQPOutput_{{ name }}.tls]
+[{{ name }}_output.tls]
 cert_file = "{{ values.ssl.cert_file }}"
 key_file = "{{ values.ssl.key_file }}"
 {%- if values.ssl.ca_file is defined %}
diff --git a/heka/files/toml/output/dashboard.toml b/heka/files/toml/output/dashboard.toml
new file mode 100644
index 0000000..fb63768
--- /dev/null
+++ b/heka/files/toml/output/dashboard.toml
@@ -0,0 +1,3 @@
+[DashboardOutput]
+address = "{{ values.host }}:{{ values.port }}"
+ticker_interval = {{ values.ticker_interval }}
diff --git a/heka/files/output/elasticsearch.toml b/heka/files/toml/output/elasticsearch.toml
similarity index 66%
rename from heka/files/output/elasticsearch.toml
rename to heka/files/toml/output/elasticsearch.toml
index 7d00888..0a55b52 100644
--- a/heka/files/output/elasticsearch.toml
+++ b/heka/files/toml/output/elasticsearch.toml
@@ -1,13 +1,13 @@
-[output_{{ name }}]
+[{{ name }}_output]
 type = "ElasticSearchOutput"
 message_matcher = "{{ values.message_matcher }}"
 encoder = "{{ values.encoder }}"
 server = "http://{{ values.host }}:{{ values.port }}"
 flush_interval = {{ values.flush_interval }}
 flush_count = {{ values.flush_count }}
+use_buffering = true
 
-[output_{{ name }}.buffering]
-max_file_size = 16777216 #16M
-max_buffer_size = 67108864 #64M
+[{{ name }}_output.buffering]
+max_buffer_size = 1073741824
+max_file_size = 134217728
 full_action = "block"
-cursor_update_count = 100
diff --git a/heka/files/toml/output/log.toml b/heka/files/toml/output/log.toml
new file mode 100644
index 0000000..0d18530
--- /dev/null
+++ b/heka/files/toml/output/log.toml
@@ -0,0 +1,3 @@
+[{{ name }}_output]
+encoder = "LogOutput"
+message_matcher = "{{ values.message_matcher }}"
diff --git a/heka/files/toml/output/tcp.toml b/heka/files/toml/output/tcp.toml
new file mode 100644
index 0000000..11d5763
--- /dev/null
+++ b/heka/files/toml/output/tcp.toml
@@ -0,0 +1,11 @@
+[output_{{ name }}]
+type="TcpOutput"
+address = "{{ values.host }}:{{ values.port }}"
+encoder = "ProtobufEncoder"
+message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric' || Type == 'heka.sandbox.bulk_metric')"
+
+use_buffering = true
+[output_{{ name }}.buffering]
+max_buffer_size = 268435456
+max_file_size = 67108864
+full_action = "drop"
diff --git a/heka/files/toml/splitter/regex.toml b/heka/files/toml/splitter/regex.toml
new file mode 100644
index 0000000..1f4a6c9
--- /dev/null
+++ b/heka/files/toml/splitter/regex.toml
@@ -0,0 +1,4 @@
+[{{ splitter_name }}_splitter]
+type = "RegexSplitter"
+delimiter = "{{ splitter.delimiter }}"
+delimiter_eol = "{{ splitter.delimiter_eol }}"
diff --git a/heka/files/toml/splitter/token.toml b/heka/files/toml/splitter/token.toml
new file mode 100644
index 0000000..bbe5464
--- /dev/null
+++ b/heka/files/toml/splitter/token.toml
@@ -0,0 +1,3 @@
+[{{ splitter_name }}_splitter]
+type = "TokenSplitter"
+delimiter = "{{ splitter.delimiter }}"
diff --git a/heka/init.sls b/heka/init.sls
index 71d650a..0c4f1fa 100644
--- a/heka/init.sls
+++ b/heka/init.sls
@@ -1,5 +1,15 @@
+{%- if pillar.heka is defined %}
 include:
-{% if pillar.heka.server is defined %}
-- heka.server
-{% endif %}
-
+{%- if pillar.heka.log_collector is defined %}
+- heka.log_collector
+{%- endif %}
+{%- if pillar.heka.metric_collector is defined %}
+- heka.metric_collector
+{%- endif %}
+{%- if pillar.heka.remote_collector is defined %}
+- heka.remote_collector
+{%- endif %}
+{%- if pillar.heka.aggregator is defined %}
+- heka.aggregator
+{%- endif %}
+{%- endif %}
diff --git a/heka/log_collector.sls b/heka/log_collector.sls
new file mode 100644
index 0000000..a50e6d2
--- /dev/null
+++ b/heka/log_collector.sls
@@ -0,0 +1,10 @@
+{%- if pillar.heka.log_collector is defined %}
+
+include:
+- heka._common
+
+{%- set service_name = "log_collector" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/map.jinja b/heka/map.jinja
index cab337d..c4e3848 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -3,7 +3,6 @@
   pkgs:
   - acl
   - heka
-  - heka-lua-scripts
   user:
   - heka
   groups:
@@ -26,4 +25,3 @@
 {%- endload %}
 
 {%- set server = salt['grains.filter_by'](server_defaults, merge=salt['pillar.get']('heka:server')) %}
-
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
new file mode 100644
index 0000000..fbb0ba7
--- /dev/null
+++ b/heka/meta/heka.yml
@@ -0,0 +1,83 @@
+log_collector:
+  encoder:
+    elasticsearch:
+      engine: elasticsearch
+      index: "%{Type}-%{%Y.%m.%d}"
+  output:
+    elasticsearch:
+      engine: elasticsearch
+      message_matcher: "Type == 'log' || Type  == 'notification'"
+      encoder: elasticsearch_encoder
+      host: mon
+      port: 9200
+      flush_interval: 5000
+      flush_count: 100
+    metric_collector:
+      engine: tcp
+      host: 127.0.0.1
+      port: 5567
+    log_dashboard:
+      engine: dashboard
+      host: 127.0.0.1
+      port: 4352
+metric_collector:
+  decoder:
+    heka_collectd:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/decoders/collectd.lua
+      module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+      config:
+        hostname: '{{ grains.fqdn.split('.')[0] }}'
+        swap_size: 4294967296
+    heka_http_check:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/decoders/noop.lua
+      module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+      config:
+        msg_type: lma.http-check
+    heka_metric:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/decoders/metric.lua
+      module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+      config:
+        deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter'
+  input: {}
+  filter:
+    heka_metric_collector:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/heka_monitoring.lua
+      module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+      preserve_data: false
+      message_matcher: "Type == 'heka.all-report'"
+    rabbitmq_service_status:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/afd.lua
+      module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
+      preserve_data: false
+      message_matcher: "(Type == 'metric' || Type == 'heka.sandbox.metric') && (Fields[name] == 'rabbitmq_check')"
+      ticker_interval: 10
+      config:
+        hostname: '{{ grains.fqdn.split('.')[0] }}'
+        afd_type: 'service'
+        afd_file: 'lma_alarms_rabbitmq_service_check'
+        afd_cluster_name: 'rabbitmq-service'
+        afd_logical_name: 'check'
+        activate_alerting: true
+        enable_notification: false
+  output: {}
+  splitter: {}
+  encoder: {}
+remote_collector:
+  decoder: {}
+  input: {}
+  filter: {}
+  output: {}
+  splitter: {}
+  encoder: {}
+aggregator:
+  decoder: {}
+  input: {}
+  filter: {}
+  output: {}
+  splitter: {}
+  encoder: {}
diff --git a/heka/metric_collector.sls b/heka/metric_collector.sls
new file mode 100644
index 0000000..9684c1b
--- /dev/null
+++ b/heka/metric_collector.sls
@@ -0,0 +1,10 @@
+{%- if pillar.heka.metric_collector is defined %}
+
+include:
+- heka._common
+
+{%- set service_name = "metric_collector" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/remote_collector.sls b/heka/remote_collector.sls
new file mode 100644
index 0000000..961587d
--- /dev/null
+++ b/heka/remote_collector.sls
@@ -0,0 +1,10 @@
+{%- if pillar.heka.remote_collector is defined %}
+
+include:
+- heka._common
+
+{%- set service_name = "remote_collector" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/server.sls b/heka/server.sls
deleted file mode 100644
index c26c967..0000000
--- a/heka/server.sls
+++ /dev/null
@@ -1,201 +0,0 @@
-{%- from "heka/map.jinja" import server with context %}
-{%- if server.enabled %}
-
-heka_packages:
-  pkg.latest:
-  - names: {{ server.pkgs }}
-
-purge-heka-conf-dir:
-  file.directory:
-  - name: /etc/heka/conf.d/
-  - clean: True
-  - makedirs: True
-  - require:
-    - pkg: heka_packages
-
-heka_ssl:
-  file.directory:
-  - name: /etc/heka/ssl
-  - user: root
-  - group: heka
-  - mode: 750
-  - require:
-    - pkg: heka_packages
-    - user: heka_user
-
-/etc/heka/conf.d/00-hekad.toml:
-  file.managed:
-  - source: salt://heka/files/00-hekad.toml
-  - template: jinja
-  - mode: 640
-  - group: heka
-  - require:
-    - pkg: heka_packages
-    - file: purge-heka-conf-dir
-    - user: heka_user
-
-{%- if grains.os_family == 'RedHat' %}
-/etc/systemd/system/heka.service:
-  file.managed:
-  - source: salt://heka/files/heka.service
-  - require:
-    - file: /etc/heka/conf.d/00-hekad.toml
-
-/var/cache/hekad:
-  file.directory:
-  - user: heka
-  - require:
-    - user: heka_user
-{%- endif %}
-
-heka_acl_log:
-  cmd.run:
-  - name: "setfacl -R -m g:adm:rx /var/log; setfacl -R -d -m g:adm:rx /var/log"
-  - unless: "getfacl /var/log/|grep default:group:adm"
-
-heka_service:
-  service.running:
-  - enable: true
-  - name: heka
-  - watch:
-    - file: /etc/heka/conf.d/00-hekad.toml
-  - require:
-    - user: heka_user
-
-heka_user:
-  user.present:
-  - name: heka
-  - system: true
-  - shell: /bin/nologin
-  - groups: {{ server.groups }}
-  - require:
-    - pkg: heka_packages
-
-{%- for name,values in server.input.iteritems() %}
-
-/etc/heka/conf.d/15-input-{{ name }}-{{ values['engine'] }}.toml:
-  file.managed:
-  - source: salt://heka/files/input/{{ values['engine'] }}.toml
-  - template: jinja
-  - mode: 640
-  - group: heka
-  - require:
-    - file: /etc/heka/conf.d/00-hekad.toml
-  - watch_in:
-    - service: heka_service
-  - defaults:
-      name: {{ name }}
-      values: {{ values }}
-
-{%- endfor %}
-
-{%- for name,values in server.output.iteritems() %}
-{%- if values.enabled %}
-/etc/heka/conf.d/60-output-{{ name }}-{{ values['engine'] }}.toml:
-  file.managed:
-  - source: salt://heka/files/output/{{ values['engine'] }}.toml
-  - template: jinja
-  - mode: 640
-  - group: heka
-  - require:
-    - file: /etc/heka/conf.d/00-hekad.toml
-  - watch_in:
-    - service: heka_service
-  - defaults:
-      name: {{ name }}
-      values: {{ values }}
-
-{%- endif %}
-{%- endfor %}
-
-
-{%- for name,values in server.filter.iteritems() %}
-
-/etc/heka/conf.d/20-filter-{{ name }}-{{ values['engine'] }}.toml:
-  file.managed:
-  - source: salt://heka/files/filter/{{ values['engine'] }}.toml
-  - template: jinja
-  - mode: 640
-  - group: heka
-  - require:
-    - file: /etc/heka/conf.d/00-hekad.toml
-  - watch_in:
-    - service: heka_service
-  - defaults:
-      name: {{ name }}
-      values: {{ values }}
-
-{%- endfor %}
-
-{%- for name,values in server.splitter.iteritems() %}
-
-/etc/heka/conf.d/30-splitter-{{ name }}-{{ values['engine'] }}.toml:
-  file.managed:
-  - source: salt://heka/files/splitter/{{ values['engine'] }}.toml
-  - template: jinja
-  - mode: 640
-  - group: heka
-  - require:
-    - file: /etc/heka/conf.d/00-hekad.toml
-  - watch_in:
-    - service: heka_service
-  - defaults:
-      name: {{ name }}
-      values: {{ values }}
-
-{%- endfor %}
-
-{%- for name,values in server.encoder.iteritems() %}
-
-/etc/heka/conf.d/40-encoder-{{ name }}-{{ values['engine'] }}.toml:
-  file.managed:
-  - source: salt://heka/files/encoder/{{ values['engine'] }}.toml
-  - template: jinja
-  - mode: 640
-  - group: heka
-  - require:
-    - file: /etc/heka/conf.d/00-hekad.toml
-  - watch_in:
-    - service: heka_service
-  - defaults:
-      name: {{ name }}
-      values: {{ values }}
-
-{%- endfor %}
-
-{%- for name,values in server.decoder.iteritems() %}
-
-/etc/heka/conf.d/10-decoder-{{ name }}-{{ values['engine'] }}.toml:
-  file.managed:
-  - source: salt://heka/files/decoder/{{ values['engine'] }}.toml
-  - template: jinja
-  - mode: 640
-  - group: heka
-  - require:
-    - file: /etc/heka/conf.d/00-hekad.toml
-  - watch_in:
-        - service: heka_service
-  - defaults:
-      name: {{ name }}
-      values: {{ values }}
-
-{%- endfor %}
-
-{%- for service_name, service in pillar.items() %}
-{%- if service.get('_support', {}).get('heka', {}).get('enabled', False) %}
-
-/etc/heka/conf.d/99-{{ service_name }}.toml:
-  file.managed:
-  - source: salt://{{ service_name }}/files/heka.toml
-  - template: jinja
-  - mode: 640
-  - group: heka
-  - require:
-    - file: /etc/heka/conf.d/00-hekad.toml
-  - watch_in:
-    - service: heka_service
-
-{%- endif %}
-{%- endfor %}
-
-{%- endif %}
diff --git a/metadata/service/aggregator/single.yml b/metadata/service/aggregator/single.yml
new file mode 100644
index 0000000..a44ae20
--- /dev/null
+++ b/metadata/service/aggregator/single.yml
@@ -0,0 +1,6 @@
+applications:
+- heka
+parameters:
+  heka:
+    aggregator:
+      enabled: true
diff --git a/metadata/service/log_collector/single.yml b/metadata/service/log_collector/single.yml
new file mode 100644
index 0000000..0406f0e
--- /dev/null
+++ b/metadata/service/log_collector/single.yml
@@ -0,0 +1,6 @@
+applications:
+- heka
+parameters:
+  heka:
+    log_collector:
+      enabled: true
diff --git a/metadata/service/metric_collector/single.yml b/metadata/service/metric_collector/single.yml
new file mode 100644
index 0000000..2dcd5e0
--- /dev/null
+++ b/metadata/service/metric_collector/single.yml
@@ -0,0 +1,6 @@
+applications:
+- heka
+parameters:
+  heka:
+    metric_collector:
+      enabled: true
diff --git a/metadata/service/remote_collector/single.yml b/metadata/service/remote_collector/single.yml
new file mode 100644
index 0000000..67bdf8d
--- /dev/null
+++ b/metadata/service/remote_collector/single.yml
@@ -0,0 +1,6 @@
+applications:
+- heka
+parameters:
+  heka:
+    remote_collector:
+      enabled: true
diff --git a/metadata/service/support.yml b/metadata/service/support.yml
index 8041310..4919327 100644
--- a/metadata/service/support.yml
+++ b/metadata/service/support.yml
@@ -4,7 +4,7 @@
       collectd:
         enabled: false
       heka:
-        enabled: false
+        enabled: true
       sensu:
         enabled: false
       sphinx: