Merge pull request #63 from simonpasquier/add-remote-collector-in-cluster-mode

Support remote_collector and aggregator in cluster
diff --git a/heka/files/lua/common/java_patterns.lua b/heka/files/lua/common/java_patterns.lua
index edbd130..ce81818 100644
--- a/heka/files/lua/common/java_patterns.lua
+++ b/heka/files/lua/common/java_patterns.lua
@@ -35,4 +35,8 @@
 -- 2016-11-21 08:52:14,070 - DEBUG - run_command: nodetool -h 127.0.0.1 info | grep ID | awk '{print $3}'
 CassandraLogGrammar = l.Ct(patt.JavaTimestamp * patt.sp * patt.dash * patt.sp * patt.JavaSeverity * patt.sp^1 * patt.dash * patt.sp * message)
 
+-- Ifmap
+-- 2016-11-24 10:11:32,457 - TRACE - [main]  - MetadataTypeRepository: new MetadataType http://www.trustedcomputinggroup.org/2010/IFMAP-METADATA/2:discovered-by - singleValue
+IfmapLogGrammar = CassandraLogGrammar
+
 return M
diff --git a/heka/files/lua/decoders/ifmap.lua b/heka/files/lua/decoders/ifmap.lua
new file mode 100644
index 0000000..b11c6b5
--- /dev/null
+++ b/heka/files/lua/decoders/ifmap.lua
@@ -0,0 +1,46 @@
+-- Copyright 2016 Mirantis, Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+local l      = require 'lpeg'
+local utils  = require 'lma_utils'
+l.locale(l)
+
+local java  = require 'java_patterns'
+
+local msg = {
+    Timestamp   = nil,
+    Type        = 'log',
+    Hostname    = nil,
+    Payload     = nil,
+    Pid         = nil,
+    Fields      = nil,
+    Severity    = 6,
+}
+
+function process_message ()
+    local log = read_message("Payload")
+    local logger = read_message("Logger")
+    local m = java.IfmapLogGrammar:match(log)
+    if not m then
+        return -1, string.format("Failed to parse %s log: %s", logger, string.sub(log, 1, 64))
+    end
+    msg.Timestamp = m.Timestamp
+    msg.Payload = m.Message
+    msg.Pid = m.Pid
+    msg.Severity = utils.label_to_severity_map[m.SeverityLabel or 'INFO'] or 6
+    msg.Fields = {}
+    msg.Fields.severity_label = utils.severity_to_label_map[msg.Severity]
+    msg.Fields.programname = 'ifmap'
+    utils.inject_tags(msg)
+    return utils.safe_inject_message(msg)
+end
diff --git a/heka/meta/heka.yml b/heka/meta/heka.yml
index 7ff0aad..55c7224 100644
--- a/heka/meta/heka.yml
+++ b/heka/meta/heka.yml
@@ -377,11 +377,17 @@
       module_file: /usr/share/lma_collector/filters/influxdb_accumulator.lua
       module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
       preserve_data: false
-      message_matcher: "Type == 'heka.sandbox.gse_metric'"
+      message_matcher: "Type == 'heka.sandbox.gse_metric' || Type == 'heka.sandbox.metric'"
       ticker_interval: 1
       config:
         tag_fields: "deployment_id environment_label tenant_id user_id"
         time_precision: "{{ aggregator.influxdb_time_precision }}"
+    influxdb_annotation:
+      engine: sandbox
+      module_file: /usr/share/lma_collector/filters/influxdb_annotation.lua
+      module_dir: /usr/share/lma_collector/common;/usr/share/heka/lua_modules
+      preserve_data: false
+      message_matcher: "Type == 'heka.sandbox.gse_metric'"
 {%- endif %}
   encoder:
 {%- if aggregator.influxdb_host is defined %}