Merge "Add generic Apache decoder for OpenStack"
diff --git a/heka/files/lua/common/accumulator.lua b/heka/files/lua/common/accumulator.lua
index 9b01a8f..ac86a0e 100644
--- a/heka/files/lua/common/accumulator.lua
+++ b/heka/files/lua/common/accumulator.lua
@@ -47,7 +47,7 @@
-- ns: the current timestamp in nanosecond (optional)
function Accumulator:flush(ns)
local now = ns or time() * 1e9
- if #self.buffer > self.flush_count or now - self.last_flush > self.flush_interval then
+ if #self.buffer > self.flush_count or now - self.last_flush > self.flush_interval * 1e9 then
self.flush_cb(self.buffer)
self.buffer = {}
self.last_flush = now
diff --git a/heka/files/lua/common/resources.lua b/heka/files/lua/common/resources.lua
index dc18329..fcb8c94 100644
--- a/heka/files/lua/common/resources.lua
+++ b/heka/files/lua/common/resources.lua
@@ -124,7 +124,11 @@
for _, sample in ipairs(message_body["payload"]) do
add_resource_to_payload(sample, resource_payload)
end
- resource_msg.Payload = cjson.encode(resource_payload)
+ local ok, payload = pcall(cjson.encode, resource_payload)
+ if not ok then
+ return -1, "Cannot encode resource_payload"
+ end
+ resource_msg.Payload = payload
resource_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
return 0, resource_msg
end
diff --git a/heka/files/lua/common/samples.lua b/heka/files/lua/common/samples.lua
index ad93873..af9c6c9 100644
--- a/heka/files/lua/common/samples.lua
+++ b/heka/files/lua/common/samples.lua
@@ -184,7 +184,11 @@
for _, sample in ipairs(message_body["payload"]) do
self:add_sample_to_payload(sample, sample_payload)
end
- sample_msg.Payload = cjson.encode(sample_payload)
+ local ok, payload = pcall(cjson.encode, sample_payload)
+ if not ok then
+ return -1, "Cannot encode sample_payload"
+ end
+ sample_msg.Payload = payload
sample_msg.Timestamp = patt.Timestamp:match(message_body.timestamp)
return 0, sample_msg
end
diff --git a/heka/files/toml/filter/sandbox.toml b/heka/files/toml/filter/sandbox.toml
index 9608e52..17de658 100644
--- a/heka/files/toml/filter/sandbox.toml
+++ b/heka/files/toml/filter/sandbox.toml
@@ -16,6 +16,9 @@
{%- if filter.hostname is defined %}
hostname = "{{ filter.hostname }}"
{%- endif %}
+{%- if filter.output_limit is defined %}
+output_limit = {{ filter.output_limit|int }}
+{%- endif %}
{%- if filter.config is defined %}
[{{ filter_name }}_filter.config]
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 157d8f8..9f04a8b 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -708,10 +708,12 @@
preserve_data: false
message_matcher: "Type =~ /ceilometer_samples$/"
ticker_interval: 1
+ # The default output_limit value (64k) is too low to deal with Ceilometer samples
+ output_limit: 524288
config:
time_precision: "{{ ceilometer_collector.influxdb_time_precision }}"
payload_name: 'sample_data'
- flush_count: 500
+ flush_count: 200
bulk_metric_type_matcher: 'ceilometer_samples'
{%- endif %}
{%- if ceilometer_collector.sensu_host is defined %}
diff --git a/heka/meta/salt.yml b/heka/meta/salt.yml
index f7c7af4..cfa8f38 100644
--- a/heka/meta/salt.yml
+++ b/heka/meta/salt.yml
@@ -59,9 +59,11 @@
{%- set service_grains = salt['grains.filter_by']({'default': service_grains}, merge=heka_yaml) %}
{%- endif %}
{%- endfor %}
+ {%- if 'heka_alarming' in salt.keys() %}
heka:
heka:
{{ salt['heka_alarming.grains_for_mine'](service_grains)|yaml(False)|indent(6) }}
+ {%- endif %}
{#-
vim: syntax=jinja
diff --git a/tests/lua/test_accumulator.lua b/tests/lua/test_accumulator.lua
index 837ffd5..2e2bef6 100644
--- a/tests/lua/test_accumulator.lua
+++ b/tests/lua/test_accumulator.lua
@@ -57,8 +57,10 @@
assertEquals(#items, 0)
sentinel = true
end
- local accum = accumulator.new(20, 1, test_cb)
+ local accum = accumulator.new(20, 4, test_cb)
accum:flush((now + 2) * 1e9)
+ assertEquals(sentinel, false)
+ accum:flush((now + 5) * 1e9)
assertEquals(sentinel, true)
end