Merge "Add counter for out of memory errors"
diff --git a/heka/_service.sls b/heka/_service.sls
index c421697..1a86b22 100644
--- a/heka/_service.sls
+++ b/heka/_service.sls
@@ -3,19 +3,17 @@
 heka_{{ service_name }}_conf_dir:
   file.directory:
   - name: {{ server.prefix_dir }}/etc/{{ service_name }}
-  - user: heka
+  - user: {{ server.owner }}
   - mode: 750
   - makedirs: true
 
-{%- if not server.container_mode %}
 heka_{{ service_name }}_cache_dir:
   file.directory:
   - name: /var/cache/{{ service_name }}
-  - user: heka
-  - group: heka
+  - user: {{ server.owner }}
+  - group: {{ server.owner }}
   - mode: 750
   - makedirs: true
-{% endif %}
 
 heka_{{ service_name }}_conf_dir_clean:
   file.directory:
@@ -54,7 +52,7 @@
 heka_{{ service_name }}_log_file:
   file.managed:
   - name: /var/log/{{ service_name }}.log
-  - user: heka
+  - user: {{ server.owner }}
   - mode: 644
   - replace: false
 
@@ -216,7 +214,7 @@
   - source: salt://heka/files/toml/global.toml
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - defaults:
     service_name: {{ service_name }}
     poolsize: {{ server.poolsize }}
@@ -235,7 +233,7 @@
   - source: salt://heka/files/toml/decoder/{{ decoder.engine }}.toml
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: heka_{{ service_name }}_conf_dir
   - require_in:
@@ -253,7 +251,7 @@
   - source: salt://heka/files/toml/input/{{ input.engine }}.toml
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: heka_{{ service_name }}_conf_dir
   - require_in:
@@ -273,7 +271,7 @@
   - source: salt://heka/files/toml/filter/afd_alarm.toml
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: heka_{{ service_name }}_conf_dir
   - require_in:
@@ -291,7 +289,7 @@
   - source: salt://heka/files/lma_alarm.lua
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: {{ server.prefix_dir }}/usr/share/lma_collector
   - defaults:
@@ -310,7 +308,7 @@
   - source: salt://heka/files/gse_policies.lua
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: {{ server.prefix_dir }}/usr/share/lma_collector
   - defaults:
@@ -325,7 +323,7 @@
   - source: salt://heka/files/toml/filter/gse_alarm_cluster.toml
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: heka_{{ service_name }}_conf_dir
   - require_in:
@@ -339,7 +337,7 @@
   - source: salt://heka/files/gse_topology.lua
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: {{ server.prefix_dir }}/usr/share/lma_collector
   - defaults:
@@ -356,7 +354,7 @@
   - source: salt://heka/files/toml/filter/{{ filter.engine }}.toml
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: heka_{{ service_name }}_conf_dir
   - require_in:
@@ -374,7 +372,7 @@
   - source: salt://heka/files/toml/splitter/{{ splitter.engine }}.toml
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: heka_{{ service_name }}_conf_dir
   - require_in:
@@ -392,7 +390,7 @@
   - source: salt://heka/files/toml/encoder/{{ encoder.engine }}.toml
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: heka_{{ service_name }}_conf_dir
   - require_in:
@@ -410,7 +408,7 @@
   - source: salt://heka/files/toml/output/{{ output.engine }}.toml
   - template: jinja
   - mode: 640
-  - group: heka
+  - group: {{ server.owner }}
   - require:
     - file: heka_{{ service_name }}_conf_dir
   - require_in:
diff --git a/heka/files/lua/common/ceilometer.lua b/heka/files/lua/common/ceilometer.lua
index 47f3974..7e23946 100644
--- a/heka/files/lua/common/ceilometer.lua
+++ b/heka/files/lua/common/ceilometer.lua
@@ -11,12 +11,12 @@
 -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
-
-local table = require 'table'
+local cjson = cjson
+local ipairs = ipairs
+local pcall = pcall
 
 local samples = require 'samples'
 local resources = require 'resources'
-local patt = require 'patterns'
 local utils = require 'lma_utils'
 
 local l = require 'lpeg'
@@ -27,6 +27,7 @@
     read_config("metadata_fields") or ""
 )
 local decode_resources = read_config('decode_resources') or false
+local flush_count = read_config('flush_count') or 500
 
 
 local samples_decoder = samples.new(metadata_fields)
@@ -49,12 +50,47 @@
     end
 end
 
-function decode(data)
-    local code, msg = inject(samples_decoder:decode(data))
-    if resource_decoder then
-        code, msg = inject(resource_decoder:decode(data))
+function inject_batch(batch)
+    local code, msg = inject(samples_decoder:decode(batch))
+    if code == 0 and resource_decoder then
+        code, msg = inject(resource_decoder:decode(batch))
     end
     return code, msg
 end
 
