Round 2
diff --git a/heka/_common.sls b/heka/_common.sls
index 0ffad13..0da7736 100644
--- a/heka/_common.sls
+++ b/heka/_common.sls
@@ -26,4 +26,9 @@
   service.dead:
   - name: heka
 
-{%- endif %}
+heka_grains_dir:
+  file.directory:
+  - name: /etc/salt/grains.d
+  - mode: 700
+  - makedirs: true
+  - user: root
diff --git a/heka/_service.sls b/heka/_service.sls
index 7085abe..6e058da 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -1,8 +1,4 @@
 
-{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file %}{% endmacro %}
-
-{%- macro service_config(service_name) %}
-
 {%- set server = salt['pillar.get']('heka:'+service_name) %}
 
 {%- if server.enabled %}
@@ -44,59 +40,66 @@
   - source: salt://heka/files/heka.service
   - user: root
   - mode: 644
+  - defaults:
+      service_name: {{ service_name }}
   - template: jinja
 
 heka_{{ service_name }}_service_wrapper:
   file.managed:
-  - name: /usr/local/bin/{{ service_name }}
+  - name: /usr/local/bin/{{ service_name }}_wrapper
   - source: salt://heka/files/service_wrapper
   - user: root
   - mode: 755
+  - defaults:
+      service_name: {{ service_name }}
   - template: jinja
 
 {%- endif %}
 
