Merge "Add collectd Zookeeper plugin"
diff --git a/collectd/files/plugin/collectd_zookeeper.py b/collectd/files/plugin/collectd_zookeeper.py
new file mode 100644
index 0000000..ec713a0
--- /dev/null
+++ b/collectd/files/plugin/collectd_zookeeper.py
@@ -0,0 +1,193 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+"""Check Zookeeper Cluster.
+"""
+
+if __name__ == '__main__':
+ import collectd_fake as collectd
+else:
+ import collectd
+import collectd_base as base
+import socket
+
+NAME = 'zookeeper'
+# Default sampling interval
+INTERVAL = 50
+
+ZK_HOST = "localhost"
+ZK_PORT = 2181
+COUNTERS = set(["zk_packets_received", "zk_packets_sent"])
+# 4-letter cmds and any expected response
+RUOK_CMD = "ruok"
+IMOK_RESP = "imok"
+MNTR_CMD = "mntr"
+
+
+class ZookeeperServer(object):
+
+ def __init__(self, logger, host='localhost', port='2181', timeout=1):
+ self._logger = logger
+ self._address = (host, int(port))
+ self._timeout = timeout
+
+ def get_stats(self):
+ """Get ZooKeeper server stats as a map."""
+ stats = {}
+ # methods for each four-letter cmd
+ stats.update(self._get_health_stat())
+ stats.update(self._get_mntr_stats())
+ return stats
+
+ def _create_socket(self):
+ return socket.socket()
+
+ def _send_cmd(self, cmd):
+ """Send a 4letter word command to the server."""
+ response = ""
+ s = self._create_socket()
+ try:
+ s.settimeout(self._timeout)
+ s.connect(self._address)
+ s.send(cmd)
+ response = s.recv(2048)
+ s.close()
+ except socket.timeout:
+ self._logger.error(('Service not healthy: '
+ 'timed out calling "%s"') % cmd)
+ except socket.error, e:
+ self._logger.error(('Service not healthy: '
+ 'error calling "%s": %s') % (cmd, e))
+ return response
+
+ def _get_health_stat(self):
+ """Send the 'ruok' 4letter word command and parse the output."""
+ response = self._send_cmd(RUOK_CMD)
+ return {
+ 'zk_service_health':
+ base.Base.OK if response == IMOK_RESP else base.Base.FAIL
+ }
+
+ def _get_mntr_stats(self):
+ """Send 'mntr' 4letter word command and parse the output."""
+ response = self._send_cmd(MNTR_CMD)
+ result = {}
+ for line in response.splitlines():
+ try:
+ key, value = self._parse_line(line)
+ if key == 'zk_server_state':
+ result['zk_is_leader'] = int(value != 'follower')
+ elif key == 'zk_version':
+ continue
+ else:
+ result[key] = value
+ except ValueError:
+ # Ignore broken lines.
+ pass
+ return result
+
+ def _parse_line(self, line):
+ try:
+ key, value = map(str.strip, line.split('\t'))
+ except ValueError:
+ raise ValueError('Found invalid line: %s' % line)
+ if not key:
+ raise ValueError('The key is mandatory and should not be empty')
+ try:
+ value = int(value)
+ except (TypeError, ValueError):
+ pass
+ return key, value
+
+
+class ZookeeperServerPlugin(base.Base):
+
+ def __init__(self, *args, **kwargs):
+ super(ZookeeperServerPlugin, self).__init__(*args, **kwargs)
+ self.plugin = NAME
+ self._config = {}
+
+ def itermetrics(self):
+ """Get stats for local Zookeeper server."""
+ host = self._config.get('host', False)
+ port = self._config.get('port', False)
+ if host and port:
+ zk = ZookeeperServer(self.logger, host, port)
+ stats = zk.get_stats()
+ for k, v in stats.items():
+ try:
+ yield {
+ 'type': 'counter' if k in COUNTERS else 'gauge',
+ 'type_instance': k.replace('zk_', ''),
+ 'values': v,
+ }
+ except (TypeError, ValueError):
+ self.logger.error(('error dispatching stat; host=%s, '
+ 'key=%s, val=%s') % (host, k, v))
+ pass
+ else:
+ self.logger.error('Missing host or port')
+
+ def config_callback(self, config):
+ """Received configuration information"""
+ super(ZookeeperServerPlugin, self).config_callback(config)
+ zk_host = ZK_HOST
+ zk_port = ZK_PORT
+ for node in config.children:
+ if node.key == 'Hosts':
+ if len(node.values[0]) > 0:
+ zk_host = node.values[0].strip()
+ else:
+ self.logger.error(('ERROR: Invalid Hosts string. '
+ 'Using default of %s') % zk_host)
+ elif node.key == 'Port':
+ if isinstance(node.values[0], float) and node.values[0] > 0:
+ try:
+ zk_port = int(node.values[0])
+ except:
+ self.logger.error(('ERROR: Converting Port number. '
+ 'Using default of %s') % zk_port)
+ else:
+ self.logger.error(('ERROR: Invalid Port number. '
+ 'Using default of %s') % zk_port)
+ else:
+ collectd.warning('zookeeper plugin: Unknown config key: %s.'
+ % node.key)
+ continue
+ self._config = {
+ 'host': zk_host,
+ 'port': zk_port,
+ }
+ self.logger.info('Configured with %s.' % self._config)
+
+
+plugin = ZookeeperServerPlugin(collectd)
+
+
+def config_callback(conf):
+ plugin.config_callback(conf)
+
+
+def read_callback():
+ plugin.read_callback()
+
+if __name__ == '__main__':
+ collectd.load_configuration(plugin)
+ plugin.read_callback()
+else:
+ collectd.register_config(config_callback)
+ collectd.register_read(read_callback, INTERVAL)