{%- from "heka/map.jinja" import log_collector with context %}
{%- from "heka/map.jinja" import metric_collector with context %}
{%- from "heka/map.jinja" import remote_collector with context %}
{%- from "heka/map.jinja" import aggregator with context %}
{%- from "heka/map.jinja" import ceilometer_collector with context %}

log_collector:
  filter:
    aggregated_http_metrics:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/http_metrics_aggregator.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      message_matcher: "Type == 'log' && Fields[http_response_time] != NIL"
      ticker_interval: 10
      config:
        hostname: '{{ grains.host }}'
        interval: 10
        max_timer_inject: 10
        bulk_size: 523
        percentile: 90
        grace_time: 5
        source: log_collector
    # This filter monitors the OpenStack logs and sends the rate of processed
    # log messages broken down by service and severity every  minute
    log_counter:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/logs_counter.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      preserve_data: true
      message_matcher: "Type == 'log' && Logger =~ /^openstack\\\\./"
      ticker_interval: 60
      config:
        hostname: '{{ grains.host }}'
        grace_interval: 30
        logger_matcher: '^openstack%.(%a+)$'
        source: log_collector
    hdd_errors:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/hdd_errors_counter.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      preserve_data: false
      message_matcher: "Type == 'log' && Logger == 'system.kern'"
      ticker_interval: 10
      config:
        grace_interval: 10
        patterns: "/error%s.+([sv]d[a-z][a-z]?)%d?/ /([sv]d[a-z][a-z]?)%d?.+%serror/"
        hostname: '{{ grains.host }}'
        source: log_collector
{%- if log_collector.sensu_host is defined %}
    watchdog:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/watchdog.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      message_matcher: "FALSE"
      preserve_data: false
      ticker_interval: 60
      config:
        service_name: "log_collector"
{%- endif %}
{%- if log_collector.elasticsearch_host is defined or log_collector.sensu_host is defined %}
  encoder:
{%- if log_collector.elasticsearch_host %}
    elasticsearch:
      engine: elasticsearch
{%- endif %}
{%- if log_collector.sensu_host is defined %}
    sensu:
      engine: sandbox
      module_file: /usr/share/lma_collector/encoders/status_sensu.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        noop_handler: "{{ log_collector.sensu_noop_handler }}"
        notification_handler: "{{ log_collector.sensu_notification_handler }}"
        watchdog_ttl: {{ log_collector.sensu_watchdog_ttl }}
{%- endif %}
{%- endif %}
  output:
    metric_collector:
      engine: tcp
      host: {{ log_collector.metric_collector_host }}
      port: {{ log_collector.metric_collector_port }}
      message_matcher: "(Type == 'metric' || Type == 'heka.sandbox.metric' || Type == 'heka.sandbox.bulk_metric')"
{%- if log_collector.sensu_host is defined %}
    sensu_watchdog:
      engine: udp
      host: "{{ log_collector.sensu_host }}"
      port: "{{ log_collector.sensu_port }}"
      message_matcher: "Type == 'heka.sandbox.watchdog'"
      encoder: sensu_encoder
      use_buffering: false
{%- endif %}
{%- if log_collector.elasticsearch_host is defined %}
    elasticsearch:
      engine: elasticsearch
      server: "http://{{ log_collector.elasticsearch_host }}:{{ log_collector.elasticsearch_port }}"
      encoder: elasticsearch_encoder
      message_matcher: "Type == 'log'"
{%- endif %}
metric_collector:
  decoder:
    collectd:
      engine: sandbox
      module_file: /usr/share/lma_collector/decoders/collectd.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        hostname: '{{ grains.fqdn.split('.')[0] }}'
        swap_size: {{ salt['ps.swap_memory']()['total'] }}
    metric:
      engine: sandbox
      module_file: /usr/share/lma_collector/decoders/metric.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        deserialize_for_sources: 'log_collector'
  input:
    heka_collectd:
      engine: http
      address: 127.0.0.1
      port: 8325
      decoder: collectd_decoder
      splitter: NullSplitter
    heka_metric:
      engine: tcp
      address: 0.0.0.0
      port: 5567
      decoder: metric_decoder
      splitter: HekaFramingSplitter
{%- if metric_collector.influxdb_host is defined or metic_collector.sensu_host is defined %}
  filter:
{%- if metric_collector.sensu_host is defined %}
    watchdog:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/watchdog.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      message_matcher: "FALSE"
      preserve_data: false
      ticker_interval: 60
      config:
        service_name: "metric_collector"
{%- endif %}
{%- if metric_collector.influxdb_host is defined %}
    influxdb_accumulator:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      preserve_data: false
      message_matcher: "Type =~ /metric$/"
      ticker_interval: 1
      config:
        tag_fields: "{{ metric_collector.influxdb_tag_fields|join(' ') }}"
        time_precision: "{{ metric_collector.influxdb_time_precision }}"
{%- endif %}
{%- endif %}
{%- if metric_collector.influxdb_host is defined or metric_collector.nagios_host is defined or metric_collector.sensu_host is defined %}
  encoder:
  {%- if metric_collector.sensu_host is defined %}
    sensu:
      engine: sandbox
      module_file: /usr/share/lma_collector/encoders/status_sensu.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        noop_handler: "{{ metric_collector.sensu_noop_handler }}"
        notification_handler: "{{ metric_collector.sensu_notification_handler }}"
        watchdog_ttl: {{ metric_collector.sensu_watchdog_ttl }}
  {%- endif %}
  {%- if metric_collector.influxdb_host is defined %}
    influxdb:
      engine: payload
      append_newlines: false
      prefix_ts: false
  {%- endif %}
  {%- if metric_collector.nagios_host is defined %}
    nagios:
      engine: sandbox
      module_file: /usr/share/lma_collector/encoders/status_nagios.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        host_suffix_dimension_key: environment_label
   {%- endif %}
{%- endif %}
{%- if metric_collector.influxdb_host is defined or metric_collector.aggregator_host is defined or metric_collector.nagios_host is defined or metric_collector.sensu_host is defined %}
  output:
  {%- if metric_collector.sensu_host is defined %}
    sensu_watchdog:
      engine: udp
      host: "{{ metric_collector.sensu_host }}"
      port: "{{ metric_collector.sensu_port }}"
      message_matcher: "Type == 'heka.sandbox.watchdog'"
      encoder: sensu_encoder
      use_buffering: false
  {%- endif %}
  {%- if metric_collector.influxdb_host is defined %}
    influxdb:
      engine: http
      address: "http://{{ metric_collector.influxdb_host }}:{{ metric_collector.influxdb_port }}/write?db={{ metric_collector.influxdb_database }}&precision={{ metric_collector.influxdb_time_precision }}"
    {%- if metric_collector.influxdb_username and metric_collector.influxdb_password %}
      username: "{{ metric_collector.influxdb_username }}"
      password: "{{ metric_collector.influxdb_password }}"
    {%- endif %}
      message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
      encoder: influxdb_encoder
      timeout: {{ metric_collector.influxdb_timeout }}
  {%- endif %}
  {%- if metric_collector.aggregator_host is defined %}
    aggregator:
      engine: tcp
      host: "{{ metric_collector.aggregator_host }}"
      port: "{{ metric_collector.aggregator_port }}"
      message_matcher: "Type == 'heka.sandbox.afd_metric'"
  {%- endif %}
  {%- if metric_collector.nagios_host is defined %}
    nagios_alarm:
      engine: http
      address: "http://{{ metric_collector.nagios_host }}:{{metric_collector.nagios_port }}/status"
      message_matcher: "Type == 'heka.sandbox.afd_metric' && Fields[alerting_enabled] == TRUE"
      encoder: nagios_encoder
      {%- if metric_collector.nagios_username is defined and metric_collector.nagios_password is defined %}
      username: {{ metric_collector.get('nagios_username') }}
      password: {{ metric_collector.get('nagios_password') }}
      {%- endif %}
      max_buffer_size: 1048576
      max_file_size: 524288
      full_action: drop
  {%- endif %}
{%- endif %}
remote_collector:
  decoder:
    collectd:
      engine: sandbox
      module_file: /usr/share/lma_collector/decoders/collectd.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        hostname: '{{ grains.host }}'
{%- if remote_collector.amqp_host is defined %}
    notification:
      engine: sandbox
      module_file: /usr/share/lma_collector/decoders/notification.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        include_full_notification: false
        hostname: '{{ grains.host }}'
{%- endif %}
  input:
    heka_collectd:
      engine: http
      address: 0.0.0.0
      port: 8326
      decoder: collectd_decoder
      splitter: NullSplitter
{%- if remote_collector.amqp_host is defined %}
{%- for notification_level in ('info', 'warn', 'error') %}
    amqp_notification_{{ notification_level }}:
      engine: amqp
      host: {{ remote_collector.amqp_host }}
      port: {{ remote_collector.amqp_port }}
      user: {{ remote_collector.amqp_user }}
      password: {{ remote_collector.amqp_password }}
      vhost: {{ remote_collector.amqp_vhost }}
      exchange: {{ remote_collector.get('amqp_exchange', 'nova') }}
      exchange_type: topic
      exchange_durability: false
      exchange_auto_delete: false
      queue_auto_delete: false
      queue: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
      routing_key: "{{ remote_collector.amqp_notification_topic }}.{{ notification_level }}"
      can_exit: false
      decoder: notification_decoder
{%- endfor %}
{%- endif %}
{%- if remote_collector.influxdb_host is defined or remote_collector.amqp_host is defined or remote_collector.sensu_host is defined %}
  filter:
  {%- if remote_collector.sensu_host is defined %}
    watchdog:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/watchdog.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      message_matcher: "FALSE"
      preserve_data: false
      ticker_interval: 60
      config:
        service_name: "remote_collector"
  {%- endif %}
  {%- if remote_collector.influxdb_host is defined %}
    influxdb_accumulator:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      preserve_data: false
      message_matcher: "Type =~ /metric$/"
      ticker_interval: 1
      config:
        tag_fields: "{{ remote_collector.influxdb_tag_fields|join(' ') }}"
        time_precision: "{{ remote_collector.influxdb_time_precision }}"
  {%- endif %}
  {%- if remote_collector.amqp_host is defined %}
    resource_creation_time:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/resource_creation_time.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      preserve_data: false
      message_matcher: "Type == 'notification' && Fields[event_type] =~ /create.end$/"
    audit_authentications:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/audit_authentications.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      message_matcher: "Type == 'audit' &&  Fields[action] == 'authenticate'"
      ticker_interval: 60
      config:
        grace_interval: 30
        source: remote_collector
  {%- endif %}
{%- endif %}
{%- if remote_collector.influxdb_host is defined or remote_collector.elasticsearch_host is defined or remote_collector.sensu_host is defined %}
  encoder:
  {%- if remote_collector.sensu_host is defined %}
    sensu:
      engine: sandbox
      module_file: /usr/share/lma_collector/encoders/status_sensu.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        noop_handler: "{{ remote_collector.sensu_noop_handler }}"
        notification_handler: "{{ remote_collector.sensu_notification_handler }}"
        watchdog_ttl: {{ remote_collector.sensu_watchdog_ttl }}
  {%- endif %}
  {%- if remote_collector.influxdb_host is defined %}
    influxdb:
      engine: payload
      append_newlines: false
      prefix_ts: false
  {%- endif %}
  {%- if remote_collector.elasticsearch_host is defined %}
    elasticsearch:
      engine: elasticsearch
  {%- endif %}
{%- endif %}
{%- if remote_collector.influxdb_host is defined or remote_collector.aggregator_host is defined or remote_collector.elasticsearch_host is defined or remote_collector.sensu_host is defined %}
  output:
  {%- if remote_collector.sensu_host is defined %}
    sensu_watchdog:
      engine: udp
      host: "{{ remote_collector.sensu_host }}"
      port: "{{ remote_collector.sensu_port }}"
      message_matcher: "Type == 'heka.sandbox.watchdog'"
      encoder: sensu_encoder
      use_buffering: false
  {%- endif %}
  {%- if remote_collector.influxdb_host is defined %}
    influxdb:
      engine: http
      address: "http://{{ remote_collector.influxdb_host }}:{{ remote_collector.influxdb_port }}/write?db={{ remote_collector.influxdb_database }}&precision={{ remote_collector.influxdb_time_precision }}"
    {%- if remote_collector.influxdb_username and remote_collector.influxdb_password %}
      username: "{{ remote_collector.influxdb_username }}"
      password: "{{ remote_collector.influxdb_password }}"
    {%- endif %}
      message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
      encoder: influxdb_encoder
      timeout: {{ remote_collector.influxdb_timeout }}
  {%- endif %}
  {%- if remote_collector.aggregator_host is defined %}
    aggregator:
      engine: tcp
      host: "{{ remote_collector.aggregator_host }}"
      port: "{{ remote_collector.aggregator_port }}"
      message_matcher: "Type == 'heka.sandbox.afd_metric'"
  {%- endif %}
  {%- if remote_collector.elasticsearch_host is defined %}
    elasticsearch:
      engine: elasticsearch
      server: "http://{{ remote_collector.elasticsearch_host }}:{{ remote_collector.elasticsearch_port }}"
      encoder: elasticsearch_encoder
      message_matcher: "Type == 'notification' || Type == 'audit'"
  {%- endif %}
{%- endif %}
aggregator:
  policy:
    # A policy that is used to derive cluster status based
    # on the highest severity status of its members.
    highest_severity:
    - status: down
      trigger:
        logical_operator: or
        rules:
        - function: count
          arguments: [ down ]
          relational_operator: '>'
          threshold: 0
    - status: critical
      trigger:
        logical_operator: or
        rules:
        - function: count
          arguments: [ critical ]
          relational_operator: '>'
          threshold: 0
    - status: warning
      trigger:
        logical_operator: or
        rules:
        - function: count
          arguments: [ warning ]
          relational_operator: '>'
          threshold: 0
    - status: okay
      trigger:
        logical_operator: or
        rules:
        - function: count
          arguments: [ okay ]
          relational_operator: '>'
          threshold: 0
    - status: unknown
    # A policy that is used to derive a cluster status based
    # on the status okay or down status for its members.
    availability_of_members:
    - status: down
      trigger:
        logical_operator: or
        rules:
        - function: count
          arguments: [ okay ]
          relational_operator: '=='
          threshold: 0
    - status: critical
      trigger:
        logical_operator: and
        rules:
        - function: count
          arguments: [ okay ]
          relational_operator: '=='
          threshold: 1
        - function: count
          arguments: [ down ]
          relational_operator: '>'
          threshold: 0
    - status: warning
      trigger:
        logical_operator: and
        rules:
        - function: count
          arguments: [ okay ]
          relational_operator: '>='
          threshold: 2
        - function: count
          arguments: [ down ]
          relational_operator: '>'
          threshold: 0
    - status: okay
      trigger:
        logical_operator: or
        rules:
        - function: percent
          arguments: [ okay ]
          relational_operator: '=='
          threshold: 100
    - status: unknown
    # A policy that is used to derive a cluster status based
    # on the health status of its members.
    status_of_members:
    - status: down
      trigger:
        logical_operator: or
        rules:
        - function: percent
          arguments: [ down ]
          relational_operator: '=='
          threshold: 100
    - status: critical
      trigger:
        logical_operator: and
        rules:
        - function: count
          arguments: [ okay, warning ]
          relational_operator: '<='
          threshold: 1
        - function: count
          arguments: [ critical, down, unknown ]
          relational_operator: '>'
          threshold: 0
    - status: warning
      trigger:
        logical_operator: or
        rules:
        - function: percent
          arguments: [ okay ]
          relational_operator: '!='
          threshold: 100
    - status: okay
      trigger:
        logical_operator: or
        rules:
        - function: percent
          arguments: [ okay ]
          relational_operator: '=='
          threshold: 100
    - status: unknown
    # A policy that is typically used for storage or compute clusters
    majority_of_node_members:
    - status: down
      trigger:
        logical_operator: or
        rules:
        - function: percent
          arguments: [ down ]
          relational_operator: '=='
          threshold: 100
    - status: critical
      trigger:
        logical_operator: and
        rules:
        - function: percent
          arguments: [ down, critical ]
          relational_operator: '>='
          threshold: 50
    - status: warning
      trigger:
        logical_operator: or
        rules:
        - function: percent
          arguments: [ down, critical, warning, unknown ]
          relational_operator: '>'
          threshold: 0
          function: percent
    - status: okay
  input:
    heka_metric:
      engine: tcp
      address: 0.0.0.0
      port: 5565
      decoder: ProtobufDecoder
      splitter: HekaFramingSplitter
{%- if aggregator.sensu_host is defined or aggregator.influxdb_host is defined %}
  filter:
{%- if aggregator.sensu_host is defined %}
    watchdog:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/watchdog.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      message_matcher: "FALSE"
      preserve_data: false
      ticker_interval: 60
      config:
        service_name: "aggregator"
{%- endif %}
{%- if aggregator.influxdb_host is defined %}
    influxdb_accumulator:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      preserve_data: false
      message_matcher: "Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.metric'"
      ticker_interval: 1
      config:
        tag_fields: "{{ aggregator.influxdb_tag_fields|join(' ') }}"
        time_precision: "{{ aggregator.influxdb_time_precision }}"
    influxdb_annotation:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/influxdb_annotation.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      preserve_data: false
      message_matcher: "Type == 'heka.sandbox.gse_metric'"
{%- endif %}
{%- endif %}
{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined or aggregator.sensu_host is defined %}
  encoder:
  {%- if aggregator.influxdb_host is defined %}
    influxdb:
      engine: payload
      append_newlines: false
      prefix_ts: false
  {%- endif %}
  {%- if aggregator.nagios_host is defined %}
    nagios:
      engine: sandbox
      module_file: /usr/share/lma_collector/encoders/status_nagios.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        host_suffix_dimension_key: environment_label
        default_nagios_host: "{{ aggregator.nagios_default_host_alarm_clusters }}"
        {%- if aggregator.nagios_host_dimension_key is defined %}
        nagios_host_dimension_key: "{{ aggregator.nagios_host_dimension_key }}"
        {%- endif %}
  {%- endif %}
  {%- if aggregator.sensu_host is defined %}
    sensu:
      engine: sandbox
      module_file: /usr/share/lma_collector/encoders/status_sensu.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        noop_handler: "{{ aggregator.sensu_noop_handler }}"
        notification_handler: "{{ aggregator.sensu_notification_handler }}"
        watchdog_ttl: {{ aggregator.sensu_watchdog_ttl }}
        {%- if aggregator.sensu_source_dimension_key is defined %}
        sensu_source_dimension_key: "{{ aggregator.sensu_source_dimension_key }}"
        {%- endif %}
  {%- endif %}
{%- endif %}
{%- if aggregator.influxdb_host is defined or aggregator.nagios_host is defined or aggregator.sensu_host is defined %}
  output:
  {%- if aggregator.influxdb_host is defined %}
    influxdb:
      engine: http
      address: "http://{{ aggregator.influxdb_host }}:{{ aggregator.influxdb_port }}/write?db={{ aggregator.influxdb_database }}&precision={{ aggregator.influxdb_time_precision }}"
    {%- if aggregator.influxdb_username and aggregator.influxdb_password %}
      username: "{{ aggregator.influxdb_username }}"
      password: "{{ aggregator.influxdb_password }}"
    {%- endif %}
      message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'influxdb'"
      encoder: influxdb_encoder
      timeout: {{ aggregator.influxdb_timeout }}
  {%- endif %}
  {%- if aggregator.nagios_host is defined %}
    nagios_alarm_cluster:
      engine: http
      address: "http://{{ aggregator.nagios_host }}:{{aggregator.nagios_port }}/status"
      message_matcher: "Type == 'heka.sandbox.gse_metric' && Fields[alerting_enabled] == TRUE"
      encoder: nagios_encoder
      {%- if aggregator.nagios_username is defined and aggregator.nagios_password is defined %}
      username: {{ aggregator.get('nagios_username') }}
      password: {{ aggregator.get('nagios_password') }}
      {%- endif %}
      max_buffer_size: 1048576
      max_file_size: 524288
      full_action: drop
  {%- endif %}
  {%- if aggregator.sensu_host is defined %}
    sensu_alarm_cluster:
      engine: udp
      host: "{{ aggregator.sensu_host }}"
      port: "{{ aggregator.sensu_port|default(3030) }}"
      message_matcher: "Type == 'heka.sandbox.watchdog' || ((Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.afd_metric') && Fields[alerting_enabled] == TRUE)"
      encoder: sensu_encoder
      use_buffering: false
  {%- endif %}
{%- endif %}
ceilometer_collector:
{%- if ceilometer_collector.amqp_host is defined %}
  decoder:
    sample:
      engine: sandbox
      module_file: /usr/share/lma_collector/decoders/metering.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        decoder: 'ceilometer'
        decode_resources: {{ ceilometer_collector.resource_decoding|lower }}
        metadata_fields: "status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state user_metadata.stack"
{%- endif %}
{%- if ceilometer_collector.amqp_host is defined %}
  input:
    openstack_sample_amqp:
      engine: amqp
      user: {{ ceilometer_collector.amqp_user }}
      password: {{ ceilometer_collector.amqp_password }}
      port: {{ ceilometer_collector.amqp_port }}
      host: {{ ceilometer_collector.amqp_host }}
      vhost: {{ ceilometer_collector.amqp_vhost }}
      queue: {{ ceilometer_collector.amqp_queue }}
      routing_key: {{ ceilometer_collector.amqp_queue }}
      decoder: sample_decoder
      splitter: NullSplitter
      exchange: {{ ceilometer_collector.get('amqp_exchange', 'ceilometer') }}
      exchange_type: topic
      exchange_auto_delete: false
      queue_auto_delete: false
{%- endif %}
{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.sensu_host is defined %}
  filter:
  {%- if ceilometer_collector.influxdb_host is defined %}
    ceilometer_influxdb_accumulator:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      preserve_data: false
      message_matcher: "Type =~ /ceilometer_samples$/"
      ticker_interval: 1
      config:
        time_precision: "{{ ceilometer_collector.influxdb_time_precision }}"
        payload_name: 'sample_data'
        flush_count: 500
        bulk_metric_type_matcher: 'ceilometer_samples'
  {%- endif %}
  {%- if ceilometer_collector.sensu_host is defined %}
    watchdog:
      engine: sandbox
      module_file: /usr/share/lma_collector/filters/watchdog.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      message_matcher: "FALSE"
      preserve_data: false
      ticker_interval: 60
      config:
        service_name: "ceilometer_collector"
  {%- endif %}
{%- endif %}
{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined or ceilometer_collector.sensu_host is defined %}
  encoder:
  {%- if ceilometer_collector.influxdb_host is defined %}
    influxdb:
      engine: payload
      append_newlines: false
      prefix_ts: false
  {%- endif %}
  {%- if ceilometer_collector.elasticsearch_host is defined %}
    elasticsearch_resource:
      engine: sandbox
      module_file:  /usr/share/lma_collector/encoders/es_ceilometer_resources.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        index: "ceilometer_resource"
        type_name: "source"
        encoder: "elasticsearch_resources"
  {%- endif %}
  {%- if ceilometer_collector.sensu_host is defined %}
    sensu:
      engine: sandbox
      module_file: /usr/share/lma_collector/encoders/status_sensu.lua
      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
      config:
        noop_handler: "{{ ceilometer_collector.sensu_noop_handler }}"
        notification_handler: "{{ ceilometer_collector.sensu_notification_handler }}"
        watchdog_ttl: {{ ceilometer_collector.sensu_watchdog_ttl }}
  {%- endif %}
{%- endif %}
{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined or ceilometer_collector.sensu_host is defined %}
  output:
  {%- if ceilometer_collector.influxdb_host is defined %}
    samples_influxdb:
      engine: http
      address: "http://{{ ceilometer_collector.influxdb_host }}:{{ ceilometer_collector.influxdb_port }}/write?db={{ ceilometer_collector.influxdb_database }}&precision={{ ceilometer_collector.influxdb_time_precision }}"
    {%- if ceilometer_collector.influxdb_username and ceilometer_collector.influxdb_password %}
      username: "{{ ceilometer_collector.influxdb_username }}"
      password: "{{ ceilometer_collector.influxdb_password }}"
    {%- endif %}
      message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'sample_data'"
      encoder: influxdb_encoder
      timeout: {{ ceilometer_collector.influxdb_timeout }}
      method: "POST"
  {%- endif %}
  {%- if ceilometer_collector.elasticsearch_host is defined %}
    elasticsearch_resource:
      engine: elasticsearch
      server: "http://{{ ceilometer_collector.elasticsearch_host }}:{{ ceilometer_collector.elasticsearch_port }}"
      message_matcher: "Type == 'ceilometer_resources'"
      encoder: elasticsearch_resource_encoder
  {%- endif %}
  {%- if ceilometer_collector.sensu_host is defined %}
    sensu_watchdog:
      engine: udp
      host: "{{ ceilometer_collector.sensu_host }}"
      port: "{{ ceilometer_collector.sensu_port }}"
      message_matcher: "Type == 'heka.sandbox.watchdog'"
      encoder: sensu_encoder
      use_buffering: false
  {%- endif %}
{%- endif %}
