very large refactoring
diff --git a/wally/sensors/__init__.py b/wally/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/sensors/__init__.py
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
new file mode 100644
index 0000000..f66bb36
--- /dev/null
+++ b/wally/sensors/api.py
@@ -0,0 +1,58 @@
+import Queue
+import threading
+from contextlib import contextmanager
+
+from .deploy_sensors import (deploy_and_start_sensors,
+ stop_and_remove_sensors)
+from .protocol import create_protocol, Timeout
+
+
+__all__ = ['Empty', 'recv_main', 'start_monitoring',
+ 'deploy_and_start_sensors', 'SensorConfig']
+
+
+Empty = Queue.Empty
+
+
+class SensorConfig(object):
+ def __init__(self, conn, url, sensors):
+ self.conn = conn
+ self.url = url
+ self.sensors = sensors
+
+
+def recv_main(proto, data_q, cmd_q):
+ while True:
+ try:
+ data_q.put(proto.recv(0.1))
+ except Timeout:
+ pass
+
+ try:
+ val = cmd_q.get(False)
+
+ if val is None:
+ return
+
+ except Queue.Empty:
+ pass
+
+
+@contextmanager
+def start_monitoring(uri, configs):
+ deploy_and_start_sensors(uri, configs)
+ try:
+ data_q = Queue.Queue()
+ cmd_q = Queue.Queue()
+ proto = create_protocol(uri, receiver=True)
+ th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+ th.daemon = True
+ th.start()
+
+ try:
+ yield data_q
+ finally:
+ cmd_q.put(None)
+ th.join()
+ finally:
+ stop_and_remove_sensors(configs)
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
new file mode 100644
index 0000000..4e96afe
--- /dev/null
+++ b/wally/sensors/cp_protocol.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+""" Protocol class """
+
+import re
+import zlib
+import json
+import logging
+import binascii
+
+
+logger = logging.getLogger("wally")
+
+
+# protocol contains 2 type of packet:
+# 1 - header, which contains template schema of counters
+# 2 - body, which contains only values in order as in template
+# it uses msgpack (or provided packer) for optimization
+#
+# packet has format:
+# begin_data_prefixSIZE\n\nDATAend_data_postfix
+# packet part has format:
+# SIZE\n\rDATA
+#
+# DATA use archivation
+
+
+class PacketException(Exception):
+ """ Exceptions from Packet"""
+ pass
+
+
+class Packet(object):
+ """ Class proceed packet by protocol"""
+
+ prefix = "begin_data_prefix"
+ postfix = "end_data_postfix"
+ header_prefix = "template"
+ # other fields
+ # is_begin
+ # is_end
+ # crc
+ # data
+ # data_len
+
+ def __init__(self, packer):
+ # preinit
+ self.is_begin = False
+ self.is_end = False
+ self.crc = None
+ self.data = ""
+ self.data_len = None
+ self.srv_template = None
+ self.clt_template = None
+ self.tmpl_size = 0
+ self.packer = packer
+
+ def new_packet(self, part):
+ """ New packet adding """
+ # proceed packet
+ try:
+ # get size
+ local_size_s, _, part = part.partition("\n\r")
+ local_size = int(local_size_s)
+
+ # find prefix
+ begin = part.find(self.prefix)
+ if begin != -1:
+ # divide data if something before begin and prefix
+ from_i = begin + len(self.prefix)
+ part = part[from_i:]
+ # reset flags
+ self.is_begin = True
+ self.is_end = False
+ self.data = ""
+ # get size
+ data_len_s, _, part = part.partition("\n\r")
+ self.data_len = int(data_len_s)
+ # get crc
+ crc_s, _, part = part.partition("\n\r")
+ self.crc = int(crc_s)
+
+ # bad size?
+ if local_size != self.data_len:
+ raise PacketException("Part size error")
+
+ # find postfix
+ end = part.find(self.postfix)
+ if end != -1:
+ # divide postfix
+ part = part[:end]
+ self.is_end = True
+
+ self.data += part
+ # check if it is end
+ if self.is_end:
+ self.data = zlib.decompress(self.data)
+ if self.data_len != len(self.data):
+ raise PacketException("Total size error")
+ if binascii.crc32(self.data) != self.crc:
+ raise PacketException("CRC error")
+
+ # check, if it is template
+ if self.data.startswith(self.header_prefix):
+ self.srv_template = self.data
+ # template is for internal use
+ return None
+
+ # decode values list
+ vals = self.packer.unpack(self.data)
+ dump = self.srv_template % tuple(vals)
+ return dump
+ else:
+ return None
+
+ except PacketException as e:
+ # if something wrong - skip packet
+ logger.warning("Packet skipped: %s", e)
+ self.is_begin = False
+ self.is_end = False
+ return None
+
+ except TypeError:
+ # if something wrong - skip packet
+ logger.warning("Packet skipped: doesn't match schema")
+ self.is_begin = False
+ self.is_end = False
+ return None
+
+ except:
+ # if something at all wrong - skip packet
+ logger.warning("Packet skipped: something is wrong")
+ self.is_begin = False
+ self.is_end = False
+ return None
+
+ @staticmethod
+ def create_packet(data, part_size):
+ """ Create packet divided by parts with part_size from data
+ No compression here """
+ # prepare data
+ data_len = "%i\n\r" % len(data)
+ header = "%s%s%s\n\r" % (Packet.prefix, data_len, binascii.crc32(data))
+ compact_data = zlib.compress(data)
+ packet = "%s%s%s" % (header, compact_data, Packet.postfix)
+
+ partheader_len = len(data_len)
+
+ beg = 0
+ end = part_size - partheader_len
+
+ result = []
+ while beg < len(packet):
+ block = packet[beg:beg+end]
+ result.append(data_len + block)
+ beg += end
+
+ return result
+
+ def create_packet_v2(self, data, part_size):
+ """ Create packet divided by parts with part_size from data
+ Compressed """
+ result = []
+ # create and add to result template header
+ if self.srv_template is None:
+ perf_string = json.dumps(data)
+ self.create_answer_template(perf_string)
+ template = self.header_prefix + self.srv_template
+ header = Packet.create_packet(template, part_size)
+ result.extend(header)
+
+ vals = self.get_matching_value_list(data)
+ body = self.packer.pack(vals)
+ parts = Packet.create_packet(body, part_size)
+ result.extend(parts)
+ return result
+
+ def get_matching_value_list(self, data):
+ """ Get values in order server expect"""
+ vals = range(0, self.tmpl_size)
+
+ try:
+ for node, groups in self.clt_template.items():
+ for group, counters in groups.items():
+ for counter, index in counters.items():
+ if not isinstance(index, dict):
+ vals[index] = data[node][group][counter]
+ else:
+ for k, i in index.items():
+ vals[i] = data[node][group][counter][k]
+
+ return vals
+
+ except (IndexError, KeyError):
+ logger = logging.getLogger(__name__)
+ logger.error("Data don't match last schema")
+ raise PacketException("Data don't match last schema")
+
+ def create_answer_template(self, perf_string):
+ """ Create template for server to insert counter values
+ Return tuple of server and clien templates + number of replaces"""
+ replacer = re.compile(": [0-9]+\.?[0-9]*")
+ # replace all values by %s
+ finditer = replacer.finditer(perf_string)
+ # server not need know positions
+ self.srv_template = ""
+ # client need positions
+ clt_template = ""
+ beg = 0
+ k = 0
+ # this could be done better?
+ for match in finditer:
+ # define input place in server template
+ self.srv_template += perf_string[beg:match.start()]
+ self.srv_template += ": %s"
+ # define match number in client template
+ clt_template += perf_string[beg:match.start()]
+ clt_template += ": %i" % k
+
+ beg = match.end()
+ k += 1
+
+ # add tail
+ self.srv_template += perf_string[beg:]
+ clt_template += perf_string[beg:]
+
+ self.tmpl_size = k
+ self.clt_template = json.loads(clt_template)
diff --git a/wally/sensors/cp_transport.py b/wally/sensors/cp_transport.py
new file mode 100644
index 0000000..2e00e80
--- /dev/null
+++ b/wally/sensors/cp_transport.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+""" UDP sender class """
+
+import socket
+import urlparse
+
+from cp_protocol import Packet
+
+try:
+ from disk_perf_test_tool.logger import define_logger
+ logger = define_logger(__name__)
+except ImportError:
+ class Logger(object):
+ def debug(self, *dt):
+ pass
+ logger = Logger()
+
+
+class SenderException(Exception):
+ """ Exceptions in Sender class """
+ pass
+
+
+class Timeout(Exception):
+ """ Exceptions in Sender class """
+ pass
+
+
+class Sender(object):
+ """ UDP sender class """
+
+ def __init__(self, packer, url=None, port=None, host="0.0.0.0", size=256):
+ """ Create connection object from input udp string or params"""
+
+ # test input
+ if url is None and port is None:
+ raise SenderException("Bad initialization")
+ if url is not None:
+ data = urlparse.urlparse(url)
+ # check schema
+ if data.scheme != "udp":
+ mes = "Bad protocol type: %s instead of UDP" % data.scheme
+ logger.error(mes)
+ raise SenderException("Bad protocol type")
+ # try to get port
+ try:
+ int_port = int(data.port)
+ except ValueError:
+ logger.error("Bad UDP port")
+ raise SenderException("Bad UDP port")
+ # save paths
+ self.sendto = (data.hostname, int_port)
+ self.bindto = (data.hostname, int_port)
+ # try to get size
+ try:
+ self.size = int(data.path.strip("/"))
+ except ValueError:
+ logger.error("Bad packet part size")
+ raise SenderException("Bad packet part size")
+ else:
+ # url is None - use size and port
+ self.sendto = (host, port)
+ self.bindto = ("0.0.0.0", port)
+ self.size = size
+
+ self.packer = packer
+
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.binded = False
+ self.all_data = {}
+ self.send_packer = None
+
+ def bind(self):
+ """ Prepare for listening """
+ self.sock.bind(self.bindto)
+ self.sock.settimeout(0.5)
+ self.binded = True
+
+ def send(self, data):
+ """ Send data to udp socket"""
+ if self.sock.sendto(data, self.sendto) != len(data):
+ mes = "Cannot send data to %s:%s" % self.sendto
+ logger.error(mes)
+ raise SenderException("Cannot send data")
+
+ def send_by_protocol(self, data):
+ """ Send data by Packet protocol
+ data = dict"""
+ if self.send_packer is None:
+ self.send_packer = Packet(self.packer())
+ parts = self.send_packer.create_packet_v2(data, self.size)
+ for part in parts:
+ self.send(part)
+
+ def recv(self):
+ """ Receive data from udp socket"""
+ # check for binding
+ if not self.binded:
+ self.bind()
+ # try to recv
+ try:
+ data, (remote_ip, _) = self.sock.recvfrom(self.size)
+ return data, remote_ip
+ except socket.timeout:
+ raise Timeout()
+
+ def recv_by_protocol(self):
+ """ Receive data from udp socket by Packet protocol"""
+ data, remote_ip = self.recv()
+
+ if remote_ip not in self.all_data:
+ self.all_data[remote_ip] = Packet(self.packer())
+
+ return self.all_data[remote_ip].new_packet(data)
+
+ def recv_with_answer(self, stop_event=None):
+ """ Receive data from udp socket and send 'ok' back
+ Command port = local port + 1
+ Answer port = local port
+ Waiting for command is blocking """
+ # create command socket
+ command_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ command_port = self.bindto[1]+1
+ command_sock.bind(("0.0.0.0", command_port))
+ command_sock.settimeout(1)
+ # try to recv
+ while True:
+ try:
+ data, (remote_ip, _) = command_sock.recvfrom(self.size)
+ self.send("ok")
+ return data, remote_ip
+ except socket.timeout:
+ if stop_event is not None and stop_event.is_set():
+ # return None if we are interrupted
+ return None
+
+ def verified_send(self, send_host, message, max_repeat=20):
+ """ Send and verify it by answer not more then max_repeat
+ Send port = local port + 1
+ Answer port = local port
+ Return True if send is verified """
+ # create send socket
+ send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ send_port = self.sendto[1]+1
+ for repeat in range(0, max_repeat):
+ send_sock.sendto(message, (send_host, send_port))
+ try:
+ data, remote_ip = self.recv()
+ if remote_ip == send_host and data == "ok":
+ return True
+ else:
+ logger.warning("No answer from %s, try %i",
+ send_host, repeat)
+ except Timeout:
+ logger.warning("No answer from %s, try %i", send_host, repeat)
+
+ return False
diff --git a/wally/sensors/daemonize.py b/wally/sensors/daemonize.py
new file mode 100644
index 0000000..a4fa157
--- /dev/null
+++ b/wally/sensors/daemonize.py
@@ -0,0 +1,226 @@
+# #!/usr/bin/python
+
+import fcntl
+import os
+import pwd
+import grp
+import sys
+import signal
+import resource
+import logging
+import atexit
+from logging import handlers
+
+
+class Daemonize(object):
+ """ Daemonize object
+ Object constructor expects three arguments:
+ - app: contains the application name which will be sent to syslog.
+ - pid: path to the pidfile.
+ - action: your custom function which will be executed after daemonization.
+ - keep_fds: optional list of fds which should not be closed.
+ - auto_close_fds: optional parameter to not close opened fds.
+ - privileged_action: action that will be executed before
+ drop privileges if user or
+ group parameter is provided.
+ If you want to transfer anything from privileged
+ action to action, such as opened privileged file
+ descriptor, you should return it from
+ privileged_action function and catch it inside action
+ function.
+ - user: drop privileges to this user if provided.
+ - group: drop privileges to this group if provided.
+ - verbose: send debug messages to logger if provided.
+ - logger: use this logger object instead of creating new one, if provided.
+ """
+ def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True,
+ privileged_action=None, user=None, group=None, verbose=False,
+ logger=None):
+ self.app = app
+ self.pid = pid
+ self.action = action
+ self.keep_fds = keep_fds or []
+ self.privileged_action = privileged_action or (lambda: ())
+ self.user = user
+ self.group = group
+ self.logger = logger
+ self.verbose = verbose
+ self.auto_close_fds = auto_close_fds
+
+ def sigterm(self, signum, frame):
+ """ sigterm method
+ These actions will be done after SIGTERM.
+ """
+ self.logger.warn("Caught signal %s. Stopping daemon." % signum)
+ os.remove(self.pid)
+ sys.exit(0)
+
+ def exit(self):
+ """ exit method
+ Cleanup pid file at exit.
+ """
+ self.logger.warn("Stopping daemon.")
+ os.remove(self.pid)
+ sys.exit(0)
+
+ def start(self):
+ """ start method
+ Main daemonization process.
+ """
+ # If pidfile already exists, we should read pid from there;
+ # to overwrite it, if locking
+ # will fail, because locking attempt somehow purges the file contents.
+ if os.path.isfile(self.pid):
+ with open(self.pid, "r") as old_pidfile:
+ old_pid = old_pidfile.read()
+ # Create a lockfile so that only one instance of this daemon is
+ # running at any time.
+ try:
+ lockfile = open(self.pid, "w")
+ except IOError:
+ print("Unable to create the pidfile.")
+ sys.exit(1)
+ try:
+ # Try to get an exclusive lock on the file. This will fail if
+ # another process has the file
+ # locked.
+ fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ print("Unable to lock on the pidfile.")
+ # We need to overwrite the pidfile if we got here.
+ with open(self.pid, "w") as pidfile:
+ pidfile.write(old_pid)
+ sys.exit(1)
+
+ # Fork, creating a new process for the child.
+ process_id = os.fork()
+ if process_id < 0:
+ # Fork error. Exit badly.
+ sys.exit(1)
+ elif process_id != 0:
+ # This is the parent process. Exit.
+ sys.exit(0)
+ # This is the child process. Continue.
+
+ # Stop listening for signals that the parent process receives.
+ # This is done by getting a new process id.
+ # setpgrp() is an alternative to setsid().
+ # setsid puts the process in a new parent group and detaches
+ # its controlling terminal.
+ process_id = os.setsid()
+ if process_id == -1:
+ # Uh oh, there was a problem.
+ sys.exit(1)
+
+ # Add lockfile to self.keep_fds.
+ self.keep_fds.append(lockfile.fileno())
+
+ # Close all file descriptors, except the ones mentioned in
+ # self.keep_fds.
+ devnull = "/dev/null"
+ if hasattr(os, "devnull"):
+ # Python has set os.devnull on this system, use it instead as it
+ # might be different
+ # than /dev/null.
+ devnull = os.devnull
+
+ if self.auto_close_fds:
+ for fd in range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[0]):
+ if fd not in self.keep_fds:
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+
+ devnull_fd = os.open(devnull, os.O_RDWR)
+ os.dup2(devnull_fd, 0)
+ os.dup2(devnull_fd, 1)
+ os.dup2(devnull_fd, 2)
+
+ if self.logger is None:
+ # Initialize logging.
+ self.logger = logging.getLogger(self.app)
+ self.logger.setLevel(logging.DEBUG)
+ # Display log messages only on defined handlers.
+ self.logger.propagate = False
+
+ # Initialize syslog.
+ # It will correctly work on OS X, Linux and FreeBSD.
+ if sys.platform == "darwin":
+ syslog_address = "/var/run/syslog"
+ else:
+ syslog_address = "/dev/log"
+
+ # We will continue with syslog initialization only if
+ # actually have such capabilities
+ # on the machine we are running this.
+ if os.path.isfile(syslog_address):
+ syslog = handlers.SysLogHandler(syslog_address)
+ if self.verbose:
+ syslog.setLevel(logging.DEBUG)
+ else:
+ syslog.setLevel(logging.INFO)
+ # Try to mimic to normal syslog messages.
+ format_t = "%(asctime)s %(name)s: %(message)s"
+ formatter = logging.Formatter(format_t,
+ "%b %e %H:%M:%S")
+ syslog.setFormatter(formatter)
+
+ self.logger.addHandler(syslog)
+
+ # Set umask to default to safe file permissions when running
+ # as a root daemon. 027 is an
+ # octal number which we are typing as 0o27 for Python3 compatibility.
+ os.umask(0o27)
+
+ # Change to a known directory. If this isn't done, starting a daemon
+ # in a subdirectory that
+ # needs to be deleted results in "directory busy" errors.
+ os.chdir("/")
+
+ # Execute privileged action
+ privileged_action_result = self.privileged_action()
+ if not privileged_action_result:
+ privileged_action_result = []
+
+ # Change gid
+ if self.group:
+ try:
+ gid = grp.getgrnam(self.group).gr_gid
+ except KeyError:
+ self.logger.error("Group {0} not found".format(self.group))
+ sys.exit(1)
+ try:
+ os.setgid(gid)
+ except OSError:
+ self.logger.error("Unable to change gid.")
+ sys.exit(1)
+
+ # Change uid
+ if self.user:
+ try:
+ uid = pwd.getpwnam(self.user).pw_uid
+ except KeyError:
+ self.logger.error("User {0} not found.".format(self.user))
+ sys.exit(1)
+ try:
+ os.setuid(uid)
+ except OSError:
+ self.logger.error("Unable to change uid.")
+ sys.exit(1)
+
+ try:
+ lockfile.write("%s" % (os.getpid()))
+ lockfile.flush()
+ except IOError:
+ self.logger.error("Unable to write pid to the pidfile.")
+ print("Unable to write pid to the pidfile.")
+ sys.exit(1)
+
+ # Set custom action on SIGTERM.
+ signal.signal(signal.SIGTERM, self.sigterm)
+ atexit.register(self.exit)
+
+ self.logger.warn("Starting daemon.")
+
+ self.action(*privileged_action_result)
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
new file mode 100644
index 0000000..249adfb
--- /dev/null
+++ b/wally/sensors/deploy_sensors.py
@@ -0,0 +1,87 @@
+import time
+import json
+import os.path
+import logging
+
+from concurrent.futures import ThreadPoolExecutor, wait
+
+from wally.ssh_utils import copy_paths, run_over_ssh
+
+logger = logging.getLogger('wally')
+
+
+def wait_all_ok(futures):
+ return all(future.result() for future in futures)
+
+
+def deploy_and_start_sensors(monitor_uri, sensor_configs,
+ remote_path='/tmp/sensors/sensors'):
+
+ paths = {os.path.dirname(__file__): remote_path}
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ futures = []
+
+ for node_sensor_config in sensor_configs:
+ futures.append(executor.submit(deploy_and_start_sensor,
+ paths,
+ node_sensor_config,
+ monitor_uri,
+ remote_path))
+
+ if not wait_all_ok(futures):
+ raise RuntimeError("Sensor deployment fails on some nodes")
+
+
+def deploy_and_start_sensor(paths, node_sensor_config,
+ monitor_uri, remote_path):
+ try:
+ copy_paths(node_sensor_config.conn, paths)
+ sftp = node_sensor_config.conn.open_sftp()
+
+ config_remote_path = os.path.join(remote_path, "conf.json")
+
+ with sftp.open(config_remote_path, "w") as fd:
+ fd.write(json.dumps(node_sensor_config.sensors))
+
+ cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+ "sensors.main -d start -u {1} {2}"
+
+ cmd = cmd_templ.format(os.path.dirname(remote_path),
+ monitor_uri,
+ config_remote_path)
+
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
+ sftp.close()
+
+ except:
+ msg = "During deploing sensors in {0}".format(node_sensor_config.url)
+ logger.exception(msg)
+ return False
+ return True
+
+
+def stop_and_remove_sensor(conn, url, remote_path):
+ cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+
+ run_over_ssh(conn, cmd.format(remote_path), node=url)
+
+ # some magic
+ time.sleep(0.3)
+
+ conn.exec_command("rm -rf {0}".format(remote_path))
+
+ logger.debug("Sensors stopped and removed")
+
+
+def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ futures = []
+
+ for node_sensor_config in configs:
+ futures.append(executor.submit(stop_and_remove_sensor,
+ node_sensor_config.conn,
+ node_sensor_config.url,
+ remote_path))
+
+ wait(futures)
diff --git a/wally/sensors/discover.py b/wally/sensors/discover.py
new file mode 100644
index 0000000..f227043
--- /dev/null
+++ b/wally/sensors/discover.py
@@ -0,0 +1,9 @@
+all_sensors = {}
+
+
+def provides(sensor_class_name):
+ def closure(func):
+ assert sensor_class_name not in all_sensors
+ all_sensors[sensor_class_name] = func
+ return func
+ return closure
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
new file mode 100644
index 0000000..3753e7c
--- /dev/null
+++ b/wally/sensors/main.py
@@ -0,0 +1,118 @@
+import os
+import sys
+import time
+import json
+import glob
+import signal
+import os.path
+import argparse
+
+from .sensors.utils import SensorInfo
+from .daemonize import Daemonize
+from .discover import all_sensors
+from .protocol import create_protocol
+
+
+# load all sensors
+from . import sensors
+sensors_dir = os.path.dirname(sensors.__file__)
+for fname in glob.glob(os.path.join(sensors_dir, "*.py")):
+ mod_name = os.path.basename(fname[:-3])
+ __import__("sensors.sensors." + mod_name)
+
+
+def get_values(required_sensors):
+ result = {}
+ for sensor_name, params in required_sensors:
+ if sensor_name in all_sensors:
+ result.update(all_sensors[sensor_name](**params))
+ else:
+ msg = "Sensor {0!r} isn't available".format(sensor_name)
+ raise ValueError(msg)
+ return time.time(), result
+
+
+def parse_args(args):
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-d', '--daemon',
+ choices=('start', 'stop', 'status'),
+ default=None)
+
+ parser.add_argument('-u', '--url', default='stdout://')
+ parser.add_argument('-t', '--timeout', type=float, default=1)
+ parser.add_argument('-l', '--list-sensors', action='store_true')
+ parser.add_argument('sensors_config', type=argparse.FileType('r'),
+ default=None, nargs='?')
+ return parser.parse_args(args[1:])
+
+
+def daemon_main(required_sensors, opts):
+ sender = create_protocol(opts.url)
+ prev = {}
+
+ while True:
+ gtime, data = get_values(required_sensors.items())
+ curr = {'time': SensorInfo(gtime, True)}
+ for name, val in data.items():
+ if val.is_accumulated:
+ if name in prev:
+ curr[name] = SensorInfo(val.value - prev[name], True)
+ prev[name] = val.value
+ else:
+ curr[name] = SensorInfo(val.value, False)
+ sender.send(curr)
+ time.sleep(opts.timeout)
+
+
+def pid_running(pid):
+ return os.path.exists("/proc/" + str(pid))
+
+
+def main(argv):
+ opts = parse_args(argv)
+
+ if opts.list_sensors:
+ print "\n".join(sorted(all_sensors))
+ return 0
+
+ if opts.daemon is not None:
+ pid_file = "/tmp/sensors.pid"
+ if opts.daemon == 'start':
+ required_sensors = json.loads(opts.sensors_config.read())
+
+ def root_func():
+ daemon_main(required_sensors, opts)
+
+ daemon = Daemonize(app="perfcollect_app",
+ pid=pid_file,
+ action=root_func)
+ daemon.start()
+ elif opts.daemon == 'stop':
+ if os.path.isfile(pid_file):
+ pid = int(open(pid_file).read())
+ if pid_running(pid):
+ os.kill(pid, signal.SIGTERM)
+
+ time.sleep(0.1)
+
+ if pid_running(pid):
+ os.kill(pid, signal.SIGKILL)
+
+ if os.path.isfile(pid_file):
+ os.unlink(pid_file)
+ elif opts.daemon == 'status':
+ if os.path.isfile(pid_file):
+ pid = int(open(pid_file).read())
+ if pid_running(pid):
+ print "running"
+ return
+ print "stopped"
+ else:
+ raise ValueError("Unknown daemon operation {}".format(opts.daemon))
+ else:
+ required_sensors = json.loads(opts.sensors_config.read())
+ daemon_main(required_sensors, opts)
+ return 0
+
+if __name__ == "__main__":
+ exit(main(sys.argv))
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
new file mode 100644
index 0000000..c2ace01
--- /dev/null
+++ b/wally/sensors/protocol.py
@@ -0,0 +1,192 @@
+import sys
+import time
+import socket
+import select
+import cPickle as pickle
+from urlparse import urlparse
+
+from . import cp_transport
+
+
+class Timeout(Exception):
+ pass
+
+
+# ------------------------------------- Serializers --------------------------
+
+
+class ISensortResultsSerializer(object):
+ def pack(self, data):
+ pass
+
+ def unpack(self, data):
+ pass
+
+
+class PickleSerializer(ISensortResultsSerializer):
+ def pack(self, data):
+ ndata = {key: val.value for key, val in data.items()}
+ return pickle.dumps(ndata)
+
+ def unpack(self, data):
+ return pickle.loads(data)
+
+try:
+ # try to use full-function lib
+ import msgpack
+
+ class mgspackSerializer(ISensortResultsSerializer):
+ def pack(self, data):
+ return msgpack.packb(data)
+
+ def unpack(self, data):
+ return msgpack.unpackb(data)
+
+ MSGPackSerializer = mgspackSerializer
+except ImportError:
+ # use local lib, if failed import
+ import umsgpack
+
+ class umsgspackSerializer(ISensortResultsSerializer):
+ def pack(self, data):
+ return umsgpack.packb(data)
+
+ def unpack(self, data):
+ return umsgpack.unpackb(data)
+
+ MSGPackSerializer = umsgspackSerializer
+
+# ------------------------------------- Transports ---------------------------
+
+
+class ITransport(object):
+ def __init__(self, receiver):
+ pass
+
+ def send(self, data):
+ pass
+
+ def recv(self, timeout=None):
+ pass
+
+
+class StdoutTransport(ITransport):
+ MIN_COL_WIDTH = 10
+
+ def __init__(self, receiver, delta=True):
+ if receiver:
+ cname = self.__class__.__name__
+ raise ValueError("{0} don't allows receiving".format(cname))
+
+ self.headers = None
+ self.line_format = ""
+ self.prev = {}
+ self.delta = delta
+ self.fd = sys.stdout
+
+ def send(self, data):
+ if self.headers is None:
+ self.headers = sorted(data)
+
+ for pos, header in enumerate(self.headers):
+ self.line_format += "{%s:>%s}" % (pos,
+ max(len(header) + 1,
+ self.MIN_COL_WIDTH))
+
+ print self.line_format.format(*self.headers)
+
+ if self.delta:
+ vals = [data[header].value - self.prev.get(header, 0)
+ for header in self.headers]
+
+ self.prev.update({header: data[header].value
+ for header in self.headers})
+ else:
+ vals = [data[header].value for header in self.headers]
+
+ self.fd.write(self.line_format.format(*vals) + "\n")
+
+ def recv(self, timeout=None):
+ cname = self.__class__.__name__
+ raise ValueError("{0} don't allows receiving".format(cname))
+
+
+class FileTransport(StdoutTransport):
+ def __init__(self, receiver, fname, delta=True):
+ StdoutTransport.__init__(self, receiver, delta)
+ self.fd = open(fname, "w")
+
+
+class UDPTransport(ITransport):
+ def __init__(self, receiver, ip, port, packer_cls):
+ self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ if receiver:
+ self.port.bind((ip, port))
+ self.packer_cls = packer_cls
+ self.packers = {}
+ else:
+ self.packer = packer_cls()
+ self.dst = (ip, port)
+
+ def send(self, data):
+ raw_data = self.packer.pack(data)
+ self.port.sendto(raw_data, self.dst)
+
+ def recv(self, timeout=None):
+ r, _, _ = select.select([self.port], [], [], timeout)
+ if len(r) != 0:
+ raw_data, addr = self.port.recvfrom(10000)
+ packer = self.packers.setdefault(addr, self.packer_cls())
+ return addr, packer.unpack(raw_data)
+ else:
+ raise Timeout()
+
+
+class HugeUDPTransport(ITransport, cp_transport.Sender):
+ def __init__(self, receiver, ip, port, packer_cls):
+ cp_transport.Sender.__init__(self, port=port, host=ip)
+ if receiver:
+ self.bind()
+
+ def send(self, data):
+ self.send_by_protocol(data)
+
+ def recv(self, timeout=None):
+ begin = time.time()
+
+ while True:
+
+ try:
+ # return not None, if packet is ready
+ ready = self.recv_by_protocol()
+ # if data ready - return it
+ if ready is not None:
+ return ready
+ # if data not ready - check if it's time to die
+ if time.time() - begin >= timeout:
+ break
+
+ except cp_transport.Timeout:
+ # no answer yet - check, if timeout end
+ if time.time() - begin >= timeout:
+ break
+# -------------------------- Factory function --------------------------------
+
+
+def create_protocol(uri, receiver=False):
+ parsed_uri = urlparse(uri)
+ if parsed_uri.scheme == 'stdout':
+ return StdoutTransport(receiver)
+ elif parsed_uri.scheme == 'udp':
+ ip, port = parsed_uri.netloc.split(":")
+ return UDPTransport(receiver, ip=ip, port=int(port),
+ packer_cls=PickleSerializer)
+ elif parsed_uri.scheme == 'file':
+ return FileTransport(receiver, parsed_uri.path)
+ elif parsed_uri.scheme == 'hugeudp':
+ ip, port = parsed_uri.netloc.split(":")
+ return HugeUDPTransport(receiver, ip=ip, port=int(port),
+ packer_cls=MSGPackSerializer)
+ else:
+ templ = "Can't instantiate transport from {0!r}"
+ raise ValueError(templ.format(uri))
diff --git a/wally/sensors/receiver.py b/wally/sensors/receiver.py
new file mode 100644
index 0000000..ff0f223
--- /dev/null
+++ b/wally/sensors/receiver.py
@@ -0,0 +1,19 @@
+from .api import start_monitoring, Empty
+# from influx_exporter import connect, add_data
+
+uri = "udp://192.168.0.104:12001"
+# infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
+# conn = connect(infldb_url)
+
+monitor_config = {'127.0.0.1':
+ {"block-io": {'allowed_prefixes': ['sda1', 'rbd1']},
+ "net-io": {"allowed_prefixes": ["virbr2"]}}}
+
+with start_monitoring(uri, monitor_config) as queue:
+ while True:
+ try:
+ (ip, port), data = queue.get(True, 1)
+ print (ip, port), data
+ # add_data(conn, ip, [data])
+ except Empty:
+ pass
diff --git a/wally/sensors/sensors/__init__.py b/wally/sensors/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/sensors/sensors/__init__.py
diff --git a/wally/sensors/sensors/io_sensors.py b/wally/sensors/sensors/io_sensors.py
new file mode 100644
index 0000000..c9ff340
--- /dev/null
+++ b/wally/sensors/sensors/io_sensors.py
@@ -0,0 +1,72 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+# 1 - major number
+# 2 - minor mumber
+# 3 - device name
+# 4 - reads completed successfully
+# 5 - reads merged
+# 6 - sectors read
+# 7 - time spent reading (ms)
+# 8 - writes completed
+# 9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+io_values_pos = [
+ (3, 'reads_completed', True),
+ (5, 'sectors_read', True),
+ (6, 'rtime', True),
+ (7, 'writes_completed', True),
+ (9, 'sectors_written', True),
+ (10, 'wtime', True),
+ (11, 'io_queue', False),
+ (13, 'io_time', True)
+]
+
+
+@provides("block-io")
+def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
+ results = {}
+ for line in open('/proc/diskstats'):
+ vals = line.split()
+ dev_name = vals[2]
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+
+ if dev_ok:
+ for pos, name, accum_val in io_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
+
+
+def get_latency(stat1, stat2):
+ disks = set(i.split('.', 1)[0] for i in stat1)
+ results = {}
+
+ for disk in disks:
+ rdc = disk + '.reads_completed'
+ wrc = disk + '.writes_completed'
+ rdt = disk + '.rtime'
+ wrt = disk + '.wtime'
+ lat = 0.0
+
+ io_ops1 = stat1[rdc].value + stat1[wrc].value
+ io_ops2 = stat2[rdc].value + stat2[wrc].value
+
+ diops = io_ops2 - io_ops1
+
+ if diops != 0:
+ io1 = stat1[rdt].value + stat1[wrt].value
+ io2 = stat2[rdt].value + stat2[wrt].value
+ lat = abs(float(io1 - io2)) / diops
+
+ results[disk + '.latence'] = SensorInfo(lat, False)
+
+ return results
diff --git a/wally/sensors/sensors/net_sensors.py b/wally/sensors/sensors/net_sensors.py
new file mode 100644
index 0000000..4a4e477
--- /dev/null
+++ b/wally/sensors/sensors/net_sensors.py
@@ -0,0 +1,43 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+# 1 - major number
+# 2 - minor mumber
+# 3 - device name
+# 4 - reads completed successfully
+# 5 - reads merged
+# 6 - sectors read
+# 7 - time spent reading (ms)
+# 8 - writes completed
+# 9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+net_values_pos = [
+ (0, 'recv_bytes', True),
+ (1, 'recv_packets', True),
+ (8, 'send_bytes', True),
+ (9, 'send_packets', True),
+]
+
+
+@provides("net-io")
+def net_stat(disallowed_prefixes=('docker',), allowed_prefixes=None):
+ results = {}
+
+ for line in open('/proc/net/dev').readlines()[2:]:
+ dev_name, stats = line.split(":", 1)
+ dev_name = dev_name.strip()
+ vals = stats.split()
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+ if dev_ok:
+ for pos, name, accum_val in net_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
diff --git a/wally/sensors/sensors/pscpu_sensors.py b/wally/sensors/sensors/pscpu_sensors.py
new file mode 100644
index 0000000..cffdb71
--- /dev/null
+++ b/wally/sensors/sensors/pscpu_sensors.py
@@ -0,0 +1,37 @@
+import os
+
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
+
+
+@provides("perprocess-cpu")
+def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+ pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+
+ for pid in pid_list:
+ try:
+ dev_name = get_pid_name(pid)
+
+ pid_stat1 = pid_stat(pid)
+
+ sensor_name = "{0}.{1}".format(dev_name, pid)
+ results[sensor_name] = SensorInfo(pid_stat1, True)
+ except IOError:
+ # may be, proc has already terminated, skip it
+ continue
+ return results
+
+
+def pid_stat(pid):
+ """ Return total cpu usage time from process"""
+ # read /proc/pid/stat
+ with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
+ proctimes = pidfile.readline().split()
+ # get utime from /proc/<pid>/stat, 14 item
+ utime = proctimes[13]
+ # get stime from proc/<pid>/stat, 15 item
+ stime = proctimes[14]
+ # count total process used time
+ proctotal = int(utime) + int(stime)
+ return float(proctotal)
diff --git a/wally/sensors/sensors/psram_sensors.py b/wally/sensors/sensors/psram_sensors.py
new file mode 100644
index 0000000..cbd85e6
--- /dev/null
+++ b/wally/sensors/sensors/psram_sensors.py
@@ -0,0 +1,76 @@
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
+
+
+# Based on ps_mem.py:
+# Licence: LGPLv2
+# Author: P@draigBrady.com
+# Source: http://www.pixelbeat.org/scripts/ps_mem.py
+# http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+
+
+# Note shared is always a subset of rss (trs is not always)
+def get_mem_stats(pid):
+ """ Return memory data of pid in format (private, shared) """
+
+ fname = '/proc/{0}/{1}'.format(pid, "smaps")
+ lines = open(fname).readlines()
+
+ shared = 0
+ private = 0
+ pss = 0
+
+ # add 0.5KiB as this avg error due to trunctation
+ pss_adjust = 0.5
+
+ for line in lines:
+ if line.startswith("Shared"):
+ shared += int(line.split()[1])
+
+ if line.startswith("Private"):
+ private += int(line.split()[1])
+
+ if line.startswith("Pss"):
+ pss += float(line.split()[1]) + pss_adjust
+
+ # Note Shared + Private = Rss above
+ # The Rss in smaps includes video card mem etc.
+
+ if pss != 0:
+ shared = int(pss - private)
+
+ return (private, shared)
+
+
+@provides("perprocess-ram")
+def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+ pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+ print pid_list
+ for pid in pid_list:
+ try:
+ dev_name = get_pid_name(pid)
+
+ private, shared = get_mem_stats(pid)
+ total = private + shared
+ sys_total = get_ram_size()
+ usage = float(total) / float(sys_total)
+
+ sensor_name = "{0}.{1}".format(dev_name, pid)
+
+ results[sensor_name + ".private_mem"] = SensorInfo(private, False)
+ results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
+ results[sensor_name + ".used_mem"] = SensorInfo(total, False)
+ name = sensor_name + ".mem_usage_percent"
+ results[name] = SensorInfo(usage * 100, False)
+ except IOError:
+ # permission denied or proc die
+ continue
+ return results
+
+
+def get_ram_size():
+ """ Return RAM size in Kb"""
+ with open("/proc/meminfo") as proc:
+ mem_total = proc.readline().split()
+ return mem_total[1]
diff --git a/wally/sensors/sensors/syscpu_sensors.py b/wally/sensors/sensors/syscpu_sensors.py
new file mode 100644
index 0000000..d3da02b
--- /dev/null
+++ b/wally/sensors/sensors/syscpu_sensors.py
@@ -0,0 +1,40 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+# 0 - cpu name
+# 1 - user: normal processes executing in user mode
+# 2 - nice: niced processes executing in user mode
+# 3 - system: processes executing in kernel mode
+# 4 - idle: twiddling thumbs
+# 5 - iowait: waiting for I/O to complete
+# 6 - irq: servicing interrupts
+# 7 - softirq: servicing softirqs
+
+io_values_pos = [
+ (1, 'user_processes', True),
+ (2, 'nice_processes', True),
+ (3, 'system_processes', True),
+ (4, 'idle_time', True),
+]
+
+
+@provides("system-cpu")
+def syscpu_stat(disallowed_prefixes=('intr', 'ctxt', 'btime', 'processes',
+ 'procs_running', 'procs_blocked', 'softirq'),
+ allowed_prefixes=None):
+ results = {}
+
+ for line in open('/proc/stat'):
+ vals = line.split()
+ dev_name = vals[0]
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+
+ if dev_ok:
+ for pos, name, accum_val in io_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
+
diff --git a/wally/sensors/sensors/sysram_sensors.py b/wally/sensors/sensors/sysram_sensors.py
new file mode 100644
index 0000000..c78eddd
--- /dev/null
+++ b/wally/sensors/sensors/sysram_sensors.py
@@ -0,0 +1,34 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+
+# return this values or setted in allowed
+ram_fields = [
+ 'MemTotal',
+ 'MemFree',
+ 'Buffers',
+ 'Cached',
+ 'SwapCached',
+ 'Dirty',
+ 'Writeback',
+ 'SwapTotal',
+ 'SwapFree'
+]
+
+
+@provides("system-ram")
+def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ if allowed_prefixes is None:
+ allowed_prefixes = ram_fields
+ results = {}
+ for line in open('/proc/meminfo'):
+ vals = line.split()
+ dev_name = vals[0]
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+
+ if dev_ok:
+ results[dev_name] = SensorInfo(int(vals[1]), False)
+ return results
diff --git a/wally/sensors/sensors/utils.py b/wally/sensors/sensors/utils.py
new file mode 100644
index 0000000..ad08676
--- /dev/null
+++ b/wally/sensors/sensors/utils.py
@@ -0,0 +1,83 @@
+import os
+
+from collections import namedtuple
+
+SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+
+
+def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
+ dev_ok = True
+
+ if disallowed_prefixes is not None:
+ dev_ok = all(not name.startswith(prefix)
+ for prefix in disallowed_prefixes)
+
+ if dev_ok and allowed_prefixes is not None:
+ dev_ok = any(name.startswith(prefix)
+ for prefix in allowed_prefixes)
+
+ return dev_ok
+
+
+def get_pid_list(disallowed_prefixes, allowed_prefixes):
+ """ Return pid list from list of pids and names """
+ # exceptions
+ but = disallowed_prefixes if disallowed_prefixes is not None else []
+ if allowed_prefixes is None:
+ # if nothing setted - all ps will be returned except setted
+ result = [pid
+ for pid in os.listdir('/proc')
+ if pid.isdigit() and pid not in but]
+ else:
+ result = []
+ for pid in os.listdir('/proc'):
+ if pid.isdigit() and pid not in but:
+ name = get_pid_name(pid)
+ if pid in allowed_prefixes or \
+ any(name.startswith(val) for val in allowed_prefixes):
+ print name
+ # this is allowed pid?
+ result.append(pid)
+ return result
+
+
+def get_pid_name(pid):
+ """ Return name by pid """
+ try:
+ with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
+ try:
+ cmd = pidfile.readline().split()[0]
+ return os.path.basename(cmd).rstrip('\x00')
+ except IndexError:
+ # no cmd returned
+ return "<NO NAME>"
+ except IOError:
+ # upstream wait any string, no matter if we couldn't read proc
+ return "no_such_process"
+
+
+def delta(func, only_upd=True):
+ prev = {}
+ while True:
+ for dev_name, vals in func():
+ if dev_name not in prev:
+ prev[dev_name] = {}
+ for name, (val, _) in vals.items():
+ prev[dev_name][name] = val
+ else:
+ dev_prev = prev[dev_name]
+ res = {}
+ for stat_name, (val, accum_val) in vals.items():
+ if accum_val:
+ if stat_name in dev_prev:
+ delta = int(val) - int(dev_prev[stat_name])
+ if not only_upd or 0 != delta:
+ res[stat_name] = str(delta)
+ dev_prev[stat_name] = val
+ elif not only_upd or '0' != val:
+ res[stat_name] = val
+
+ if only_upd and len(res) == 0:
+ continue
+ yield dev_name, res
+ yield None, None
diff --git a/wally/sensors/storage/__init__.py b/wally/sensors/storage/__init__.py
new file mode 100644
index 0000000..d7bf6aa
--- /dev/null
+++ b/wally/sensors/storage/__init__.py
@@ -0,0 +1,104 @@
+import struct
+
+
+def pack(val, tp=True):
+ if isinstance(val, int):
+ assert 0 <= val < 2 ** 16
+
+ if tp:
+ res = 'i'
+ else:
+ res = ""
+
+ res += struct.pack("!U", val)
+ elif isinstance(val, dict):
+ assert len(val) < 2 ** 16
+ if tp:
+ res = "d"
+ else:
+ res = ""
+
+ res += struct.pack("!U", len(val))
+ for k, v in dict.items():
+ assert 0 <= k < 2 ** 16
+ assert 0 <= v < 2 ** 32
+ res += struct.pack("!UI", k, v)
+ elif isinstance(val, str):
+ assert len(val) < 256
+ if tp:
+ res = "s"
+ else:
+ res = ""
+ res += chr(len(val)) + val
+ else:
+ raise ValueError()
+
+ return res
+
+
+def unpack(fd, tp=None):
+ if tp is None:
+ tp = fd.read(1)
+
+ if tp == 'i':
+ return struct.unpack("!U", fd.read(2))
+ elif tp == 'd':
+ res = {}
+ val_len = struct.unpack("!U", fd.read(2))
+ for _ in range(val_len):
+ k, v = struct.unpack("!UI", fd.read(6))
+ res[k] = v
+ return res
+ elif tp == 's':
+ val_len = struct.unpack("!U", fd.read(2))
+ return fd.read(val_len)
+
+ raise ValueError()
+
+
+class LocalStorage(object):
+ NEW_DATA = 0
+ NEW_SENSOR = 1
+ NEW_SOURCE = 2
+
+ def __init__(self, fd):
+ self.fd = fd
+ self.sensor_ids = {}
+ self.sources_ids = {}
+ self.max_source_id = 0
+ self.max_sensor_id = 0
+
+ def add_data(self, source, sensor_values):
+ source_id = self.sources_ids.get(source)
+ if source_id is None:
+ source_id = self.max_source_id
+ self.sources_ids[source] = source_id
+ self.emit(self.NEW_SOURCE, source_id, source)
+ self.max_source_id += 1
+
+ new_sensor_values = {}
+
+ for name, val in sensor_values.items():
+ sensor_id = self.sensor_ids.get(name)
+ if sensor_id is None:
+ sensor_id = self.max_sensor_id
+ self.sensor_ids[name] = sensor_id
+ self.emit(self.NEW_SENSOR, sensor_id, name)
+ self.max_sensor_id += 1
+ new_sensor_values[sensor_id] = val
+
+ self.emit(self.NEW_DATA, source_id, new_sensor_values)
+
+ def emit(self, tp, v1, v2):
+ self.fd.write(chr(tp) + pack(v1, False) + pack(v2))
+
+ def readall(self):
+ tp = self.fd.read(1)
+ if ord(tp) == self.NEW_DATA:
+ pass
+ elif ord(tp) == self.NEW_SENSOR:
+ pass
+ elif ord(tp) == self.NEW_SOURCE:
+ pass
+ else:
+ raise ValueError()
diff --git a/wally/sensors/storage/grafana.py b/wally/sensors/storage/grafana.py
new file mode 100644
index 0000000..9823fac
--- /dev/null
+++ b/wally/sensors/storage/grafana.py
@@ -0,0 +1,47 @@
+import json
+
+
+query = """
+select value from "{series}"
+where $timeFilter and
+host='{host}' and device='{device}'
+order asc
+"""
+
+
+def make_dashboard_file(config):
+ series = ['writes_completed', 'sectors_written']
+ dashboards = []
+
+ for serie in series:
+ dashboard = dict(title=serie, type='graph',
+ span=12, fill=1, linewidth=2,
+ tooltip={'shared': True})
+
+ targets = []
+
+ for ip, devs in config.items():
+ for device in devs:
+ params = {
+ 'series': serie,
+ 'host': ip,
+ 'device': device
+ }
+
+ target = dict(
+ target="disk io",
+ query=query.replace("\n", " ").format(**params).strip(),
+ interval="",
+ alias="{0} io {1}".format(ip, device),
+ rawQuery=True
+ )
+ targets.append(target)
+
+ dashboard['targets'] = targets
+ dashboards.append(dashboard)
+
+ fc = open("grafana_template.js").read()
+ return fc % (json.dumps(dashboards),)
+
+
+print make_dashboard_file({'192.168.0.104': ['sda1', 'rbd1']})
diff --git a/wally/sensors/storage/grafana_template.js b/wally/sensors/storage/grafana_template.js
new file mode 100644
index 0000000..7c57924
--- /dev/null
+++ b/wally/sensors/storage/grafana_template.js
@@ -0,0 +1,46 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {rows : []};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+ from: "now-5m",
+ to: "now"
+};
+
+dashboard.rows.push({
+ title: 'Chart',
+ height: '300px',
+ panels: %s
+});
+
+
+return dashboard;
diff --git a/wally/sensors/storage/influx_exporter.py b/wally/sensors/storage/influx_exporter.py
new file mode 100644
index 0000000..34b3c0a
--- /dev/null
+++ b/wally/sensors/storage/influx_exporter.py
@@ -0,0 +1,31 @@
+from urlparse import urlparse
+from influxdb import InfluxDBClient
+
+
+def connect(url):
+ parsed_url = urlparse(url)
+ user_passwd, host_port = parsed_url.netloc.rsplit("@", 1)
+ user, passwd = user_passwd.split(":", 1)
+ host, port = host_port.split(":")
+ return InfluxDBClient(host, int(port), user, passwd, parsed_url.path[1:])
+
+
+def add_data(conn, hostname, data):
+ per_sensor_data = {}
+ for serie in data:
+ serie = serie.copy()
+ gtime = serie.pop('time')
+ for key, val in serie.items():
+ dev, sensor = key.split('.')
+ data = per_sensor_data.setdefault(sensor, [])
+ data.append([gtime, hostname, dev, val])
+
+ infl_data = []
+ columns = ['time', 'host', 'device', 'value']
+ for sensor_name, points in per_sensor_data.items():
+ infl_data.append(
+ {'columns': columns,
+ 'name': sensor_name,
+ 'points': points})
+
+ conn.write_points(infl_data)
diff --git a/wally/sensors/storage/koder.js b/wally/sensors/storage/koder.js
new file mode 100644
index 0000000..a65a454
--- /dev/null
+++ b/wally/sensors/storage/koder.js
@@ -0,0 +1,47 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {rows : []};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+ from: "now-5m",
+ to: "now"
+};
+
+dashboard.rows.push({
+ title: 'Chart',
+ height: '300px',
+ panels: [{"span": 12, "title": "writes_completed", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}, {"span": 12, "title": "sectors_written", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}]
+});
+
+
+return dashboard;
+
diff --git a/wally/sensors/umsgpack.py b/wally/sensors/umsgpack.py
new file mode 100644
index 0000000..0cdc83e
--- /dev/null
+++ b/wally/sensors/umsgpack.py
@@ -0,0 +1,879 @@
+# u-msgpack-python v2.0 - vsergeev at gmail
+# https://github.com/vsergeev/u-msgpack-python
+#
+# u-msgpack-python is a lightweight MessagePack serializer and deserializer
+# module, compatible with both Python 2 and 3, as well CPython and PyPy
+# implementations of Python. u-msgpack-python is fully compliant with the
+# latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
+# particular, it supports the new binary, UTF-8 string, and application ext
+# types.
+#
+# MIT License
+#
+# Copyright (c) 2013-2014 Ivan A. Sergeev
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+"""
+u-msgpack-python v2.0 - vsergeev at gmail
+https://github.com/vsergeev/u-msgpack-python
+
+u-msgpack-python is a lightweight MessagePack serializer and deserializer
+module, compatible with both Python 2 and 3, as well CPython and PyPy
+implementations of Python. u-msgpack-python is fully compliant with the
+latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
+particular, it supports the new binary, UTF-8 string, and application ext
+types.
+
+License: MIT
+"""
+
+__version__ = "2.0"
+"Module version string"
+
+version = (2,0)
+"Module version tuple"
+
+import struct
+import collections
+import sys
+import io
+
+################################################################################
+### Ext Class
+################################################################################
+
+# Extension type for application-defined types and data
+class Ext:
+ """
+ The Ext class facilitates creating a serializable extension object to store
+ an application-defined type and data byte array.
+ """
+
+ def __init__(self, type, data):
+ """
+ Construct a new Ext object.
+
+ Args:
+ type: application-defined type integer from 0 to 127
+ data: application-defined data byte array
+
+ Raises:
+ TypeError:
+ Specified ext type is outside of 0 to 127 range.
+
+ Example:
+ >>> foo = umsgpack.Ext(0x05, b"\x01\x02\x03")
+ >>> umsgpack.packb({u"special stuff": foo, u"awesome": True})
+ '\x82\xa7awesome\xc3\xadspecial stuff\xc7\x03\x05\x01\x02\x03'
+ >>> bar = umsgpack.unpackb(_)
+ >>> print(bar["special stuff"])
+ Ext Object (Type: 0x05, Data: 01 02 03)
+ >>>
+ """
+ # Application ext type should be 0 <= type <= 127
+ if not isinstance(type, int) or not (type >= 0 and type <= 127):
+ raise TypeError("ext type out of range")
+ # Check data is type bytes
+ elif sys.version_info[0] == 3 and not isinstance(data, bytes):
+ raise TypeError("ext data is not type \'bytes\'")
+ elif sys.version_info[0] == 2 and not isinstance(data, str):
+ raise TypeError("ext data is not type \'str\'")
+ self.type = type
+ self.data = data
+
+ def __eq__(self, other):
+ """
+ Compare this Ext object with another for equality.
+ """
+ return (isinstance(other, self.__class__) and
+ self.type == other.type and
+ self.data == other.data)
+
+ def __ne__(self, other):
+ """
+ Compare this Ext object with another for inequality.
+ """
+ return not self.__eq__(other)
+
+ def __str__(self):
+ """
+ String representation of this Ext object.
+ """
+ s = "Ext Object (Type: 0x%02x, Data: " % self.type
+ for i in range(min(len(self.data), 8)):
+ if i > 0:
+ s += " "
+ if isinstance(self.data[i], int):
+ s += "%02x" % (self.data[i])
+ else:
+ s += "%02x" % ord(self.data[i])
+ if len(self.data) > 8:
+ s += " ..."
+ s += ")"
+ return s
+
+################################################################################
+### Exceptions
+################################################################################
+
+# Base Exception classes
+class PackException(Exception):
+ "Base class for exceptions encountered during packing."
+ pass
+class UnpackException(Exception):
+ "Base class for exceptions encountered during unpacking."
+ pass
+
+# Packing error
+class UnsupportedTypeException(PackException):
+ "Object type not supported for packing."
+ pass
+
+# Unpacking error
+class InsufficientDataException(UnpackException):
+ "Insufficient data to unpack the encoded object."
+ pass
+class InvalidStringException(UnpackException):
+ "Invalid UTF-8 string encountered during unpacking."
+ pass
+class ReservedCodeException(UnpackException):
+ "Reserved code encountered during unpacking."
+ pass
+class UnhashableKeyException(UnpackException):
+ """
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ """
+ pass
+class DuplicateKeyException(UnpackException):
+ "Duplicate key encountered during map unpacking."
+ pass
+
+# Backwards compatibility
+KeyNotPrimitiveException = UnhashableKeyException
+KeyDuplicateException = DuplicateKeyException
+
+################################################################################
+### Exported Functions and Globals
+################################################################################
+
+# Exported functions and variables, set up in __init()
+pack = None
+packb = None
+unpack = None
+unpackb = None
+dump = None
+dumps = None
+load = None
+loads = None
+
+compatibility = False
+"""
+Compatibility mode boolean.
+
+When compatibility mode is enabled, u-msgpack-python will serialize both
+unicode strings and bytes into the old "raw" msgpack type, and deserialize the
+"raw" msgpack type into bytes. This provides backwards compatibility with the
+old MessagePack specification.
+
+Example:
+>>> umsgpack.compatibility = True
+>>>
+>>> umsgpack.packb([u"some string", b"some bytes"])
+b'\x92\xabsome string\xaasome bytes'
+>>> umsgpack.unpackb(_)
+[b'some string', b'some bytes']
+>>>
+"""
+
+################################################################################
+### Packing
+################################################################################
+
+# You may notice struct.pack("B", obj) instead of the simpler chr(obj) in the
+# code below. This is to allow for seamless Python 2 and 3 compatibility, as
+# chr(obj) has a str return type instead of bytes in Python 3, and
+# struct.pack(...) has the right return type in both versions.
+
+def _pack_integer(obj, fp):
+ if obj < 0:
+ if obj >= -32:
+ fp.write(struct.pack("b", obj))
+ elif obj >= -2**(8-1):
+ fp.write(b"\xd0" + struct.pack("b", obj))
+ elif obj >= -2**(16-1):
+ fp.write(b"\xd1" + struct.pack(">h", obj))
+ elif obj >= -2**(32-1):
+ fp.write(b"\xd2" + struct.pack(">i", obj))
+ elif obj >= -2**(64-1):
+ fp.write(b"\xd3" + struct.pack(">q", obj))
+ else:
+ raise UnsupportedTypeException("huge signed int")
+ else:
+ if obj <= 127:
+ fp.write(struct.pack("B", obj))
+ elif obj <= 2**8-1:
+ fp.write(b"\xcc" + struct.pack("B", obj))
+ elif obj <= 2**16-1:
+ fp.write(b"\xcd" + struct.pack(">H", obj))
+ elif obj <= 2**32-1:
+ fp.write(b"\xce" + struct.pack(">I", obj))
+ elif obj <= 2**64-1:
+ fp.write(b"\xcf" + struct.pack(">Q", obj))
+ else:
+ raise UnsupportedTypeException("huge unsigned int")
+
+def _pack_nil(obj, fp):
+ fp.write(b"\xc0")
+
+def _pack_boolean(obj, fp):
+ fp.write(b"\xc3" if obj else b"\xc2")
+
+def _pack_float(obj, fp):
+ if _float_size == 64:
+ fp.write(b"\xcb" + struct.pack(">d", obj))
+ else:
+ fp.write(b"\xca" + struct.pack(">f", obj))
+
+def _pack_string(obj, fp):
+ obj = obj.encode('utf-8')
+ if len(obj) <= 31:
+ fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
+ elif len(obj) <= 2**8-1:
+ fp.write(b"\xd9" + struct.pack("B", len(obj)) + obj)
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
+ else:
+ raise UnsupportedTypeException("huge string")
+
+def _pack_binary(obj, fp):
+ if len(obj) <= 2**8-1:
+ fp.write(b"\xc4" + struct.pack("B", len(obj)) + obj)
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xc5" + struct.pack(">H", len(obj)) + obj)
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xc6" + struct.pack(">I", len(obj)) + obj)
+ else:
+ raise UnsupportedTypeException("huge binary string")
+
+def _pack_oldspec_raw(obj, fp):
+ if len(obj) <= 31:
+ fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
+ else:
+ raise UnsupportedTypeException("huge raw string")
+
+def _pack_ext(obj, fp):
+ if len(obj.data) == 1:
+ fp.write(b"\xd4" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) == 2:
+ fp.write(b"\xd5" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) == 4:
+ fp.write(b"\xd6" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) == 8:
+ fp.write(b"\xd7" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) == 16:
+ fp.write(b"\xd8" + struct.pack("B", obj.type & 0xff) + obj.data)
+ elif len(obj.data) <= 2**8-1:
+ fp.write(b"\xc7" + struct.pack("BB", len(obj.data), obj.type & 0xff) + obj.data)
+ elif len(obj.data) <= 2**16-1:
+ fp.write(b"\xc8" + struct.pack(">HB", len(obj.data), obj.type & 0xff) + obj.data)
+ elif len(obj.data) <= 2**32-1:
+ fp.write(b"\xc9" + struct.pack(">IB", len(obj.data), obj.type & 0xff) + obj.data)
+ else:
+ raise UnsupportedTypeException("huge ext data")
+
+def _pack_array(obj, fp):
+ if len(obj) <= 15:
+ fp.write(struct.pack("B", 0x90 | len(obj)))
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xdc" + struct.pack(">H", len(obj)))
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xdd" + struct.pack(">I", len(obj)))
+ else:
+ raise UnsupportedTypeException("huge array")
+
+ for e in obj:
+ pack(e, fp)
+
+def _pack_map(obj, fp):
+ if len(obj) <= 15:
+ fp.write(struct.pack("B", 0x80 | len(obj)))
+ elif len(obj) <= 2**16-1:
+ fp.write(b"\xde" + struct.pack(">H", len(obj)))
+ elif len(obj) <= 2**32-1:
+ fp.write(b"\xdf" + struct.pack(">I", len(obj)))
+ else:
+ raise UnsupportedTypeException("huge array")
+
+ for k,v in obj.items():
+ pack(k, fp)
+ pack(v, fp)
+
+########################################
+
+# Pack for Python 2, with 'unicode' type, 'str' type, and 'long' type
+def _pack2(obj, fp):
+ """
+ Serialize a Python object into MessagePack bytes.
+
+ Args:
+ obj: a Python object
+ fp: a .write()-supporting file-like object
+
+ Returns:
+ None.
+
+ Raises:
+ UnsupportedType(PackException):
+ Object type not supported for packing.
+
+ Example:
+ >>> f = open('test.bin', 'w')
+ >>> umsgpack.pack({u"compact": True, u"schema": 0}, f)
+ >>>
+ """
+
+ global compatibility
+
+ if obj is None:
+ _pack_nil(obj, fp)
+ elif isinstance(obj, bool):
+ _pack_boolean(obj, fp)
+ elif isinstance(obj, int) or isinstance(obj, long):
+ _pack_integer(obj, fp)
+ elif isinstance(obj, float):
+ _pack_float(obj, fp)
+ elif compatibility and isinstance(obj, unicode):
+ _pack_oldspec_raw(bytes(obj), fp)
+ elif compatibility and isinstance(obj, bytes):
+ _pack_oldspec_raw(obj, fp)
+ elif isinstance(obj, unicode):
+ _pack_string(obj, fp)
+ elif isinstance(obj, str):
+ _pack_binary(obj, fp)
+ elif isinstance(obj, list) or isinstance(obj, tuple):
+ _pack_array(obj, fp)
+ elif isinstance(obj, dict):
+ _pack_map(obj, fp)
+ elif isinstance(obj, Ext):
+ _pack_ext(obj, fp)
+ else:
+ raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
+
+# Pack for Python 3, with unicode 'str' type, 'bytes' type, and no 'long' type
+def _pack3(obj, fp):
+ """
+ Serialize a Python object into MessagePack bytes.
+
+ Args:
+ obj: a Python object
+ fp: a .write()-supporting file-like object
+
+ Returns:
+ None.
+
+ Raises:
+ UnsupportedType(PackException):
+ Object type not supported for packing.
+
+ Example:
+ >>> f = open('test.bin', 'w')
+ >>> umsgpack.pack({u"compact": True, u"schema": 0}, fp)
+ >>>
+ """
+ global compatibility
+
+ if obj is None:
+ _pack_nil(obj, fp)
+ elif isinstance(obj, bool):
+ _pack_boolean(obj, fp)
+ elif isinstance(obj, int):
+ _pack_integer(obj, fp)
+ elif isinstance(obj, float):
+ _pack_float(obj, fp)
+ elif compatibility and isinstance(obj, str):
+ _pack_oldspec_raw(obj.encode('utf-8'), fp)
+ elif compatibility and isinstance(obj, bytes):
+ _pack_oldspec_raw(obj, fp)
+ elif isinstance(obj, str):
+ _pack_string(obj, fp)
+ elif isinstance(obj, bytes):
+ _pack_binary(obj, fp)
+ elif isinstance(obj, list) or isinstance(obj, tuple):
+ _pack_array(obj, fp)
+ elif isinstance(obj, dict):
+ _pack_map(obj, fp)
+ elif isinstance(obj, Ext):
+ _pack_ext(obj, fp)
+ else:
+ raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
+
+def _packb2(obj):
+ """
+ Serialize a Python object into MessagePack bytes.
+
+ Args:
+ obj: a Python object
+
+ Returns:
+ A 'str' containing serialized MessagePack bytes.
+
+ Raises:
+ UnsupportedType(PackException):
+ Object type not supported for packing.
+
+ Example:
+ >>> umsgpack.packb({u"compact": True, u"schema": 0})
+ '\x82\xa7compact\xc3\xa6schema\x00'
+ >>>
+ """
+ fp = io.BytesIO()
+ _pack2(obj, fp)
+ return fp.getvalue()
+
+def _packb3(obj):
+ """
+ Serialize a Python object into MessagePack bytes.
+
+ Args:
+ obj: a Python object
+
+ Returns:
+ A 'bytes' containing serialized MessagePack bytes.
+
+ Raises:
+ UnsupportedType(PackException):
+ Object type not supported for packing.
+
+ Example:
+ >>> umsgpack.packb({u"compact": True, u"schema": 0})
+ b'\x82\xa7compact\xc3\xa6schema\x00'
+ >>>
+ """
+ fp = io.BytesIO()
+ _pack3(obj, fp)
+ return fp.getvalue()
+
+################################################################################
+### Unpacking
+################################################################################
+
+def _read_except(fp, n):
+ data = fp.read(n)
+ if len(data) < n:
+ raise InsufficientDataException()
+ return data
+
+def _unpack_integer(code, fp):
+ if (ord(code) & 0xe0) == 0xe0:
+ return struct.unpack("b", code)[0]
+ elif code == b'\xd0':
+ return struct.unpack("b", _read_except(fp, 1))[0]
+ elif code == b'\xd1':
+ return struct.unpack(">h", _read_except(fp, 2))[0]
+ elif code == b'\xd2':
+ return struct.unpack(">i", _read_except(fp, 4))[0]
+ elif code == b'\xd3':
+ return struct.unpack(">q", _read_except(fp, 8))[0]
+ elif (ord(code) & 0x80) == 0x00:
+ return struct.unpack("B", code)[0]
+ elif code == b'\xcc':
+ return struct.unpack("B", _read_except(fp, 1))[0]
+ elif code == b'\xcd':
+ return struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xce':
+ return struct.unpack(">I", _read_except(fp, 4))[0]
+ elif code == b'\xcf':
+ return struct.unpack(">Q", _read_except(fp, 8))[0]
+ raise Exception("logic error, not int: 0x%02x" % ord(code))
+
+def _unpack_reserved(code, fp):
+ if code == b'\xc1':
+ raise ReservedCodeException("encountered reserved code: 0x%02x" % ord(code))
+ raise Exception("logic error, not reserved code: 0x%02x" % ord(code))
+
+def _unpack_nil(code, fp):
+ if code == b'\xc0':
+ return None
+ raise Exception("logic error, not nil: 0x%02x" % ord(code))
+
+def _unpack_boolean(code, fp):
+ if code == b'\xc2':
+ return False
+ elif code == b'\xc3':
+ return True
+ raise Exception("logic error, not boolean: 0x%02x" % ord(code))
+
+def _unpack_float(code, fp):
+ if code == b'\xca':
+ return struct.unpack(">f", _read_except(fp, 4))[0]
+ elif code == b'\xcb':
+ return struct.unpack(">d", _read_except(fp, 8))[0]
+ raise Exception("logic error, not float: 0x%02x" % ord(code))
+
+def _unpack_string(code, fp):
+ if (ord(code) & 0xe0) == 0xa0:
+ length = ord(code) & ~0xe0
+ elif code == b'\xd9':
+ length = struct.unpack("B", _read_except(fp, 1))[0]
+ elif code == b'\xda':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xdb':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not string: 0x%02x" % ord(code))
+
+ # Always return raw bytes in compatibility mode
+ global compatibility
+ if compatibility:
+ return _read_except(fp, length)
+
+ try:
+ return bytes.decode(_read_except(fp, length), 'utf-8')
+ except UnicodeDecodeError:
+ raise InvalidStringException("unpacked string is not utf-8")
+
+def _unpack_binary(code, fp):
+ if code == b'\xc4':
+ length = struct.unpack("B", _read_except(fp, 1))[0]
+ elif code == b'\xc5':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xc6':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not binary: 0x%02x" % ord(code))
+
+ return _read_except(fp, length)
+
+def _unpack_ext(code, fp):
+ if code == b'\xd4':
+ length = 1
+ elif code == b'\xd5':
+ length = 2
+ elif code == b'\xd6':
+ length = 4
+ elif code == b'\xd7':
+ length = 8
+ elif code == b'\xd8':
+ length = 16
+ elif code == b'\xc7':
+ length = struct.unpack("B", _read_except(fp, 1))[0]
+ elif code == b'\xc8':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xc9':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not ext: 0x%02x" % ord(code))
+
+ return Ext(ord(_read_except(fp, 1)), _read_except(fp, length))
+
+def _unpack_array(code, fp):
+ if (ord(code) & 0xf0) == 0x90:
+ length = (ord(code) & ~0xf0)
+ elif code == b'\xdc':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xdd':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not array: 0x%02x" % ord(code))
+
+ return [_unpack(fp) for i in range(length)]
+
+def _deep_list_to_tuple(obj):
+ if isinstance(obj, list):
+ return tuple([_deep_list_to_tuple(e) for e in obj])
+ return obj
+
+def _unpack_map(code, fp):
+ if (ord(code) & 0xf0) == 0x80:
+ length = (ord(code) & ~0xf0)
+ elif code == b'\xde':
+ length = struct.unpack(">H", _read_except(fp, 2))[0]
+ elif code == b'\xdf':
+ length = struct.unpack(">I", _read_except(fp, 4))[0]
+ else:
+ raise Exception("logic error, not map: 0x%02x" % ord(code))
+
+ d = {}
+ for i in range(length):
+ # Unpack key
+ k = _unpack(fp)
+
+ if isinstance(k, list):
+ # Attempt to convert list into a hashable tuple
+ k = _deep_list_to_tuple(k)
+ elif not isinstance(k, collections.Hashable):
+ raise UnhashableKeyException("encountered unhashable key: %s, %s" % (str(k), str(type(k))))
+ elif k in d:
+ raise DuplicateKeyException("encountered duplicate key: %s, %s" % (str(k), str(type(k))))
+
+ # Unpack value
+ v = _unpack(fp)
+
+ try:
+ d[k] = v
+ except TypeError:
+ raise UnhashableKeyException("encountered unhashable key: %s" % str(k))
+ return d
+
+def _unpack(fp):
+ code = _read_except(fp, 1)
+ return _unpack_dispatch_table[code](code, fp)
+
+########################################
+
+def _unpack2(fp):
+ """
+ Deserialize MessagePack bytes into a Python object.
+
+ Args:
+ fp: a .read()-supporting file-like object
+
+ Returns:
+ A Python object.
+
+ Raises:
+ InsufficientDataException(UnpackException):
+ Insufficient data to unpack the encoded object.
+ InvalidStringException(UnpackException):
+ Invalid UTF-8 string encountered during unpacking.
+ ReservedCodeException(UnpackException):
+ Reserved code encountered during unpacking.
+ UnhashableKeyException(UnpackException):
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ DuplicateKeyException(UnpackException):
+ Duplicate key encountered during map unpacking.
+
+ Example:
+ >>> f = open("test.bin")
+ >>> umsgpack.unpackb(f)
+ {u'compact': True, u'schema': 0}
+ >>>
+ """
+ return _unpack(fp)
+
+def _unpack3(fp):
+ """
+ Deserialize MessagePack bytes into a Python object.
+
+ Args:
+ fp: a .read()-supporting file-like object
+
+ Returns:
+ A Python object.
+
+ Raises:
+ InsufficientDataException(UnpackException):
+ Insufficient data to unpack the encoded object.
+ InvalidStringException(UnpackException):
+ Invalid UTF-8 string encountered during unpacking.
+ ReservedCodeException(UnpackException):
+ Reserved code encountered during unpacking.
+ UnhashableKeyException(UnpackException):
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ DuplicateKeyException(UnpackException):
+ Duplicate key encountered during map unpacking.
+
+ Example:
+ >>> f = open("test.bin")
+ >>> umsgpack.unpackb(f)
+ {'compact': True, 'schema': 0}
+ >>>
+ """
+ return _unpack(fp)
+
+# For Python 2, expects a str object
+def _unpackb2(s):
+ """
+ Deserialize MessagePack bytes into a Python object.
+
+ Args:
+ s: a 'str' containing serialized MessagePack bytes
+
+ Returns:
+ A Python object.
+
+ Raises:
+ TypeError:
+ Packed data is not type 'str'.
+ InsufficientDataException(UnpackException):
+ Insufficient data to unpack the encoded object.
+ InvalidStringException(UnpackException):
+ Invalid UTF-8 string encountered during unpacking.
+ ReservedCodeException(UnpackException):
+ Reserved code encountered during unpacking.
+ UnhashableKeyException(UnpackException):
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ DuplicateKeyException(UnpackException):
+ Duplicate key encountered during map unpacking.
+
+ Example:
+ >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
+ {u'compact': True, u'schema': 0}
+ >>>
+ """
+ if not isinstance(s, str):
+ raise TypeError("packed data is not type 'str'")
+ return _unpack(io.BytesIO(s))
+
+# For Python 3, expects a bytes object
+def _unpackb3(s):
+ """
+ Deserialize MessagePack bytes into a Python object.
+
+ Args:
+ s: a 'bytes' containing serialized MessagePack bytes
+
+ Returns:
+ A Python object.
+
+ Raises:
+ TypeError:
+ Packed data is not type 'bytes'.
+ InsufficientDataException(UnpackException):
+ Insufficient data to unpack the encoded object.
+ InvalidStringException(UnpackException):
+ Invalid UTF-8 string encountered during unpacking.
+ ReservedCodeException(UnpackException):
+ Reserved code encountered during unpacking.
+ UnhashableKeyException(UnpackException):
+ Unhashable key encountered during map unpacking.
+ The serialized map cannot be deserialized into a Python dictionary.
+ DuplicateKeyException(UnpackException):
+ Duplicate key encountered during map unpacking.
+
+ Example:
+ >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
+ {'compact': True, 'schema': 0}
+ >>>
+ """
+ if not isinstance(s, bytes):
+ raise TypeError("packed data is not type 'bytes'")
+ return _unpack(io.BytesIO(s))
+
+################################################################################
+### Module Initialization
+################################################################################
+
+def __init():
+ global pack
+ global packb
+ global unpack
+ global unpackb
+ global dump
+ global dumps
+ global load
+ global loads
+ global compatibility
+ global _float_size
+ global _unpack_dispatch_table
+
+ # Compatibility mode for handling strings/bytes with the old specification
+ compatibility = False
+
+ # Auto-detect system float precision
+ if sys.float_info.mant_dig == 53:
+ _float_size = 64
+ else:
+ _float_size = 32
+
+ # Map packb and unpackb to the appropriate version
+ if sys.version_info[0] == 3:
+ pack = _pack3
+ packb = _packb3
+ dump = _pack3
+ dumps = _packb3
+ unpack = _unpack3
+ unpackb = _unpackb3
+ load = _unpack3
+ loads = _unpackb3
+ else:
+ pack = _pack2
+ packb = _packb2
+ dump = _pack2
+ dumps = _packb2
+ unpack = _unpack2
+ unpackb = _unpackb2
+ load = _unpack2
+ loads = _unpackb2
+
+ # Build a dispatch table for fast lookup of unpacking function
+
+ _unpack_dispatch_table = {}
+ # Fix uint
+ for code in range(0, 0x7f+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+ # Fix map
+ for code in range(0x80, 0x8f+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_map
+ # Fix array
+ for code in range(0x90, 0x9f+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_array
+ # Fix str
+ for code in range(0xa0, 0xbf+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
+ # Nil
+ _unpack_dispatch_table[b'\xc0'] = _unpack_nil
+ # Reserved
+ _unpack_dispatch_table[b'\xc1'] = _unpack_reserved
+ # Boolean
+ _unpack_dispatch_table[b'\xc2'] = _unpack_boolean
+ _unpack_dispatch_table[b'\xc3'] = _unpack_boolean
+ # Bin
+ for code in range(0xc4, 0xc6+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_binary
+ # Ext
+ for code in range(0xc7, 0xc9+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
+ # Float
+ _unpack_dispatch_table[b'\xca'] = _unpack_float
+ _unpack_dispatch_table[b'\xcb'] = _unpack_float
+ # Uint
+ for code in range(0xcc, 0xcf+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+ # Int
+ for code in range(0xd0, 0xd3+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+ # Fixext
+ for code in range(0xd4, 0xd8+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
+ # String
+ for code in range(0xd9, 0xdb+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
+ # Array
+ _unpack_dispatch_table[b'\xdc'] = _unpack_array
+ _unpack_dispatch_table[b'\xdd'] = _unpack_array
+ # Map
+ _unpack_dispatch_table[b'\xde'] = _unpack_map
+ _unpack_dispatch_table[b'\xdf'] = _unpack_map
+ # Negative fixint
+ for code in range(0xe0, 0xff+1):
+ _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+
+__init()