Merge pull request #58 from ityaptin/stacklight
Add an os_telemetry_collector service
diff --git a/heka/_service.sls b/heka/_service.sls
index 30bde22..d098575 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -7,7 +7,7 @@
- name: /var/log/{{ service_name }}.log
- user: heka
- mode: 644
- - replace: False
+ - replace: false
heka_{{ service_name }}_conf_dir:
file.directory:
@@ -118,6 +118,14 @@
'splitter': {},
'encoder': {},
'output': {},
+ },
+ 'ceilometer_collector': {
+ 'decoder': {},
+ 'input': {},
+ 'filter': {},
+ 'splitter': {},
+ 'encoder': {},
+ 'output': {},
}
} %}
@@ -135,7 +143,6 @@
{%- endif %}
{%- endfor %}
-
{%- if service_name in ('remote_collector', 'aggregator') %}
{# Load the other services' support metadata from salt-mine #}
diff --git a/heka/ceilometer_collector.sls b/heka/ceilometer_collector.sls
new file mode 100644
index 0000000..fa05d77
--- /dev/null
+++ b/heka/ceilometer_collector.sls
@@ -0,0 +1,12 @@
+{%- if pillar.heka.ceilometer_collector is defined %}
+
+include:
+- heka._common
+
+{%- from "heka/map.jinja" import ceilometer_collector with context %}
+{%- set server = ceilometer_collector %}
+{%- set service_name = "ceilometer_collector" %}
+
+{%- include "heka/_service.sls" %}
+
+{%- endif %}
diff --git a/heka/files/lua/common/ceilometer.lua b/heka/files/lua/common/ceilometer.lua
new file mode 100644
index 0000000..56b1eec
--- /dev/null
+++ b/heka/files/lua/common/ceilometer.lua
@@ -0,0 +1,60 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local table = require 'table'
+
+local samples = require 'samples'
+local resources = require 'resources'
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+
+local l = require 'lpeg'
+l.locale(l)
+
+local fields_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
+local metadata_fields = fields_grammar:match(
+ read_config("metadata_fields") or ""
+)
+local decode_resources = read_config('decode_resources') or false
+
+
+local samples_decoder = samples.new(metadata_fields)
+local resource_decoder = nil
+
+if decode_resources then
+ resource_decoder = resources.new()
+end
+
+local CeilometerDecoder = {}
+CeilometerDecoder.__index = CeilometerDecoder
+
+setfenv(1, CeilometerDecoder) -- Remove external access to contain everything in the module
+
+function inject(code, msg)
+ if code == 0 and msg then
+ return utils.safe_inject_message(msg)
+ else
+ return code, msg
+ end
+end
+
+function decode(data)
+ local code, msg = inject(samples_decoder:decode(data))
+ if code == 0 and resource_decoder then
+ code, msg = inject(resource_decoder:decode(data))
+ end
+ return code, msg
+end
+
+return CeilometerDecoder
diff --git a/heka/files/lua/common/elasticsearch_resources.lua b/heka/files/lua/common/elasticsearch_resources.lua
new file mode 100644
index 0000000..b77caa6
--- /dev/null
+++ b/heka/files/lua/common/elasticsearch_resources.lua
@@ -0,0 +1,76 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local string = require "string"
+local cjson = require "cjson"
+local utils = require "lma_utils"
+local table = require "table"
+local pairs = pairs
+local read_message = read_message
+local utils = require "lma_utils"
+
+local index = read_config("index") or "index"
+local type_name = read_config("type_name") or "source"
+
+local ResourceEncoder = {}
+ResourceEncoder.__index = ResourceEncoder
+
+setfenv(1, ResourceEncoder) -- Remove external access to contain everything in the module
+
+function encode()
+ local ns
+ local resources = cjson.decode(read_message("Payload"))
+ payload = {}
+ for resource_id, resource in pairs(resources) do
+ local update = cjson.encode({
+ update = {_index = index, _type = type_name, _id = resource_id}
+ })
+ local body = {
+ script = 'ctx._source.meters += meter;' ..
+ 'ctx._source.user_id = user_id;' ..
+ 'ctx._source.project_id = project_id;' ..
+ 'ctx._source.source = source; ' ..
+ 'ctx._source.metadata = ' ..
+ 'ctx._source.last_sample_timestamp <= timestamp ? ' ..
+ 'metadata : ctx._source.metadata;' ..
+ 'ctx._source.last_sample_timestamp = ' ..
+ 'ctx._source.last_sample_timestamp < timestamp ?' ..
+ 'timestamp : ctx._source.last_sample_timestamp;' ..
+ 'ctx._source.first_sample_timestamp = ' ..
+ 'ctx._source.first_sample_timestamp > timestamp ?' ..
+ 'timestamp : ctx._source.first_sample_timestamp;',
+ params = {
+ meter = resource.meter,
+ metadata = resource.metadata,
+ timestamp = resource.timestamp,
+ user_id = resource.user_id or '',
+ project_id = resource.project_id or '',
+ source = resource.source or '',
+ },
+ upsert = {
+ first_sample_timestamp = resource.timestamp,
+ last_sample_timestamp = resource.timestamp,
+ project_id = resource.project_id or '',
+ user_id = resource.user_id or '',
+ source = resource.source or '',
+ metadata = resource.metadata,
+ meters = resource.meter
+ }
+ }
+ bulk_body = string.format("%s\n%s\n", update, cjson.encode(body))
+ table.insert(payload, bulk_body)
+ end
+ return 0, table.concat(payload)
+end
+
+return ResourceEncoder
\ No newline at end of file
diff --git a/heka/files/lua/common/resources.lua b/heka/files/lua/common/resources.lua
new file mode 100644
index 0000000..465585b
--- /dev/null
+++ b/heka/files/lua/common/resources.lua
@@ -0,0 +1,121 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local cjson = cjson
+local string = string
+local table = table
+local math = math
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local pcall = pcall
+local type = type
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+local l = require 'lpeg'
+l.locale(l)
+
+function normalize_uuid(uuid)
+ return patt.Uuid:match(uuid)
+end
+
+local metadata_fields = {}
+
+local ResourcesDecoder = {}
+ResourcesDecoder.__index = ResourcesDecoder
+
+setfenv(1, ResourcesDecoder) -- Remove external access to contain everything in the module
+
+function normalize_uuid(uuid)
+ return patt.Uuid:match(uuid)
+end
+
+-- Mapping table defining transformation functions to be applied, keys are the
+-- attributes in the notification's payload and values are Lua functions
+local transform_functions = {
+ created_at = utils.format_datetime,
+ launched_at = utils.format_datetime,
+ deleted_at = utils.format_datetime,
+ terminated_at = utils.format_datetime,
+ user_id = normalize_uuid,
+ project_id = normalize_uuid,
+}
+
+function map(func, tbl)
+ local mapped_table = {}
+ for i,v in pairs(tbl) do
+ mapped_table[i] = func(v)
+ end
+ return mapped_table
+end
+
+local resource_msg = {
+ Timestamp = nil,
+ Type = "ceilometer_resources",
+ Payload = nil
+}
+
+function add_resource_to_payload(sample, payload)
+ local counter_name, _ = string.gsub(sample.counter_name, "%.", "\\")
+
+ local resource_data = {
+ timestamp = sample.timestamp,
+ resource_id = sample.resource_id,
+ source = sample.source or "",
+ metadata = sample.resource_metadata,
+ user_id = sample.user_id,
+ project_id = sample.project_id,
+ meter = {
+ [counter_name] = {
+ type = sample.counter_type,
+ unit = sample.counter_unit
+ }
+ }
+ }
+ payload[sample.resource_id] = resource_data
+end
+
+function ResourcesDecoder.new()
+ local e = {}
+ setmetatable(e, ResourcesDecoder)
+ return e
+end
+
+-- Decode Ceilometer samples to resource messages
+
+-- data: oslo.messaging message with Ceilometer samples
+-- returns ok and resource or error message
+function ResourcesDecoder:decode (data)
+ local ok, message = pcall(cjson.decode, data)
+ if not ok then
+ return -2, "Cannot decode Payload"
+ end
+ local ok, message_body = pcall(cjson.decode, message["oslo.message"])
+ if not ok then
+ return -2, "Cannot decode Payload[oslo.message]"
+ end
+ local resource_payload = {}
+ if message_body['payload'] then
+ for _, sample in ipairs(message_body["payload"]) do
+ add_resource_to_payload(sample, resource_payload)
+ end
+ resource_msg.Payload = cjson.encode(resource_payload)
+ resource_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
+ return 0, resource_msg
+ end
+ return -2, "Empty message"
+end
+
+return ResourcesDecoder
diff --git a/heka/files/lua/common/samples.lua b/heka/files/lua/common/samples.lua
new file mode 100644
index 0000000..0516e8f
--- /dev/null
+++ b/heka/files/lua/common/samples.lua
@@ -0,0 +1,194 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local cjson = cjson
+local string = string
+local table = table
+local math = math
+local setmetatable = setmetatable
+local ipairs = ipairs
+local pairs = pairs
+local pcall = pcall
+local type = type
+
+local patt = require 'patterns'
+local utils = require 'lma_utils'
+local l = require 'lpeg'
+l.locale(l)
+
+function normalize_uuid(uuid)
+ return patt.Uuid:match(uuid)
+end
+
+local metadata_fields = {}
+
+local SamplesDecoder = {}
+SamplesDecoder.__index = SamplesDecoder
+
+setfenv(1, SamplesDecoder) -- Remove external access to contain everything in the module
+
+function normalize_uuid(uuid)
+ return patt.Uuid:match(uuid)
+end
+
+-- Mapping table defining transformation functions to be applied, keys are the
+-- attributes in the notification's payload and values are Lua functions
+local transform_functions = {
+ created_at = utils.format_datetime,
+ launched_at = utils.format_datetime,
+ deleted_at = utils.format_datetime,
+ terminated_at = utils.format_datetime,
+ user_id = normalize_uuid,
+ project_id = normalize_uuid,
+}
+
+function map(func, tbl)
+ local mapped_table = {}
+ for i,v in pairs(tbl) do
+ mapped_table[i] = func(v)
+ end
+ return mapped_table
+end
+
+local sample_msg = {
+ Timestamp = nil,
+ -- This message type has the same structure than 'bulk_metric'.
+ Type = "ceilometer_samples",
+ Payload = nil
+}
+
+function parse_metadata_field(field)
+ local from = 1
+ local to = string.find(field, "%.")
+ if to ~= nil then
+ local field_t = {}
+ while to do
+ table.insert(field_t, string.sub(field, from, to - 1))
+ from = to + 1
+ to = string.find(field, "%.", from)
+ end
+ table.insert(field_t, string.sub(field, from))
+ return field_t
+ else
+ return field
+ end
+
+end
+
+function parse_metadata_fields(fields)
+ local parsed_fields = {}
+ for _, field in ipairs(fields) do
+ parsed_fields[field] = parse_metadata_field(field)
+ end
+ return parsed_fields
+end
+
+function get_field(field, metadata)
+ local value = nil
+ if type(metadata) == 'table' then
+ if type(field) == 'table' then
+ value = metadata
+ for _, field_part in ipairs(field) do
+ if not value then
+ break
+ end
+ value = value[field_part]
+ end
+ else
+ value = metadata[field]
+ end
+ end
+ return value
+end
+
+
+function SamplesDecoder:inject_metadata(metadata, tags)
+ local value
+ for field_name, field_tbl in pairs(self.metadata_fields) do
+ value = get_field(field_tbl, metadata)
+ if value ~= nil and type(value) ~= 'table' then
+ local transform = transform_functions[field_name]
+ if transform ~= nil then
+ tags["metadata." .. field_name] = transform(value)
+ else
+ tags["metadata." .. field_name] = value
+ end
+ end
+ end
+end
+
+function SamplesDecoder:add_sample_to_payload(sample, payload)
+ local sample_data = {
+ name='sample',
+ timestamp = patt.Timestamp:match(sample.timestamp),
+ values = {
+ value = sample.counter_volume,
+ message_id = sample.message_id,
+ recorded_at = sample.recorded_at,
+ timestamp = sample.timestamp,
+ message_signature = sample.signature,
+ type = sample.counter_type,
+ unit = sample.counter_unit
+ }
+ }
+ local tags = {
+ meter = sample.counter_name,
+ resource_id = sample.resource_id,
+ project_id = sample.project_id ,
+ user_id = sample.user_id,
+ source = sample.source
+ }
+ self:inject_metadata(sample.resource_metadata or {}, tags)
+ sample_data["tags"] = tags
+ table.insert(payload, sample_data)
+end
+
+-- Create a new Sample decoder
+--
+-- metadata fields: line with metadata fields to store
+-- from samples separated by space
+function SamplesDecoder.new(metadata_fields)
+ local e = {}
+ setmetatable(e, SamplesDecoder)
+ e.metadata_fields = parse_metadata_fields(metadata_fields)
+ return e
+end
+
+
+-- Decode Ceilometer samples
+
+-- data: oslo.messaging message with Ceilometer samples
+-- returns ok and sample or error message
+function SamplesDecoder:decode (data)
+ local ok, message = pcall(cjson.decode, data)
+ if not ok then
+ return -2, "Cannot decode Payload"
+ end
+ local ok, message_body = pcall(cjson.decode, message["oslo.message"])
+ if not ok then
+ return -2, "Cannot decode Payload[oslo.message]"
+ end
+ local sample_payload = {}
+ if message_body['payload'] then
+ for _, sample in ipairs(message_body["payload"]) do
+ self:add_sample_to_payload(sample, sample_payload)
+ end
+ sample_msg.Payload = cjson.encode(sample_payload)
+ sample_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
+ return 0, sample_msg
+ end
+ return -2, "Empty message"
+end
+
+return SamplesDecoder
diff --git a/heka/files/lua/decoders/ceilometer.lua b/heka/files/lua/decoders/ceilometer.lua
deleted file mode 100644
index 01564af..0000000
--- a/heka/files/lua/decoders/ceilometer.lua
+++ /dev/null
@@ -1,135 +0,0 @@
--- Copyright 2016 Mirantis, Inc.
---
--- Licensed under the Apache License, Version 2.0 (the "License");
--- you may not use this file except in compliance with the License.
--- You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-require "string"
-require "cjson"
-require 'table'
-require 'math'
-
-local patt = require 'patterns'
-local utils = require 'lma_utils'
-local l = require 'lpeg'
-l.locale(l)
-
-function normalize_uuid(uuid)
- return patt.Uuid:match(uuid)
-end
-
--- the metadata_fields parameter is a list of words separated by space
-local fields_grammar = l.Ct((l.C((l.P(1) - l.P" ")^1) * l.P" "^0)^0)
-local metadata_fields = fields_grammar:match(
- read_config("metadata_fields") or ""
-)
-
-local decode_resources = read_config('decode_resources') or false
-
-local sample_msg = {
- Timestamp = nil,
- -- This message type has the same structure than 'bulk_metric'.
- Type = "ceilometer_samples",
- Payload = nil
-}
-
-local resource_msg = {
- Timestamp = nil,
- Type = "ceilometer_resource",
- Fields = nil,
-}
-
-function inject_metadata(metadata, tags)
- local value
- for _, field in ipairs(metadata_fields) do
- value = metadata[field]
- if value ~= nil and type(value) ~= 'table' then
- tags["metadata." .. field] = value
- end
- end
-end
-
-function add_resource_to_payload(sample, payload)
-
- local resource_data = {
- timestamp = sample.timestamp,
- resource_id = sample.resource_id,
- source = sample.source or "",
- metadata = sample.resource_metadata,
- user_id = sample.user_id,
- project_id = sample.project_id,
- meter = {
- [sample.counter_name] = {
- type = sample.counter_type,
- unit = sample.counter_unit
- }
- }
- }
- payload[sample.resource_id] = resource_data
-end
-
-
-function add_sample_to_payload(sample, payload)
- local sample_data = {
- name='sample',
- timestamp = patt.Timestamp:match(sample.timestamp),
- values = {
- value = sample.counter_volume,
- message_id = sample.message_id,
- recorded_at = sample.recorded_at,
- timestamp = sample.timestamp,
- message_signature = sample.signature,
- type = sample.counter_type,
- unit = sample.counter_unit
- }
- }
- local tags = {
- meter = sample.counter_name,
- resource_id = sample.resource_id,
- project_id = sample.project_id ,
- user_id = sample.user_id,
- source = sample.source
- }
-
- inject_metadata(sample.resource_metadata or {}, tags)
- sample_data["tags"] = tags
- table.insert(payload, sample_data)
-end
-
-function process_message ()
- local data = read_message("Payload")
- local ok, message = pcall(cjson.decode, data)
- if not ok then
- return -1, "Cannot decode Payload"
- end
- local ok, message_body = pcall(cjson.decode, message["oslo.message"])
- if not ok then
- return -1, "Cannot decode Payload[oslo.message]"
- end
- local sample_payload = {}
- local resource_payload = {}
- for _, sample in ipairs(message_body["payload"]) do
- add_sample_to_payload(sample, sample_payload)
- if decode_resources then
- add_resource_to_payload(sample, resource_payload)
- end
- end
- sample_msg.Payload = cjson.encode(sample_payload)
- sample_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
- utils.safe_inject_message(sample_msg)
-
- if decode_resources then
- resource_msg.Payload = cjson.encode(resource_payload)
- resource_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
- utils.safe_inject_message(resource_msg)
- end
-
- return 0
-end
diff --git a/heka/files/lua/decoders/metering.lua b/heka/files/lua/decoders/metering.lua
new file mode 100644
index 0000000..d075730
--- /dev/null
+++ b/heka/files/lua/decoders/metering.lua
@@ -0,0 +1,35 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+local table = require 'table'
+local utils = require 'lma_utils'
+local l = require 'lpeg'
+l.locale(l)
+
+local decoder_module = read_config('decoder') or error("Decoder module should be defined")
+
+local inject = utils.safe_inject_message
+
+if decoder_module then
+ inject = require(decoder_module).decode
+ if not inject then
+ error(decoder_module .. " does not provide a decode function")
+ end
+end
+
+function process_message ()
+ local data = read_message("Payload")
+ local code, msg = inject(data)
+ return code, msg
+end
diff --git a/heka/files/lua/encoders/es_ceilometer_resources.lua b/heka/files/lua/encoders/es_ceilometer_resources.lua
index 860ba03..61fb68c 100644
--- a/heka/files/lua/encoders/es_ceilometer_resources.lua
+++ b/heka/files/lua/encoders/es_ceilometer_resources.lua
@@ -13,54 +13,19 @@
-- limitations under the License.
require "string"
require "cjson"
-local elasticsearch = require "elasticsearch"
+local utils = require "lma_utils"
+local encoder_module = read_config("encoder") or error("Encoder should be defined")
-local index = read_config("index") or "index"
-local type_name = read_config("type_name") or "message"
+local encode = require(encoder_module).encode
+if not encode then
+ error("Encoder should implements 'encode' function")
+end
function process_message()
- local ns
- local resources = cjson.decode(read_message("Payload"))
- for resource_id, resource in pairs(resources) do
- local update = cjson.encode({update = {_index = index, _type = type_name,
- _id = resource_id}})
- local body = {
- script = 'ctx._source.meters += meter;' ..
- 'ctx._source.user_id = user_id;' ..
- 'ctx._source.project_id = project_id;' ..
- 'ctx._source.source = source; ' ..
- 'ctx._source.metadata = ' ..
- 'ctx._source.last_sample_timestamp <= timestamp ? ' ..
- 'metadata : ctx._source.metadata;' ..
- 'ctx._source.last_sample_timestamp = ' ..
- 'ctx._source.last_sample_timestamp < timestamp ?' ..
- 'timestamp : ctx._source.last_sample_timestamp;' ..
- 'ctx._source.first_sample_timestamp = ' ..
- 'ctx._source.first_sample_timestamp > timestamp ?' ..
- 'timestamp : ctx._source.first_sample_timestamp;',
- params = {
- meter = resource.meter,
- metadata = resource.metadata,
- timestamp = resource.timestamp,
- user_id = resource.user_id or '',
- project_id = resource.project_id or '',
- source = resource.source or '',
- },
- upsert = {
- first_sample_timestamp = resource.timestamp,
- last_sample_timestamp = resource.timestamp,
- project_id = resource.project_id or '',
- user_id = resource.user_id or '',
- source = resource.source or '',
- metadata = resource.metadata,
- meters = resource.meter
- }
- }
- body = cjson.encode(body)
-
- add_to_payload(update, "\n", body, "\n")
+ local code, payload = encode()
+ if code == 0 and payload then
+ return utils.safe_inject_payload('txt', 'elasticsearch', payload)
+ else
+ return code, payload
end
-
- inject_payload()
- return 0
end
diff --git a/heka/init.sls b/heka/init.sls
index 0c4f1fa..0668bb0 100644
--- a/heka/init.sls
+++ b/heka/init.sls
@@ -9,6 +9,9 @@
{%- if pillar.heka.remote_collector is defined %}
- heka.remote_collector
{%- endif %}
+{%- if pillar.heka.ceilometer_collector is defined %}
+- heka.ceilometer_collector
+{%- endif %}
{%- if pillar.heka.aggregator is defined %}
- heka.aggregator
{%- endif %}
diff --git a/heka/map.jinja b/heka/map.jinja
index 3995eb2..908c5df 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -39,6 +39,10 @@
{% set default_nagios_host_alarm_clusters = '00-clusters' %}
{% set default_automatic_starting = True %}
+{% set default_rabbit_port = 5672 %}
+{% set default_rabbit_vhost = '/openstack' %}
+{% set default_ceilometer_rabbit_queue = 'metering.sample' %}
+
{% set log_collector = salt['grains.filter_by']({
'default': {
'elasticsearch_port': default_elasticsearch_port,
@@ -81,3 +85,16 @@
'automatic_starting': default_automatic_starting,
}
}, merge=salt['pillar.get']('heka:aggregator')) %}
+
+{% set ceilometer_collector = salt['grains.filter_by']({
+ 'default': {
+ 'influxdb_port': default_influxdb_port,
+ 'influxdb_timeout': default_influxdb_timeout,
+ 'influxdb_time_precision': default_influxdb_time_precision,
+ 'elasticsearch_port': default_elasticsearch_port,
+ 'rabbit_port': default_rabbit_port,
+ 'rabbit_vhost': default_rabbit_vhost,
+ 'rabbit_queue': default_ceilometer_rabbit_queue,
+ 'resource_decoding': False,
+ }
+}, merge=salt['pillar.get']('heka:ceilometer_collector')) %}
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 55c7224..0f16d20 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -2,6 +2,8 @@
{%- from "heka/map.jinja" import metric_collector with context %}
{%- from "heka/map.jinja" import remote_collector with context %}
{%- from "heka/map.jinja" import aggregator with context %}
+{%- from "heka/map.jinja" import ceilometer_collector with context %}
+
log_collector:
filter:
@@ -434,3 +436,85 @@
max_file_size: 524288
full_action: drop
{%- endif %}
+ceilometer_collector:
+ decoder:
+ sample:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/decoders/metering.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ decoder: 'ceilometer'
+ decode_resources: {{ ceilometer_collector.resource_decoding|lower }}
+ metadata_fields: "status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state user_metadata.stack"
+ input:
+{%- if ceilometer_collector.rabbit_host is defined %}
+ openstack_sample_amqp:
+ engine: amqp
+ user: {{ ceilometer_collector.rabbit_user }}
+ password: {{ ceilometer_collector.rabbit_password }}
+ port: {{ ceilometer_collector.rabbit_port }}
+ host: {{ ceilometer_collector.rabbit_host }}
+ vhost: {{ ceilometer_collector.rabbit_vhost }}
+ queue: {{ ceilometer_collector.rabbit_queue }}
+ routing_key: {{ ceilometer_collector.rabbit_queue }}
+ decoder: sample_decoder
+ splitter: NullSplitter
+ exchange: "ceilometer"
+ exchange_type: "topic"
+ exchange_auto_delete: false
+ queue_auto_delete: false
+{%- endif %}
+ filter:
+{%- if ceilometer_collector.influxdb_host is defined %}
+ ceilometer_influxdb_accumulator:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ preserve_data: false
+ message_matcher: "Type =~ /ceilometer_samples$/"
+ ticker_interval: 1
+ config:
+ time_precision: "{{ ceilometer_collector.influxdb_time_precision }}"
+ payload_name: 'sample_data'
+ flush_count: 500
+ bulk_metric_type_matcher: 'ceilometer_samples'
+{%- endif %}
+ encoder:
+{%- if ceilometer_collector.influxdb_host is defined %}
+ influxdb:
+ engine: payload
+ append_newlines: false
+ prefix_ts: false
+{%- endif %}
+{%- if ceilometer_collector.elasticsearch_host is defined %}
+ elasticsearch_resource:
+ engine: sandbox
+ module_file: /usr/share/lma_collector/encoders/es_ceilometer_resources.lua
+ module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+ config:
+ index: "ceilometer_resource"
+ type_name: "source"
+ encoder: "elasticsearch_resources"
+{%- endif %}
+{%- if ceilometer_collector.influxdb_host is defined or ceilometer_collector.elasticsearch_host is defined %}
+ output:
+{%- if ceilometer_collector.influxdb_host is defined %}
+ samples_influxdb:
+ engine: http
+ address: "http://{{ ceilometer_collector.influxdb_host }}:{{ ceilometer_collector.influxdb_port }}/write?db={{ ceilometer_collector.influxdb_database }}&precision={{ ceilometer_collector.influxdb_time_precision }}"
+ {%- if ceilometer_collector.influxdb_username and ceilometer_collector.influxdb_password %}
+ username: "{{ ceilometer_collector.influxdb_username }}"
+ password: "{{ ceilometer_collector.influxdb_password }}"
+ {%- endif %}
+ message_matcher: "Fields[payload_type] == 'txt' && Fields[payload_name] == 'sample_data'"
+ encoder: influxdb_encoder
+ timeout: {{ ceilometer_collector.influxdb_timeout }}
+ method: "POST"
+{%- endif %}
+{%- if ceilometer_collector.elasticsearch_host is defined %}
+ elasticsearch_resource:
+ engine: elasticsearch
+ server: "http://{{ ceilometer_collector.elasticsearch_host }}:{{ ceilometer_collector.elasticsearch_port }}"
+ message_matcher: "Type == 'ceilometer_resources'"
+ encoder: elasticsearch_resource_encoder
+{%- endif %}
diff --git a/metadata/service/ceilometer_collector/single.yml b/metadata/service/ceilometer_collector/single.yml
new file mode 100644
index 0000000..bfc6e75
--- /dev/null
+++ b/metadata/service/ceilometer_collector/single.yml
@@ -0,0 +1,8 @@
+applications:
+- heka
+classes:
+- service.heka.support
+parameters:
+ heka:
+ ceilometer_collector:
+ enabled: true