Add batching for overlimited input messages
Change-Id: If622c6fea8a85c137b2ebc5aa8a640431d5ac576
Closes-bug: PROD-13203
diff --git a/heka/files/lua/common/ceilometer.lua b/heka/files/lua/common/ceilometer.lua
index 47f3974..7e23946 100644
--- a/heka/files/lua/common/ceilometer.lua
+++ b/heka/files/lua/common/ceilometer.lua
@@ -11,12 +11,12 @@
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-
-local table = require 'table'
+local cjson = cjson
+local ipairs = ipairs
+local pcall = pcall
local samples = require 'samples'
local resources = require 'resources'
-local patt = require 'patterns'
local utils = require 'lma_utils'
local l = require 'lpeg'
@@ -27,6 +27,7 @@
read_config("metadata_fields") or ""
)
local decode_resources = read_config('decode_resources') or false
+local flush_count = read_config('flush_count') or 500
local samples_decoder = samples.new(metadata_fields)
@@ -49,12 +50,47 @@
end
end
-function decode(data)
- local code, msg = inject(samples_decoder:decode(data))
- if resource_decoder then
- code, msg = inject(resource_decoder:decode(data))
+function inject_batch(batch)
+ local code, msg = inject(samples_decoder:decode(batch))
+ if code == 0 and resource_decoder then
+ code, msg = inject(resource_decoder:decode(batch))
end
return code, msg
end
+function decode(data)
+ local ok, message = pcall(cjson.decode, data)
+ if not ok then
+ return -1, "Cannot decode Payload"
+ end
+ local ok, message_body = pcall(cjson.decode, message["oslo.message"])
+ if not ok then
+ return -1, "Cannot decode Payload[oslo.message]"
+ end
+
+ local code = 0
+ local msg = ''
+
+ local batch = {}
+ batch['payload'] = {}
+ batch['timestamp'] = message_body.timestamp
+ if message_body['payload'] then
+ for _, sample in ipairs(message_body["payload"]) do
+ batch['payload'][#batch['payload']+1] = sample
+ if #batch['payload'] >= flush_count then
+ code, msg = inject_batch(batch)
+ batch['payload'] = {}
+ if code == -1 then
+ return code, msg
+ end
+ end
+ end
+ if #batch['payload'] > 0 then
+ code, msg = inject_batch(batch)
+ end
+ return code, msg
+ end
+ return -1, "Empty message"
+end
+
return CeilometerDecoder
diff --git a/heka/files/lua/common/resources.lua b/heka/files/lua/common/resources.lua
index fcb8c94..0a32ac2 100644
--- a/heka/files/lua/common/resources.lua
+++ b/heka/files/lua/common/resources.lua
@@ -14,8 +14,6 @@
local cjson = cjson
local string = string
-local table = table
-local math = math
local setmetatable = setmetatable
local ipairs = ipairs
local pairs = pairs
@@ -110,15 +108,7 @@
-- data: oslo.messaging message with Ceilometer samples
-- returns ok and resource or error message
-function ResourcesDecoder:decode (data)
- local ok, message = pcall(cjson.decode, data)
- if not ok then
- return -1, "Cannot decode Payload"
- end
- local ok, message_body = pcall(cjson.decode, message["oslo.message"])
- if not ok then
- return -1, "Cannot decode Payload[oslo.message]"
- end
+function ResourcesDecoder:decode (message_body)
local resource_payload = {}
if message_body['payload'] then
for _, sample in ipairs(message_body["payload"]) do
diff --git a/heka/files/lua/common/samples.lua b/heka/files/lua/common/samples.lua
index af9c6c9..95f704a 100644
--- a/heka/files/lua/common/samples.lua
+++ b/heka/files/lua/common/samples.lua
@@ -15,7 +15,6 @@
local cjson = cjson
local string = string
local table = table
-local math = math
local setmetatable = setmetatable
local ipairs = ipairs
local pairs = pairs
@@ -170,15 +169,7 @@
-- data: oslo.messaging message with Ceilometer samples
-- returns ok and sample or error message
-function SamplesDecoder:decode (data)
- local ok, message = pcall(cjson.decode, data)
- if not ok then
- return -1, "Cannot decode Payload"
- end
- local ok, message_body = pcall(cjson.decode, message["oslo.message"])
- if not ok then
- return -1, "Cannot decode Payload[oslo.message]"
- end
+function SamplesDecoder:decode(message_body)
local sample_payload = {}
if message_body['payload'] then
for _, sample in ipairs(message_body["payload"]) do
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 9f04a8b..e2e9bf3 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -678,6 +678,7 @@
config:
decoder: 'ceilometer'
decode_resources: {{ ceilometer_collector.resource_decoding|lower }}
+ flush_count = 500
metadata_fields: "status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state user_metadata.stack"
{%- endif %}
{%- if ceilometer_collector.amqp_host is defined %}