+heka_{{ service_name }}_service:
+  service.running:
+  - name: {{ service_name }}
+  - enable: True
 
 {# Setup basic structure for all roles so updates can apply #}
 
 {%- set service_grains = {
-  'heka': {
-    'log_collector': {
-      'decoder': {},
-      'input': {},
-      'filter': {},
-      'splitter': {},
-      'encoder': {},
-      'output': {}
-    },
-    'metric_collector': {
-      'decoder': {},
-      'input': {},
-      'filter': {},
-      'splitter': {},
-      'encoder': {},
-      'output': {}
-    },
-    'remote_collector': {
-      'decoder': {},
-      'input': {},
-      'filter': {},
-      'splitter': {},
-      'encoder': {},
-      'output': {}
-    },
-    'aggregator': {
-      'decoder': {},
-      'input': {},
-      'filter': {},
-      'splitter': {},
-      'encoder': {},
-      'output': {}
-    }
+  'log_collector': {
+    'decoder': {},
+    'input': {},
+    'filter': {},
+    'splitter': {},
+    'encoder': {},
+    'output': {}
+  },
+  'metric_collector': {
+    'decoder': {},
+    'input': {},
+    'filter': {},
+    'splitter': {},
+    'encoder': {},
+    'output': {}
+  },
+  'remote_collector': {
+    'decoder': {},
+    'input': {},
+    'filter': {},
+    'splitter': {},
+    'encoder': {},
+    'output': {}
+  },
+  'aggregator': {
+    'decoder': {},
+    'input': {},
+    'filter': {},
+    'splitter': {},
+    'encoder': {},
+    'output': {}
   }
 } %}
 
 
+
 {# Loading the other services' support metadata for local roles #}
 
 {%- if service_name in ['log_collector', 'metric_collector'] %}
@@ -104,9 +107,11 @@
 {%- for service_name, service in pillar.iteritems() %}
 {%- if service.get('_support', {}).get('heka', {}).get('enabled', False) %}
 
+{%- macro load_grains_file(grains_fragment_file) %}{% include grains_fragment_file %}{% endmacro %}
+
 {%- set grains_fragment_file = service_name+'/meta/heka.yml' %}
 {%- set grains_yaml = load_grains_file(grains_fragment_file)|load_yaml %}
-{%- do service_grains.heka.update(grains_yaml) %}
+{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=grains_yaml) %}
 
 {%- endif %}
 {%- endfor %}
@@ -121,7 +126,7 @@
 {%- for node_name, node_grains in salt['mine.get']('*', 'grains.items').iteritems() %}
 {%- if node_grains.heka is defined %}
 
-{%- do service_grains.heka.update(node_grains.heka) %}
+{%- do service_grains.update(node_grains.heka) %}
 
 {%- endif %}
 {%- endfor %}
@@ -136,7 +141,10 @@
 
 {%- for service_action_name, service_action in service_grain.iteritems() %}
 {%- if salt['pillar.get']('heka:'+service_grain_name).get(service_action_name, False) is mapping %}
-{%- do service_grains.heka.[service_grain_name].[service_action_name].update(salt['pillar.get']('heka:'+service_grain_name+':'+service_action_name)) %}
+{%- set grain_action_meta = salt['pillar.get']('heka:'+service_grain_name+':'+service_action_name) %}
+{#
+{%- set service_grains.get(service_grain_name).get(service_action_name) = salt['grains.filter_by']({'default': service_grains}, merge=grain_action_meta) %}
+#}
 {%- endif %}
 {%- endfor %}
 
@@ -144,6 +152,19 @@
 {%- endfor %}
 
 
+heka_{{ service_name }}_grain:
+  file.managed:
+  - name: /etc/salt/grains.d/heka
+  - source: salt://heka/files/heka.grain
+  - template: jinja
+  - user: root
+  - mode: 600
+  - defaults:
+    service_grains:
+      heka: {{ service_grains|yaml }}
+  - require:
+    - file: heka_grains_dir
+
 /etc/{{ service_name }}/global.toml:
   file.managed:
   - source: salt://heka/files/toml/global.toml
@@ -159,9 +180,11 @@
   - watch_in:
     - service: heka_{{ service_name }}_service
 
-{%- for decoder_name, decoder in service_grains.heka.[service_name].decoder.iteritems() %}
+{%- set service_metadata = service_grains.get(service_name) %}
 
-/etc/{{ service_name }}/10-decoder-{{ decoder_name }}-{{ decoder.engine }}.toml:
+{%- for decoder_name, decoder in service_metadata.get('decoder', {}).iteritems() %}
+
+/etc/{{ service_name }}/decoder_{{ decoder_name }}.toml:
   file.managed:
   - source: salt://heka/files/toml/decoder/{{ decoder.engine }}.toml
   - template: jinja
@@ -179,9 +202,9 @@
 
 {%- endfor %}
 
-{%- for input_name, input in service_grains.heka.[service_name].input.iteritems() %}
+{%- for input_name, input in service_metadata.get('input', {}).iteritems() %}
 
-/etc/{{ service_name }}/15-input-{{ input_name }}-{{ input.engine }}.toml:
+/etc/{{ service_name }}/input_{{ input_name }}.toml:
   file.managed:
   - source: salt://heka/files/toml/input/{{ input.engine }}.toml
   - template: jinja
@@ -199,9 +222,9 @@
 
 {%- endfor %}
 
-{%- for filter_name, filter in service_grains.heka.[service_name].filter.iteritems() %}
+{%- for filter_name, filter in service_metadata.get('filter', {}).iteritems() %}
 
-/etc/{{ service_name }}/20-filter-{{ filter_name }}-{{ filter.engine }}.toml:
+/etc/{{ service_name }}/filter_{{ filter_name }}.toml:
   file.managed:
   - source: salt://heka/files/toml/filter/{{ filter.engine }}.toml
   - template: jinja
@@ -219,9 +242,9 @@
 
 {%- endfor %}
 
-{%- for splitter_name, splitter in service_grains.heka.[service_name].splitter.iteritems() %}
+{%- for splitter_name, splitter in service_metadata.get('splitter', {}).iteritems() %}
 
-/etc/{{ service_name }}/30-splitter-{{ splitter_name }}-{{ splitter.engine }}.toml:
+/etc/{{ service_name }}/splitter_{{ splitter_name }}.toml:
   file.managed:
   - source: salt://heka/files/toml/splitter/{{ splitter.engine }}.toml
   - template: jinja
@@ -239,9 +262,9 @@
 
 {%- endfor %}
 
-{%- for encoder_name, encoder in service_grains.heka.[service_name].encoder.iteritems() %}
+{%- for encoder_name, encoder in service_metadata.get('encoder', {}).iteritems() %}
 
-/etc/{{ service_name }}/40-encoder-{{ encoder_name }}-{{ encoder.engine }}.toml:
+/etc/{{ service_name }}/encoder_{{ encoder_name }}.toml:
   file.managed:
   - source: salt://heka/files/toml/encoder/{{ encoder.engine }}.toml
   - template: jinja
@@ -259,9 +282,9 @@
 
 {%- endfor %}
 
-{%- for output_name, output in service_grains.heka.[service_name].output.iteritems() %}
+{%- for output_name, output in service_metadata.get('output', {}).iteritems() %}
 
-/etc/{{ service_name }}/60-output-{{ output_name }}-{{ output.engine }}.toml:
+/etc/{{ service_name }}/output_{{ output_name }}.toml:
   file.managed:
   - source: salt://heka/files/toml/output/{{ output.engine }}.toml
   - template: jinja
@@ -280,7 +303,3 @@
 {%- endfor %}
 
 {%- endif %}
-
-{%- endmacro %}
-
-{%- service_config(service_name) %}
diff --git a/heka/files/heka.grain b/heka/files/heka.grain
new file mode 100644
index 0000000..3e3b373
--- /dev/null
+++ b/heka/files/heka.grain
@@ -0,0 +1 @@
+{{ service_grains|yaml(False) }}
diff --git a/heka/files/toml/decoder/sandbox.toml b/heka/files/toml/decoder/sandbox.toml
index 897f4be..3cc8413 100644
--- a/heka/files/toml/decoder/sandbox.toml
+++ b/heka/files/toml/decoder/sandbox.toml
@@ -20,6 +20,6 @@
 {%- if decoder.config is defined %}
 [{{ decoder_name }}_decoder.config]
 {%- for config_param, config_value in decoder.config.iteritems() %}
-{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% else %}{{ config_value }}{% endif %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% elif config_value in [True, False] %}{{ config_value|lower }}{% else %}{{ config_value }}{% endif %}
 {%- endfor %}
 {%- endif %}
diff --git a/heka/files/toml/encoder/elasticsearch.toml b/heka/files/toml/encoder/elasticsearch.toml
index 4765798..d91d07a 100644
--- a/heka/files/toml/encoder/elasticsearch.toml
+++ b/heka/files/toml/encoder/elasticsearch.toml
@@ -1,5 +1,11 @@
-[{{ name }}_encoder]
+[{{ encoder_name }}_encoder]
 type = "ESJsonEncoder"
-index = "{{ values.index }}"
+{%- if encoder.index is defined %}
+index = "{{ encoder.index }}"
+{%- else %}
+{% raw %}
+index = "%{Type}-%{%Y.%m.%d}"
+{%- endraw %}
+{%- endif %}
 es_index_from_timestamp = true
 fields = [ "DynamicFields", "Hostname", "Logger", "Payload", "Pid", "Severity", "Timestamp", "Type" ]
diff --git a/heka/files/toml/encoder/es-json.toml b/heka/files/toml/encoder/es-json.toml
deleted file mode 100644
index ae9e0da..0000000
--- a/heka/files/toml/encoder/es-json.toml
+++ /dev/null
@@ -1,5 +0,0 @@
-[es_json]
-type = "ESJsonEncoder"
-es_index_from_timestamp = true
-index = "{{ values.get('index', '%{Type}-%{%Y.%m.%d}') }}"
-type_name = "{% raw %}%{Type}{% endraw %}"
diff --git a/heka/files/toml/encoder/es-payload.toml b/heka/files/toml/encoder/es-payload.toml
deleted file mode 100644
index 315ab13..0000000
--- a/heka/files/toml/encoder/es-payload.toml
+++ /dev/null
@@ -1,8 +0,0 @@
-[es_payload]
-type = "SandboxEncoder"
-filename = "lua_encoders/es_payload.lua"
-
-    [es_payload.config]
-    es_index_from_timestamp = true
-    index = "{{ values.get('index', '%{Logger}-%{%Y.%m.%d}') }}"
-    type_name = "{% raw %}%{Type}-%{Hostname}{% endraw %}"
diff --git a/heka/files/toml/encoder/payload.toml b/heka/files/toml/encoder/payload.toml
new file mode 100644
index 0000000..b7cc8f4
--- /dev/null
+++ b/heka/files/toml/encoder/payload.toml
@@ -0,0 +1,4 @@
+[{{ encoder_name }}_encoder]
+type = "PayloadEncoder"
+append_newlines = {{ encoder.get('append_newlines', False)|lower }}
+prefix_ts = {{ encoder.get('prefix_ts', False)|lower }}
diff --git a/heka/files/toml/encoder/protobuf.toml b/heka/files/toml/encoder/protobuf.toml
index b16fc08..5d6d675 100644
--- a/heka/files/toml/encoder/protobuf.toml
+++ b/heka/files/toml/encoder/protobuf.toml
@@ -1,2 +1 @@
 [ProtobufEncoder]
-
diff --git a/heka/files/toml/filter/sandbox.toml b/heka/files/toml/filter/sandbox.toml
index 76afba2..eefaca0 100644
--- a/heka/files/toml/filter/sandbox.toml
+++ b/heka/files/toml/filter/sandbox.toml
@@ -18,8 +18,8 @@
 {%- endif %}
 
 {%- if filter.config is defined %}
-[{{ filter_name }}_decoder.config]
+[{{ filter_name }}_filter.config]
 {%- for config_param, config_value in filter.config.iteritems() %}
-{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% else %}{{ config_value }}{% endif %}
+{{ config_param }} = {% if config_value is string %}"{{ config_value }}"{% elif config_value in [True, False] %}{{ config_value|lower }}{% else %}{{ config_value }}{% endif %}
 {%- endfor %}
 {%- endif %}
diff --git a/heka/files/toml/input/amqp.toml b/heka/files/toml/input/amqp.toml
index 111ad7d..e75596e 100644
--- a/heka/files/toml/input/amqp.toml
+++ b/heka/files/toml/input/amqp.toml
@@ -1,39 +1,34 @@
-[input_{{ input_name }}]
+[{{ input_name }}_input]
 type = "AMQPInput"
 url = "amqp{% if input.ssl is defined and input.ssl.get('enabled', True) %}s{% endif %}://{{ input.user }}:{{ input.password }}@{{ input.host }}/{{ input.vhost }}"
 exchange = "{{ input.exchange }}"
 exchange_type = "{{ input.exchange_type }}"
-
-{% if input.prefetch_count is defined -%}
+{%- if input.prefetch_count is defined -%}
 prefetch_count = {{ input.prefetch_count }}
-{% endif %}
+{%- endif %}
 {%- if input.exchange_durability is defined -%}
 exchange_durability = "{{ input.exchange_durability }}"
-{% endif %}
+{%- endif %}
 {%- if input.exchange_auto_delete is defined -%}
 exchange_auto_delete = "{{ input.exchange_auto_delete }}"
-{% endif %}
+{%- endif %}
 {%- if input.queue_auto_delete is defined -%}
 queue_auto_delete = {{ input.queue_auto_delete }}
-{% endif %}
+{%- endif %}
 {%- if input.queue is defined -%}
 queue = "{{ input.queue }}"
-{% endif %}
+{%- endif %}
 {%- if input.routing_key is defined -%}
 routing_key = "{{ input.routing_key }}"
-{% endif %}
+{%- endif %}
 decoder = "{{ input.decoder }}"
 splitter = "{{ input.splitter }}"
 
 {%- if input.ssl is defined and input.ssl.get('enabled', True) %}
-[input_{{ input_name }}.tls]
+[{{ input_name }}_input.tls]
 cert_file = "{{ input.ssl.cert_file }}"
 key_file = "{{ input.ssl.key_file }}"
 {%- if input.ssl.ca_file is defined %}
 root_cafile = "{{ input.ssl.ca_file }}"
 {%- endif %}
 {%- endif %}
-
-{#-
-vim: syntax=jinja
--#}
diff --git a/heka/files/toml/input/collectd.toml b/heka/files/toml/input/collectd.toml
deleted file mode 100644
index e69de29..0000000
--- a/heka/files/toml/input/collectd.toml
+++ /dev/null
diff --git a/heka/files/toml/input/http.toml b/heka/files/toml/input/http.toml
new file mode 100644
index 0000000..1a4a8db
--- /dev/null
+++ b/heka/files/toml/input/http.toml
@@ -0,0 +1,5 @@
+[{{ input_name }}_input]
+type="HttpListenInput"
+address = "{{ input.address }}:{{ input.port }}"
+decoder = "{{ input.decoder }}"
+splitter = '{{ input.splitter }}'
diff --git a/heka/files/toml/input/process.toml b/heka/files/toml/input/process.toml
index a7b4379..547b4ee 100644
--- a/heka/files/toml/input/process.toml
+++ b/heka/files/toml/input/process.toml
@@ -1,29 +1,11 @@
-[<%= @title %>_process]
+[{{ input_name }}_input]
 type="ProcessInput"
-ticker_interval = <%= @ticker_interval %>
-decoder = "<%= @decoder %>_decoder"
-<% if @stdout -%>
-stdout = true
-<% else %>
-stdout = false
-<% end %>
-<% if @stderr -%>
-stderr = true
-<% else %>
-stderr = false
-<% end %>
-
-<% if @splitter -%>
-splitter = "<%= @splitter %>_splitter"
-<% else -%>
-splitter = "NullSplitter"
-<% end -%>
-
-<% @commands.each_with_index do |command, i| -%>
- <% command.each do |cmd, args| -%>
-  [<%= @title %>_process.command.<%= i %>]
-   bin = "<%= cmd %>"
-   args = [ <%= args.collect{ |x| '"%s"' % x }.join(", ") %> ]
- <%end%>
-<% end %>
-
+ticker_interval = {{ input.get('ticker_interval', 30) }}
+{%- if input.decoder is defined %}
+decoder = "{{ input.decoder }}"
+{%- endif %}
+{%- if input.splitter is defined %}
+splitter = "{{ input.get('splitter', 'NullSplitter') }}"
+{%- endif %}
+stdout = {{ input.get('stdout', true)|lower }}
+stderr = {{ input.get('stderr', true)|lower }}
diff --git a/heka/files/toml/input/tcp.toml b/heka/files/toml/input/tcp.toml
new file mode 100644
index 0000000..ba690b0
--- /dev/null
+++ b/heka/files/toml/input/tcp.toml
@@ -0,0 +1,5 @@
+[{{ input_name }}_input]
+type="TcpInput"
+address = "{{ input.address }}:{{ input.port }}"
+decoder = "{{ input.decoder }}"
+splitter = '{{ input.splitter }}'
diff --git a/heka/files/toml/output/amqp.toml b/heka/files/toml/output/amqp.toml
index 2c9bd7e..80f2106 100644
--- a/heka/files/toml/output/amqp.toml
+++ b/heka/files/toml/output/amqp.toml
@@ -1,4 +1,4 @@
-[{{ name }}_output]
+[{{ output_name }}_output]
 type = "AMQPOutput"
 url = "amqp{% if values.ssl is defined and values.ssl.get('enabled', True) %}s{% endif %}://{{ values.user }}:{{ values.password }}@{{ values.host }}/{{ values.vhost }}"
 exchange = "{{ values.exchange }}"
@@ -7,20 +7,16 @@
 use_framing = true
 encoder = "{{ values.encoder }}"
 
-[{{ name }}_output.retries]
+[{{ output_name }}_output.retries]
 max_delay = "{{ values.get('max_delay', '30s') }}"
 delay = "{{ values.get('delay', '250ms') }}"
 max_retries = {{ values.get('max_retries', '-1') }}
 
 {%- if values.ssl is defined and values.ssl.get('enabled', True) %}
-[{{ name }}_output.tls]
+[{{ output_name }}_output.tls]
 cert_file = "{{ values.ssl.cert_file }}"
 key_file = "{{ values.ssl.key_file }}"
 {%- if values.ssl.ca_file is defined %}
 root_cafile = "{{ values.ssl.ca_file }}"
 {%- endif %}
 {%- endif %}
-
-{#-
-vim: syntax=jinja
--#}
diff --git a/heka/files/toml/output/dashboard.toml b/heka/files/toml/output/dashboard.toml
index fb63768..f4a6ebe 100644
--- a/heka/files/toml/output/dashboard.toml
+++ b/heka/files/toml/output/dashboard.toml
@@ -1,3 +1,3 @@
 [DashboardOutput]
-address = "{{ values.host }}:{{ values.port }}"
-ticker_interval = {{ values.ticker_interval }}
+address = "{{ output.host }}:{{ output.port }}"
+ticker_interval = {{ output.get('ticker_interval', 30) }}
diff --git a/heka/files/toml/output/elasticsearch.toml b/heka/files/toml/output/elasticsearch.toml
index 0a55b52..54aa685 100644
--- a/heka/files/toml/output/elasticsearch.toml
+++ b/heka/files/toml/output/elasticsearch.toml
@@ -1,13 +1,13 @@
-[{{ name }}_output]
+[{{ output_name }}_output]
 type = "ElasticSearchOutput"
-message_matcher = "{{ values.message_matcher }}"
-encoder = "{{ values.encoder }}"
-server = "http://{{ values.host }}:{{ values.port }}"
-flush_interval = {{ values.flush_interval }}
-flush_count = {{ values.flush_count }}
+message_matcher = "{{ output.message_matcher }}"
+encoder = "{{ output.encoder }}"
+server = "http://{{ output.host }}:{{ output.port }}"
+flush_interval = {{ output.flush_interval }}
+flush_count = {{ output.flush_count }}
 use_buffering = true
 
-[{{ name }}_output.buffering]
+[{{ output_name }}_output.buffering]
 max_buffer_size = 1073741824
 max_file_size = 134217728
 full_action = "block"
diff --git a/heka/files/toml/output/http.toml b/heka/files/toml/output/http.toml
new file mode 100644
index 0000000..a7cb3c1
--- /dev/null
+++ b/heka/files/toml/output/http.toml
@@ -0,0 +1,18 @@
+[{{ output_name }}_output]
+type = "HttpOutput"
+message_matcher = "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
+encoder = "influxdb_encoder"
+address = "http://{{ output.host }}:8086/write?db=lma&precision=ms"
+username = "influxdb"
+password = "influxdbpass"
+http_timeout = 5000
+method = "POST"
+use_buffering = true
+
+[influxdb_output.buffering]
+max_buffer_size = 1610612736
+max_file_size = 134217728
+full_action = "drop"
+
+[influxdb_output.headers]
+Content-Type = ["application/x-www-form-urlencoded"]
diff --git a/heka/files/toml/output/log.toml b/heka/files/toml/output/log.toml
index 0d18530..9f9f172 100644
--- a/heka/files/toml/output/log.toml
+++ b/heka/files/toml/output/log.toml
@@ -1,3 +1,3 @@
-[{{ name }}_output]
+[{{ output_name }}_output]
 encoder = "LogOutput"
-message_matcher = "{{ values.message_matcher }}"
+message_matcher = "{{ output.message_matcher }}"
diff --git a/heka/files/toml/output/tcp.toml b/heka/files/toml/output/tcp.toml
index 11d5763..dee7fa0 100644
--- a/heka/files/toml/output/tcp.toml
+++ b/heka/files/toml/output/tcp.toml
@@ -1,11 +1,11 @@
-[output_{{ name }}]
+[{{ output_name }}_output]
 type="TcpOutput"
-address = "{{ values.host }}:{{ values.port }}"
+address = "{{ output.host }}:{{ output.port }}"
 encoder = "ProtobufEncoder"
-message_matcher = "(Type == 'metric' || Type == 'heka.sandbox.metric' || Type == 'heka.sandbox.bulk_metric')"
-
+message_matcher = "{{ output.message_matcher }}"
 use_buffering = true
-[output_{{ name }}.buffering]
+
+[{{ output_name }}_output.buffering]
 max_buffer_size = 268435456
 max_file_size = 67108864
 full_action = "drop"
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index fbb0ba7..4ecaeac 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -2,7 +2,6 @@
   encoder:
     elasticsearch:
       engine: elasticsearch
-      index: "%{Type}-%{%Y.%m.%d}"
   output:
     elasticsearch:
       engine: elasticsearch
@@ -16,10 +15,12 @@
       engine: tcp
       host: 127.0.0.1
       port: 5567
+      message_matcher: "(Type == 'metric' || Type == 'heka.sandbox.metric' || Type == 'heka.sandbox.bulk_metric')"
     log_dashboard:
       engine: dashboard
       host: 127.0.0.1
       port: 4352
+      ticker_interval: 30
 metric_collector:
   decoder:
     heka_collectd:
@@ -41,7 +42,31 @@
       module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
       config:
         deserialize_bulk_metric_for_loggers: 'aggregated_http_metrics_filter hdd_errors_counter_filter'
-  input: {}
+  input:
+    heka_http_chek:
+      engine: http
+      address: 127.0.0.1
+      port: 5566
+      decoder: http-check_decoder
+      splitter: NullSplitter
+    heka_collectd:
+      engine: http
+      address: 127.0.0.1
+      port: 8325
+      decoder: collectd_decoder
+      splitter: NullSplitter
+    heka_aggregator:
+      engine: tcp
+      address: 0.0.0.0
+      port: 5565
+      decoder: aggregator_decoder
+      splitter: HekaFramingSplitter
+    heka_metric:
+      engine: tcp
+      address: 0.0.0.0
+      port: 5567
+      decoder: metric_decoder
+      splitter: HekaFramingSplitter
   filter:
     heka_metric_collector:
       engine: sandbox
@@ -49,31 +74,33 @@
       module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
       preserve_data: false
       message_matcher: "Type == 'heka.all-report'"
-    rabbitmq_service_status:
-      engine: sandbox
-      module_file: /usr/share/lma_collector/filters/afd.lua
-      module_dir: /usr/share/lma_collector_modules;/usr/share/heka/lua_modules
-      preserve_data: false
-      message_matcher: "(Type == 'metric' || Type == 'heka.sandbox.metric') && (Fields[name] == 'rabbitmq_check')"
-      ticker_interval: 10
-      config:
-        hostname: '{{ grains.fqdn.split('.')[0] }}'
-        afd_type: 'service'
-        afd_file: 'lma_alarms_rabbitmq_service_check'
-        afd_cluster_name: 'rabbitmq-service'
-        afd_logical_name: 'check'
-        activate_alerting: true
-        enable_notification: false
-  output: {}
-  splitter: {}
-  encoder: {}
+  encoder:
+    influxdb:
+      engine: payload
+      append_newlines: false
+      prefix_ts: false
+  output:
+    aggregator:
+      engine: tcp
+      host: 127.0.0.1
+      port: 5565
+      message_matcher: "Fields[aggregator] == NIL && Type =~ /^heka\\\\.sandbox\\\\.afd.*metric$/"
+    metric_dashboard:
+      engine: dashboard
+      host: 127.0.0.1
+      port: 4353
+      ticker_interval: 30
 remote_collector:
-  decoder: {}
-  input: {}
-  filter: {}
-  output: {}
-  splitter: {}
-  encoder: {}
+  output:
+    metric_collector:
+      engine: tcp
+      host: 127.0.0.1
+      port: 5567
+    remote_collector_dashboard:
+      engine: dashboard
+      host: 127.0.0.1
+      port: 4354
+      ticker_interval: 30 
 aggregator:
   decoder: {}
   input: {}
diff --git a/metadata/service/aggregator/single.yml b/metadata/service/aggregator/single.yml
index a44ae20..a46e97e 100644
--- a/metadata/service/aggregator/single.yml
+++ b/metadata/service/aggregator/single.yml
@@ -1,5 +1,7 @@
 applications:
 - heka
+classes:
+- service.heka.support
 parameters:
   heka:
     aggregator:
diff --git a/metadata/service/log_collector/single.yml b/metadata/service/log_collector/single.yml
index 0406f0e..f857da5 100644
--- a/metadata/service/log_collector/single.yml
+++ b/metadata/service/log_collector/single.yml
@@ -1,5 +1,7 @@
 applications:
 - heka
+classes:
+- service.heka.support
 parameters:
   heka:
     log_collector:
diff --git a/metadata/service/metric_collector/single.yml b/metadata/service/metric_collector/single.yml
index 2dcd5e0..41d5256 100644
--- a/metadata/service/metric_collector/single.yml
+++ b/metadata/service/metric_collector/single.yml
@@ -1,5 +1,7 @@
 applications:
 - heka
+classes:
+- service.heka.support
 parameters:
   heka:
     metric_collector:
diff --git a/metadata/service/remote_collector/single.yml b/metadata/service/remote_collector/single.yml
index 67bdf8d..d0b6420 100644
--- a/metadata/service/remote_collector/single.yml
+++ b/metadata/service/remote_collector/single.yml
@@ -1,5 +1,7 @@
 applications:
 - heka
+classes:
+- service.heka.support
 parameters:
   heka:
     remote_collector: