Adapt GSE Lua code
diff --git a/heka/files/lua/common/gse.lua b/heka/files/lua/common/gse.lua
index 16bb950..2e86695 100644
--- a/heka/files/lua/common/gse.lua
+++ b/heka/files/lua/common/gse.lua
@@ -125,7 +125,7 @@
-- compute the cluster metric and inject it into the Heka pipeline
-- the metric's value is computed using the status of its members
-function inject_cluster_metric(msg_type, cluster_name, metric_name, interval, source, to_alerting)
+function inject_cluster_metric(cluster_name, to_alerting)
local payload
local status, alarms = resolve_status(cluster_name)
@@ -145,18 +145,17 @@
end
local msg = {
- Type = msg_type,
+ Type = 'gse_metric',
Payload = payload,
Fields = {
- name=metric_name,
- value=status,
- cluster_name=cluster_name,
- tag_fields={'cluster_name'},
- interval=interval,
- source=source,
- no_alerting=no_alerting,
+ name = 'cluster_status',
+ value = status,
+ member = cluster_name,
+ tag_fields = {'member'},
+ no_alerting = no_alerting,
}
}
+
lma.inject_tags(msg)
lma.safe_inject_message(msg)
end
diff --git a/heka/files/lua/filters/gse_cluster_filter.lua b/heka/files/lua/filters/gse_cluster_filter.lua
index 6c8415f..c099046 100644
--- a/heka/files/lua/filters/gse_cluster_filter.lua
+++ b/heka/files/lua/filters/gse_cluster_filter.lua
@@ -17,15 +17,11 @@
local afd = require 'afd'
local gse = require 'gse'
local lma = require 'lma_utils'
+local policies = require('gse_policies')
-local output_message_type = read_config('output_message_type') or error('output_message_type must be specified!')
-local cluster_field = read_config('cluster_field')
-local member_field = read_config('member_field') or error('member_field must be specified!')
-local output_metric_name = read_config('output_metric_name') or error('output_metric_name must be specified!')
-local source = read_config('source') or error('source must be specified!')
+local cluster_field = read_config('cluster_field') -- FIXME(elemoine) remove??
local topology_file = read_config('topology_file') or error('topology_file must be specified!')
-local policies_file = read_config('policies_file') or error('policies_file must be specified!')
-local interval = (read_config('interval') or error('interval must be specified!')) + 0
+local interval = (read_config('interval') or 10) + 0
local interval_in_ns = interval * 1e9
local max_inject = (read_config('max_inject') or 10) + 0
local warm_up_period = ((read_config('warm_up_period') or 0) + 0) * 1e9
@@ -37,14 +33,14 @@
local last_index = nil
local topology = require(topology_file)
-local policies = require(policies_file)
for cluster_name, attributes in pairs(topology.clusters) do
local policy = policies.find(attributes.policy)
if not policy then
error('Cannot find ' .. attributes.policy .. ' policy!')
end
- gse.add_cluster(cluster_name, attributes.members, attributes.hints, attributes.group_by, policy)
+ gse.add_cluster(cluster_name, attributes.members, attributes.hints or {},
+ attributes.group_by, policy)
end
function process_message()
@@ -63,7 +59,7 @@
return 0
end
- local member_id = afd.get_entity_name(member_field)
+ local member_id = afd.get_entity_name('member')
if not member_id then
return -1, "Cannot find entity's name in the AFD/GSE message"
end
@@ -79,7 +75,7 @@
end
local cluster_ids
- if cluster_field then
+ if cluster_field then -- FIXME(elemoine) Remove??
local cluster_id = afd.get_entity_name(cluster_field)
if not cluster_id then
return -1, "Cannot find the cluster's name in the AFD/GSE message"
@@ -118,14 +114,7 @@
local injected = 0
for i, cluster_name in ipairs(gse.get_ordered_clusters()) do
if last_index == nil or i > last_index then
- gse.inject_cluster_metric(
- output_message_type,
- cluster_name,
- output_metric_name,
- interval,
- source,
- activate_alerting
- )
+ gse.inject_cluster_metric(cluster_name, activate_alerting)
last_index = i
injected = injected + 1
diff --git a/tests/lua/test_gse.lua b/tests/lua/test_gse.lua
index 98dfb89..899e15e 100644
--- a/tests/lua/test_gse.lua
+++ b/tests/lua/test_gse.lua
@@ -184,54 +184,33 @@
end
function TestGse:test_inject_cluster_metric_for_nova()
- gse.inject_cluster_metric(
- 'gse_service_cluster_metric',
- 'nova',
- 'service_cluster_status',
- 10,
- 'gse_service_cluster_plugin'
- )
+ gse.inject_cluster_metric('nova', true)
local metric = last_injected_msg
- assertEquals(metric.Type, 'gse_service_cluster_metric')
- assertEquals(metric.Fields.cluster_name, 'nova')
- assertEquals(metric.Fields.name, 'service_cluster_status')
+ assertEquals(metric.Type, 'gse_metric')
+ assertEquals(metric.Fields.member, 'nova')
+ assertEquals(metric.Fields.name, 'cluster_status')
assertEquals(metric.Fields.value, consts.OKAY)
- assertEquals(metric.Fields.interval, 10)
assertEquals(metric.Payload, '{"alarms":[]}')
end
function TestGse:test_inject_cluster_metric_for_glance()
- gse.inject_cluster_metric(
- 'gse_service_cluster_metric',
- 'glance',
- 'service_cluster_status',
- 10,
- 'gse_service_cluster_plugin'
- )
+ gse.inject_cluster_metric('glance', true)
local metric = last_injected_msg
- assertEquals(metric.Type, 'gse_service_cluster_metric')
- assertEquals(metric.Fields.cluster_name, 'glance')
- assertEquals(metric.Fields.name, 'service_cluster_status')
+ assertEquals(metric.Type, 'gse_metric')
+ assertEquals(metric.Fields.member, 'glance')
+ assertEquals(metric.Fields.name, 'cluster_status')
assertEquals(metric.Fields.value, consts.DOWN)
- assertEquals(metric.Fields.interval, 10)
assert(metric.Payload:match("glance%-registry endpoints are down"))
assert(metric.Payload:match("glance%-api endpoint is down on node%-1"))
end
function TestGse:test_inject_cluster_metric_for_heat()
- gse.inject_cluster_metric(
- 'gse_service_cluster_metric',
- 'heat',
- 'service_cluster_status',
- 10,
- 'gse_service_cluster_plugin'
- )
+ gse.inject_cluster_metric('heat', true)
local metric = last_injected_msg
- assertEquals(metric.Type, 'gse_service_cluster_metric')
- assertEquals(metric.Fields.cluster_name, 'heat')
- assertEquals(metric.Fields.name, 'service_cluster_status')
+ assertEquals(metric.Type, 'gse_metric')
+ assertEquals(metric.Fields.member, 'heat')
+ assertEquals(metric.Fields.name, 'cluster_status')
assertEquals(metric.Fields.value, consts.WARN)
- assertEquals(metric.Fields.interval, 10)
assert(metric.Payload:match("5xx errors detected"))
assert(metric.Payload:match("1 RabbitMQ node out of 3 is down"))
end