+function decode(data)
+    local ok, message = pcall(cjson.decode, data)
+    if not ok then
+        return -1, "Cannot decode Payload"
+    end
+    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
+    if not ok then
+        return -1, "Cannot decode Payload[oslo.message]"
+    end
+
+    local code = 0
+    local msg = ''
+
+    local batch = {}
+    batch['payload'] = {}
+    batch['timestamp'] = message_body.timestamp
+    if message_body['payload'] then
+        for _, sample in ipairs(message_body["payload"]) do
+            batch['payload'][#batch['payload']+1] = sample
+            if #batch['payload'] >= flush_count then
+                code, msg = inject_batch(batch)
+                batch['payload'] = {}
+                if code == -1 then
+                    return code, msg
+                end
+            end
+        end
+        if #batch['payload'] > 0 then
+            code, msg = inject_batch(batch)
+        end
+        return code, msg
+    end
+    return -1, "Empty message"
+end
+
 return CeilometerDecoder
diff --git a/heka/files/lua/common/resources.lua b/heka/files/lua/common/resources.lua
index fcb8c94..0a32ac2 100644
--- a/heka/files/lua/common/resources.lua
+++ b/heka/files/lua/common/resources.lua
@@ -14,8 +14,6 @@
 
 local cjson = cjson
 local string = string
-local table = table
-local math = math
 local setmetatable = setmetatable
 local ipairs = ipairs
 local pairs = pairs
@@ -110,15 +108,7 @@
 
 -- data: oslo.messaging message with Ceilometer samples
 -- returns ok and resource or error message
-function ResourcesDecoder:decode (data)
-    local ok, message = pcall(cjson.decode, data)
-    if not ok then
-        return -1, "Cannot decode Payload"
-    end
-    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
-    if not ok then
-        return -1, "Cannot decode Payload[oslo.message]"
-    end
+function ResourcesDecoder:decode (message_body)
     local resource_payload = {}
     if message_body['payload'] then
         for _, sample in ipairs(message_body["payload"]) do
diff --git a/heka/files/lua/common/samples.lua b/heka/files/lua/common/samples.lua
index af9c6c9..95f704a 100644
--- a/heka/files/lua/common/samples.lua
+++ b/heka/files/lua/common/samples.lua
@@ -15,7 +15,6 @@
 local cjson = cjson
 local string = string
 local table = table
-local math = math
 local setmetatable = setmetatable
 local ipairs = ipairs
 local pairs = pairs
@@ -170,15 +169,7 @@
 
 -- data: oslo.messaging message with Ceilometer samples
 -- returns ok and sample or error message
-function SamplesDecoder:decode (data)
-    local ok, message = pcall(cjson.decode, data)
-    if not ok then
-        return -1, "Cannot decode Payload"
-    end
-    local ok, message_body = pcall(cjson.decode, message["oslo.message"])
-    if not ok then
-        return -1, "Cannot decode Payload[oslo.message]"
-    end
+function SamplesDecoder:decode(message_body)
     local sample_payload = {}
     if message_body['payload'] then
         for _, sample in ipairs(message_body["payload"]) do
diff --git a/heka/map.jinja b/heka/map.jinja
index 07e39a2..e362fce 100644
--- a/heka/map.jinja
+++ b/heka/map.jinja
@@ -50,6 +50,7 @@
 {% set log_collector = salt['grains.filter_by']({
   'default': {
     'container_mode': False,
+    'owner': 'heka',
     'alarms_enabled': True,
     'emit_rates': True,
     'prefix_dir': default_prefix_dir,
@@ -69,6 +70,7 @@
 {% set metric_collector = salt['grains.filter_by']({
   'default': {
     'container_mode': False,
+    'owner': 'heka',
     'alarms_enabled': True,
     'prefix_dir': default_prefix_dir,
     'influxdb_port': default_influxdb_port,
@@ -90,6 +92,7 @@
 {% set remote_collector = salt['grains.filter_by']({
   'default': {
     'container_mode': False,
+    'owner': 'heka',
     'alarms_enabled': True,
     'emit_rates': True,
     'prefix_dir': default_prefix_dir,
@@ -114,6 +117,7 @@
 {% set aggregator = salt['grains.filter_by']({
   'default': {
     'container_mode': False,
+    'owner': 'heka',
     'alarms_enabled': True,
     'prefix_dir': default_prefix_dir,
     'influxdb_port': default_influxdb_port,
@@ -135,6 +139,7 @@
 {% set ceilometer_collector = salt['grains.filter_by']({
   'default': {
     'container_mode': False,
+    'owner': 'heka',
     'alarms_enabled': True,
     'prefix_dir': default_prefix_dir,
     'influxdb_port': default_influxdb_port,
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 74e6cdf..da2037f 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -690,6 +690,7 @@
       config:
         decoder: 'ceilometer'
         decode_resources: {{ ceilometer_collector.resource_decoding|lower }}
+        flush_count = 500
         metadata_fields: "status deleted container_format min_ram updated_at min_disk is_public size checksum created_at disk_format protected instance_host host display_name instance_id instance_type status state user_metadata.stack"
 {%- endif %}
 {%- if ceilometer_collector.amqp_host is defined %}
diff --git a/metadata/service/remote_collector/container.yml b/metadata/service/remote_collector/container.yml
index eefddda..52ac6b4 100644
--- a/metadata/service/remote_collector/container.yml
+++ b/metadata/service/remote_collector/container.yml
@@ -7,3 +7,4 @@
     remote_collector:
       enabled: true
       container_mode: true
+      owner: root