Merge pull request #58 from ityaptin/stacklight

Add an os_telemetry_collector service
diff --git a/heka/_service.sls b/heka/_service.sls
index 30bde22..d098575 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -7,7 +7,7 @@
   - name: /var/log/{{ service_name }}.log
   - user: heka
   - mode: 644
-  - replace: False
+  - replace: false
 
 heka_{{ service_name }}_conf_dir:
   file.directory:
@@ -118,6 +118,14 @@
     'splitter': {},
     'encoder': {},
     'output': {},
+  },
+  'ceilometer_collector': {
+    'decoder': {},
+    'input': {},
+    'filter': {},
+    'splitter': {},
+    'encoder': {},
+    'output': {},
   }
 } %}
 
@@ -135,7 +143,6 @@
 {%- endif %}
 {%- endfor %}
 
-
 {%- if service_name in ('remote_collector', 'aggregator') %}
 
 {# Load the other services' support metadata from salt-mine #}
diff --git a/heka/ceilometer_collector.sls b/heka/ceilometer_collector.sls
new file mode 100644
index 0000000..fa05d77
--- /dev/null
+++ b/heka/ceilometer_collector.sls
@@ -0,0 +1,12 @@
+{%- if pillar.heka.ceilometer_collector is defined %}
+
+include:
+- heka._common
+
+{%- from "heka/map.jinja" import ceilometer_collector with context %}
+{%- set server = ceilometer_collector %}
+{%- set service_name = "ceilometer_collector" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/files/lua/common/ceilometer.lua b/heka/files/lua/common/ceilometer.lua
new file mode 100644
index 0000000..56b1eec
--- /dev/null
+++ b/heka/files/lua/common/ceilometer.lua
@@ -0,0 +1,60 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local table = require 'table'
+
+local samples = require 'samples'
+local resources = require 'resources'
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local l = require 'lpeg'
+l.locale(l)
+
+local fields_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
+local metadata_fields = fields_grammar:match(
+    read_config("metadata_fields") or ""
+)
+local decode_resources = read_config('decode_resources') or false
+
+
+local samples_decoder = samples.new(metadata_fields)
+local resource_decoder = nil
+
+if decode_resources then
+    resource_decoder = resources.new()
+end
+
+local CeilometerDecoder = {}
+CeilometerDecoder.__index = CeilometerDecoder
+
+setfenv(1, CeilometerDecoder) -- Remove external access to contain everything in the module
+
+function inject(code, msg)
+    if code == 0 and msg then
+        return utils.safe_inject_message(msg)
+    else
+        return code, msg
+    end
+end
+
+function decode(data)
+    local code, msg = inject(samples_decoder:decode(data))
+    if code == 0 and resource_decoder then
+        code, msg = inject(resource_decoder:decode(data))
+    end
+    return code, msg
+end
+
+return CeilometerDecoder
diff --git a/heka/files/lua/common/elasticsearch_resources.lua b/heka/files/lua/common/elasticsearch_resources.lua
new file mode 100644
index 0000000..b77caa6
--- /dev/null
+++ b/heka/files/lua/common/elasticsearch_resources.lua
@@ -0,0 +1,76 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local string = require "string"
+local cjson = require "cjson"
+local utils = require "lma_utils"
+local table = require "table"
+local pairs = pairs
+local read_message = read_message
+local utils = require "lma_utils"
+
+local index = read_config("index") or "index"
+local type_name = read_config("type_name") or "source"
+
+local ResourceEncoder = {}
+ResourceEncoder.__index = ResourceEncoder
+
+setfenv(1, ResourceEncoder) -- Remove external access to contain everything in the module
+
+function encode()
+    local ns
+    local resources = cjson.decode(read_message("Payload"))
+    payload = {}
+    for resource_id, resource in pairs(resources) do
+        local update = cjson.encode({
+            update = {_index = index, _type = type_name, _id = resource_id}
+        })
+        local body = {
+            script = 'ctx._source.meters += meter;' ..
+            'ctx._source.user_id = user_id;' ..
+            'ctx._source.project_id = project_id;' ..
+            'ctx._source.source = source; ' ..
+            'ctx._source.metadata =  ' ..
+            'ctx._source.last_sample_timestamp <= timestamp ? ' ..
+            'metadata : ctx._source.metadata;' ..
+            'ctx._source.last_sample_timestamp = ' ..
+            'ctx._source.last_sample_timestamp < timestamp ?' ..
+            'timestamp : ctx._source.last_sample_timestamp;' ..
+            'ctx._source.first_sample_timestamp = ' ..
+            'ctx._source.first_sample_timestamp > timestamp ?' ..
+            'timestamp : ctx._source.first_sample_timestamp;',
+            params = {
+                meter = resource.meter,
+                metadata = resource.metadata,
+                timestamp = resource.timestamp,
+                user_id = resource.user_id or '',
+                project_id = resource.project_id or '',
+                source = resource.source or '',
+            },
+            upsert = {
+                first_sample_timestamp = resource.timestamp,
+                last_sample_timestamp = resource.timestamp,
+                project_id = resource.project_id or '',
+                user_id = resource.user_id or '',
+                source = resource.source or '',
+                metadata = resource.metadata,
+                meters = resource.meter
+            }
+        }
+        bulk_body = string.format("%s\n%s\n", update, cjson.encode(body))
+        table.insert(payload, bulk_body)
+    end
+    return 0, table.concat(payload)
+end
+
+return ResourceEncoder
\ No newline at end of file
diff --git a/heka/files/lua/common/resources.lua b/heka/files/lua/common/resources.lua
new file mode 100644
index 0000000..465585b
--- /dev/null
+++ b/heka/files/lua/common/resources.lua
@@ -0,0 +1,121 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local cjson = cjson
+local string = string
+local table = table
+local math = math
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local pcall = pcall
+local type = type
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+local l = require 'lpeg'
+l.locale(l)
+
+function normalize_uuid(uuid)
+    return patt.Uuid:match(uuid)
+end
+
+local metadata_fields = {}
+
+local ResourcesDecoder = {}
+ResourcesDecoder.__index = ResourcesDecoder
+
+setfenv(1, ResourcesDecoder) -- Remove external access to contain everything in the module
+
+function normalize_uuid(uuid)
+    return patt.Uuid:match(uuid)
+end
+
+-- Mapping table defining transformation functions to be applied, keys are the
+-- attributes in the notification's payload and values are Lua functions
+local transform_functions = {
+    created_at = utils.format_datetime,
+    launched_at = utils.format_datetime,
+    deleted_at = utils.format_datetime,
+    terminated_at = utils.format_datetime,
+    user_id = normalize_uuid,
+    project_id = normalize_uuid,
+}
+
+function map(func, tbl)
+     local mapped_table = {}
+     for i,v in pairs(tbl) do
+         mapped_table[i] = func(v)
+     end
+     return mapped_table
+end
+
+local resource_msg = {
+    Timestamp = nil,
+    Type = "ceilometer_resources",
+    Payload = nil
+}
+
+function add_resource_to_payload(sample, payload)
+    local counter_name, _ = string.gsub(sample.counter_name, "%.", "\\")
+
+    local resource_data = {
+        timestamp = sample.timestamp,
+        resource_id = sample.resource_id,
+        source = sample.source or "",
+        metadata = sample.resource_metadata,
+        user_id = sample.user_id,
+        project_id = sample.project_id,
+        meter = {
+            [counter_name] = {
+                type = sample.counter_type,
+                unit = sample.counter_unit
+            }
+        }
+    }
+    payload[sample.resource_id] = resource_data
+end
+
+function ResourcesDecoder.new()
+    local e = {}
+    setmetatable(e, ResourcesDecoder)
+    return e
+end
+
+-- Decode Ceilometer samples to resource messages
+
+-- data: oslo.messaging message with Ceilometer samples
+-- returns ok and resource or error message
+function ResourcesDecoder:decode (data)
+    local ok, message = pcall(cjson.decode, data)
+    if not ok then
+        return -2, "Cannot decode Payload"
+    end
+    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
+    if not ok then
+        return -2, "Cannot decode Payload[oslo.message]"
+    end
+    local resource_payload = {}
+    if message_body['payload'] then
+        for _, sample in ipairs(message_body["payload"]) do
+            add_resource_to_payload(sample, resource_payload)
+        end
+        resource_msg.Payload = cjson.encode(resource_payload)
+        resource_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
+        return 0, resource_msg
+    end
+    return -2, "Empty message"
+end
+
+return ResourcesDecoder
diff --git a/heka/files/lua/common/samples.lua b/heka/files/lua/common/samples.lua
new file mode 100644
index 0000000..0516e8f
--- /dev/null
+++ b/heka/files/lua/common/samples.lua
@@ -0,0 +1,194 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local cjson = cjson
+local string = string
+local table = table
+local math = math
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local pcall = pcall
+local type = type
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+local l = require 'lpeg'
+l.locale(l)
+
+function normalize_uuid(uuid)
+    return patt.Uuid:match(uuid)
+end
+
+local metadata_fields = {}
+
+local SamplesDecoder = {}
+SamplesDecoder.__index = SamplesDecoder
+
+setfenv(1, SamplesDecoder) -- Remove external access to contain everything in the module
+
+function normalize_uuid(uuid)
+    return patt.Uuid:match(uuid)
+end
+
+-- Mapping table defining transformation functions to be applied, keys are the
+-- attributes in the notification's payload and values are Lua functions
+local transform_functions = {
+    created_at = utils.format_datetime,
+    launched_at = utils.format_datetime,
+    deleted_at = utils.format_datetime,
+    terminated_at = utils.format_datetime,
+    user_id = normalize_uuid,
+    project_id = normalize_uuid,
+}
+
+function map(func, tbl)
+     local mapped_table = {}
+     for i,v in pairs(tbl) do
+         mapped_table[i] = func(v)
+     end
+     return mapped_table
+end
+
+local sample_msg = {
+    Timestamp = nil,
+    -- This message type has the same structure than 'bulk_metric'.
+    Type = "ceilometer_samples",
+    Payload = nil
+}
+
+function parse_metadata_field(field)
+    local from = 1
+    local to = string.find(field, "%.")
+    if to ~= nil then
+        local field_t = {}
+        while to do
+            table.insert(field_t, string.sub(field, from, to - 1))
+            from = to + 1
+            to = string.find(field, "%.", from)
+        end
+        table.insert(field_t, string.sub(field, from))
+        return field_t
+    else
+        return field
+    end
+
+end
+
+function parse_metadata_fields(fields)
+    local parsed_fields = {}
+    for _, field in ipairs(fields) do
+        parsed_fields[field] = parse_metadata_field(field)
+    end
+    return parsed_fields
+end
+
+function get_field(field, metadata)
+    local value = nil
+    if type(metadata) == 'table' then
+        if type(field) == 'table' then
+            value = metadata
+            for _, field_part in ipairs(field) do
+                if not value then
+                    break
+                end
+                value = value[field_part]
+            end
+        else
+            value = metadata[field]
+        end
+    end
+    return value
+end
+
+
+function SamplesDecoder:inject_metadata(metadata, tags)
+    local value
+    for field_name, field_tbl in pairs(self.metadata_fields) do
+        value = get_field(field_tbl, metadata)
+        if value ~= nil and type(value) ~= 'table' then
+            local transform = transform_functions[field_name]
+            if transform ~= nil then
+                tags["metadata." .. field_name] = transform(value)
+            else
+                tags["metadata." .. field_name] = value
+            end
+        end
+    end
+end
+
+function SamplesDecoder:add_sample_to_payload(sample, payload)
+    local sample_data = {
+        name='sample',
+        timestamp = patt.Timestamp:match(sample.timestamp),
+        values = {
+            value = sample.counter_volume,
+            message_id = sample.message_id,
+            recorded_at = sample.recorded_at,
+            timestamp = sample.timestamp,
+            message_signature = sample.signature,
+            type = sample.counter_type,
+            unit = sample.counter_unit
+        }
+    }
+    local tags = {
+        meter = sample.counter_name,
+        resource_id = sample.resource_id,
+        project_id = sample.project_id ,
+        user_id = sample.user_id,
+        source = sample.source
+    }
+    self:inject_metadata(sample.resource_metadata or {}, tags)
+    sample_data["tags"] = tags
+    table.insert(payload, sample_data)
+end
+
+-- Create a new Sample decoder
+--
+-- metadata fields: line with metadata fields to store
+-- from samples separated by space
+function SamplesDecoder.new(metadata_fields)
+    local e = {}
+    setmetatable(e, SamplesDecoder)
+    e.metadata_fields = parse_metadata_fields(metadata_fields)
+    return e
+end
+
+
+-- Decode Ceilometer samples
+
+-- data: oslo.messaging message with Ceilometer samples
+-- returns ok and sample or error message
+function SamplesDecoder:decode (data)
+    local ok, message = pcall(cjson.decode, data)
+    if not ok then
+        return -2, "Cannot decode Payload"
+    end
+    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
+    if not ok then
+        return -2, "Cannot decode Payload[oslo.message]"
+    end
+    local sample_payload = {}
+    if message_body['payload'] then
+        for _, sample in ipairs(message_body["payload"]) do
+            self:add_sample_to_payload(sample, sample_payload)
+        end
+        sample_msg.Payload = cjson.encode(sample_payload)
+        sample_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
+        return 0, sample_msg
+    end
+    return -2, "Empty message"
+end
+
+return SamplesDecoder
diff --git a/heka/files/lua/decoders/ceilometer.lua b/heka/files/lua/decoders/ceilometer.lua
deleted file mode 100644
index 01564af..0000000
--- a/heka/files/lua/decoders/ceilometer.lua
+++ /dev/null
@@ -1,135 +0,0 @@
--- Copyright 2016 Mirantis, Inc.
---
--- Licensed under the Apache License, Version 2.0 (the "License");
--- you may not use this file except in compliance with the License.
--- You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-require "string"
-require "cjson"
-require 'table'
-require 'math'
-
-local patt = require 'patterns'
-local utils = require 'lma_utils'
-local l = require 'lpeg'
-l.locale(l)
-
-function normalize_uuid(uuid)
-    return patt.Uuid:match(uuid)
-end
-
--- the metadata_fields parameter is a list of words separated by space
-local fields_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
-local metadata_fields = fields_grammar:match(
-    read_config("metadata_fields") or ""
-)
-
-local decode_resources = read_config('decode_resources') or false
-
-local sample_msg = {
-    Timestamp = nil,
-    -- This message type has the same structure than 'bulk_metric'.
-    Type = "ceilometer_samples",
-    Payload = nil
-}
-
-local resource_msg = {
-    Timestamp = nil,
-    Type = "ceilometer_resource",
-    Fields = nil,
-}
-
-function inject_metadata(metadata, tags)
-    local value
-    for _, field in ipairs(metadata_fields) do
-        value = metadata[field]
-        if value ~= nil and type(value) ~= 'table' then
-            tags["metadata." .. field] = value
-        end
-    end
-end
-
-function add_resource_to_payload(sample, payload)
-
-    local resource_data = {
-        timestamp = sample.timestamp,
-        resource_id = sample.resource_id,
-        source = sample.source or "",
-        metadata = sample.resource_metadata,
-        user_id = sample.user_id,
-        project_id = sample.project_id,
-        meter = {
-            [sample.counter_name] = {
-                type = sample.counter_type,
-                unit = sample.counter_unit
-            }
-        }
-    }
-    payload[sample.resource_id] = resource_data
-end
-
-
-function add_sample_to_payload(sample, payload)
-    local sample_data = {
-        name='sample',
-        timestamp = patt.Timestamp:match(sample.timestamp),
-        values = {
-            value = sample.counter_volume,
-            message_id = sample.message_id,
-            recorded_at = sample.recorded_at,
-            timestamp = sample.timestamp,
-            message_signature = sample.signature,
-            type = sample.counter_type,
-            unit = sample.counter_unit
-        }
-    }
-    local tags = {
-        meter = sample.counter_name,
-        resource_id = sample.resource_id,
-        project_id = sample.project_id ,
-        user_id = sample.user_id,
-        source = sample.source
-    }
-
-    inject_metadata(sample.resource_metadata or {}, tags)
-    sample_data["tags"] = tags
-    table.insert(payload, sample_data)
-end
-
-function process_message ()
-    local data = read_message("Payload")
-    local ok, message = pcall(cjson.decode, data)
-    if not ok then
-        return -1, "Cannot decode Payload"
-    end
-    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
-    if not ok then
-        return -1, "Cannot decode Payload[oslo.message]"
-    end
-    local sample_payload = {}
-    local resource_payload = {}
-    for _, sample in ipairs(message_body["payload"]) do
-        add_sample_to_payload(sample, sample_payload)
-        if decode_resources then
-            add_resource_to_payload(sample, resource_payload)
-        end
-    end
-    sample_msg.Payload = cjson.encode(sample_payload)
-    sample_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
-    utils.safe_inject_message(sample_msg)
-
-    if decode_resources then
-        resource_msg.Payload = cjson.encode(resource_payload)
-        resource_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
-        utils.safe_inject_message(resource_msg)
-    end
-
-    return 0
-end
diff --git a/heka/files/lua/decoders/metering.lua b/heka/files/lua/decoders/metering.lua
new file mode 100644
index 0000000..d075730
--- /dev/null
+++ b/heka/files/lua/decoders/metering.lua
@@ -0,0 +1,35 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local table = require 'table'
+local utils = require 'lma_utils'
+local l = require 'lpeg'
+l.locale(l)
+
+local decoder_module = read_config('decoder') or error("Decoder module should be defined")
+
+local inject = utils.safe_inject_message
+
+if decoder_module then
+    inject = require(decoder_module).decode
+    if not inject then
+        error(decoder_module .. " does not provide a decode function")
+    end
+end
+
+function process_message ()
+    local data = read_message("Payload")
+    local code, msg = inject(data)
+    return code, msg
+end
diff --git a/heka/files/lua/encoders/es_ceilometer_resources.lua b/heka/files/lua/encoders/es_ceilometer_resources.lua
index 860ba03..61fb68c 100644
--- a/heka/files/lua/encoders/es_ceilometer_resources.lua
+++ b/heka/files/lua/encoders/es_ceilometer_resources.lua
@@ -13,54 +13,19 @@
 -- limitations under the License.
 require "string"
 require "cjson"
-local elasticsearch = require "elasticsearch"
+local utils = require "lma_utils"
+local encoder_module = read_config("encoder") or error("Encoder should be defined")
 
-local index = read_config("index") or "index"
-local type_name = read_config("type_name") or "message"
+local encode = require(encoder_module).encode
+if not encode then
+    error("Encoder should implements 'encode' function")
+end
 
 function process_message()
-    local ns
-    local resources = cjson.decode(read_message("Payload"))
-    for resource_id, resource in pairs(resources) do
-        local update = cjson.encode({update = {_index = index, _type = type_name,
-            _id = resource_id}})
-        local body = {
-            script = 'ctx._source.meters += meter;' ..
-            'ctx._source.user_id = user_id;' ..
-            'ctx._source.project_id = project_id;' ..
-            'ctx._source.source = source; ' ..
-            'ctx._source.metadata =  ' ..
-            'ctx._source.last_sample_timestamp <= timestamp ? ' ..
-            'metadata : ctx._source.metadata;' ..
-            'ctx._source.last_sample_timestamp = ' ..
-            'ctx._source.last_sample_timestamp < timestamp ?' ..
-            'timestamp : ctx._source.last_sample_timestamp;' ..
-            'ctx._source.first_sample_timestamp = ' ..
-            'ctx._source.first_sample_timestamp > timestamp ?' ..
-            'timestamp : ctx._source.first_sample_timestamp;',
-            params = {
-                meter = resource.meter,
-                metadata = resource.metadata,
-                timestamp = resource.timestamp,
-                user_id = resource.user_id or '',
-                project_id = resource.project_id or '',
-                source = resource.source or '',
-            },
-            upsert = {
-                first_sample_timestamp = resource.timestamp,
-                last_sample_timestamp = resource.timestamp,
-                project_id = resource.project_id or '',
-                user_id = resource.user_id or '',
-                source = resource.source or '',
-                metadata = resource.metadata,
-                meters = resource.meter
-            }
-        }
-        body = cjson.encode(body)
-
-        add_to_payload(update, "\n", body, "\n")
+    local code, payload = encode()
+    if code == 0 and payload then
+        return utils.safe_inject_payload('txt', 'elasticsearch', payload)
+    else
+        return code, payload
     end
-
-    inject_payload()
-    return 0
 end
diff --git a/heka/init.sls b/heka/init.sls
index 0c4f1fa..0668bb0 100644
--- a/heka/init.sls
+++ b/heka/init.sls
@@ -9,6 +9,9 @@
 {%- if pillar.heka.remote_collector is defined %}
 - heka.remote_collector
 {%- endif %}
+{%- if pillar.heka.ceilometer_collector is defined %}
+- heka.ceilometer_collector
+{%- endif %}
 {%- if pillar.heka.aggregator is defined %}
 - heka.aggregator
 {%- endif %}
diff --git a/heka/map.jinja b/heka/map.jinja
index 3995eb2..908c5df 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -39,6 +39,10 @@
 {% set default_nagios_host_alarm_clusters = '00-clusters' %}
 {% set default_automatic_starting = True %}
 
+{% set default_rabbit_port = 5672 %}
+{% set default_rabbit_vhost = '/openstack' %}
+{% set default_ceilometer_rabbit_queue = 'metering.sample' %}
+
 {% set log_collector = salt['grains.filter_by']({
   'default': {
     'elasticsearch_port': default_elasticsearch_port,
@@ -81,3 +85,16 @@
     'automatic_starting': default_automatic_starting,
   }
 }, merge=salt['pillar.get']('heka:aggregator')) %}
+
+{% set ceilometer_collector = salt['grains.filter_by']({
+  'default': {
+    'influxdb_port': default_influxdb_port,
+    'influxdb_timeout': default_influxdb_timeout,
+    'influxdb_time_precision': default_influxdb_time_precision,
+    'elasticsearch_port': default_elasticsearch_port,
+    'rabbit_port': default_rabbit_port,
+    'rabbit_vhost': default_rabbit_vhost,
+    'rabbit_queue': default_ceilometer_rabbit_queue,
+    'resource_decoding': False,
+  }
+}, merge=salt['pillar.get']('heka:ceilometer_collector')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 55c7224..0f16d20 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -2,6 +2,8 @@
 {%- from "heka/map.jinja" import metric_collector with context %}
 {%- from "heka/map.jinja" import remote_collector with context %}
 {%- from "heka/map.jinja" import aggregator  with context %}
+{%- from "heka/map.jinja" import ceilometer_collector with context %}
+
 
 log_collector:
   filter:
@@ -434,3 +436,85 @@
       max_file_size: 524288
       full_action: drop
 {%- endif %}
+ceilometer_collector:
+  decoder:
+    sample:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/decoders/metering.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        decoder: 'ceilometer'
+        decode_resources: {{ ceilometer_collector.resource_decoding|lower }}
+        metadata_fields: "status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state user_metadata.stack"
+  input:
+{%- if ceilometer_collector.rabbit_host is defined %}
+    openstack_sample_amqp:
+      engine: amqp
+      user: {{ ceilometer_collector.rabbit_user }}
+      password: {{ ceilometer_collector.rabbit_password }}
+      port: {{ ceilometer_collector.rabbit_port }}
+      host: {{ ceilometer_collector.rabbit_host }}
+      vhost: {{ ceilometer_collector.rabbit_vhost }}
+      queue: {{ ceilometer_collector.rabbit_queue }}
+      routing_key: {{ ceilometer_collector.rabbit_queue }}
+      decoder: sample_decoder
+      splitter: NullSplitter
+      exchange: "ceilometer"
+      exchange_type: "topic"
+      exchange_auto_delete: false
+      queue_auto_delete: false
+{%- endif %}
+  filter:
+{%- if ceilometer_collector.influxdb_host is defined %}
+    ceilometer_influxdb_accumulator:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      preserve_data: false
+      message_matcher: "Type =~ /ceilometer_samples$/"
+      ticker_interval: 1
+      config:
+        time_precision: "{{ ceilometer_collector.influxdb_time_precision }}"
+        payload_name: 'sample_data'
+        flush_count: 500
+        bulk_metric_type_matcher: 'ceilometer_samples'
+{%- endif %}
+  encoder:
+{%- if ceilometer_collector.influxdb_host is defined %}
+    influxdb:
+      engine: payload
+      append_newlines: false
+      prefix_ts: false
+{%- endif %}
+{%- if ceilometer_collector.elasticsearch_host is defined %}
+    elasticsearch_resource:
+      engine: sandbox
+      module_file:  /usr/share/lma_collector/encoders/es_ceilometer_resources.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      config:
+        index: "ceilometer_resource"
+        type_name: "source"
+        encoder: "elasticsearch_resources"
+{%- endif %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
+  output:
+{%- if ceilometer_collector.influxdb_host is defined %}
+    samples_influxdb:
+      engine: http
+      address: "http://{{ ceilometer_collector.influxdb_host }}:{{ ceilometer_collector.influxdb_port }}/write?db={{ ceilometer_collector.influxdb_database }}&precision={{ ceilometer_collector.influxdb_time_precision }}"
+    {%- if ceilometer_collector.influxdb_username and ceilometer_collector.influxdb_password %}
+      username: "{{ ceilometer_collector.influxdb_username }}"
+      password: "{{ ceilometer_collector.influxdb_password }}"
+    {%- endif %}
+      message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'sample_data'"
+      encoder: influxdb_encoder
+      timeout: {{ ceilometer_collector.influxdb_timeout }}
+      method: "POST"
+{%- endif %}
+{%- if ceilometer_collector.elasticsearch_host is defined %}
+    elasticsearch_resource:
+      engine: elasticsearch
+      server: "http://{{ ceilometer_collector.elasticsearch_host }}:{{ ceilometer_collector.elasticsearch_port }}"
+      message_matcher: "Type == 'ceilometer_resources'"
+      encoder: elasticsearch_resource_encoder
+{%- endif %}
diff --git a/metadata/service/ceilometer_collector/single.yml b/metadata/service/ceilometer_collector/single.yml
new file mode 100644
index 0000000..bfc6e75
--- /dev/null
+++ b/metadata/service/ceilometer_collector/single.yml
@@ -0,0 +1,8 @@
+applications:
+- heka
+classes:
+- service.heka.support
+parameters:
+  heka:
+    ceilometer_collector:
+      enabled: true