new sensor manage code
diff --git a/__init__.py b/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/__init__.py
diff --git a/sensors/__init__.py b/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sensors/__init__.py
diff --git a/sensors/api.py b/sensors/api.py
new file mode 100644
index 0000000..f78e6a9
--- /dev/null
+++ b/sensors/api.py
@@ -0,0 +1,47 @@
+import Queue
+import threading
+
+from contextlib import contextmanager
+
+from deploy_sensors import (deploy_and_start_sensors,
+ stop_and_remove_sensors)
+from protocol import create_protocol, Timeout
+
+
+Empty = Queue.Empty
+
+
+def recv_main(proto, data_q, cmd_q):
+ while True:
+ try:
+ data_q.put(proto.recv(0.1))
+ except Timeout:
+ pass
+ try:
+ val = cmd_q.get(False)
+
+ if val is None:
+ return
+
+ except Queue.Empty:
+ pass
+
+
+@contextmanager
+def start_monitoring(uri, config):
+ deploy_and_start_sensors(uri, config)
+ try:
+ data_q = Queue.Queue()
+ cmd_q = Queue.Queue()
+ proto = create_protocol(uri, receiver=True)
+ th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+ th.daemon = True
+ th.start()
+
+ try:
+ yield data_q
+ finally:
+ cmd_q.put(None)
+ th.join()
+ finally:
+ stop_and_remove_sensors(config)
diff --git a/sensors/config.yaml b/sensors/config.yaml
new file mode 100644
index 0000000..61f622b
--- /dev/null
+++ b/sensors/config.yaml
@@ -0,0 +1,23 @@
+127.0.0.1:
+ block-io:
+ allowed_prefixes: [sda1, rbd1]
+ net-io:
+ allowed_prefixes: [virbr2]
+
+# 192.168.152.39:
+# block-io:
+# allowed_prefixes: [sdb]
+# net-io:
+# allowed_prefixes: [eth0]
+
+# 192.168.152.40:
+# block-io:
+# allowed_prefixes: [sdb]
+# net-io:
+# allowed_prefixes: [eth0]
+
+# 192.168.152.41:
+# block-io:
+# allowed_prefixes: [sdb]
+# net-io:
+# allowed_prefixes: [eth0]
diff --git a/sensors/daemonize.py b/sensors/daemonize.py
new file mode 100644
index 0000000..1c3241b
--- /dev/null
+++ b/sensors/daemonize.py
@@ -0,0 +1,211 @@
+# #!/usr/bin/python
+
+import fcntl
+import os
+import pwd
+import grp
+import sys
+import signal
+import resource
+import logging
+import atexit
+from logging import handlers
+
+
+class Daemonize(object):
+ """ Daemonize object
+ Object constructor expects three arguments:
+ - app: contains the application name which will be sent to syslog.
+ - pid: path to the pidfile.
+ - action: your custom function which will be executed after daemonization.
+ - keep_fds: optional list of fds which should not be closed.
+ - auto_close_fds: optional parameter to not close opened fds.
+ - privileged_action: action that will be executed before drop privileges if user or
+ group parameter is provided.
+ If you want to transfer anything from privileged_action to action, such as
+ opened privileged file descriptor, you should return it from
+ privileged_action function and catch it inside action function.
+ - user: drop privileges to this user if provided.
+ - group: drop privileges to this group if provided.
+ - verbose: send debug messages to logger if provided.
+ - logger: use this logger object instead of creating new one, if provided.
+ """
+ def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True, privileged_action=None, user=None, group=None, verbose=False, logger=None):
+ self.app = app
+ self.pid = pid
+ self.action = action
+ self.keep_fds = keep_fds or []
+ self.privileged_action = privileged_action or (lambda: ())
+ self.user = user
+ self.group = group
+ self.logger = logger
+ self.verbose = verbose
+ self.auto_close_fds = auto_close_fds
+
+ def sigterm(self, signum, frame):
+ """ sigterm method
+ These actions will be done after SIGTERM.
+ """
+ self.logger.warn("Caught signal %s. Stopping daemon." % signum)
+ os.remove(self.pid)
+ sys.exit(0)
+
+ def exit(self):
+ """ exit method
+ Cleanup pid file at exit.
+ """
+ self.logger.warn("Stopping daemon.")
+ os.remove(self.pid)
+ sys.exit(0)
+
+ def start(self):
+ """ start method
+ Main daemonization process.
+ """
+ # If pidfile already exists, we should read pid from there; to overwrite it, if locking
+ # will fail, because locking attempt somehow purges the file contents.
+ if os.path.isfile(self.pid):
+ with open(self.pid, "r") as old_pidfile:
+ old_pid = old_pidfile.read()
+ # Create a lockfile so that only one instance of this daemon is running at any time.
+ try:
+ lockfile = open(self.pid, "w")
+ except IOError:
+ print("Unable to create the pidfile.")
+ sys.exit(1)
+ try:
+ # Try to get an exclusive lock on the file. This will fail if another process has the file
+ # locked.
+ fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ print("Unable to lock on the pidfile.")
+ # We need to overwrite the pidfile if we got here.
+ with open(self.pid, "w") as pidfile:
+ pidfile.write(old_pid)
+ sys.exit(1)
+
+ # Fork, creating a new process for the child.
+ process_id = os.fork()
+ if process_id < 0:
+ # Fork error. Exit badly.
+ sys.exit(1)
+ elif process_id != 0:
+ # This is the parent process. Exit.
+ sys.exit(0)
+ # This is the child process. Continue.
+
+ # Stop listening for signals that the parent process receives.
+ # This is done by getting a new process id.
+ # setpgrp() is an alternative to setsid().
+ # setsid puts the process in a new parent group and detaches its controlling terminal.
+ process_id = os.setsid()
+ if process_id == -1:
+ # Uh oh, there was a problem.
+ sys.exit(1)
+
+ # Add lockfile to self.keep_fds.
+ self.keep_fds.append(lockfile.fileno())
+
+ # Close all file descriptors, except the ones mentioned in self.keep_fds.
+ devnull = "/dev/null"
+ if hasattr(os, "devnull"):
+ # Python has set os.devnull on this system, use it instead as it might be different
+ # than /dev/null.
+ devnull = os.devnull
+
+ if self.auto_close_fds:
+ for fd in range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[0]):
+ if fd not in self.keep_fds:
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+
+ devnull_fd = os.open(devnull, os.O_RDWR)
+ os.dup2(devnull_fd, 0)
+ os.dup2(devnull_fd, 1)
+ os.dup2(devnull_fd, 2)
+
+ if self.logger is None:
+ # Initialize logging.
+ self.logger = logging.getLogger(self.app)
+ self.logger.setLevel(logging.DEBUG)
+ # Display log messages only on defined handlers.
+ self.logger.propagate = False
+
+ # Initialize syslog.
+ # It will correctly work on OS X, Linux and FreeBSD.
+ if sys.platform == "darwin":
+ syslog_address = "/var/run/syslog"
+ else:
+ syslog_address = "/dev/log"
+
+ # We will continue with syslog initialization only if actually have such capabilities
+ # on the machine we are running this.
+ if os.path.isfile(syslog_address):
+ syslog = handlers.SysLogHandler(syslog_address)
+ if self.verbose:
+ syslog.setLevel(logging.DEBUG)
+ else:
+ syslog.setLevel(logging.INFO)
+ # Try to mimic to normal syslog messages.
+ formatter = logging.Formatter("%(asctime)s %(name)s: %(message)s",
+ "%b %e %H:%M:%S")
+ syslog.setFormatter(formatter)
+
+ self.logger.addHandler(syslog)
+
+ # Set umask to default to safe file permissions when running as a root daemon. 027 is an
+ # octal number which we are typing as 0o27 for Python3 compatibility.
+ os.umask(0o27)
+
+ # Change to a known directory. If this isn't done, starting a daemon in a subdirectory that
+ # needs to be deleted results in "directory busy" errors.
+ os.chdir("/")
+
+ # Execute privileged action
+ privileged_action_result = self.privileged_action()
+ if not privileged_action_result:
+ privileged_action_result = []
+
+ # Change gid
+ if self.group:
+ try:
+ gid = grp.getgrnam(self.group).gr_gid
+ except KeyError:
+ self.logger.error("Group {0} not found".format(self.group))
+ sys.exit(1)
+ try:
+ os.setgid(gid)
+ except OSError:
+ self.logger.error("Unable to change gid.")
+ sys.exit(1)
+
+ # Change uid
+ if self.user:
+ try:
+ uid = pwd.getpwnam(self.user).pw_uid
+ except KeyError:
+ self.logger.error("User {0} not found.".format(self.user))
+ sys.exit(1)
+ try:
+ os.setuid(uid)
+ except OSError:
+ self.logger.error("Unable to change uid.")
+ sys.exit(1)
+
+ try:
+ lockfile.write("%s" % (os.getpid()))
+ lockfile.flush()
+ except IOError:
+ self.logger.error("Unable to write pid to the pidfile.")
+ print("Unable to write pid to the pidfile.")
+ sys.exit(1)
+
+ # Set custom action on SIGTERM.
+ signal.signal(signal.SIGTERM, self.sigterm)
+ atexit.register(self.exit)
+
+ self.logger.warn("Starting daemon.")
+
+ self.action(*privileged_action_result)
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
new file mode 100644
index 0000000..b7e29b3
--- /dev/null
+++ b/sensors/deploy_sensors.py
@@ -0,0 +1,76 @@
+import time
+import json
+import os.path
+
+from disk_perf_test_tool.ssh_copy_directory import copy_paths
+from disk_perf_test_tool.ssh_runner import connect
+
+from concurrent.futures import ThreadPoolExecutor, wait
+
+
+def wait_all_ok(futures):
+ return all(future.result() for future in futures)
+
+
+def deploy_and_start_sensors(monitor_uri, config, remote_path='/tmp/sensors'):
+ paths = {os.path.dirname(__file__): remote_path}
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ futures = []
+
+ for uri, config in config.items():
+ futures.append(executor.submit(deploy_and_start_sensor,
+ paths, uri, monitor_uri,
+ config, remote_path))
+
+ if not wait_all_ok(futures):
+ raise RuntimeError("Sensor deployment fails on some nodes")
+
+
+def deploy_and_start_sensor(paths, uri, monitor_uri, config, remote_path):
+ try:
+ conn = connect(uri)
+ copy_paths(conn, paths)
+ sftp = conn.open_sftp()
+
+ config_remote_path = os.path.join(remote_path, "conf.json")
+ main_remote_path = os.path.join(remote_path, "main.py")
+
+ with sftp.open(config_remote_path, "w") as fd:
+ fd.write(json.dumps(config))
+
+ cmd_templ = "python {0} -d start -u {1} {2}"
+ cmd = cmd_templ.format(main_remote_path,
+ monitor_uri,
+ config_remote_path)
+ conn.exec_command(cmd)
+ sftp.close()
+ conn.close()
+ except Exception as exc:
+ print exc
+ return False
+ return True
+
+
+def stop_and_remove_sensor(uri, remote_path):
+ conn = connect(uri)
+ main_remote_path = os.path.join(remote_path, "main.py")
+
+ cmd_templ = "python {0} -d stop"
+ conn.exec_command(cmd_templ.format(main_remote_path))
+ time.sleep(0.3)
+ # print out.read(), err.read()
+
+ conn.exec_command("rm -rf {0}".format(remote_path))
+
+ conn.close()
+
+
+def stop_and_remove_sensors(config, remote_path='/tmp/sensors'):
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ futures = []
+
+ for uri, config in config.items():
+ futures.append(executor.submit(stop_and_remove_sensor,
+ uri, remote_path))
+
+ wait(futures)
diff --git a/sensors/discover.py b/sensors/discover.py
new file mode 100644
index 0000000..f227043
--- /dev/null
+++ b/sensors/discover.py
@@ -0,0 +1,9 @@
+all_sensors = {}
+
+
+def provides(sensor_class_name):
+ def closure(func):
+ assert sensor_class_name not in all_sensors
+ all_sensors[sensor_class_name] = func
+ return func
+ return closure
diff --git a/sensors/grafana_template.js b/sensors/grafana_template.js
new file mode 100644
index 0000000..08f8f99
--- /dev/null
+++ b/sensors/grafana_template.js
@@ -0,0 +1,80 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {
+ rows : [],
+};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+ from: "now-5m",
+ to: "now"
+};
+
+var rows = 1;
+var seriesName = 'argName';
+
+if(!_.isUndefined(ARGS.rows)) {
+ rows = parseInt(ARGS.rows, 10);
+}
+
+if(!_.isUndefined(ARGS.name)) {
+ seriesName = ARGS.name;
+}
+
+for (var i = 0; i < rows; i++) {
+
+ dashboard.rows.push({
+ title: 'Chart',
+ height: '300px',
+ panels: [
+ {
+ title: 'Events',
+ type: 'graph',
+ span: 12,
+ fill: 1,
+ linewidth: 2,
+ targets: [
+ {"target": "disk io",
+ "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc",
+ "interval": "",
+ "alias": "host io sw",
+ "rawQuery": true}
+ ],
+ tooltip: {
+ shared: true
+ }
+ }
+ ]
+ });
+}
+
+
+return dashboard;
diff --git a/sensors/host1_config.json b/sensors/host1_config.json
new file mode 100644
index 0000000..7e81125
--- /dev/null
+++ b/sensors/host1_config.json
@@ -0,0 +1,8 @@
+{
+ "block-io": {
+ "allowed_prefixes": ["sdb1"]
+ },
+ "net-io": {
+ "allowed_prefixes": ["eth0"]
+ }
+}
\ No newline at end of file
diff --git a/sensors/influx_exporter.py b/sensors/influx_exporter.py
new file mode 100644
index 0000000..34b3c0a
--- /dev/null
+++ b/sensors/influx_exporter.py
@@ -0,0 +1,31 @@
+from urlparse import urlparse
+from influxdb import InfluxDBClient
+
+
+def connect(url):
+ parsed_url = urlparse(url)
+ user_passwd, host_port = parsed_url.netloc.rsplit("@", 1)
+ user, passwd = user_passwd.split(":", 1)
+ host, port = host_port.split(":")
+ return InfluxDBClient(host, int(port), user, passwd, parsed_url.path[1:])
+
+
+def add_data(conn, hostname, data):
+ per_sensor_data = {}
+ for serie in data:
+ serie = serie.copy()
+ gtime = serie.pop('time')
+ for key, val in serie.items():
+ dev, sensor = key.split('.')
+ data = per_sensor_data.setdefault(sensor, [])
+ data.append([gtime, hostname, dev, val])
+
+ infl_data = []
+ columns = ['time', 'host', 'device', 'value']
+ for sensor_name, points in per_sensor_data.items():
+ infl_data.append(
+ {'columns': columns,
+ 'name': sensor_name,
+ 'points': points})
+
+ conn.write_points(infl_data)
diff --git a/sensors/io_sensors.py b/sensors/io_sensors.py
new file mode 100644
index 0000000..2fdd24e
--- /dev/null
+++ b/sensors/io_sensors.py
@@ -0,0 +1,46 @@
+from discover import provides
+from utils import SensorInfo, is_dev_accepted
+
+# 1 - major number
+# 2 - minor mumber
+# 3 - device name
+# 4 - reads completed successfully
+# 5 - reads merged
+# 6 - sectors read
+# 7 - time spent reading (ms)
+# 8 - writes completed
+# 9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+io_values_pos = [
+ (3, 'reads_completed', True),
+ (5, 'sectors_read', True),
+ (6, 'rtime', True),
+ (7, 'writes_completed', True),
+ (9, 'sectors_written', True),
+ (10, 'wtime', True),
+ (11, 'io_queue', False),
+ (13, 'io_time', True)
+]
+
+
+@provides("block-io")
+def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
+ results = {}
+ for line in open('/proc/diskstats'):
+ vals = line.split()
+ dev_name = vals[2]
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+
+ if dev_ok:
+ for pos, name, accum_val in io_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
diff --git a/sensors/main.py b/sensors/main.py
new file mode 100644
index 0000000..07a1dce
--- /dev/null
+++ b/sensors/main.py
@@ -0,0 +1,103 @@
+import os
+import sys
+import time
+import json
+import signal
+import os.path
+import argparse
+
+import io_sensors
+import net_sensors
+
+from utils import SensorInfo
+from daemonize import Daemonize
+from discover import all_sensors
+from protocol import create_protocol
+
+
+def get_values(required_sensors):
+ result = {}
+ for sensor_name, params in required_sensors:
+ if sensor_name in all_sensors:
+ result.update(all_sensors[sensor_name](**params))
+ else:
+ msg = "Sensor {0!r} isn't available".format(sensor_name)
+ raise ValueError(msg)
+ return time.time(), result
+
+
+def parse_args(args):
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-d', '--daemon',
+ choices=('start', 'stop', 'status'),
+ default=None)
+
+ parser.add_argument('-u', '--url', default='stdout://')
+ parser.add_argument('-t', '--timeout', type=float, default=1)
+ parser.add_argument('sensors_config', type=argparse.FileType('r'),
+ default=None, nargs='?')
+ return parser.parse_args(args[1:])
+
+
+def daemon_main(required_sensors, opts):
+ sender = create_protocol(opts.url)
+ prev = {}
+
+ while True:
+ gtime, data = get_values(required_sensors.items())
+ curr = {'time': SensorInfo(gtime, True)}
+ for name, val in data.items():
+ if val.is_accumulated:
+ if name in prev:
+ curr[name] = SensorInfo(val.value - prev[name], True)
+ prev[name] = val.value
+ else:
+ curr[name] = SensorInfo(val.value, False)
+ sender.send(curr)
+ time.sleep(opts.timeout)
+
+
+def main(argv):
+ opts = parse_args(argv)
+
+ if opts.daemon is not None:
+ pid_file = "/tmp/sensors.pid"
+ if opts.daemon == 'start':
+ required_sensors = json.loads(opts.sensors_config.read())
+
+ def root_func():
+ daemon_main(required_sensors, opts)
+
+ daemon = Daemonize(app="perfcollect_app",
+ pid=pid_file,
+ action=root_func)
+ daemon.start()
+ elif opts.daemon == 'stop':
+ if os.path.isfile(pid_file):
+ pid = int(open(pid_file).read())
+ if os.path.exists("/proc/" + str(pid)):
+ os.kill(pid, signal.SIGTERM)
+
+ time.sleep(0.1)
+
+ if os.path.exists("/proc/" + str(pid)):
+ os.kill(pid, signal.SIGKILL)
+
+ if os.path.isfile(pid_file):
+ os.unlink(pid_file)
+ elif opts.daemon == 'status':
+ if os.path.isfile(pid_file):
+ pid = int(open(pid_file).read())
+ if os.path.exists("/proc/" + str(pid)):
+ print "running"
+ return
+ print "stopped"
+ else:
+ raise ValueError("Unknown daemon operation {}".format(opts.daemon))
+ else:
+ required_sensors = json.loads(opts.sensors_config.read())
+ daemon_main(required_sensors, opts)
+ return 0
+
+if __name__ == "__main__":
+ exit(main(sys.argv))
diff --git a/sensors/net_sensors.py b/sensors/net_sensors.py
new file mode 100644
index 0000000..3a2d926
--- /dev/null
+++ b/sensors/net_sensors.py
@@ -0,0 +1,43 @@
+from discover import provides
+from utils import SensorInfo, is_dev_accepted
+
+# 1 - major number
+# 2 - minor mumber
+# 3 - device name
+# 4 - reads completed successfully
+# 5 - reads merged
+# 6 - sectors read
+# 7 - time spent reading (ms)
+# 8 - writes completed
+# 9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+net_values_pos = [
+ (0, 'recv_bytes', True),
+ (1, 'recv_packets', True),
+ (8, 'send_bytes', True),
+ (9, 'send_packets', True),
+]
+
+
+@provides("net-io")
+def net_stat(disallowed_prefixes=('docker',), allowed_prefixes=None):
+ results = {}
+
+ for line in open('/proc/net/dev').readlines()[2:]:
+ dev_name, stats = line.split(":", 1)
+ dev_name = dev_name.strip()
+ vals = stats.split()
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+ if dev_ok:
+ for pos, name, accum_val in net_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
diff --git a/sensors/protocol.py b/sensors/protocol.py
new file mode 100644
index 0000000..fa88927
--- /dev/null
+++ b/sensors/protocol.py
@@ -0,0 +1,120 @@
+import socket
+import select
+import cPickle as pickle
+from urlparse import urlparse
+
+
+class Timeout(Exception):
+ pass
+
+
+# ------------------------------------- Serializers --------------------------
+
+
+class ISensortResultsSerializer(object):
+ def pack(self, data):
+ pass
+
+ def unpack(self, data):
+ pass
+
+
+class PickleSerializer(ISensortResultsSerializer):
+ def pack(self, data):
+ ndata = {key: val.value for key, val in data.items()}
+ return pickle.dumps(ndata)
+
+ def unpack(self, data):
+ return pickle.loads(data)
+
+
+# ------------------------------------- Transports ---------------------------
+
+class ITransport(object):
+ def __init__(self, receiver):
+ pass
+
+ def send(self, data):
+ pass
+
+ def recv(self, timeout=None):
+ pass
+
+
+class StdoutTransport(ITransport):
+ MIN_COL_WIDTH = 10
+
+ def __init__(self, receiver, delta=True):
+ if receiver:
+ raise ValueError("StdoutTransport don't allows receiving")
+
+ self.headers = None
+ self.line_format = ""
+ self.prev = {}
+ self.delta = delta
+
+ def send(self, data):
+ if self.headers is None:
+ self.headers = sorted(data)
+
+ for pos, header in enumerate(self.headers):
+ self.line_format += "{%s:>%s}" % (pos,
+ max(len(header) + 1,
+ self.MIN_COL_WIDTH))
+
+ print self.line_format.format(*self.headers)
+
+ if self.delta:
+ vals = [data[header].value - self.prev.get(header, 0)
+ for header in self.headers]
+
+ self.prev.update({header: data[header].value
+ for header in self.headers})
+ else:
+ vals = [data[header].value for header in self.headers]
+
+ print self.line_format.format(*vals)
+
+ def recv(self, timeout=None):
+ raise ValueError("StdoutTransport don't allows receiving")
+
+
+class UDPTransport(ITransport):
+ def __init__(self, receiver, ip, port, packer_cls):
+ self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ if receiver:
+ self.port.bind((ip, port))
+ self.packer_cls = packer_cls
+ self.packers = {}
+ else:
+ self.packer = packer_cls()
+ self.dst = (ip, port)
+
+ def send(self, data):
+ raw_data = self.packer.pack(data)
+ self.port.sendto(raw_data, self.dst)
+
+ def recv(self, timeout=None):
+ r, _, _ = select.select([self.port], [], [], timeout)
+ if len(r) != 0:
+ raw_data, addr = self.port.recvfrom(10000)
+ packer = self.packers.setdefault(addr, self.packer_cls())
+ return addr, packer.unpack(raw_data)
+ else:
+ raise Timeout()
+
+
+# -------------------------- Factory function --------------------------------
+
+
+def create_protocol(uri, receiver=False):
+ parsed_uri = urlparse(uri)
+ if parsed_uri.scheme == 'stdout':
+ return StdoutTransport(receiver)
+ elif parsed_uri.scheme == 'udp':
+ ip, port = parsed_uri.netloc.split(":")
+ return UDPTransport(receiver, ip=ip, port=int(port),
+ packer_cls=PickleSerializer)
+ else:
+ templ = "Can't instantiate transport from {0!r}"
+ raise ValueError(templ.format(uri))
diff --git a/sensors/receiver.py b/sensors/receiver.py
new file mode 100644
index 0000000..369a65f
--- /dev/null
+++ b/sensors/receiver.py
@@ -0,0 +1,43 @@
+import yaml
+
+from api import start_monitoring, Empty
+from influx_exporter import connect, add_data
+
+monitor_config = yaml.load(open("config.yaml").read())
+
+uri = "udp://192.168.0.104:12001"
+infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
+conn = connect(infldb_url)
+
+sw_per_ip = {}
+count = 4
+expected = ['192.168.0.104', '192.168.152.41',
+ '192.168.152.39', '192.168.152.40']
+
+with start_monitoring(uri, monitor_config) as queue:
+ while True:
+ try:
+ (ip, port), data = queue.get(True, 1)
+
+ if 'sda1.sectors_written' in data:
+ val = data['sda1.sectors_written']
+ elif 'sdb.sectors_written' in data:
+ val = data['sdb.sectors_written']
+ else:
+ val = 0
+
+ sw_per_ip[ip] = sw_per_ip.get(ip, 0) + val
+ count -= 1
+
+ if 0 == count:
+ try:
+ vals = [sw_per_ip[ip] for ip in expected]
+ print ("{:>6}" * 4).format(*vals)
+ sw_per_ip = {}
+ count = 4
+ except KeyError:
+ pass
+
+ add_data(conn, ip, [data])
+ except Empty:
+ pass
diff --git a/sensors/utils.py b/sensors/utils.py
new file mode 100644
index 0000000..5af0a2a
--- /dev/null
+++ b/sensors/utils.py
@@ -0,0 +1,44 @@
+from collections import namedtuple
+
+SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+
+
+def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
+ dev_ok = True
+
+ if disallowed_prefixes is not None:
+ dev_ok = all(not name.startswith(prefix)
+ for prefix in disallowed_prefixes)
+
+ if dev_ok and allowed_prefixes is not None:
+ dev_ok = any(name.startswith(prefix)
+ for prefix in allowed_prefixes)
+
+ return dev_ok
+
+
+def delta(func, only_upd=True):
+ prev = {}
+ while True:
+ for dev_name, vals in func():
+ if dev_name not in prev:
+ prev[dev_name] = {}
+ for name, (val, _) in vals.items():
+ prev[dev_name][name] = val
+ else:
+ dev_prev = prev[dev_name]
+ res = {}
+ for stat_name, (val, accum_val) in vals.items():
+ if accum_val:
+ if stat_name in dev_prev:
+ delta = int(val) - int(dev_prev[stat_name])
+ if not only_upd or 0 != delta:
+ res[stat_name] = str(delta)
+ dev_prev[stat_name] = val
+ elif not only_upd or '0' != val:
+ res[stat_name] = val
+
+ if only_upd and len(res) == 0:
+ continue
+ yield dev_name, res
+ yield None, None
diff --git a/ssh_runner.py b/ssh_runner.py
index 5cb26de..28996ab 100644
--- a/ssh_runner.py
+++ b/ssh_runner.py
@@ -55,8 +55,8 @@
def parse_ssh_uri(uri):
- # user:passwd@ip_host:port
- # user:passwd@ip_host
+ # user:passwd@@ip_host:port
+ # user:passwd@@ip_host
# user@ip_host:port
# user@ip_host
# ip_host:port
@@ -68,6 +68,8 @@
res = ConnCreds()
res.port = "22"
+ res.key_file = None
+ res.passwd = None
for rr in uri_reg_exprs:
rrm = re.match(rr, uri)
diff --git a/storage_api.py b/storage_api.py
index 28ec060..1dbf30c 100644
--- a/storage_api.py
+++ b/storage_api.py
@@ -6,6 +6,7 @@
from flask import url_for
import os
+
class Measurement(object):
def __init__(self):
self.build = ""
@@ -27,7 +28,7 @@
s = stdev(item[1])
build[item[0]] = [m, s]
-
+
def mean(l):
n = len(l)
diff --git a/utils.py b/utils.py
index 93c82e0..ca65409 100644
--- a/utils.py
+++ b/utils.py
@@ -1,5 +1,7 @@
import time
import socket
+import os.path
+import getpass
import logging
import threading
import contextlib
@@ -39,9 +41,14 @@
ssh.known_hosts = None
for i in range(retry_count):
try:
+ if creds.user is None:
+ user = getpass.getuser()
+ else:
+ user = creds.user
+
if creds.passwd is not None:
ssh.connect(creds.host,
- username=creds.user,
+ username=user,
password=creds.passwd,
port=creds.port,
allow_agent=False,
@@ -50,12 +57,20 @@
if creds.key_file is not None:
ssh.connect(creds.host,
- username=creds.user,
+ username=user,
key_filename=creds.key_file,
look_for_keys=False,
port=creds.port)
return ssh
- raise ValueError("Wrong credentials {0}".format(creds.__dict__))
+
+ key_file = os.path.expanduser('~/.ssh/id_rsa')
+ ssh.connect(creds.host,
+ username=user,
+ key_filename=key_file,
+ look_for_keys=False,
+ port=creds.port)
+ return ssh
+ # raise ValueError("Wrong credentials {0}".format(creds.__dict__))
except paramiko.PasswordRequiredException:
raise
except socket.error: