Add batching for overlimited input messages

Change-Id: If622c6fea8a85c137b2ebc5aa8a640431d5ac576
Closes-bug: PROD-13203
diff --git a/heka/files/lua/common/ceilometer.lua b/heka/files/lua/common/ceilometer.lua
index 47f3974..7e23946 100644
--- a/heka/files/lua/common/ceilometer.lua
+++ b/heka/files/lua/common/ceilometer.lua
@@ -11,12 +11,12 @@
 -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
-
-local table = require 'table'
+local cjson = cjson
+local ipairs = ipairs
+local pcall = pcall
 
 local samples = require 'samples'
 local resources = require 'resources'
-local patt = require 'patterns'
 local utils = require 'lma_utils'
 
 local l = require 'lpeg'
@@ -27,6 +27,7 @@
     read_config("metadata_fields") or ""
 )
 local decode_resources = read_config('decode_resources') or false
+local flush_count = read_config('flush_count') or 500
 
 
 local samples_decoder = samples.new(metadata_fields)
@@ -49,12 +50,47 @@
     end
 end
 
-function decode(data)
-    local code, msg = inject(samples_decoder:decode(data))
-    if resource_decoder then
-        code, msg = inject(resource_decoder:decode(data))
+function inject_batch(batch)
+    local code, msg = inject(samples_decoder:decode(batch))
+    if code == 0 and resource_decoder then
+        code, msg = inject(resource_decoder:decode(batch))
     end
     return code, msg
 end
 
+function decode(data)
+    local ok, message = pcall(cjson.decode, data)
+    if not ok then
+        return -1, "Cannot decode Payload"
+    end
+    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
+    if not ok then
+        return -1, "Cannot decode Payload[oslo.message]"
+    end
+
+    local code = 0
+    local msg = ''
+
+    local batch = {}
+    batch['payload'] = {}
+    batch['timestamp'] = message_body.timestamp
+    if message_body['payload'] then
+        for _, sample in ipairs(message_body["payload"]) do
+            batch['payload'][#batch['payload']+1] = sample
+            if #batch['payload'] >= flush_count then
+                code, msg = inject_batch(batch)
+                batch['payload'] = {}
+                if code == -1 then
+                    return code, msg
+                end
+            end
+        end
+        if #batch['payload'] > 0 then
+            code, msg = inject_batch(batch)
+        end
+        return code, msg
+    end
+    return -1, "Empty message"
+end
+
 return CeilometerDecoder
diff --git a/heka/files/lua/common/resources.lua b/heka/files/lua/common/resources.lua
index fcb8c94..0a32ac2 100644
--- a/heka/files/lua/common/resources.lua
+++ b/heka/files/lua/common/resources.lua
@@ -14,8 +14,6 @@
 
 local cjson = cjson
 local string = string
-local table = table
-local math = math
 local setmetatable = setmetatable
 local ipairs = ipairs
 local pairs = pairs
@@ -110,15 +108,7 @@
 
 -- data: oslo.messaging message with Ceilometer samples
 -- returns ok and resource or error message
-function ResourcesDecoder:decode (data)
-    local ok, message = pcall(cjson.decode, data)
-    if not ok then
-        return -1, "Cannot decode Payload"
-    end
-    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
-    if not ok then
-        return -1, "Cannot decode Payload[oslo.message]"
-    end
+function ResourcesDecoder:decode (message_body)
     local resource_payload = {}
     if message_body['payload'] then
         for _, sample in ipairs(message_body["payload"]) do
diff --git a/heka/files/lua/common/samples.lua b/heka/files/lua/common/samples.lua
index af9c6c9..95f704a 100644
--- a/heka/files/lua/common/samples.lua
+++ b/heka/files/lua/common/samples.lua
@@ -15,7 +15,6 @@
 local cjson = cjson
 local string = string
 local table = table
-local math = math
 local setmetatable = setmetatable
 local ipairs = ipairs
 local pairs = pairs
@@ -170,15 +169,7 @@
 
 -- data: oslo.messaging message with Ceilometer samples
 -- returns ok and sample or error message
-function SamplesDecoder:decode (data)
-    local ok, message = pcall(cjson.decode, data)
-    if not ok then
-        return -1, "Cannot decode Payload"
-    end
-    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
-    if not ok then
-        return -1, "Cannot decode Payload[oslo.message]"
-    end
+function SamplesDecoder:decode(message_body)
     local sample_payload = {}
     if message_body['payload'] then
         for _, sample in ipairs(message_body["payload"]) do
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 9f04a8b..e2e9bf3 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -678,6 +678,7 @@
       config:
         decoder: 'ceilometer'
         decode_resources: {{ ceilometer_collector.resource_decoding|lower }}
+        flush_count = 500
         metadata_fields: "status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state user_metadata.stack"
 {%- endif %}
 {%- if ceilometer_collector.amqp_host is defined %}