2.0 refactoring:
* Add type for most of functions
* Remove old fio run code, move to RPC/pluggable
* Remove most of sensors code, will move then to RPC
* Other refactoring
diff --git a/wally/config.py b/wally/config.py
index 332dc5e..c5d1db0 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,6 +1,5 @@
import os
import uuid
-import logging
import functools
import yaml
@@ -11,19 +10,26 @@
def pet_generate(x, y):
return str(uuid.uuid4())
-import pretty_yaml
+from . import pretty_yaml
-class NoData(object):
+class NoData:
@classmethod
def get(cls, name, x):
return cls
-class Config(object):
+class Config:
def __init__(self, val=None):
if val is not None:
self.update(val)
+ self.results_dir = None
+ self.run_uuid = None
+ self.settings = {}
+ self.run_params_file = None
+ self.default_test_local_folder = None
+ self.hwinfo_directory = None
+ self.hwreport_fname = None
def get(self, name, defval=None):
obj = self.__dict__
@@ -65,7 +71,6 @@
file_name = os.path.abspath(file_name)
defaults = dict(
- sensors_remote_path='/tmp/sensors',
testnode_log_root='/tmp/wally',
settings={}
)
@@ -85,7 +90,6 @@
cfg.update(raw_cfg)
results_storage = cfg.settings.get('results_storage', '/tmp')
- print results_storage
results_storage = os.path.abspath(results_storage)
existing = file_name.startswith(results_storage)
@@ -135,84 +139,3 @@
return dict(run_uuid=dt['run_uuid'],
comment=dt.get('comment'))
-
-
-def color_me(color):
- RESET_SEQ = "\033[0m"
- COLOR_SEQ = "\033[1;%dm"
-
- color_seq = COLOR_SEQ % (30 + color)
-
- def closure(msg):
- return color_seq + msg + RESET_SEQ
- return closure
-
-
-class ColoredFormatter(logging.Formatter):
- BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
-
- colors = {
- 'WARNING': color_me(YELLOW),
- 'DEBUG': color_me(BLUE),
- 'CRITICAL': color_me(YELLOW),
- 'ERROR': color_me(RED)
- }
-
- def __init__(self, msg, use_color=True, datefmt=None):
- logging.Formatter.__init__(self, msg, datefmt=datefmt)
- self.use_color = use_color
-
- def format(self, record):
- orig = record.__dict__
- record.__dict__ = record.__dict__.copy()
- levelname = record.levelname
-
- prn_name = levelname + ' ' * (8 - len(levelname))
- if levelname in self.colors:
- record.levelname = self.colors[levelname](prn_name)
- else:
- record.levelname = prn_name
-
- # super doesn't work here in 2.6 O_o
- res = logging.Formatter.format(self, record)
-
- # res = super(ColoredFormatter, self).format(record)
-
- # restore record, as it will be used by other formatters
- record.__dict__ = orig
- return res
-
-
-def setup_loggers(def_level=logging.DEBUG, log_fname=None):
- logger = logging.getLogger('wally')
- logger.setLevel(logging.DEBUG)
- sh = logging.StreamHandler()
- sh.setLevel(def_level)
-
- log_format = '%(asctime)s - %(levelname)s - %(name)-15s - %(message)s'
- colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
-
- sh.setFormatter(colored_formatter)
- logger.addHandler(sh)
-
- logger_api = logging.getLogger("wally.fuel_api")
-
- if log_fname is not None:
- fh = logging.FileHandler(log_fname)
- log_format = '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
- formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
- fh.setFormatter(formatter)
- fh.setLevel(logging.DEBUG)
- logger.addHandler(fh)
- logger_api.addHandler(fh)
- else:
- fh = None
-
- logger_api.addHandler(sh)
- logger_api.setLevel(logging.WARNING)
-
- logger = logging.getLogger('paramiko')
- logger.setLevel(logging.WARNING)
- # logger.addHandler(sh)
- if fh is not None:
- logger.addHandler(fh)
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
index 70a6edf..ae06cbc 100644
--- a/wally/discover/ceph.py
+++ b/wally/discover/ceph.py
@@ -1,39 +1,22 @@
""" Collect data about ceph nodes"""
import json
import logging
-import subprocess
+from typing import Iterable
-from .node import Node
-from wally.ssh_utils import connect
+from ..node import NodeInfo, Node
-logger = logging.getLogger("io-perf-tool")
+logger = logging.getLogger("wally.discover")
-def local_execute(cmd):
- return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
-
-
-def ssh_execute(ssh):
- def closure(cmd):
- _, chan, _ = ssh.exec_command(cmd)
- return chan.read()
- return closure
-
-
-def discover_ceph_nodes(ip):
- """ Return list of ceph's nodes ips """
+def discover_ceph_nodes(node: Node) -> Iterable[NodeInfo]:
+ """Return list of ceph's nodes NodeInfo"""
ips = {}
- if ip != 'local':
- executor = ssh_execute(connect(ip))
- else:
- executor = local_execute
-
- osd_ips = get_osds_ips(executor, get_osds_list(executor))
- mon_ips = get_mons_or_mds_ips(executor, "mon")
- mds_ips = get_mons_or_mds_ips(executor, "mds")
+ osd_ips = get_osds_ips(node, get_osds_list(node))
+ mon_ips = get_mons_or_mds_ips(node, "mon")
+ mds_ips = get_mons_or_mds_ips(node, "mds")
for ip in osd_ips:
url = "ssh://%s" % ip
@@ -47,43 +30,35 @@
url = "ssh://%s" % ip
ips.setdefault(url, []).append("ceph-mds")
- return [Node(url, list(roles)) for url, roles in ips.items()]
+ return [NodeInfo(url, set(roles)) for url, roles in ips.items()]
-def get_osds_list(executor):
- """ Get list of osds id"""
- return filter(None, executor("ceph osd ls").split("\n"))
+def get_osds_list(node: Node) -> Iterable[str]:
+ """Get list of osd's id"""
+ return filter(None, node.run("ceph osd ls").split("\n"))
-def get_mons_or_mds_ips(executor, who):
- """ Return mon ip list
- :param who - "mon" or "mds" """
- if who not in ("mon", "mds"):
- raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
- "of mon/mds") % who)
+def get_mons_or_mds_ips(node: Node, who: str) -> Iterable[str]:
+ """Return mon ip list. who - mon/mds"""
+ assert who in ("mon", "mds"), \
+ "{!r} in get_mons_or_mds_ips instead of mon/mds".format(who)
- line_res = executor("ceph {0} dump".format(who)).split("\n")
+ line_res = node.run("ceph {0} dump".format(who)).split("\n")
ips = set()
for line in line_res:
fields = line.split()
-
- # what does fields[1], fields[2] means?
- # make this code looks like:
- # SOME_MENINGFULL_VAR1, SOME_MENINGFULL_VAR2 = line.split()[1:3]
-
if len(fields) > 2 and who in fields[2]:
ips.add(fields[1].split(":")[0])
return ips
-def get_osds_ips(executor, osd_list):
- """ Get osd's ips
- :param osd_list - list of osd names from osd ls command"""
+def get_osds_ips(node: Node, osd_list: Iterable[str]) -> Iterable[str]:
+ """Get osd's ips. osd_list - list of osd names from osd ls command"""
ips = set()
for osd_id in osd_list:
- out = executor("ceph osd find {0}".format(osd_id))
+ out = node.run("ceph osd find {0}".format(osd_id))
ip = json.loads(out)["ip"]
ips.add(str(ip.split(":")[0]))
return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index 5b9d3fe..233650e 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -1,10 +1,14 @@
import os.path
import logging
+from paramiko import AuthenticationException
+
from . import ceph
from . import fuel
from . import openstack
-from wally.utils import parse_creds
+from ..utils import parse_creds, StopTestError
+from ..test_run_class import TestRun
+from ..node import Node, NodeInfo
logger = logging.getLogger("wally.discover")
@@ -28,12 +32,12 @@
"""
-def discover(ctx, discover, clusters_info, var_dir, discover_nodes=True):
+def discover(testrun: TestRun, discover_cfg, clusters_info, var_dir, discover_nodes=True):
+ """Discover nodes in clusters"""
nodes_to_run = []
clean_data = None
- ctx.fuel_openstack_creds = None
- for cluster in discover:
+ for cluster in discover_cfg:
if cluster == "openstack" and not discover_nodes:
logger.warning("Skip openstack cluster discovery")
elif cluster == "openstack" and discover_nodes:
@@ -64,19 +68,30 @@
if cluster == "fuel_openrc_only":
discover_nodes = False
- res = fuel.discover_fuel_nodes(clusters_info['fuel'],
- var_dir,
+ ssh_creds = clusters_info['fuel']['ssh_creds']
+ fuel_node = Node(NodeInfo(ssh_creds, {'fuel_master'}))
+
+ try:
+ fuel_node.connect_ssh()
+ except AuthenticationException:
+ raise StopTestError("Wrong fuel credentials")
+ except Exception:
+ logger.exception("While connection to FUEL")
+ raise StopTestError("Failed to connect to FUEL")
+
+ fuel_node.connect_rpc()
+
+ res = fuel.discover_fuel_nodes(fuel_node,
+ clusters_info['fuel'],
discover_nodes)
nodes, clean_data, openrc_dict, version = res
- if openrc_dict is None:
- ctx.fuel_openstack_creds = None
- else:
+ if openrc_dict:
if version >= [8, 0] and openrc_dict['os_auth_url'].startswith("https://"):
logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
openrc_dict['os_auth_url'] = "http" + openrc_dict['os_auth_url'][5:]
- ctx.fuel_openstack_creds = {
+ testrun.fuel_openstack_creds = {
'name': openrc_dict['username'],
'passwd': openrc_dict['password'],
'tenant': openrc_dict['tenant_name'],
@@ -91,11 +106,11 @@
fuel_openrc_fname = os.path.join(var_dir,
env_f_name + "_openrc")
- if ctx.fuel_openstack_creds is not None:
+ if testrun.fuel_openstack_creds is not None:
with open(fuel_openrc_fname, "w") as fd:
- fd.write(openrc_templ.format(**ctx.fuel_openstack_creds))
- msg = "Openrc for cluster {0} saves into {1}"
- logger.info(msg.format(env_name, fuel_openrc_fname))
+ fd.write(openrc_templ.format(**testrun.fuel_openstack_creds))
+ msg = "Openrc for cluster {0} saves into {1}"
+ logger.info(msg.format(env_name, fuel_openrc_fname))
nodes_to_run.extend(nodes)
elif cluster == "ceph":
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 4f34298..1443a9f 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -1,44 +1,41 @@
-import re
import socket
import logging
-from urlparse import urlparse
-
-import sshtunnel
-from paramiko import AuthenticationException
+from typing import Dict, Any, Tuple, List
+from urllib.parse import urlparse
-from wally.fuel_rest_api import (KeystoneAuth, get_cluster_id,
- reflect_cluster, FuelInfo)
-from wally.utils import (parse_creds, check_input_param, StopTestError,
- clean_resource, get_ip_for_target)
-from wally.ssh_utils import (run_over_ssh, connect, set_key_for_node,
- read_from_remote)
-
-from .node import Node
+from .. import fuel_rest_api
+from ..utils import parse_creds, check_input_param
+from ..node import NodeInfo, Node, FuelNodeInfo
logger = logging.getLogger("wally.discover")
-BASE_PF_PORT = 44006
-def discover_fuel_nodes(fuel_data, var_dir, discover_nodes=True):
+def discover_fuel_nodes(fuel_master_node: Node,
+ fuel_data: Dict[str, Any],
+ discover_nodes: bool=True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
+ """Discover nodes in fuel cluster, get openrc for selected cluster"""
+
+ # parse FUEL REST credentials
username, tenant_name, password = parse_creds(fuel_data['creds'])
creds = {"username": username,
"tenant_name": tenant_name,
"password": password}
- conn = KeystoneAuth(fuel_data['url'], creds, headers=None)
-
+ # connect to FUEL
+ conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
msg = "openstack_env should be provided in fuel config"
check_input_param('openstack_env' in fuel_data, msg)
- cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
- cluster = reflect_cluster(conn, cluster_id)
- version = FuelInfo(conn).get_version()
+ # get cluster information from REST API
+ cluster_id = fuel_rest_api.get_cluster_id(conn, fuel_data['openstack_env'])
+ cluster = fuel_rest_api.reflect_cluster(conn, cluster_id)
+ version = fuel_rest_api.FuelInfo(conn).get_version()
if not discover_nodes:
logger.warning("Skip fuel cluster discovery")
- return ([], None, cluster.get_openrc(), version)
+ return [], FuelNodeInfo(version, None, cluster.get_openrc())
fuel_nodes = list(cluster.get_nodes())
@@ -46,85 +43,27 @@
network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
- ssh_creds = fuel_data['ssh_creds']
-
fuel_host = urlparse(fuel_data['url']).hostname
fuel_ip = socket.gethostbyname(fuel_host)
+ fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
- try:
- ssh_conn = connect("{0}@{1}".format(ssh_creds, fuel_host))
- except AuthenticationException:
- raise StopTestError("Wrong fuel credentials")
- except Exception:
- logger.exception("While connection to FUEL")
- raise StopTestError("Failed to connect to FUEL")
-
- fuel_ext_iface = get_external_interface(ssh_conn, fuel_ip)
-
+ # get FUEL master key to connect to cluster nodes via ssh
logger.debug("Downloading fuel master key")
- fuel_key = download_master_key(ssh_conn)
+ fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
+
+ # forward ports of cluster nodes to FUEL master
+ logger.info("Forwarding ssh ports from FUEL nodes to localhost")
+ ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
+ port_fw = [fuel_master_node.forward_port(ip, 22) for ip in ips]
+ listen_ip = fuel_master_node.get_ip()
nodes = []
- ips_ports = []
-
- logger.info("Forwarding ssh ports from FUEL nodes to localhost")
- fuel_usr, fuel_passwd = ssh_creds.split(":", 1)
- ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
- port_fw = forward_ssh_ports(fuel_host, fuel_usr, fuel_passwd, ips)
- listen_ip = get_ip_for_target(fuel_host)
-
for port, fuel_node, ip in zip(port_fw, fuel_nodes, ips):
- logger.debug(
- "SSH port forwarding {0} => localhost:{1}".format(ip, port))
+ logger.debug("SSH port forwarding {} => {}:{}".format(ip, listen_ip, port))
+ conn_url = "ssh://root@{}:{}".format(listen_ip, port)
+ nodes.append(NodeInfo(conn_url, fuel_node['roles'], listen_ip, fuel_key))
- conn_url = "ssh://root@127.0.0.1:{0}".format(port)
- set_key_for_node(('127.0.0.1', port), fuel_key)
+ logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
- node = Node(conn_url, fuel_node['roles'])
- node.monitor_ip = listen_ip
- nodes.append(node)
- ips_ports.append((ip, port))
+ return nodes, FuelNodeInfo(version, fuel_ext_iface, cluster.get_openrc())
- logger.debug("Found %s fuel nodes for env %r" %
- (len(nodes), fuel_data['openstack_env']))
-
- return (nodes,
- (ssh_conn, fuel_ext_iface, ips_ports),
- cluster.get_openrc(),
- version)
-
-
-def download_master_key(conn):
- # download master key
- with conn.open_sftp() as sftp:
- return read_from_remote(sftp, '/root/.ssh/id_rsa')
-
-
-def get_external_interface(conn, ip):
- data = run_over_ssh(conn, "ip a", node='fuel-master', nolog=True)
- curr_iface = None
- for line in data.split("\n"):
-
- match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
- if match1 is not None:
- curr_iface = match1.group('name')
-
- match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
- if match2 is not None:
- if match2.group('ip') == ip:
- assert curr_iface is not None
- return curr_iface
- raise KeyError("Can't found interface for ip {0}".format(ip))
-
-
-def forward_ssh_ports(proxy_ip, proxy_user, proxy_passwd, ips):
- for ip in ips:
- tunnel = sshtunnel.open(
- (proxy_ip, 22),
- ssh_username=proxy_user,
- ssh_password=proxy_passwd,
- threaded=True,
- remote_bind_address=(ip, 22))
- tunnel.__enter__()
- clean_resource(tunnel.__exit__, None, None, None)
- yield tunnel.local_bind_port
diff --git a/wally/discover/node.py b/wally/discover/node.py
deleted file mode 100644
index a3d58f9..0000000
--- a/wally/discover/node.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import getpass
-
-from wally.ssh_utils import parse_ssh_uri
-
-
-class Node(object):
-
- def __init__(self, conn_url, roles):
- self.roles = roles
- self.conn_url = conn_url
- self.connection = None
- self.monitor_ip = None
- self.os_vm_id = None
-
- def get_ip(self):
- if self.conn_url == 'local':
- return '127.0.0.1'
-
- assert self.conn_url.startswith("ssh://")
- return parse_ssh_uri(self.conn_url[6:]).host
-
- def get_conn_id(self):
- if self.conn_url == 'local':
- return '127.0.0.1'
-
- assert self.conn_url.startswith("ssh://")
- creds = parse_ssh_uri(self.conn_url[6:])
- return "{0.host}:{0.port}".format(creds)
-
- def get_user(self):
- if self.conn_url == 'local':
- return getpass.getuser()
-
- assert self.conn_url.startswith("ssh://")
- creds = parse_ssh_uri(self.conn_url[6:])
- return creds.user
-
- def __str__(self):
- templ = "<Node: url={conn_url!r} roles={roles}" + \
- " connected={is_connected}>"
- return templ.format(conn_url=self.conn_url,
- roles=", ".join(self.roles),
- is_connected=self.connection is not None)
-
- def __repr__(self):
- return str(self)
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
index 32b4629..bb128c3 100644
--- a/wally/discover/openstack.py
+++ b/wally/discover/openstack.py
@@ -1,42 +1,63 @@
import socket
import logging
+from typing import Iterable, Dict, Any
from novaclient.client import Client
-from .node import Node
+from ..node import NodeInfo
from wally.utils import parse_creds
-logger = logging.getLogger("io-perf-tool.discover")
+logger = logging.getLogger("wally.discover")
-def get_floating_ip(vm):
- addrs = vm.addresses
- for net_name, ifaces in addrs.items():
+def get_floating_ip(vm) -> str:
+ """Get VM floating IP address"""
+
+ for net_name, ifaces in vm.addresses.items():
for iface in ifaces:
if iface.get('OS-EXT-IPS:type') == "floating":
return iface['addr']
+ raise ValueError("VM {} has no floating ip".format(vm))
-def discover_vms(client, search_opts):
+
+def get_ssh_url(user: str, password: str, ip: str, key: str) -> str:
+ """Get ssh connection URL from parts"""
+
+ if password is not None:
+ assert key is None, "Both key and password provided"
+ return "ssh://{}:{}@{}".format(user, password, ip)
+ else:
+ assert key is not None, "None of key/password provided"
+ return "ssh://{}@{}::{}".format(user, ip, key)
+
+
+def discover_vms(client: Client, search_opts) -> Iterable[NodeInfo]:
+ """Discover virtual machines"""
user, password, key = parse_creds(search_opts.pop('auth'))
servers = client.servers.list(search_opts=search_opts)
logger.debug("Found %s openstack vms" % len(servers))
- return [Node(get_floating_ip(server), ["test_vm"], username=user,
- password=password, key_path=key)
- for server in servers if get_floating_ip(server)]
+
+ nodes = []
+ for server in servers:
+ ip = get_floating_ip(server)
+ nodes.append(NodeInfo(get_ssh_url(user, password, ip, key), ["test_vm"]))
+
+ return nodes
-def discover_services(client, opts):
+def discover_services(client: Client, opts: Dict[str, Any]) -> Iterable[NodeInfo]:
+ """Discover openstack services for given cluster"""
user, password, key = parse_creds(opts.pop('auth'))
services = []
if opts['service'] == "all":
services = client.services.list()
else:
- if isinstance(opts['service'], basestring):
+ if isinstance(opts['service'], str):
opts['service'] = [opts['service']]
for s in opts['service']:
@@ -50,21 +71,25 @@
logger.debug("Found %s openstack service nodes" %
len(host_services_mapping))
- return [Node(host, services, username=user,
- password=password, key_path=key) for host, services in
- host_services_mapping.items()]
+ nodes = []
+ for host, services in host_services_mapping.items():
+ ssh_url = get_ssh_url(user, password, host, key)
+ nodes.append(NodeInfo(ssh_url, services))
+
+ return nodes
-def discover_openstack_nodes(conn_details, conf):
+def discover_openstack_nodes(conn_details: Dict[str, str], conf: Dict[str, Any]) -> Iterable[NodeInfo]:
"""Discover vms running in openstack
- :param conn_details - dict with openstack connection details -
+ conn_details - dict with openstack connection details -
auth_url, api_key (password), username
+ conf - test configuration object
"""
client = Client(version='1.1', **conn_details)
- nodes = []
+
if conf.get('discover'):
services_to_discover = conf['discover'].get('nodes')
if services_to_discover:
- nodes.extend(discover_services(client, services_to_discover))
+ return discover_services(client, services_to_discover)
- return nodes
+ return []
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 728ac94..8799ed2 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -2,8 +2,9 @@
import json
import time
import logging
-import urllib2
-import urlparse
+import urllib.request
+import urllib.parse
+from typing import Dict, Any
from functools import partial, wraps
import netaddr
@@ -15,14 +16,14 @@
logger = logging.getLogger("wally.fuel_api")
-class Urllib2HTTP(object):
+class Urllib2HTTP:
"""
class for making HTTP requests
"""
allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
- def __init__(self, root_url, headers=None):
+ def __init__(self, root_url: str, headers: Dict[str, str]=None):
"""
"""
if root_url.endswith('/'):
@@ -32,10 +33,10 @@
self.headers = headers if headers is not None else {}
- def host(self):
+ def host(self) -> str:
return self.root_url.split('/')[2]
- def do(self, method, path, params=None):
+ def do(self, method: str, path: str, params: Dict[Any, Any]=None):
if path.startswith('/'):
url = self.root_url + path
else:
@@ -49,14 +50,14 @@
logger.debug("HTTP: {0} {1}".format(method.upper(), url))
- request = urllib2.Request(url,
- data=data_json,
- headers=self.headers)
+ request = urllib.request.Request(url,
+ data=data_json,
+ headers=self.headers)
if data_json is not None:
request.add_header('Content-Type', 'application/json')
request.get_method = lambda: method.upper()
- response = urllib2.urlopen(request)
+ response = urllib.request.urlopen(request)
logger.debug("HTTP Responce: {0}".format(response.code))
@@ -70,16 +71,16 @@
return json.loads(content)
- def __getattr__(self, name):
+ def __getattr__(self, name: str):
if name in self.allowed_methods:
return partial(self.do, name)
raise AttributeError(name)
class KeystoneAuth(Urllib2HTTP):
- def __init__(self, root_url, creds, headers=None):
+ def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str]=None):
super(KeystoneAuth, self).__init__(root_url, headers)
- admin_node_ip = urlparse.urlparse(root_url).hostname
+ admin_node_ip = urllib.parse.urlparse(root_url).hostname
self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
self.keystone = keystoneclient(
auth_url=self.keystone_url, **creds)
@@ -95,11 +96,11 @@
'Cant establish connection to keystone with url %s',
self.keystone_url)
- def do(self, method, path, params=None):
+ def do(self, method: str, path: str, params: Dict[str, str]=None):
"""Do request. If gets 401 refresh token"""
try:
return super(KeystoneAuth, self).do(method, path, params)
- except urllib2.HTTPError as e:
+ except urllib.request.HTTPError as e:
if e.code == 401:
logger.warning(
'Authorization failure: {0}'.format(e.read()))
@@ -109,13 +110,13 @@
raise
-def get_inline_param_list(url):
+def get_inline_param_list(url: str):
format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
for match in format_param_rr.finditer(url):
yield match.group(1)
-class RestObj(object):
+class RestObj:
name = None
id = None
@@ -136,7 +137,7 @@
return getattr(self, item)
-def make_call(method, url):
+def make_call(method: str, url: str):
def closure(obj, entire_obj=None, **data):
inline_params_vals = {}
for name in get_inline_param_list(url):
@@ -226,40 +227,6 @@
get_info = GET('/api/nodes/{id}')
get_interfaces = GET('/api/nodes/{id}/interfaces')
- update_interfaces = PUT('/api/nodes/{id}/interfaces')
-
- def set_network_assigment(self, mapping):
- """Assings networks to interfaces
- :param mapping: list (dict) interfaces info
- """
-
- curr_interfaces = self.get_interfaces()
-
- network_ids = {}
- for interface in curr_interfaces:
- for net in interface['assigned_networks']:
- network_ids[net['name']] = net['id']
-
- # transform mappings
- new_assigned_networks = {}
-
- for dev_name, networks in mapping.items():
- new_assigned_networks[dev_name] = []
- for net_name in networks:
- nnet = {'name': net_name, 'id': network_ids[net_name]}
- new_assigned_networks[dev_name].append(nnet)
-
- # update by ref
- for dev_descr in curr_interfaces:
- if dev_descr['name'] in new_assigned_networks:
- nass = new_assigned_networks[dev_descr['name']]
- dev_descr['assigned_networks'] = nass
-
- self.update_interfaces(curr_interfaces, id=self.id)
-
- def set_node_name(self, name):
- """Update node name"""
- self.__connection__.put('api/nodes', [{'id': self.id, 'name': name}])
def get_network_data(self):
"""Returns node network data"""
@@ -308,23 +275,9 @@
class Cluster(RestObj):
"""Class represents Cluster in Fuel"""
- add_node_call = PUT('api/nodes')
- start_deploy = PUT('api/clusters/{id}/changes')
get_status = GET('api/clusters/{id}')
- delete = DELETE('api/clusters/{id}')
- get_tasks_status = GET("api/tasks?cluster_id={id}")
- get_networks = GET(
- 'api/clusters/{id}/network_configuration/neutron')
-
- get_attributes = GET(
- 'api/clusters/{id}/attributes')
-
- set_attributes = PUT(
- 'api/clusters/{id}/attributes')
-
- configure_networks = PUT(
- 'api/clusters/{id}/network_configuration/{net_provider}')
-
+ get_networks = GET('api/clusters/{id}/network_configuration/neutron')
+ get_attributes = GET('api/clusters/{id}/attributes')
_get_nodes = GET('api/nodes?cluster_id={id}')
def __init__(self, *dt, **mp):
@@ -338,17 +291,16 @@
try:
self.get_status()
return True
- except urllib2.HTTPError as err:
+ except urllib.request.HTTPError as err:
if err.code == 404:
return False
raise
def get_openrc(self):
access = self.get_attributes()['editable']['access']
- creds = {}
- creds['username'] = access['user']['value']
- creds['password'] = access['password']['value']
- creds['tenant_name'] = access['tenant']['value']
+ creds = {'username': access['user']['value'],
+ 'password': access['password']['value'],
+ 'tenant_name': access['tenant']['value']}
version = FuelInfo(self.__connection__).get_version()
# only HTTPS since 7.0
@@ -365,78 +317,6 @@
for node_descr in self._get_nodes():
yield Node(self.__connection__, **node_descr)
- def add_node(self, node, roles, interfaces=None):
- """Add node to cluster
-
- :param node: Node object
- :param roles: roles to assign
- :param interfaces: mapping iface name to networks
- """
- data = {}
- data['pending_roles'] = roles
- data['cluster_id'] = self.id
- data['id'] = node.id
- data['pending_addition'] = True
-
- logger.debug("Adding node %s to cluster..." % node.id)
-
- self.add_node_call([data])
- self.nodes.append(node)
-
- if interfaces is not None:
- networks = {}
- for iface_name, params in interfaces.items():
- networks[iface_name] = params['networks']
-
- node.set_network_assigment(networks)
-
- def wait_operational(self, timeout):
- """Wait until cluster status operational"""
- def wo():
- status = self.get_status()['status']
- if status == "error":
- raise Exception("Cluster deploy failed")
- return self.get_status()['status'] == 'operational'
- with_timeout(timeout, "deploy cluster")(wo)()
-
- def deploy(self, timeout):
- """Start deploy and wait until all tasks finished"""
- logger.debug("Starting deploy...")
- self.start_deploy()
-
- self.wait_operational(timeout)
-
- def all_tasks_finished_ok(obj):
- ok = True
- for task in obj.get_tasks_status():
- if task['status'] == 'error':
- raise Exception('Task execution error')
- elif task['status'] != 'ready':
- ok = False
- return ok
-
- wto = with_timeout(timeout, "wait deployment finished")
- wto(all_tasks_finished_ok)(self)
-
- def set_networks(self, net_descriptions):
- """Update cluster networking parameters"""
- configuration = self.get_networks()
- current_networks = configuration['networks']
- parameters = configuration['networking_parameters']
-
- if net_descriptions.get('networks'):
- net_mapping = net_descriptions['networks']
-
- for net in current_networks:
- net_desc = net_mapping.get(net['name'])
- if net_desc:
- net.update(net_desc)
-
- if net_descriptions.get('networking_parameters'):
- parameters.update(net_descriptions['networking_parameters'])
-
- self.configure_networks(**configuration)
-
def reflect_cluster(conn, cluster_id):
"""Returns cluster object by id"""
@@ -465,80 +345,3 @@
raise ValueError("Cluster {0} not found".format(name))
-
-sections = {
- 'sahara': 'additional_components',
- 'murano': 'additional_components',
- 'ceilometer': 'additional_components',
- 'volumes_ceph': 'storage',
- 'images_ceph': 'storage',
- 'ephemeral_ceph': 'storage',
- 'objects_ceph': 'storage',
- 'osd_pool_size': 'storage',
- 'volumes_lvm': 'storage',
- 'volumes_vmdk': 'storage',
- 'tenant': 'access',
- 'password': 'access',
- 'user': 'access',
- 'vc_password': 'vcenter',
- 'cluster': 'vcenter',
- 'host_ip': 'vcenter',
- 'vc_user': 'vcenter',
- 'use_vcenter': 'vcenter',
-}
-
-
-def create_empty_cluster(conn, cluster_desc, debug_mode=False):
- """Create new cluster with configuration provided"""
-
- data = {}
- data['nodes'] = []
- data['tasks'] = []
- data['name'] = cluster_desc['name']
- data['release'] = cluster_desc['release']
- data['mode'] = cluster_desc.get('deployment_mode')
- data['net_provider'] = cluster_desc.get('net_provider')
-
- params = conn.post(path='/api/clusters', params=data)
- cluster = Cluster(conn, **params)
-
- attributes = cluster.get_attributes()
-
- ed_attrs = attributes['editable']
-
- ed_attrs['common']['libvirt_type']['value'] = \
- cluster_desc.get('libvirt_type', 'kvm')
-
- if 'nodes' in cluster_desc:
- use_ceph = cluster_desc['nodes'].get('ceph_osd', None) is not None
- else:
- use_ceph = False
-
- if 'storage_type' in cluster_desc:
- st = cluster_desc['storage_type']
- if st == 'ceph':
- use_ceph = True
- else:
- use_ceph = False
-
- if use_ceph:
- opts = ['ephemeral_ceph', 'images_ceph', 'images_vcenter']
- opts += ['iser', 'objects_ceph', 'volumes_ceph']
- opts += ['volumes_lvm', 'volumes_vmdk']
-
- for name in opts:
- val = ed_attrs['storage'][name]
- if val['type'] == 'checkbox':
- is_ceph = ('images_ceph' == name)
- is_ceph = is_ceph or ('volumes_ceph' == name)
-
- if is_ceph:
- val['value'] = True
- else:
- val['value'] = False
- # else:
- # raise NotImplementedError("Non-ceph storages are not implemented")
-
- cluster.set_attributes(attributes)
-
- return cluster
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 73226e3..214883d 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -1,15 +1,17 @@
import re
+from typing import Dict, Any, Iterable
import xml.etree.ElementTree as ET
-from wally import ssh_utils, utils
+from . import utils
+from .inode import INode
-def get_data(rr, data):
+def get_data(rr: str, data: str) -> str:
match_res = re.search("(?ims)" + rr, data)
return match_res.group(0)
-class HWInfo(object):
+class HWInfo:
def __init__(self):
self.hostname = None
self.cores = []
@@ -30,11 +32,11 @@
self.storage_controllers = []
- def get_HDD_count(self):
+ def get_hdd_count(self) -> Iterable[int]:
# SATA HDD COUNT, SAS 10k HDD COUNT, SAS SSD count, PCI-E SSD count
return []
- def get_summary(self):
+ def get_summary(self) -> Dict[str, Any]:
cores = sum(count for _, count in self.cores)
disks = sum(size for _, size in self.disks_info.values())
@@ -52,15 +54,15 @@
ram=utils.b2ssize(summ['ram']),
disk=utils.b2ssize(summ['storage'])))
res.append(str(self.sys_name))
- if self.mb is not None:
+ if self.mb:
res.append("Motherboard: " + self.mb)
- if self.ram_size == 0:
+ if not self.ram_size:
res.append("RAM: Failed to get RAM size")
else:
res.append("RAM " + utils.b2ssize(self.ram_size) + "B")
- if self.cores == []:
+ if not self.cores:
res.append("CPU cores: Failed to get CPU info")
else:
res.append("CPU cores:")
@@ -70,12 +72,12 @@
else:
res.append(" " + name)
- if self.storage_controllers != []:
+ if self.storage_controllers:
res.append("Disk controllers:")
for descr in self.storage_controllers:
res.append(" " + descr)
- if self.disks_info != {}:
+ if self.disks_info:
res.append("Storage devices:")
for dev, (model, size) in sorted(self.disks_info.items()):
ssize = utils.b2ssize(size) + "B"
@@ -83,14 +85,14 @@
else:
res.append("Storage devices's: Failed to get info")
- if self.disks_raw_info != {}:
+ if self.disks_raw_info:
res.append("Disks devices:")
for dev, descr in sorted(self.disks_raw_info.items()):
res.append(" {0} {1}".format(dev, descr))
else:
res.append("Disks devices's: Failed to get info")
- if self.net_info != {}:
+ if self.net_info:
res.append("Net adapters:")
for name, (speed, dtype, _) in self.net_info.items():
res.append(" {0} {2} duplex={1}".format(name, dtype, speed))
@@ -100,7 +102,7 @@
return str(self.hostname) + ":\n" + "\n".join(" " + i for i in res)
-class SWInfo(object):
+class SWInfo:
def __init__(self):
self.partitions = None
self.kernel_version = None
@@ -112,48 +114,29 @@
self.ceph_version = None
-def get_sw_info(conn):
+def get_sw_info(node: INode) -> SWInfo:
res = SWInfo()
+
res.OS_version = utils.get_os()
-
- with conn.open_sftp() as sftp:
- def get(fname):
- try:
- return ssh_utils.read_from_remote(sftp, fname)
- except:
- return None
-
- res.kernel_version = get('/proc/version')
- res.partitions = get('/etc/mtab')
-
- def rr(cmd):
- try:
- return ssh_utils.run_over_ssh(conn, cmd, nolog=True)
- except:
- return None
-
- res.libvirt_version = rr("virsh -v")
- res.qemu_version = rr("qemu-system-x86_64 --version")
- res.ceph_version = rr("ceph --version")
+ res.kernel_version = node.get_file_content('/proc/version')
+ res.partitions = node.get_file_content('/etc/mtab')
+ res.libvirt_version = node.run("virsh -v", nolog=True)
+ res.qemu_version = node.run("qemu-system-x86_64 --version", nolog=True)
+ res.ceph_version = node.run("ceph --version", nolog=True)
return res
-def get_network_info():
- pass
-
-
-def get_hw_info(conn):
+def get_hw_info(node: INode) -> HWInfo:
res = HWInfo()
- lshw_out = ssh_utils.run_over_ssh(conn, 'sudo lshw -xml 2>/dev/null',
- nolog=True)
+ lshw_out = node.run('sudo lshw -xml 2>/dev/null', nolog=True)
res.raw = lshw_out
lshw_et = ET.fromstring(lshw_out)
try:
res.hostname = lshw_et.find("node").attrib['id']
- except:
+ except Exception:
pass
try:
@@ -161,7 +144,7 @@
lshw_et.find("node/product").text)
res.sys_name = res.sys_name.replace("(To be filled by O.E.M.)", "")
res.sys_name = res.sys_name.replace("(To be Filled by O.E.M.)", "")
- except:
+ except Exception:
pass
core = lshw_et.find("node/node[@id='core']")
@@ -171,7 +154,7 @@
try:
res.mb = " ".join(core.find(node).text
for node in ['vendor', 'product', 'version'])
- except:
+ except Exception:
pass
for cpu in core.findall("node[@class='processor']"):
@@ -183,7 +166,7 @@
else:
threads = int(threads_node.attrib['value'])
res.cores.append((model, threads))
- except:
+ except Exception:
pass
res.ram_size = 0
@@ -201,7 +184,7 @@
else:
assert mem_sz.attrib['units'] == 'bytes'
res.ram_size += int(mem_sz.text)
- except:
+ except Exception:
pass
for net in core.findall(".//node[@class='network']"):
@@ -223,7 +206,7 @@
dup = dup_node.attrib['value']
res.net_info[name] = (speed, dup, [])
- except:
+ except Exception:
pass
for controller in core.findall(".//node[@class='storage']"):
@@ -240,7 +223,7 @@
res.storage_controllers.append(
"{0} {1} {2}".format(description,
vendor, product))
- except:
+ except Exception:
pass
for disk in core.findall(".//node[@class='disk']"):
@@ -268,98 +251,7 @@
businfo = disk.find('businfo').text
res.disks_raw_info[businfo] = full_descr
- except:
+ except Exception:
pass
return res
-
-# import traceback
-# print ET.tostring(disk)
-# traceback.print_exc()
-
-# print get_hw_info(None)
-
-# def get_hw_info(conn):
-# res = HWInfo(None)
-# remote_run = functools.partial(ssh_utils.run_over_ssh, conn, nolog=True)
-
-# # some data
-# with conn.open_sftp() as sftp:
-# proc_data = ssh_utils.read_from_remote(sftp, '/proc/cpuinfo')
-# mem_data = ssh_utils.read_from_remote(sftp, '/proc/meminfo')
-
-# # cpu info
-# curr_core = {}
-# for line in proc_data.split("\n"):
-# if line.strip() == "":
-# if curr_core != {}:
-# res.cores.append(curr_core)
-# curr_core = {}
-# else:
-# param, val = line.split(":", 1)
-# curr_core[param.strip()] = val.strip()
-
-# if curr_core != {}:
-# res.cores.append(curr_core)
-
-# # RAM info
-# for line in mem_data.split("\n"):
-# if line.startswith("MemTotal"):
-# res.ram_size = int(line.split(":", 1)[1].split()[0]) * 1024
-# break
-
-# # HDD info
-# for dev in remote_run('ls /dev').split():
-# if dev[-1].isdigit():
-# continue
-
-# if dev.startswith('sd') or dev.startswith('hd'):
-# model = None
-# size = None
-
-# for line in remote_run('sudo hdparm -I /dev/' + dev).split("\n"):
-# if "Model Number:" in line:
-# model = line.split(':', 1)[1]
-# elif "device size with M = 1024*1024" in line:
-# size = int(line.split(':', 1)[1].split()[0])
-# size *= 1024 ** 2
-
-# res.disks_info[dev] = (model, size)
-
-# # Network info
-# separator = '*-network'
-# net_info = remote_run('sudo lshw -class network')
-# for net_dev_info in net_info.split(separator):
-# if net_dev_info.strip().startswith("DISABLED"):
-# continue
-
-# if ":" not in net_dev_info:
-# continue
-
-# dev_params = {}
-# for line in net_dev_info.split("\n"):
-# line = line.strip()
-# if ':' in line:
-# key, data = line.split(":", 1)
-# dev_params[key.strip()] = data.strip()
-
-# if 'configuration' not in dev_params:
-# print "!!!!!", net_dev_info
-# continue
-
-# conf = dev_params['configuration']
-# if 'link=yes' in conf:
-# if 'speed=' in conf:
-# speed = conf.split('speed=', 1)[1]
-# speed = speed.strip().split()[0]
-# else:
-# speed = None
-
-# if "duplex=" in conf:
-# dtype = conf.split("duplex=", 1)[1]
-# dtype = dtype.strip().split()[0]
-# else:
-# dtype = None
-
-# res.net_info[dev_params['logical name']] = (speed, dtype)
-# return res
diff --git a/wally/inode.py b/wally/inode.py
new file mode 100644
index 0000000..225a807
--- /dev/null
+++ b/wally/inode.py
@@ -0,0 +1,106 @@
+import abc
+from typing import Set, Dict, Tuple, Any, Optional
+
+from .ssh_utils import parse_ssh_uri
+
+
+class FuelNodeInfo:
+ """FUEL master node additional info"""
+ def __init__(self,
+ version: str,
+ fuel_ext_iface: str,
+ openrc: Dict[str, str]):
+
+ self.version = version
+ self.fuel_ext_iface = fuel_ext_iface
+ self.openrc = openrc
+
+
+class NodeInfo:
+ """Node information object"""
+ def __init__(self,
+ ssh_conn_url: str,
+ roles: Set[str],
+ bind_ip: str=None,
+ ssh_key: str=None):
+ self.ssh_conn_url = ssh_conn_url
+ self.roles = roles
+
+ if bind_ip is None:
+ bind_ip = parse_ssh_uri(self.ssh_conn_url).host
+
+ self.bind_ip = bind_ip
+ self.ssh_key = ssh_key
+
+
+class INode(metaclass=abc.ABCMeta):
+ """Node object"""
+
+ def __init__(self, node_info: NodeInfo):
+ self.rpc = None
+ self.node_info = node_info
+ self.hwinfo = None
+ self.roles = []
+
+ @abc.abstractmethod
+ def __str__(self):
+ pass
+
+ def __repr__(self):
+ return str(self)
+
+ @abc.abstractmethod
+ def is_connected(self) -> bool:
+ pass
+
+ @abc.abstractmethod
+ def connect_ssh(self, timeout: int=None) -> None:
+ pass
+
+ @abc.abstractmethod
+ def connect_rpc(self) -> None:
+ pass
+
+ @abc.abstractmethod
+ def prepare_rpc(self) -> None:
+ pass
+
+ @abc.abstractmethod
+ def get_ip(self) -> str:
+ pass
+
+ @abc.abstractmethod
+ def get_user(self) -> str:
+ pass
+
+ @abc.abstractmethod
+ def run(self, cmd: str, stdin_data: str=None, timeout: int=60, nolog: bool=False) -> str:
+ pass
+
+ @abc.abstractmethod
+ def discover_hardware_info(self) -> None:
+ pass
+
+ @abc.abstractmethod
+ def copy_file(self, local_path: str, remote_path: Optional[str]=None) -> str:
+ pass
+
+ @abc.abstractmethod
+ def get_file_content(self, path: str) -> bytes:
+ pass
+
+ @abc.abstractmethod
+ def put_to_file(self, path:str, content: bytes) -> None:
+ pass
+
+ @abc.abstractmethod
+ def forward_port(self, ip: str, remote_port: int, local_port: int=None) -> int:
+ pass
+
+ @abc.abstractmethod
+ def get_interface(self, ip: str) -> str:
+ pass
+
+ @abc.abstractmethod
+ def stat_file(self, path:str) -> Any:
+ pass
diff --git a/wally/keystone.py b/wally/keystone.py
index 24d322c..77c1d5b 100644
--- a/wally/keystone.py
+++ b/wally/keystone.py
@@ -1,19 +1,20 @@
import json
-import urllib2
+import urllib.request
from functools import partial
+from typing import Dict, Any
from keystoneclient import exceptions
from keystoneclient.v2_0 import Client as keystoneclient
-class Urllib2HTTP(object):
+class Urllib2HTTP:
"""
class for making HTTP requests
"""
allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
- def __init__(self, root_url, headers=None, echo=False):
+ def __init__(self, root_url:str, headers:Dict[str, str]=None, echo: bool=False):
"""
"""
if root_url.endswith('/'):
@@ -24,7 +25,7 @@
self.headers = headers if headers is not None else {}
self.echo = echo
- def do(self, method, path, params=None):
+ def do(self, method: str, path: str, params: Dict[str, str]=None) -> Any:
if path.startswith('/'):
url = self.root_url + path
else:
@@ -36,14 +37,14 @@
else:
data_json = json.dumps(params)
- request = urllib2.Request(url,
- data=data_json,
- headers=self.headers)
+ request = urllib.request.Request(url,
+ data=data_json,
+ headers=self.headers)
if data_json is not None:
request.add_header('Content-Type', 'application/json')
request.get_method = lambda: method.upper()
- response = urllib2.urlopen(request)
+ response = urllib.request.urlopen(request)
if response.code < 200 or response.code > 209:
raise IndexError(url)
@@ -62,15 +63,15 @@
class KeystoneAuth(Urllib2HTTP):
- def __init__(self, root_url, creds, headers=None, echo=False,
- admin_node_ip=None):
+ def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str]=None, echo: bool=False,
+ admin_node_ip: str=None):
super(KeystoneAuth, self).__init__(root_url, headers, echo)
self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
self.keystone = keystoneclient(
auth_url=self.keystone_url, **creds)
self.refresh_token()
- def refresh_token(self):
+ def refresh_token(self) -> None:
"""Get new token from keystone and update headers"""
try:
self.keystone.authenticate()
@@ -78,11 +79,11 @@
except exceptions.AuthorizationFailure:
raise
- def do(self, method, path, params=None):
+ def do(self, method: str, path: str, params: Dict[str, str]=None) -> Any:
"""Do request. If gets 401 refresh token"""
try:
return super(KeystoneAuth, self).do(method, path, params)
- except urllib2.HTTPError as e:
+ except urllib.request.HTTPError as e:
if e.code == 401:
self.refresh_token()
return super(KeystoneAuth, self).do(method, path, params)
diff --git a/wally/logger.py b/wally/logger.py
new file mode 100644
index 0000000..43eff6b
--- /dev/null
+++ b/wally/logger.py
@@ -0,0 +1,82 @@
+import logging
+
+
+def color_me(color):
+ RESET_SEQ = "\033[0m"
+ COLOR_SEQ = "\033[1;%dm"
+
+ color_seq = COLOR_SEQ % (30 + color)
+
+ def closure(msg):
+ return color_seq + msg + RESET_SEQ
+ return closure
+
+
+class ColoredFormatter(logging.Formatter):
+ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+
+ colors = {
+ 'WARNING': color_me(YELLOW),
+ 'DEBUG': color_me(BLUE),
+ 'CRITICAL': color_me(YELLOW),
+ 'ERROR': color_me(RED)
+ }
+
+ def __init__(self, msg, use_color=True, datefmt=None):
+ logging.Formatter.__init__(self, msg, datefmt=datefmt)
+ self.use_color = use_color
+
+ def format(self, record):
+ orig = record.__dict__
+ record.__dict__ = record.__dict__.copy()
+ levelname = record.levelname
+
+ prn_name = levelname + ' ' * (8 - len(levelname))
+ if levelname in self.colors:
+ record.levelname = self.colors[levelname](prn_name)
+ else:
+ record.levelname = prn_name
+
+ # super doesn't work here in 2.6 O_o
+ res = logging.Formatter.format(self, record)
+
+ # res = super(ColoredFormatter, self).format(record)
+
+ # restore record, as it will be used by other formatters
+ record.__dict__ = orig
+ return res
+
+
+def setup_loggers(def_level=logging.DEBUG, log_fname=None):
+ logger = logging.getLogger('wally')
+ logger.setLevel(logging.DEBUG)
+ sh = logging.StreamHandler()
+ sh.setLevel(def_level)
+
+ log_format = '%(asctime)s - %(levelname)s - %(name)-15s - %(message)s'
+ colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
+
+ sh.setFormatter(colored_formatter)
+ logger.addHandler(sh)
+
+ logger_api = logging.getLogger("wally.fuel_api")
+
+ if log_fname is not None:
+ fh = logging.FileHandler(log_fname)
+ log_format = '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
+ formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
+ fh.setFormatter(formatter)
+ fh.setLevel(logging.DEBUG)
+ logger.addHandler(fh)
+ logger_api.addHandler(fh)
+ else:
+ fh = None
+
+ logger_api.addHandler(sh)
+ logger_api.setLevel(logging.WARNING)
+
+ logger = logging.getLogger('paramiko')
+ logger.setLevel(logging.WARNING)
+ # logger.addHandler(sh)
+ if fh is not None:
+ logger.addHandler(fh)
diff --git a/wally/main.py b/wally/main.py
index da144dd..9358b53 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -5,7 +5,6 @@
import logging
import argparse
import functools
-import contextlib
from yaml import load as _yaml_load
@@ -24,34 +23,17 @@
faulthandler = None
-from wally.timeseries import SensorDatastore
-from wally import utils, run_test, pretty_yaml
-from wally.config import (load_config, setup_loggers,
- get_test_files, save_run_params, load_run_params)
+from .timeseries import SensorDatastore
+from . import utils, run_test, pretty_yaml
+from .config import (load_config,
+ get_test_files, save_run_params, load_run_params)
+from .logger import setup_loggers
+from .stage import log_stage
logger = logging.getLogger("wally")
-class Context(object):
- def __init__(self):
- self.build_meta = {}
- self.nodes = []
- self.clear_calls_stack = []
- self.openstack_nodes_ids = []
- self.sensors_mon_q = None
- self.hw_info = []
- self.fuel_openstack_creds = None
-
-
-def get_stage_name(func):
- nm = get_func_name(func)
- if nm.endswith("stage"):
- return nm
- else:
- return nm + " stage"
-
-
def get_test_names(raw_res):
res = []
for tp, data in raw_res:
@@ -106,31 +88,6 @@
print(tab.draw())
-def get_func_name(obj):
- if hasattr(obj, '__name__'):
- return obj.__name__
- if hasattr(obj, 'func_name'):
- return obj.func_name
- return obj.func.func_name
-
-
-@contextlib.contextmanager
-def log_stage(func):
- msg_templ = "Exception during {0}: {1!s}"
- msg_templ_no_exc = "During {0}"
-
- logger.info("Start " + get_stage_name(func))
-
- try:
- yield
- except utils.StopTestError as exc:
- logger.error(msg_templ.format(
- get_func_name(func), exc))
- except Exception:
- logger.exception(msg_templ_no_exc.format(
- get_func_name(func)))
-
-
def make_storage_dir_struct(cfg):
utils.mkdirs_if_unxists(cfg.results_dir)
utils.mkdirs_if_unxists(cfg.sensor_storage)
@@ -175,7 +132,7 @@
test_parser.add_argument('--build-type', type=str, default="GA")
test_parser.add_argument('-n', '--no-tests', action='store_true',
help="Don't run tests", default=False)
- test_parser.add_argument('--load_report', action='store_true')
+ test_parser.add_argument('--load_report', action='store_true')
test_parser.add_argument("-k", '--keep-vm', action='store_true',
help="Don't remove test vm's", default=False)
test_parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
diff --git a/wally/meta_info.py b/wally/meta_info.py
index 12cb061..19fdad9 100644
--- a/wally/meta_info.py
+++ b/wally/meta_info.py
@@ -1,8 +1,9 @@
-from urlparse import urlparse
-from keystone import KeystoneAuth
+from typing import Any, Dict
+from urllib.parse import urlparse
+from .keystone import KeystoneAuth
-def total_lab_info(data):
+def total_lab_info(data: Dict[str, Any]) -> Dict[str, Any]:
lab_data = {}
lab_data['nodes_count'] = len(data['nodes'])
lab_data['total_memory'] = 0
@@ -24,7 +25,7 @@
return lab_data
-def collect_lab_data(url, cred):
+def collect_lab_data(url: str, cred: Dict[str, str]) -> Dict[str, Any]:
u = urlparse(url)
keystone = KeystoneAuth(root_url=url, creds=cred, admin_node_ip=u.hostname)
lab_info = keystone.do(method='get', path="/api/nodes")
diff --git a/wally/node.py b/wally/node.py
new file mode 100644
index 0000000..b146876
--- /dev/null
+++ b/wally/node.py
@@ -0,0 +1,108 @@
+import re
+import getpass
+from typing import Tuple
+from .inode import INode, NodeInfo
+
+from .ssh_utils import parse_ssh_uri, run_over_ssh, connect
+
+
+class Node(INode):
+ """Node object"""
+
+ def __init__(self, node_info: NodeInfo):
+ self.info = node_info
+ self.roles = node_info.roles
+ self.bind_ip = node_info.bind_ip
+
+ assert self.ssh_conn_url.startswith("ssh://")
+ self.ssh_conn_url = node_info.ssh_conn_url
+
+ self.ssh_conn = None
+ self.rpc_conn_url = None
+ self.rpc_conn = None
+ self.os_vm_id = None
+ self.hw_info = None
+
+ if self.ssh_conn_url is not None:
+ self.ssh_cred = parse_ssh_uri(self.ssh_conn_url)
+ self.node_id = "{0.host}:{0.port}".format(self.ssh_cred)
+ else:
+ self.ssh_cred = None
+ self.node_id = None
+
+ def __str__(self):
+ template = "<Node: url={conn_url!r} roles={roles}" + \
+ " connected={is_connected}>"
+ return template.format(conn_url=self.ssh_conn_url,
+ roles=", ".join(self.roles),
+ is_connected=self.ssh_conn is not None)
+
+ def __repr__(self):
+ return str(self)
+
+ def connect_ssh(self) -> None:
+ self.ssh_conn = connect(self.ssh_conn_url)
+
+ def connect_rpc(self) -> None:
+ raise NotImplementedError()
+
+ def prepare_rpc(self) -> None:
+ raise NotImplementedError()
+
+ def get_ip(self) -> str:
+ """get node connection ip address"""
+
+ if self.ssh_conn_url == 'local':
+ return '127.0.0.1'
+ return self.ssh_cred.host
+
+ def get_user(self) -> str:
+ """"get ssh connection username"""
+ if self.ssh_conn_url == 'local':
+ return getpass.getuser()
+ return self.ssh_cred.user
+
+ def run(self, cmd: str, stdin_data: str=None, timeout: int=60, nolog: bool=False) -> Tuple[int, str]:
+ """Run command on node. Will use rpc connection, if available"""
+
+ if self.rpc_conn is None:
+ return run_over_ssh(self.ssh_conn, cmd,
+ stdin_data=stdin_data, timeout=timeout,
+ nolog=nolog, node=self)
+ assert not stdin_data
+ proc_id = self.rpc_conn.cli.spawn(cmd)
+ exit_code = None
+ output = ""
+
+ while exit_code is None:
+ exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
+ output += stdout_data + stderr_data
+
+ return exit_code, output
+
+ def discover_hardware_info(self) -> None:
+ raise NotImplementedError()
+
+ def get_file_content(self, path: str) -> str:
+ raise NotImplementedError()
+
+ def forward_port(self, ip: str, remote_port: int, local_port: int = None) -> int:
+ raise NotImplementedError()
+
+ def get_interface(self, ip: str) -> str:
+ """Get node external interface for given IP"""
+ data = self.run("ip a", nolog=True)
+ curr_iface = None
+
+ for line in data.split("\n"):
+ match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
+ if match1 is not None:
+ curr_iface = match1.group('name')
+
+ match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
+ if match2 is not None:
+ if match2.group('ip') == ip:
+ assert curr_iface is not None
+ return curr_iface
+
+ raise KeyError("Can't found interface for ip {0}".format(ip))
diff --git a/wally/node_rpc.py b/wally/node_rpc.py
new file mode 100644
index 0000000..9a458ae
--- /dev/null
+++ b/wally/node_rpc.py
@@ -0,0 +1,428 @@
+import re
+import json
+import time
+import errno
+import socket
+import shutil
+import logging
+import os.path
+import getpass
+import StringIO
+import threading
+import subprocess
+
+import paramiko
+
+from agent import connect
+from .ssh_utils import Local, ssh_connect, ssh_copy_file
+
+
+logger = logging.getLogger("wally")
+
+
+def setup_node(conn, agent_path, ip):
+ agent_fname, log_fname = run_over_ssh(conn, "mktemp;echo;mktemp").strip().split()
+ with conn.open_sftp() as sftp:
+ ssh_copy_file(sftp, agent_path, agent_fname)
+
+ cmd = "python {} server -d --listen-addr={}:0 --stdout-file={}"
+ jdata = run_over_ssh(conn, cmd.format(agent_fname, ip, log_fname)).strip()
+ run_over_ssh(conn, "rm {}".format(agent_fname))
+ sett = json.loads(jdata)
+ return connect(sett['addr'])
+
+
+def exists(rpc, path):
+ """os.path.exists for paramiko's SCP object"""
+ return rpc.exists(path)
+
+
+def save_to_remote(sftp, path, content):
+ with sftp.open(path, "wb") as fd:
+ fd.write(content)
+
+
+def read_from_remote(sftp, path):
+ with sftp.open(path, "rb") as fd:
+ return fd.read()
+
+
+def normalize_dirpath(dirpath):
+ while dirpath.endswith("/"):
+ dirpath = dirpath[:-1]
+ return dirpath
+
+
+ALL_RWX_MODE = ((1 << 9) - 1)
+
+
+def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
+ remotepath = normalize_dirpath(remotepath)
+ if intermediate:
+ try:
+ sftp.mkdir(remotepath, mode=mode)
+ except (IOError, OSError):
+ upper_dir = remotepath.rsplit("/", 1)[0]
+
+ if upper_dir == '' or upper_dir == '/':
+ raise
+
+ ssh_mkdir(sftp, upper_dir, mode=mode, intermediate=True)
+ return sftp.mkdir(remotepath, mode=mode)
+ else:
+ sftp.mkdir(remotepath, mode=mode)
+
+
+def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
+ sftp.put(localfile, remfile)
+ if preserve_perm:
+ sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
+
+
+def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
+ "upload local directory to remote recursively"
+
+ # hack for localhost connection
+ if hasattr(sftp, "copytree"):
+ sftp.copytree(localpath, remotepath)
+ return
+
+ assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
+
+ # normalize
+ localpath = normalize_dirpath(localpath)
+ remotepath = normalize_dirpath(remotepath)
+
+ try:
+ sftp.chdir(remotepath)
+ localsuffix = localpath.rsplit("/", 1)[1]
+ remotesuffix = remotepath.rsplit("/", 1)[1]
+ if localsuffix != remotesuffix:
+ remotepath = os.path.join(remotepath, localsuffix)
+ except IOError:
+ pass
+
+ for root, dirs, fls in os.walk(localpath):
+ prefix = os.path.commonprefix([localpath, root])
+ suffix = root.split(prefix, 1)[1]
+ if suffix.startswith("/"):
+ suffix = suffix[1:]
+
+ remroot = os.path.join(remotepath, suffix)
+
+ try:
+ sftp.chdir(remroot)
+ except IOError:
+ if preserve_perm:
+ mode = os.stat(root).st_mode & ALL_RWX_MODE
+ else:
+ mode = ALL_RWX_MODE
+ ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
+ sftp.chdir(remroot)
+
+ for f in fls:
+ remfile = os.path.join(remroot, f)
+ localfile = os.path.join(root, f)
+ ssh_copy_file(sftp, localfile, remfile, preserve_perm)
+
+
+def delete_file(conn, path):
+ sftp = conn.open_sftp()
+ sftp.remove(path)
+ sftp.close()
+
+
+def copy_paths(conn, paths):
+ sftp = conn.open_sftp()
+ try:
+ for src, dst in paths.items():
+ try:
+ if os.path.isfile(src):
+ ssh_copy_file(sftp, src, dst)
+ elif os.path.isdir(src):
+ put_dir_recursively(sftp, src, dst)
+ else:
+ templ = "Can't copy {0!r} - " + \
+ "it neither a file not a directory"
+ raise OSError(templ.format(src))
+ except Exception as exc:
+ tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
+ raise OSError(tmpl.format(src, dst, exc))
+ finally:
+ sftp.close()
+
+
+class ConnCreds(object):
+ conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+
+ def __init__(self):
+ for name in self.conn_uri_attrs:
+ setattr(self, name, None)
+
+ def __str__(self):
+ return str(self.__dict__)
+
+
+uri_reg_exprs = []
+
+
+class URIsNamespace(object):
+ class ReParts(object):
+ user_rr = "[^:]*?"
+ host_rr = "[^:@]*?"
+ port_rr = "\\d+"
+ key_file_rr = "[^:@]*"
+ passwd_rr = ".*?"
+
+ re_dct = ReParts.__dict__
+
+ for attr_name, val in re_dct.items():
+ if attr_name.endswith('_rr'):
+ new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
+ setattr(ReParts, attr_name, new_rr)
+
+ re_dct = ReParts.__dict__
+
+ templs = [
+ "^{host_rr}$",
+ "^{host_rr}:{port_rr}$",
+ "^{host_rr}::{key_file_rr}$",
+ "^{host_rr}:{port_rr}:{key_file_rr}$",
+ "^{user_rr}@{host_rr}$",
+ "^{user_rr}@{host_rr}:{port_rr}$",
+ "^{user_rr}@{host_rr}::{key_file_rr}$",
+ "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
+ "^{user_rr}:{passwd_rr}@{host_rr}$",
+ "^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$",
+ ]
+
+ for templ in templs:
+ uri_reg_exprs.append(templ.format(**re_dct))
+
+
+def parse_ssh_uri(uri):
+ # user:passwd@ip_host:port
+ # user:passwd@ip_host
+ # user@ip_host:port
+ # user@ip_host
+ # ip_host:port
+ # ip_host
+ # user@ip_host:port:path_to_key_file
+ # user@ip_host::path_to_key_file
+ # ip_host:port:path_to_key_file
+ # ip_host::path_to_key_file
+
+ if uri.startswith("ssh://"):
+ uri = uri[len("ssh://"):]
+
+ res = ConnCreds()
+ res.port = "22"
+ res.key_file = None
+ res.passwd = None
+ res.user = getpass.getuser()
+
+ for rr in uri_reg_exprs:
+ rrm = re.match(rr, uri)
+ if rrm is not None:
+ res.__dict__.update(rrm.groupdict())
+ return res
+
+ raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
+
+
+def reconnect(conn, uri, **params):
+ if uri == 'local':
+ return conn
+
+ creds = parse_ssh_uri(uri)
+ creds.port = int(creds.port)
+ return ssh_connect(creds, reuse_conn=conn, **params)
+
+
+def connect(uri, **params):
+ if uri == 'local':
+ res = Local()
+ else:
+ creds = parse_ssh_uri(uri)
+ creds.port = int(creds.port)
+ res = ssh_connect(creds, **params)
+ return res
+
+
+all_sessions_lock = threading.Lock()
+all_sessions = {}
+
+
+class BGSSHTask(object):
+ CHECK_RETRY = 5
+
+ def __init__(self, node, use_sudo):
+ self.node = node
+ self.pid = None
+ self.use_sudo = use_sudo
+
+ def start(self, orig_cmd, **params):
+ uniq_name = 'test'
+ cmd = "screen -S {0} -d -m {1}".format(uniq_name, orig_cmd)
+ run_over_ssh(self.node.connection, cmd,
+ timeout=10, node=self.node.get_conn_id(),
+ **params)
+ processes = run_over_ssh(self.node.connection, "ps aux", nolog=True)
+
+ for iter in range(self.CHECK_RETRY):
+ for proc in processes.split("\n"):
+ if orig_cmd in proc and "SCREEN" not in proc:
+ self.pid = proc.split()[1]
+ break
+ if self.pid is not None:
+ break
+ time.sleep(1)
+
+ if self.pid is None:
+ self.pid = -1
+
+ def check_running(self):
+ assert self.pid is not None
+ if -1 == self.pid:
+ return False
+ try:
+ run_over_ssh(self.node.connection,
+ "ls /proc/{0}".format(self.pid),
+ timeout=10, nolog=True)
+ return True
+ except OSError:
+ return False
+
+ def kill(self, soft=True, use_sudo=True):
+ assert self.pid is not None
+ if self.pid == -1:
+ return True
+ try:
+ if soft:
+ cmd = "kill {0}"
+ else:
+ cmd = "kill -9 {0}"
+
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+
+ run_over_ssh(self.node.connection,
+ cmd.format(self.pid), nolog=True)
+ return True
+ except OSError:
+ return False
+
+ def wait(self, soft_timeout, timeout):
+ end_of_wait_time = timeout + time.time()
+ soft_end_of_wait_time = soft_timeout + time.time()
+
+ # time_till_check = random.randint(5, 10)
+ time_till_check = 2
+
+ # time_till_first_check = random.randint(2, 6)
+ time_till_first_check = 2
+ time.sleep(time_till_first_check)
+ if not self.check_running():
+ return True
+
+ while self.check_running() and time.time() < soft_end_of_wait_time:
+ # time.sleep(soft_end_of_wait_time - time.time())
+ time.sleep(time_till_check)
+
+ while end_of_wait_time > time.time():
+ time.sleep(time_till_check)
+ if not self.check_running():
+ break
+ else:
+ self.kill()
+ time.sleep(1)
+ if self.check_running():
+ self.kill(soft=False)
+ return False
+ return True
+
+
+def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
+ nolog=False, node=None):
+ "should be replaces by normal implementation, with select"
+
+ if isinstance(conn, Local):
+ if not nolog:
+ logger.debug("SSH:local Exec {0!r}".format(cmd))
+ proc = subprocess.Popen(cmd, shell=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+
+ stdoutdata, _ = proc.communicate(input=stdin_data)
+ if proc.returncode != 0:
+ templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+ raise OSError(templ.format(node, cmd, proc.returncode, stdoutdata))
+
+ return stdoutdata
+
+ transport = conn.get_transport()
+ session = transport.open_session()
+
+ if node is None:
+ node = ""
+
+ with all_sessions_lock:
+ all_sessions[id(session)] = session
+
+ try:
+ session.set_combine_stderr(True)
+
+ stime = time.time()
+
+ if not nolog:
+ logger.debug("SSH:{0} Exec {1!r}".format(node, cmd))
+
+ session.exec_command(cmd)
+
+ if stdin_data is not None:
+ session.sendall(stdin_data)
+
+ session.settimeout(1)
+ session.shutdown_write()
+ output = ""
+
+ while True:
+ try:
+ ndata = session.recv(1024)
+ output += ndata
+ if "" == ndata:
+ break
+ except socket.timeout:
+ pass
+
+ if time.time() - stime > timeout:
+ raise OSError(output + "\nExecution timeout")
+
+ code = session.recv_exit_status()
+ finally:
+ found = False
+ with all_sessions_lock:
+ if id(session) in all_sessions:
+ found = True
+ del all_sessions[id(session)]
+
+ if found:
+ session.close()
+
+ if code != 0:
+ templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+ raise OSError(templ.format(node, cmd, code, output))
+
+ return output
+
+
+def close_all_sessions():
+ with all_sessions_lock:
+ for session in all_sessions.values():
+ try:
+ session.sendall('\x03')
+ session.close()
+ except:
+ pass
+ all_sessions.clear()
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index afb6375..4038ea8 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -1,13 +1,14 @@
__doc__ = "functions for make pretty yaml files"
__all__ = ['dumps']
+from typing import Any, Iterable
-def dumps_simple(val):
+
+def dumps_simple(val: Any) -> str:
bad_symbols = set(" \r\t\n,':{}[]><;")
- if isinstance(val, basestring):
- if isinstance(val, unicode):
- val = val.encode('utf8')
+ if isinstance(val, str):
+ val = val.encode('utf8')
try:
float(val)
@@ -27,16 +28,16 @@
return str(val)
-def is_simple(val):
- simple_type = isinstance(val, (str, unicode, int, long, bool, float))
+def is_simple(val: Any) -> bool:
+ simple_type = isinstance(val, (str, int, bool, float))
return simple_type or val is None
-def all_nums(vals):
- return all(isinstance(val, (int, float, long)) for val in vals)
+def all_nums(vals: Iterable[Any]) -> bool:
+ return all(isinstance(val, (int, float)) for val in vals)
-def dumpv(data, tab_sz=4, width=160, min_width=40):
+def dumpv(data: Any, tab_sz: int=4, width: int=160, min_width: int=40) -> str:
tab = ' ' * tab_sz
if width < min_width:
@@ -107,5 +108,5 @@
return res
-def dumps(data, tab_sz=4, width=120, min_width=40):
+def dumps(data: Any, tab_sz: int=4, width: int=120, min_width: int=40) -> str:
return "\n".join(dumpv(data, tab_sz, width, min_width))
diff --git a/wally/report.py b/wally/report.py
index f27491b..ed3c362 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -4,7 +4,8 @@
import logging
import itertools
import collections
-from cStringIO import StringIO
+from io import StringIO
+from typing import Dict
try:
import numpy
@@ -16,18 +17,18 @@
plt = None
import wally
-from wally.utils import ssize2b
-from wally.statistic import round_3_digit
-from wally.suits.io.fio_task_parser import (get_test_sync_mode,
- get_test_summary,
- parse_all_in_1,
- abbv_name_to_full)
+from .utils import ssize2b
+from .statistic import round_3_digit
+from .suits.io.fio_task_parser import (get_test_sync_mode,
+ get_test_summary,
+ parse_all_in_1,
+ abbv_name_to_full)
logger = logging.getLogger("wally.report")
-class DiskInfo(object):
+class DiskInfo:
def __init__(self):
self.direct_iops_r_max = 0
self.direct_iops_w_max = 0
@@ -46,7 +47,7 @@
class Attrmapper(object):
- def __init__(self, dct):
+ def __init__(self, dct: Dict):
self.__dct = dct
def __getattr__(self, name):
@@ -347,12 +348,6 @@
data['bw_write_max'][1],
data['bw_write_max'][2])
- # templ_name = 'report_ceph_1.html'
-
- # import pprint
- # pprint.pprint(data)
- # pprint.pprint(info.__dict__)
-
images.update(data)
templ = get_template(templ_name)
return templ.format(lab_info=lab_description,
@@ -576,7 +571,7 @@
# res.lat.average,
res.concurence))
- rws4k_iops_lat_th.sort(key=lambda (_1, _2, conc): conc)
+ rws4k_iops_lat_th.sort(key=lambda x: x[2])
latv = [lat for _, lat, _ in rws4k_iops_lat_th]
diff --git a/wally/run_test.py b/wally/run_test.py
index f99f445..4390f5e 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,7 +5,7 @@
import functools
import contextlib
import collections
-
+from typing import List, Dict, Optional, Iterable, Any, Generator, Mapping, Callable
from yaml import load as _yaml_load
try:
@@ -14,19 +14,21 @@
except ImportError:
yaml_load = _yaml_load
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, wait
-from wally.hw_info import get_hw_info
-from wally.config import get_test_files
-from wally.discover import discover, Node
-from wally import pretty_yaml, utils, report, ssh_utils, start_vms
-from wally.sensors_utils import with_sensors_util, sensors_info_util
+from .config import Config
+from .config import get_test_files
+from .discover import discover, Node
+from .inode import INode
+from .test_run_class import TestRun
-from wally.suits.mysql import MysqlTest
-from wally.suits.itest import TestConfig
-from wally.suits.io.fio import IOPerfTest
-from wally.suits.postgres import PgBenchTest
-from wally.suits.omgbench import OmgTest
+from . import pretty_yaml, utils, report, ssh_utils, start_vms
+
+from .suits.mysql import MysqlTest
+from .suits.itest import TestConfig
+from .suits.io.fio import IOPerfTest
+from .suits.postgres import PgBenchTest
+from .suits.omgbench import OmgTest
TOOL_TYPE_MAPPER = {
@@ -40,9 +42,8 @@
logger = logging.getLogger("wally")
-def connect_all(nodes, spawned_node=False):
- """
- Connect to all nodes, log errors
+def connect_all(nodes: Iterable[INode], spawned_node: Optional[bool]=False) -> None:
+ """Connect to all nodes, log errors
nodes:[Node] - list of nodes
spawned_node:bool - whenever nodes is newly spawned VM
"""
@@ -51,93 +52,87 @@
conn_timeout = 240 if spawned_node else 30
- def connect_ext(conn_url):
+ def connect_ext(node: INode) -> bool:
try:
- return ssh_utils.connect(conn_url, conn_timeout=conn_timeout)
+ node.connect_ssh(conn_timeout)
+ node.connect_rpc(conn_timeout)
+ return True
except Exception as exc:
- logger.error("During connect to {0}: {1!s}".format(conn_url, exc))
- return None
-
- urls = []
- ssh_pref = "ssh://"
-
- for node in nodes:
- if node.conn_url == 'local':
- urls.append(node.conn_url)
- elif node.conn_url.startswith(ssh_pref):
- urls.append(node.conn_url[len(ssh_pref):])
- else:
- msg = "Unknown url type {0}".format(node.conn_url)
- logger.error(msg)
- raise utils.StopTestError(msg)
+ logger.error("During connect to {}: {!s}".format(node, exc))
+ return False
with ThreadPoolExecutor(32) as pool:
- for node, conn in zip(nodes, pool.map(connect_ext, urls)):
- node.connection = conn
+ list(pool.map(connect_ext, nodes))
failed_testnodes = []
failed_nodes = []
for node in nodes:
- if node.connection is None:
+ if not node.is_connected():
if 'testnode' in node.roles:
- failed_testnodes.append(node.get_conn_id())
+ failed_testnodes.append(node)
else:
- failed_nodes.append(node.get_conn_id())
+ failed_nodes.append(node)
- if failed_nodes != []:
- msg = "Node(s) {0} would be excluded - can't connect"
- logger.warning(msg.format(",".join(failed_nodes)))
+ if failed_nodes:
+ msg = "Node(s) {} would be excluded - can't connect"
+ logger.warning(msg.format(",".join(map(str, failed_nodes))))
- if failed_testnodes != []:
- msg = "Can't connect to testnode(s) " + ",".join(failed_testnodes)
+ if failed_testnodes:
+ msg = "Can't connect to testnode(s) " + \
+ ",".join(map(str, failed_testnodes))
logger.error(msg)
raise utils.StopTestError(msg)
- if len(failed_nodes) == 0:
+ if not failed_nodes:
logger.info("All nodes connected successfully")
-def collect_hw_info_stage(cfg, ctx):
- if os.path.exists(cfg['hwreport_fname']):
+def collect_hw_info_stage(cfg: Config, nodes: Iterable[INode]) -> None:
+ # TODO(koder): rewrite this function, to use other storage type
+ if os.path.exists(cfg.hwreport_fname):
msg = "{0} already exists. Skip hw info"
logger.info(msg.format(cfg['hwreport_fname']))
return
with ThreadPoolExecutor(32) as pool:
- connections = (node.connection for node in ctx.nodes)
- ctx.hw_info.extend(pool.map(get_hw_info, connections))
+ fitures = pool.submit(node.discover_hardware_info
+ for node in nodes)
+ wait(fitures)
- with open(cfg['hwreport_fname'], 'w') as hwfd:
- for node, info in zip(ctx.nodes, ctx.hw_info):
+ with open(cfg.hwreport_fname, 'w') as hwfd:
+ for node in nodes:
hwfd.write("-" * 60 + "\n")
hwfd.write("Roles : " + ", ".join(node.roles) + "\n")
- hwfd.write(str(info) + "\n")
+ hwfd.write(str(node.hwinfo) + "\n")
hwfd.write("-" * 60 + "\n\n")
- if info.hostname is not None:
+ if node.hwinfo.hostname is not None:
fname = os.path.join(
cfg.hwinfo_directory,
- info.hostname + "_lshw.xml")
+ node.hwinfo.hostname + "_lshw.xml")
with open(fname, "w") as fd:
- fd.write(info.raw)
- logger.info("Hardware report stored in " + cfg['hwreport_fname'])
- logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
+ fd.write(node.hwinfo.raw)
+
+ logger.info("Hardware report stored in " + cfg.hwreport_fname)
+ logger.debug("Raw hardware info in " + cfg.hwinfo_directory + " folder")
@contextlib.contextmanager
-def suspend_vm_nodes_ctx(unused_nodes):
+def suspend_vm_nodes_ctx(unused_nodes: Iterable[INode]) -> Generator[List[int]]:
+
pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
if node.os_vm_id is not None]
+
non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
if 0 != non_pausable:
- logger.warning("Can't pause {0} nodes".format(
+ logger.warning("Can't pause {} nodes".format(
non_pausable))
if len(pausable_nodes_ids) != 0:
- logger.debug("Try to pause {0} unused nodes".format(
+ logger.debug("Try to pause {} unused nodes".format(
len(pausable_nodes_ids)))
start_vms.pause(pausable_nodes_ids)
@@ -145,20 +140,20 @@
yield pausable_nodes_ids
finally:
if len(pausable_nodes_ids) != 0:
- logger.debug("Unpausing {0} nodes".format(
+ logger.debug("Unpausing {} nodes".format(
len(pausable_nodes_ids)))
start_vms.unpause(pausable_nodes_ids)
-def generate_result_dir_name(results, name, params):
+def generate_result_dir_name(results: str, name: str, params: Dict[str, Any]) -> str:
# make a directory for results
all_tests_dirs = os.listdir(results)
if 'name' in params:
- dir_name = "{0}_{1}".format(name, params['name'])
+ dir_name = "{}_{}".format(name, params['name'])
else:
for idx in range(len(all_tests_dirs) + 1):
- dir_name = "{0}_{1}".format(name, idx)
+ dir_name = "{}_{}".format(name, idx)
if dir_name not in all_tests_dirs:
break
else:
@@ -167,7 +162,13 @@
return os.path.join(results, dir_name)
-def run_tests(cfg, test_block, nodes):
+@contextlib.contextmanager
+def sensor_monitoring(sensor_cfg: Any, nodes: Iterable[INode]) -> Generator[None]:
+ # TODO(koder): write this function
+ pass
+
+
+def run_tests(cfg: Config, test_block, nodes: Iterable[INode]) -> None:
"""
Run test from test block
"""
@@ -183,11 +184,11 @@
# iterate over all node counts
limit = params.get('node_limit', len(test_nodes))
- if isinstance(limit, (int, long)):
+ if isinstance(limit, int):
vm_limits = [limit]
else:
list_or_tpl = isinstance(limit, (tuple, list))
- all_ints = list_or_tpl and all(isinstance(climit, (int, long))
+ all_ints = list_or_tpl and all(isinstance(climit, int)
for climit in limit)
if not all_ints:
msg = "'node_limit' parameter ion config should" + \
@@ -221,50 +222,39 @@
if node.os_vm_id is not None]
if len(resumable_nodes_ids) != 0:
- logger.debug("Check and unpause {0} nodes".format(
+ logger.debug("Check and unpause {} nodes".format(
len(resumable_nodes_ids)))
start_vms.unpause(resumable_nodes_ids)
- sens_nodes = curr_test_nodes + not_test_nodes
- with sensors_info_util(cfg, sens_nodes) as sensor_data:
- test_cls = TOOL_TYPE_MAPPER[name]
+ test_cls = TOOL_TYPE_MAPPER[name]
- remote_dir = cfg.default_test_local_folder.format(name=name)
+ remote_dir = cfg.default_test_local_folder.format(name=name)
- test_cfg = TestConfig(test_cls.__name__,
- params=params,
- test_uuid=cfg.run_uuid,
- nodes=test_nodes,
- log_directory=results_path,
- remote_dir=remote_dir)
+ test_cfg = TestConfig(test_cls.__name__,
+ params=params,
+ test_uuid=cfg.run_uuid,
+ nodes=test_nodes,
+ log_directory=results_path,
+ remote_dir=remote_dir)
- t_start = time.time()
- res = test_cls(test_cfg).run()
- t_end = time.time()
-
- # save sensor data
- if sensor_data is not None:
- fname = "{0}_{1}.csv".format(int(t_start), int(t_end))
- fpath = os.path.join(cfg.sensor_storage, fname)
-
- with open(fpath, "w") as fd:
- fd.write("\n\n".join(sensor_data))
+ t_start = time.time()
+ res = test_cls(test_cfg).run()
+ t_end = time.time()
results.append(res)
yield name, results
-def connect_stage(cfg, ctx):
+def connect_stage(cfg: Config, ctx: TestRun) -> None:
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
ctx.nodes = [node for node in ctx.nodes if node.connection is not None]
-def discover_stage(cfg, ctx):
- """
- discover clusters and nodes stage
- """
+def discover_stage(cfg: Config, ctx: TestRun) -> None:
+ """discover clusters and nodes stage"""
+
if cfg.get('discover') is not None:
discover_objs = [i.strip() for i in cfg.discover.strip().split(",")]
@@ -280,7 +270,8 @@
ctx.nodes.append(Node(url, roles.split(",")))
-def save_nodes_stage(cfg, ctx):
+def save_nodes_stage(cfg: Config, ctx: TestRun) -> None:
+ """Save nodes list to file"""
cluster = {}
for node in ctx.nodes:
roles = node.roles[:]
@@ -294,15 +285,15 @@
fd.write(pretty_yaml.dumps(cluster))
-def reuse_vms_stage(cfg, ctx):
+def reuse_vms_stage(cfg: Config, ctx: TestRun) -> None:
vms_patterns = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
private_key_path = get_vm_keypair(cfg)['keypair_file_private']
for creds in vms_patterns:
user_name, vm_name_pattern = creds.split("@", 1)
- msg = "Vm like {0} lookup failed".format(vm_name_pattern)
+ msg = "Vm like {} lookup failed".format(vm_name_pattern)
- with utils.log_error(msg):
+ with utils.LogError(msg):
msg = "Looking for vm with name like {0}".format(vm_name_pattern)
logger.debug(msg)
@@ -321,7 +312,7 @@
ctx.nodes.append(node)
-def get_OS_credentials(cfg, ctx):
+def get_OS_credentials(cfg: Config, ctx: TestRun) -> None:
creds = None
os_creds = None
force_insecure = False
@@ -371,7 +362,7 @@
return creds
-def get_vm_keypair(cfg):
+def get_vm_keypair(cfg: Config) -> Dict[str, str]:
res = {}
for field, ext in (('keypair_file_private', 'pem'),
('keypair_file_public', 'pub')):
@@ -388,7 +379,7 @@
@contextlib.contextmanager
-def create_vms_ctx(ctx, cfg, config, already_has_count=0):
+def create_vms_ctx(ctx: TestRun, cfg: Config, config, already_has_count: int=0) -> Generator[List[INode]]:
if config['count'].startswith('='):
count = int(config['count'][1:])
if count <= already_has_count:
@@ -436,7 +427,7 @@
ctx.nodes = old_nodes
-def run_tests_stage(cfg, ctx):
+def run_tests_stage(cfg: Config, ctx: TestRun) -> None:
ctx.results = collections.defaultdict(lambda: [])
for group in cfg.get('tests', []):
@@ -469,7 +460,7 @@
if cfg.get('sensors') is None:
sensor_ctx = utils.empty_ctx()
else:
- sensor_ctx = with_sensors_util(cfg.get('sensors'), ctx.nodes)
+ sensor_ctx = sensor_monitoring(cfg.get('sensors'), ctx.nodes)
with vm_ctx as new_nodes:
if len(new_nodes) != 0:
@@ -482,7 +473,7 @@
ctx.results[tp].extend(res)
-def shut_down_vms_stage(cfg, ctx):
+def shut_down_vms_stage(cfg: Config, ctx: TestRun) -> None:
vm_ids_fname = cfg.vm_ids_fname
if ctx.openstack_nodes_ids is None:
nodes_ids = open(vm_ids_fname).read().split()
@@ -498,17 +489,17 @@
os.remove(vm_ids_fname)
-def store_nodes_in_log(cfg, nodes_ids):
+def store_nodes_in_log(cfg: Config, nodes_ids: Iterable[str]):
with open(cfg.vm_ids_fname, 'w') as fd:
fd.write("\n".join(nodes_ids))
-def clear_enviroment(cfg, ctx):
+def clear_enviroment(cfg: Config, ctx: TestRun) -> None:
if os.path.exists(cfg.vm_ids_fname):
shut_down_vms_stage(cfg, ctx)
-def disconnect_stage(cfg, ctx):
+def disconnect_stage(cfg: Config, ctx: TestRun) -> None:
ssh_utils.close_all_sessions()
for node in ctx.nodes:
@@ -516,7 +507,7 @@
node.connection.close()
-def store_raw_results_stage(cfg, ctx):
+def store_raw_results_stage(cfg: Config, ctx: TestRun) -> None:
if os.path.exists(cfg.raw_results):
cont = yaml_load(open(cfg.raw_results).read())
else:
@@ -529,7 +520,7 @@
fd.write(raw_data)
-def console_report_stage(cfg, ctx):
+def console_report_stage(cfg: Config, ctx: TestRun) -> None:
first_report = True
text_rep_fname = cfg.text_report_file
with open(text_rep_fname, "w") as fd:
@@ -558,7 +549,7 @@
print("\n" + rep + "\n")
-def test_load_report_stage(cfg, ctx):
+def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
load_rep_fname = cfg.load_report_file
found = False
for idx, (tp, data) in enumerate(ctx.results.items()):
@@ -572,7 +563,7 @@
report.make_load_report(idx, cfg['results'], load_rep_fname)
-def html_report_stage(cfg, ctx):
+def html_report_stage(cfg: Config, ctx: TestRun) -> None:
html_rep_fname = cfg.html_report_file
found = False
for tp, data in ctx.results.items():
@@ -589,10 +580,10 @@
lab_info=ctx.hw_info)
-def load_data_from_path(test_res_dir):
+def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
files = get_test_files(test_res_dir)
raw_res = yaml_load(open(files['raw_results']).read())
- res = collections.defaultdict(lambda: [])
+ res = collections.defaultdict(list)
for tp, test_lists in raw_res:
for tests in test_lists:
@@ -603,10 +594,10 @@
return res
-def load_data_from_path_stage(var_dir, _, ctx):
+def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
for tp, vals in load_data_from_path(var_dir).items():
ctx.results.setdefault(tp, []).extend(vals)
-def load_data_from(var_dir):
+def load_data_from(var_dir: str) -> Callable:
return functools.partial(load_data_from_path_stage, var_dir)
diff --git a/wally/sensors.py b/wally/sensors.py
new file mode 100644
index 0000000..7560a11
--- /dev/null
+++ b/wally/sensors.py
@@ -0,0 +1,401 @@
+import os
+from collections import namedtuple
+
+
+SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+
+
+def provides(name: str):
+ def closure(func):
+ return func
+ return closure
+
+
+def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
+ dev_ok = True
+
+ if disallowed_prefixes is not None:
+ dev_ok = all(not name.startswith(prefix)
+ for prefix in disallowed_prefixes)
+
+ if dev_ok and allowed_prefixes is not None:
+ dev_ok = any(name.startswith(prefix)
+ for prefix in allowed_prefixes)
+
+ return dev_ok
+
+
+def get_pid_list(disallowed_prefixes, allowed_prefixes):
+ """Return pid list from list of pids and names"""
+ # exceptions
+ but = disallowed_prefixes if disallowed_prefixes is not None else []
+ if allowed_prefixes is None:
+ # if nothing setted - all ps will be returned except setted
+ result = [pid
+ for pid in os.listdir('/proc')
+ if pid.isdigit() and pid not in but]
+ else:
+ result = []
+ for pid in os.listdir('/proc'):
+ if pid.isdigit() and pid not in but:
+ name = get_pid_name(pid)
+ if pid in allowed_prefixes or \
+ any(name.startswith(val) for val in allowed_prefixes):
+ # this is allowed pid?
+ result.append(pid)
+ return result
+
+
+def get_pid_name(pid):
+ """Return name by pid"""
+ try:
+ with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
+ try:
+ cmd = pidfile.readline().split()[0]
+ return os.path.basename(cmd).rstrip('\x00')
+ except IndexError:
+ # no cmd returned
+ return "<NO NAME>"
+ except IOError:
+ # upstream wait any string, no matter if we couldn't read proc
+ return "no_such_process"
+
+
+def delta(func, only_upd=True):
+ prev = {}
+ while True:
+ for dev_name, vals in func():
+ if dev_name not in prev:
+ prev[dev_name] = {}
+ for name, (val, _) in vals.items():
+ prev[dev_name][name] = val
+ else:
+ dev_prev = prev[dev_name]
+ res = {}
+ for stat_name, (val, accum_val) in vals.items():
+ if accum_val:
+ if stat_name in dev_prev:
+ delta = int(val) - int(dev_prev[stat_name])
+ if not only_upd or 0 != delta:
+ res[stat_name] = str(delta)
+ dev_prev[stat_name] = val
+ elif not only_upd or '0' != val:
+ res[stat_name] = val
+
+ if only_upd and len(res) == 0:
+ continue
+ yield dev_name, res
+ yield None, None
+
+
+# 1 - major number
+# 2 - minor mumber
+# 3 - device name
+# 4 - reads completed successfully
+# 5 - reads merged
+# 6 - sectors read
+# 7 - time spent reading (ms)
+# 8 - writes completed
+# 9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+io_values_pos = [
+ (3, 'reads_completed', True),
+ (5, 'sectors_read', True),
+ (6, 'rtime', True),
+ (7, 'writes_completed', True),
+ (9, 'sectors_written', True),
+ (10, 'wtime', True),
+ (11, 'io_queue', False),
+ (13, 'io_time', True)
+]
+
+
+@provides("block-io")
+def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
+ results = {}
+ for line in open('/proc/diskstats'):
+ vals = line.split()
+ dev_name = vals[2]
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+ if dev_name[-1].isdigit():
+ dev_ok = False
+
+ if dev_ok:
+ for pos, name, accum_val in io_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
+
+
+def get_latency(stat1, stat2):
+ disks = set(i.split('.', 1)[0] for i in stat1)
+ results = {}
+
+ for disk in disks:
+ rdc = disk + '.reads_completed'
+ wrc = disk + '.writes_completed'
+ rdt = disk + '.rtime'
+ wrt = disk + '.wtime'
+ lat = 0.0
+
+ io_ops1 = stat1[rdc].value + stat1[wrc].value
+ io_ops2 = stat2[rdc].value + stat2[wrc].value
+
+ diops = io_ops2 - io_ops1
+
+ if diops != 0:
+ io1 = stat1[rdt].value + stat1[wrt].value
+ io2 = stat2[rdt].value + stat2[wrt].value
+ lat = abs(float(io1 - io2)) / diops
+
+ results[disk + '.latence'] = SensorInfo(lat, False)
+
+ return results
+
+
+# 1 - major number
+# 2 - minor mumber
+# 3 - device name
+# 4 - reads completed successfully
+# 5 - reads merged
+# 6 - sectors read
+# 7 - time spent reading (ms)
+# 8 - writes completed
+# 9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+net_values_pos = [
+ (0, 'recv_bytes', True),
+ (1, 'recv_packets', True),
+ (8, 'send_bytes', True),
+ (9, 'send_packets', True),
+]
+
+
+@provides("net-io")
+def net_stat(disallowed_prefixes=('docker', 'lo'), allowed_prefixes=('eth',)):
+ results = {}
+
+ for line in open('/proc/net/dev').readlines()[2:]:
+ dev_name, stats = line.split(":", 1)
+ dev_name = dev_name.strip()
+ vals = stats.split()
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+
+ if '.' in dev_name and dev_name.split('.')[-1].isdigit():
+ dev_ok = False
+
+ if dev_ok:
+ for pos, name, accum_val in net_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+ return results
+
+
+@provides("perprocess-cpu")
+def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+ pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+
+ for pid in pid_list:
+ try:
+ dev_name = get_pid_name(pid)
+
+ pid_stat1 = pid_stat(pid)
+
+ sensor_name = "{0}.{1}".format(dev_name, pid)
+ results[sensor_name] = SensorInfo(pid_stat1, True)
+ except IOError:
+ # may be, proc has already terminated, skip it
+ continue
+ return results
+
+
+def pid_stat(pid):
+ """Return total cpu usage time from process"""
+ # read /proc/pid/stat
+ with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
+ proctimes = pidfile.readline().split()
+ # get utime from /proc/<pid>/stat, 14 item
+ utime = proctimes[13]
+ # get stime from proc/<pid>/stat, 15 item
+ stime = proctimes[14]
+ # count total process used time
+ return float(int(utime) + int(stime))
+
+
+# Based on ps_mem.py:
+# Licence: LGPLv2
+# Author: P@draigBrady.com
+# Source: http://www.pixelbeat.org/scripts/ps_mem.py
+# http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+
+
+# Note shared is always a subset of rss (trs is not always)
+def get_mem_stats(pid):
+ """Return memory data of pid in format (private, shared)"""
+
+ fname = '/proc/{0}/{1}'.format(pid, "smaps")
+ lines = open(fname).readlines()
+
+ shared = 0
+ private = 0
+ pss = 0
+
+ # add 0.5KiB as this avg error due to trunctation
+ pss_adjust = 0.5
+
+ for line in lines:
+ if line.startswith("Shared"):
+ shared += int(line.split()[1])
+
+ if line.startswith("Private"):
+ private += int(line.split()[1])
+
+ if line.startswith("Pss"):
+ pss += float(line.split()[1]) + pss_adjust
+
+ # Note Shared + Private = Rss above
+ # The Rss in smaps includes video card mem etc.
+
+ if pss != 0:
+ shared = int(pss - private)
+
+ return (private, shared)
+
+
+@provides("perprocess-ram")
+def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+ pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+ for pid in pid_list:
+ try:
+ dev_name = get_pid_name(pid)
+
+ private, shared = get_mem_stats(pid)
+ total = private + shared
+ sys_total = get_ram_size()
+ usage = float(total) / float(sys_total)
+
+ sensor_name = "{0}({1})".format(dev_name, pid)
+
+ results[sensor_name + ".private_mem"] = SensorInfo(private, False)
+ results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
+ results[sensor_name + ".used_mem"] = SensorInfo(total, False)
+ name = sensor_name + ".mem_usage_percent"
+ results[name] = SensorInfo(usage * 100, False)
+ except IOError:
+ # permission denied or proc die
+ continue
+ return results
+
+
+def get_ram_size():
+ """Return RAM size in Kb"""
+ with open("/proc/meminfo") as proc:
+ mem_total = proc.readline().split()
+ return mem_total[1]
+
+
+# 0 - cpu name
+# 1 - user: normal processes executing in user mode
+# 2 - nice: niced processes executing in user mode
+# 3 - system: processes executing in kernel mode
+# 4 - idle: twiddling thumbs
+# 5 - iowait: waiting for I/O to complete
+# 6 - irq: servicing interrupts
+# 7 - softirq: servicing softirqs
+
+io_values_pos = [
+ (1, 'user_processes', True),
+ (2, 'nice_processes', True),
+ (3, 'system_processes', True),
+ (4, 'idle_time', True),
+]
+
+
+@provides("system-cpu")
+def syscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+
+ # calculate core count
+ core_count = 0
+
+ for line in open('/proc/stat'):
+ vals = line.split()
+ dev_name = vals[0]
+
+ if dev_name == 'cpu':
+ for pos, name, accum_val in io_values_pos:
+ sensor_name = "{0}.{1}".format(dev_name, name)
+ results[sensor_name] = SensorInfo(int(vals[pos]),
+ accum_val)
+ elif dev_name == 'procs_blocked':
+ val = int(vals[1])
+ results["cpu.procs_blocked"] = SensorInfo(val, False)
+ elif dev_name.startswith('cpu'):
+ core_count += 1
+
+ # procs in queue
+ TASKSPOS = 3
+ vals = open('/proc/loadavg').read().split()
+ ready_procs = vals[TASKSPOS].partition('/')[0]
+ # dec on current proc
+ procs_queue = (float(ready_procs) - 1) / core_count
+ results["cpu.procs_queue"] = SensorInfo(procs_queue, False)
+
+ return results
+
+
+# return this values or setted in allowed
+ram_fields = [
+ 'MemTotal',
+ 'MemFree',
+ 'Buffers',
+ 'Cached',
+ 'SwapCached',
+ 'Dirty',
+ 'Writeback',
+ 'SwapTotal',
+ 'SwapFree'
+]
+
+
+@provides("system-ram")
+def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ if allowed_prefixes is None:
+ allowed_prefixes = ram_fields
+ results = {}
+ for line in open('/proc/meminfo'):
+ vals = line.split()
+ dev_name = vals[0].rstrip(":")
+
+ dev_ok = is_dev_accepted(dev_name,
+ disallowed_prefixes,
+ allowed_prefixes)
+
+ title = "ram.{0}".format(dev_name)
+
+ if dev_ok:
+ results[title] = SensorInfo(int(vals[1]), False)
+
+ if 'ram.MemFree' in results and 'ram.MemTotal' in results:
+ used = results['ram.MemTotal'].value - results['ram.MemFree'].value
+ usage = float(used) / results['ram.MemTotal'].value
+ results["ram.usage_percent"] = SensorInfo(usage, False)
+ return results
diff --git a/wally/sensors/__init__.py b/wally/sensors/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wally/sensors/__init__.py
+++ /dev/null
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
deleted file mode 100644
index 68b2e7d..0000000
--- a/wally/sensors/api.py
+++ /dev/null
@@ -1,94 +0,0 @@
-import os
-import time
-import json
-import logging
-import contextlib
-
-from concurrent.futures import ThreadPoolExecutor
-
-from wally.ssh_utils import (copy_paths, run_over_ssh,
- save_to_remote, read_from_remote)
-
-
-logger = logging.getLogger("wally.sensors")
-
-
-class SensorConfig(object):
- def __init__(self, conn, url, sensors, source_id,
- monitor_url=None):
- self.conn = conn
- self.url = url
- self.sensors = sensors
- self.source_id = source_id
- self.monitor_url = monitor_url
-
-
-@contextlib.contextmanager
-def with_sensors(sensor_configs, remote_path):
- paths = {os.path.dirname(__file__):
- os.path.join(remote_path, "sensors")}
- config_remote_path = os.path.join(remote_path, "conf.json")
-
- def deploy_sensors(node_sensor_config):
- # check that path already exists
- copy_paths(node_sensor_config.conn, paths)
- with node_sensor_config.conn.open_sftp() as sftp:
- sensors_config = node_sensor_config.sensors.copy()
- sensors_config['source_id'] = node_sensor_config.source_id
- save_to_remote(sftp, config_remote_path,
- json.dumps(sensors_config))
-
- def remove_sensors(node_sensor_config):
- run_over_ssh(node_sensor_config.conn,
- "rm -rf {0}".format(remote_path),
- node=node_sensor_config.url, timeout=10)
-
- logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs)))
- with ThreadPoolExecutor(max_workers=32) as executor:
- list(executor.map(deploy_sensors, sensor_configs))
- try:
- yield
- finally:
- list(executor.map(remove_sensors, sensor_configs))
-
-
-@contextlib.contextmanager
-def sensors_info(sensor_configs, remote_path):
- config_remote_path = os.path.join(remote_path, "conf.json")
-
- def start_sensors(node_sensor_config):
- cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
- "sensors.main -d start -u {1} {2}"
-
- cmd = cmd_templ.format(remote_path,
- node_sensor_config.monitor_url,
- config_remote_path)
-
- run_over_ssh(node_sensor_config.conn, cmd,
- node=node_sensor_config.url)
-
- def stop_and_gather_data(node_sensor_config):
- cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
- cmd = cmd.format(remote_path)
- run_over_ssh(node_sensor_config.conn, cmd,
- node=node_sensor_config.url)
- # some magic
- time.sleep(1)
-
- assert node_sensor_config.monitor_url.startswith("csvfile://")
-
- res_path = node_sensor_config.monitor_url.split("//", 1)[1]
- with node_sensor_config.conn.open_sftp() as sftp:
- res = read_from_remote(sftp, res_path)
-
- return res
-
- results = []
-
- logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs)))
- with ThreadPoolExecutor(max_workers=32) as executor:
- list(executor.map(start_sensors, sensor_configs))
- try:
- yield results
- finally:
- results.extend(executor.map(stop_and_gather_data, sensor_configs))
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
deleted file mode 100644
index 1b6993c..0000000
--- a/wally/sensors/cp_protocol.py
+++ /dev/null
@@ -1,227 +0,0 @@
-#!/usr/bin/env python
-""" Protocol class """
-
-import re
-import zlib
-import json
-import logging
-import binascii
-
-
-logger = logging.getLogger("wally.sensors")
-
-
-# protocol contains 2 type of packet:
-# 1 - header, which contains template schema of counters
-# 2 - body, which contains only values in order as in template
-# it uses msgpack (or provided packer) for optimization
-#
-# packet has format:
-# begin_data_prefixSIZE\n\nDATAend_data_postfix
-# packet part has format:
-# SIZE\n\rDATA
-#
-# DATA use archivation
-
-
-class PacketException(Exception):
- """ Exceptions from Packet"""
- pass
-
-
-class Packet(object):
- """ Class proceed packet by protocol"""
-
- prefix = "begin_data_prefix"
- postfix = "end_data_postfix"
- header_prefix = "template"
- # other fields
- # is_begin
- # is_end
- # crc
- # data
- # data_len
-
- def __init__(self, packer):
- # preinit
- self.is_begin = False
- self.is_end = False
- self.crc = None
- self.data = ""
- self.data_len = None
- self.srv_template = None
- self.clt_template = None
- self.tmpl_size = 0
- self.packer = packer
-
- def new_packet(self, part):
- """ New packet adding """
- # proceed packet
- try:
- # get size
- local_size_s, _, part = part.partition("\n\r")
- local_size = int(local_size_s)
-
- # find prefix
- begin = part.find(self.prefix)
- if begin != -1:
- # divide data if something before begin and prefix
- from_i = begin + len(self.prefix)
- part = part[from_i:]
- # reset flags
- self.is_begin = True
- self.is_end = False
- self.data = ""
- # get size
- data_len_s, _, part = part.partition("\n\r")
- self.data_len = int(data_len_s)
- # get crc
- crc_s, _, part = part.partition("\n\r")
- self.crc = int(crc_s)
-
- # bad size?
- if local_size != self.data_len:
- raise PacketException("Part size error")
-
- # find postfix
- end = part.find(self.postfix)
- if end != -1:
- # divide postfix
- part = part[:end]
- self.is_end = True
-
- self.data += part
- # check if it is end
- if self.is_end:
- self.data = zlib.decompress(self.data)
- if self.data_len != len(self.data):
- raise PacketException("Total size error")
- if binascii.crc32(self.data) != self.crc:
- raise PacketException("CRC error")
-
- # check, if it is template
- if self.data.startswith(self.header_prefix):
- self.srv_template = self.data
- # template is for internal use
- return None
-
- # decode values list
- vals = self.packer.unpack(self.data)
- dump = self.srv_template % tuple(vals)
- return dump
- else:
- return None
-
- except PacketException as e:
- # if something wrong - skip packet
- logger.warning("Packet skipped: %s", e)
- self.is_begin = False
- self.is_end = False
- return None
-
- except TypeError:
- # if something wrong - skip packet
- logger.warning("Packet skipped: doesn't match schema")
- self.is_begin = False
- self.is_end = False
- return None
-
- except:
- # if something at all wrong - skip packet
- logger.warning("Packet skipped: something is wrong")
- self.is_begin = False
- self.is_end = False
- return None
-
- @staticmethod
- def create_packet(data, part_size):
- """ Create packet divided by parts with part_size from data
- No compression here """
- # prepare data
- data_len = "%i\n\r" % len(data)
- header = "%s%s%s\n\r" % (Packet.prefix, data_len, binascii.crc32(data))
- compact_data = zlib.compress(data)
- packet = "%s%s%s" % (header, compact_data, Packet.postfix)
-
- partheader_len = len(data_len)
-
- beg = 0
- end = part_size - partheader_len
-
- result = []
- while beg < len(packet):
- block = packet[beg:beg+end]
- result.append(data_len + block)
- beg += end
-
- return result
-
- def create_packet_v2(self, data, part_size):
- """ Create packet divided by parts with part_size from data
- Compressed """
- result = []
- # create and add to result template header
- if self.srv_template is None:
- perf_string = json.dumps(data)
- self.create_answer_template(perf_string)
- template = self.header_prefix + self.srv_template
- header = Packet.create_packet(template, part_size)
- result.extend(header)
-
- vals = self.get_matching_value_list(data)
- body = self.packer.pack(vals)
- parts = Packet.create_packet(body, part_size)
- result.extend(parts)
- return result
-
- def get_matching_value_list(self, data):
- """ Get values in order server expect"""
- vals = range(0, self.tmpl_size)
-
- try:
- for node, groups in self.clt_template.items():
- for group, counters in groups.items():
- for counter, index in counters.items():
- if not isinstance(index, dict):
- vals[index] = data[node][group][counter]
- else:
- for k, i in index.items():
- vals[i] = data[node][group][counter][k]
-
- return vals
-
- except (IndexError, KeyError):
- logger = logging.getLogger(__name__)
- logger.error("Data don't match last schema")
- raise PacketException("Data don't match last schema")
-
- def create_answer_template(self, perf_string):
- """ Create template for server to insert counter values
- Return tuple of server and clien templates + number of replaces"""
- replacer = re.compile(": [0-9]+\.?[0-9]*")
- # replace all values by %s
- finditer = replacer.finditer(perf_string)
- # server not need know positions
- self.srv_template = ""
- # client need positions
- clt_template = ""
- beg = 0
- k = 0
- # this could be done better?
- for match in finditer:
- # define input place in server template
- self.srv_template += perf_string[beg:match.start()]
- self.srv_template += ": %s"
- # define match number in client template
- clt_template += perf_string[beg:match.start()]
- clt_template += ": %i" % k
-
- beg = match.end()
- k += 1
-
- # add tail
- self.srv_template += perf_string[beg:]
- clt_template += perf_string[beg:]
-
- self.tmpl_size = k
- self.clt_template = json.loads(clt_template)
diff --git a/wally/sensors/cp_transport.py b/wally/sensors/cp_transport.py
deleted file mode 100644
index 2e00e80..0000000
--- a/wally/sensors/cp_transport.py
+++ /dev/null
@@ -1,157 +0,0 @@
-#!/usr/bin/env python
-""" UDP sender class """
-
-import socket
-import urlparse
-
-from cp_protocol import Packet
-
-try:
- from disk_perf_test_tool.logger import define_logger
- logger = define_logger(__name__)
-except ImportError:
- class Logger(object):
- def debug(self, *dt):
- pass
- logger = Logger()
-
-
-class SenderException(Exception):
- """ Exceptions in Sender class """
- pass
-
-
-class Timeout(Exception):
- """ Exceptions in Sender class """
- pass
-
-
-class Sender(object):
- """ UDP sender class """
-
- def __init__(self, packer, url=None, port=None, host="0.0.0.0", size=256):
- """ Create connection object from input udp string or params"""
-
- # test input
- if url is None and port is None:
- raise SenderException("Bad initialization")
- if url is not None:
- data = urlparse.urlparse(url)
- # check schema
- if data.scheme != "udp":
- mes = "Bad protocol type: %s instead of UDP" % data.scheme
- logger.error(mes)
- raise SenderException("Bad protocol type")
- # try to get port
- try:
- int_port = int(data.port)
- except ValueError:
- logger.error("Bad UDP port")
- raise SenderException("Bad UDP port")
- # save paths
- self.sendto = (data.hostname, int_port)
- self.bindto = (data.hostname, int_port)
- # try to get size
- try:
- self.size = int(data.path.strip("/"))
- except ValueError:
- logger.error("Bad packet part size")
- raise SenderException("Bad packet part size")
- else:
- # url is None - use size and port
- self.sendto = (host, port)
- self.bindto = ("0.0.0.0", port)
- self.size = size
-
- self.packer = packer
-
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.binded = False
- self.all_data = {}
- self.send_packer = None
-
- def bind(self):
- """ Prepare for listening """
- self.sock.bind(self.bindto)
- self.sock.settimeout(0.5)
- self.binded = True
-
- def send(self, data):
- """ Send data to udp socket"""
- if self.sock.sendto(data, self.sendto) != len(data):
- mes = "Cannot send data to %s:%s" % self.sendto
- logger.error(mes)
- raise SenderException("Cannot send data")
-
- def send_by_protocol(self, data):
- """ Send data by Packet protocol
- data = dict"""
- if self.send_packer is None:
- self.send_packer = Packet(self.packer())
- parts = self.send_packer.create_packet_v2(data, self.size)
- for part in parts:
- self.send(part)
-
- def recv(self):
- """ Receive data from udp socket"""
- # check for binding
- if not self.binded:
- self.bind()
- # try to recv
- try:
- data, (remote_ip, _) = self.sock.recvfrom(self.size)
- return data, remote_ip
- except socket.timeout:
- raise Timeout()
-
- def recv_by_protocol(self):
- """ Receive data from udp socket by Packet protocol"""
- data, remote_ip = self.recv()
-
- if remote_ip not in self.all_data:
- self.all_data[remote_ip] = Packet(self.packer())
-
- return self.all_data[remote_ip].new_packet(data)
-
- def recv_with_answer(self, stop_event=None):
- """ Receive data from udp socket and send 'ok' back
- Command port = local port + 1
- Answer port = local port
- Waiting for command is blocking """
- # create command socket
- command_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- command_port = self.bindto[1]+1
- command_sock.bind(("0.0.0.0", command_port))
- command_sock.settimeout(1)
- # try to recv
- while True:
- try:
- data, (remote_ip, _) = command_sock.recvfrom(self.size)
- self.send("ok")
- return data, remote_ip
- except socket.timeout:
- if stop_event is not None and stop_event.is_set():
- # return None if we are interrupted
- return None
-
- def verified_send(self, send_host, message, max_repeat=20):
- """ Send and verify it by answer not more then max_repeat
- Send port = local port + 1
- Answer port = local port
- Return True if send is verified """
- # create send socket
- send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- send_port = self.sendto[1]+1
- for repeat in range(0, max_repeat):
- send_sock.sendto(message, (send_host, send_port))
- try:
- data, remote_ip = self.recv()
- if remote_ip == send_host and data == "ok":
- return True
- else:
- logger.warning("No answer from %s, try %i",
- send_host, repeat)
- except Timeout:
- logger.warning("No answer from %s, try %i", send_host, repeat)
-
- return False
diff --git a/wally/sensors/daemonize.py b/wally/sensors/daemonize.py
deleted file mode 100644
index bc4ab81..0000000
--- a/wally/sensors/daemonize.py
+++ /dev/null
@@ -1,225 +0,0 @@
-# #!/usr/bin/python
-
-import os
-import pwd
-import grp
-import sys
-import fcntl
-import signal
-import atexit
-import logging
-import resource
-
-
-class Daemonize(object):
- """ Daemonize object
- Object constructor expects three arguments:
- - app: contains the application name which will be sent to syslog.
- - pid: path to the pidfile.
- - action: your custom function which will be executed after daemonization.
- - keep_fds: optional list of fds which should not be closed.
- - auto_close_fds: optional parameter to not close opened fds.
- - privileged_action: action that will be executed before
- drop privileges if user or
- group parameter is provided.
- If you want to transfer anything from privileged
- action to action, such as opened privileged file
- descriptor, you should return it from
- privileged_action function and catch it inside action
- function.
- - user: drop privileges to this user if provided.
- - group: drop privileges to this group if provided.
- - verbose: send debug messages to logger if provided.
- - logger: use this logger object instead of creating new one, if provided.
- """
- def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True,
- privileged_action=None, user=None, group=None, verbose=False,
- logger=None):
- self.app = app
- self.pid = pid
- self.action = action
- self.keep_fds = keep_fds or []
- self.privileged_action = privileged_action or (lambda: ())
- self.user = user
- self.group = group
- self.logger = logger
- self.verbose = verbose
- self.auto_close_fds = auto_close_fds
-
- def sigterm(self, signum, frame):
- """ sigterm method
- These actions will be done after SIGTERM.
- """
- self.logger.warn("Caught signal %s. Stopping daemon." % signum)
- os.remove(self.pid)
- sys.exit(0)
-
- def exit(self):
- """ exit method
- Cleanup pid file at exit.
- """
- self.logger.warn("Stopping daemon.")
- os.remove(self.pid)
- sys.exit(0)
-
- def start(self):
- """ start method
- Main daemonization process.
- """
- # If pidfile already exists, we should read pid from there;
- # to overwrite it, if locking
- # will fail, because locking attempt somehow purges the file contents.
- if os.path.isfile(self.pid):
- with open(self.pid, "r") as old_pidfile:
- old_pid = old_pidfile.read()
- # Create a lockfile so that only one instance of this daemon is
- # running at any time.
- try:
- lockfile = open(self.pid, "w")
- except IOError:
- print("Unable to create the pidfile.")
- sys.exit(1)
- try:
- # Try to get an exclusive lock on the file. This will fail if
- # another process has the file
- # locked.
- fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except IOError:
- print("Unable to lock on the pidfile.")
- # We need to overwrite the pidfile if we got here.
- with open(self.pid, "w") as pidfile:
- pidfile.write(old_pid)
- sys.exit(1)
-
- # Fork, creating a new process for the child.
- process_id = os.fork()
- if process_id < 0:
- # Fork error. Exit badly.
- sys.exit(1)
- elif process_id != 0:
- # This is the parent process. Exit.
- sys.exit(0)
- # This is the child process. Continue.
-
- # Stop listening for signals that the parent process receives.
- # This is done by getting a new process id.
- # setpgrp() is an alternative to setsid().
- # setsid puts the process in a new parent group and detaches
- # its controlling terminal.
- process_id = os.setsid()
- if process_id == -1:
- # Uh oh, there was a problem.
- sys.exit(1)
-
- # Add lockfile to self.keep_fds.
- self.keep_fds.append(lockfile.fileno())
-
- # Close all file descriptors, except the ones mentioned in
- # self.keep_fds.
- devnull = "/dev/null"
- if hasattr(os, "devnull"):
- # Python has set os.devnull on this system, use it instead as it
- # might be different
- # than /dev/null.
- devnull = os.devnull
-
- if self.auto_close_fds:
- for fd in range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[0]):
- if fd not in self.keep_fds:
- try:
- os.close(fd)
- except OSError:
- pass
-
- devnull_fd = os.open(devnull, os.O_RDWR)
- os.dup2(devnull_fd, 0)
- os.dup2(devnull_fd, 1)
- os.dup2(devnull_fd, 2)
-
- if self.logger is None:
- # Initialize logging.
- self.logger = logging.getLogger(self.app)
- self.logger.setLevel(logging.DEBUG)
- # Display log messages only on defined handlers.
- self.logger.propagate = False
-
- # Initialize syslog.
- # It will correctly work on OS X, Linux and FreeBSD.
- if sys.platform == "darwin":
- syslog_address = "/var/run/syslog"
- else:
- syslog_address = "/dev/log"
-
- # We will continue with syslog initialization only if
- # actually have such capabilities
- # on the machine we are running this.
- if os.path.isfile(syslog_address):
- syslog = logging.handlers.SysLogHandler(syslog_address)
- if self.verbose:
- syslog.setLevel(logging.DEBUG)
- else:
- syslog.setLevel(logging.INFO)
- # Try to mimic to normal syslog messages.
- format_t = "%(asctime)s %(name)s: %(message)s"
- formatter = logging.Formatter(format_t,
- "%b %e %H:%M:%S")
- syslog.setFormatter(formatter)
-
- self.logger.addHandler(syslog)
-
- # Set umask to default to safe file permissions when running
- # as a root daemon. 027 is an
- # octal number which we are typing as 0o27 for Python3 compatibility.
- os.umask(0o27)
-
- # Change to a known directory. If this isn't done, starting a daemon
- # in a subdirectory that
- # needs to be deleted results in "directory busy" errors.
- os.chdir("/")
-
- # Execute privileged action
- privileged_action_result = self.privileged_action()
- if not privileged_action_result:
- privileged_action_result = []
-
- # Change gid
- if self.group:
- try:
- gid = grp.getgrnam(self.group).gr_gid
- except KeyError:
- self.logger.error("Group {0} not found".format(self.group))
- sys.exit(1)
- try:
- os.setgid(gid)
- except OSError:
- self.logger.error("Unable to change gid.")
- sys.exit(1)
-
- # Change uid
- if self.user:
- try:
- uid = pwd.getpwnam(self.user).pw_uid
- except KeyError:
- self.logger.error("User {0} not found.".format(self.user))
- sys.exit(1)
- try:
- os.setuid(uid)
- except OSError:
- self.logger.error("Unable to change uid.")
- sys.exit(1)
-
- try:
- lockfile.write("%s" % (os.getpid()))
- lockfile.flush()
- except IOError:
- self.logger.error("Unable to write pid to the pidfile.")
- print("Unable to write pid to the pidfile.")
- sys.exit(1)
-
- # Set custom action on SIGTERM.
- signal.signal(signal.SIGTERM, self.sigterm)
- atexit.register(self.exit)
-
- self.logger.warn("Starting daemon.")
-
- self.action(*privileged_action_result)
diff --git a/wally/sensors/discover.py b/wally/sensors/discover.py
deleted file mode 100644
index f227043..0000000
--- a/wally/sensors/discover.py
+++ /dev/null
@@ -1,9 +0,0 @@
-all_sensors = {}
-
-
-def provides(sensor_class_name):
- def closure(func):
- assert sensor_class_name not in all_sensors
- all_sensors[sensor_class_name] = func
- return func
- return closure
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
deleted file mode 100644
index 20eedc5..0000000
--- a/wally/sensors/main.py
+++ /dev/null
@@ -1,154 +0,0 @@
-import os
-import sys
-import time
-import json
-import glob
-import signal
-import os.path
-import argparse
-
-from .sensors.utils import SensorInfo
-from .daemonize import Daemonize
-from .discover import all_sensors
-from .protocol import create_protocol
-
-
-# load all sensors
-from . import sensors
-sensors_dir = os.path.dirname(sensors.__file__)
-for fname in glob.glob(os.path.join(sensors_dir, "*.py")):
- mod_name = os.path.basename(fname[:-3])
- __import__("sensors.sensors." + mod_name)
-
-
-def get_values(required_sensors):
- result = {}
- for sensor_name, params in required_sensors:
- if sensor_name in all_sensors:
- result.update(all_sensors[sensor_name](**params))
- else:
- msg = "Sensor {0!r} isn't available".format(sensor_name)
- raise ValueError(msg)
- return result
-
-
-def parse_args(args):
- parser = argparse.ArgumentParser()
- parser.add_argument('-d', '--daemon',
- choices=('start', 'stop', 'status',
- 'start_monitoring', 'stop_monitoring',
- 'dump_ram_data'),
- default=None)
-
- parser.add_argument('-u', '--url', default='stdout://')
- parser.add_argument('-t', '--timeout', type=float, default=1)
- parser.add_argument('-l', '--list-sensors', action='store_true')
- parser.add_argument('sensors_config', type=argparse.FileType('r'),
- default=None, nargs='?')
- return parser.parse_args(args[1:])
-
-
-def daemon_main(required_sensors, opts):
- try:
- source_id = str(required_sensors.pop('source_id'))
- except KeyError:
- source_id = None
-
- sender = create_protocol(opts.url)
- prev = {}
- next_data_record_time = time.time()
-
- first_round = True
- while True:
- real_time = int(time.time())
-
- if real_time < int(next_data_record_time):
- if int(next_data_record_time) - real_time > 2:
- print "Error: sleep too small portion!!"
- report_time = int(next_data_record_time)
- elif real_time > int(next_data_record_time):
- if real_time - int(next_data_record_time) > 2:
- report_time = real_time
- else:
- report_time = int(next_data_record_time)
- else:
- report_time = real_time
-
- data = get_values(required_sensors.items())
- curr = {'time': SensorInfo(report_time, True)}
- for name, val in data.items():
- if val.is_accumulated:
- if name in prev:
- curr[name] = SensorInfo(val.value - prev[name], True)
- prev[name] = val.value
- else:
- curr[name] = SensorInfo(val.value, False)
-
- if source_id is not None:
- curr['source_id'] = source_id
-
- # on first round not all fields was ready
- # this leads to setting wrong headers list
- if not first_round:
- sender.send(curr)
- else:
- first_round = False
-
- next_data_record_time = report_time + opts.timeout + 0.5
- time.sleep(next_data_record_time - time.time())
-
-
-def pid_running(pid):
- return os.path.exists("/proc/" + str(pid))
-
-
-def main(argv):
- opts = parse_args(argv)
-
- if opts.list_sensors:
- print "\n".join(sorted(all_sensors))
- return 0
-
- if opts.daemon is not None:
- pid_file = "/tmp/sensors.pid"
- if opts.daemon == 'start':
- required_sensors = json.loads(opts.sensors_config.read())
-
- def root_func():
- daemon_main(required_sensors, opts)
-
- daemon = Daemonize(app="perfcollect_app",
- pid=pid_file,
- action=root_func)
- daemon.start()
- elif opts.daemon == 'stop':
- if os.path.isfile(pid_file):
- pid = int(open(pid_file).read())
- if pid_running(pid):
- os.kill(pid, signal.SIGTERM)
-
- time.sleep(0.5)
-
- if pid_running(pid):
- os.kill(pid, signal.SIGKILL)
-
- time.sleep(0.5)
-
- if os.path.isfile(pid_file):
- os.unlink(pid_file)
- elif opts.daemon == 'status':
- if os.path.isfile(pid_file):
- pid = int(open(pid_file).read())
- if pid_running(pid):
- print "running"
- return
- print "stopped"
- else:
- raise ValueError("Unknown daemon operation {}".format(opts.daemon))
- else:
- required_sensors = json.loads(opts.sensors_config.read())
- daemon_main(required_sensors, opts)
- return 0
-
-if __name__ == "__main__":
- exit(main(sys.argv))
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
deleted file mode 100644
index 7c8aa0e..0000000
--- a/wally/sensors/protocol.py
+++ /dev/null
@@ -1,312 +0,0 @@
-import sys
-import csv
-import time
-import struct
-import socket
-import select
-import cPickle as pickle
-from urlparse import urlparse
-
-
-class Timeout(Exception):
- pass
-
-
-class CantUnpack(Exception):
- pass
-
-
-# ------------------------------------- Serializers --------------------------
-
-
-class ISensortResultsSerializer(object):
- def pack(self, data):
- pass
-
- def unpack(self, data):
- pass
-
-
-class StructSerializerSend(ISensortResultsSerializer):
- initial_times = 5
- resend_timeout = 60
- HEADERS = 'h'
- DATA = 'd'
- END_OF_HEADERS = '\x00'
- END_OF_SOURCE_ID = '\x00'
- HEADERS_SEPARATOR = '\n'
-
- def __init__(self):
- self.field_order = None
- self.headers_send_cycles_left = self.initial_times
- self.pack_fmt = None
- self.next_header_send_time = None
-
- def pack(self, data):
- data = data.copy()
-
- source_id = data.pop("source_id")
- vals = [int(data.pop("time").value)]
-
- if self.field_order is None:
- self.field_order = sorted(data.keys())
- self.pack_fmt = "!I" + "I" * len(self.field_order)
-
- need_resend = False
- if self.next_header_send_time is not None:
- if time.time() > self.next_header_send_time:
- need_resend = True
-
- if self.headers_send_cycles_left > 0 or need_resend:
- forder = self.HEADERS_SEPARATOR.join(self.field_order)
- flen = struct.pack("!H", len(self.field_order))
-
- result = (self.HEADERS + source_id +
- self.END_OF_SOURCE_ID +
- socket.gethostname() +
- self.END_OF_SOURCE_ID +
- flen + forder + self.END_OF_HEADERS)
-
- if self.headers_send_cycles_left > 0:
- self.headers_send_cycles_left -= 1
-
- self.next_header_send_time = time.time() + self.resend_timeout
- else:
- result = ""
-
- for name in self.field_order:
- vals.append(int(data[name].value))
-
- packed_data = self.DATA + source_id
- packed_data += self.END_OF_SOURCE_ID
- packed_data += struct.pack(self.pack_fmt, *vals)
-
- return result + packed_data
-
-
-class StructSerializerRecv(ISensortResultsSerializer):
- def __init__(self):
- self.fields = {}
- self.formats = {}
- self.hostnames = {}
-
- def unpack(self, data):
- code = data[0]
-
- if code == StructSerializerSend.HEADERS:
- source_id, hostname, packed_data = data[1:].split(
- StructSerializerSend.END_OF_SOURCE_ID, 2)
- # fields order provided
- flen_sz = struct.calcsize("!H")
- flen = struct.unpack("!H", packed_data[:flen_sz])[0]
-
- headers_data, rest = packed_data[flen_sz:].split(
- StructSerializerSend.END_OF_HEADERS, 1)
-
- forder = headers_data.split(
- StructSerializerSend.HEADERS_SEPARATOR)
-
- assert len(forder) == flen, \
- "Wrong len {0} != {1}".format(len(forder), flen)
-
- if 'source_id' in self.fields:
- assert self.fields[source_id] == ['time'] + forder,\
- "New field order"
- else:
- self.fields[source_id] = ['time'] + forder
- self.formats[source_id] = "!I" + "I" * flen
- self.hostnames[source_id] = hostname
-
- if len(rest) != 0:
- return self.unpack(rest)
- return None
- else:
- source_id, packed_data = data[1:].split(
- StructSerializerSend.END_OF_SOURCE_ID, 1)
- assert code == StructSerializerSend.DATA,\
- "Unknown code {0!r}".format(code)
-
- try:
- fields = self.fields[source_id]
- except KeyError:
- raise CantUnpack("No fields order provided"
- " for {0} yet".format(source_id))
- s_format = self.formats[source_id]
-
- exp_size = struct.calcsize(s_format)
- assert len(packed_data) == exp_size, \
- "Wrong data len {0} != {1}".format(len(packed_data), exp_size)
-
- vals = struct.unpack(s_format, packed_data)
- res = dict(zip(fields, vals))
- res['source_id'] = source_id
- res['hostname'] = self.hostnames[source_id]
- return res
-
-
-class PickleSerializer(ISensortResultsSerializer):
- def pack(self, data):
- ndata = {}
- for key, val in data.items():
- if isinstance(val, basestring):
- ndata[key] = val
- else:
- ndata[key] = val.value
- return pickle.dumps(ndata)
-
- def unpack(self, data):
- return pickle.loads(data)
-
-# ------------------------------------- Transports ---------------------------
-
-
-class ITransport(object):
- def __init__(self, receiver):
- pass
-
- def send(self, data):
- pass
-
- def recv(self, timeout=None):
- pass
-
-
-class StdoutTransport(ITransport):
- MIN_COL_WIDTH = 10
-
- def __init__(self, receiver, delta=True):
- if receiver:
- cname = self.__class__.__name__
- raise ValueError("{0} don't allows receiving".format(cname))
-
- self.headers = None
- self.line_format = ""
- self.prev = {}
- self.delta = delta
- self.fd = sys.stdout
-
- def send(self, data):
- if self.headers is None:
- self.headers = sorted(data)
- self.headers.remove('source_id')
-
- for pos, header in enumerate(self.headers):
- self.line_format += "{%s:>%s}" % (pos,
- max(len(header) + 1,
- self.MIN_COL_WIDTH))
-
- print self.line_format.format(*self.headers)
-
- if self.delta:
-
- vals = [data[header].value - self.prev.get(header, 0)
- for header in self.headers]
-
- self.prev.update(dict((header, data[header].value)
- for header in self.headers))
- else:
- vals = [data[header].value for header in self.headers]
-
- self.fd.write(self.line_format.format(*vals) + "\n")
-
- def recv(self, timeout=None):
- cname = self.__class__.__name__
- raise ValueError("{0} don't allows receiving".format(cname))
-
-
-class FileTransport(StdoutTransport):
- def __init__(self, receiver, fname, delta=True):
- StdoutTransport.__init__(self, receiver, delta)
- self.fd = open(fname, "w")
-
-
-class CSVFileTransport(ITransport):
- required_keys = set(['time', 'source_id'])
-
- def __init__(self, receiver, fname):
- ITransport.__init__(self, receiver)
- self.fd = open(fname, "w")
- self.csv_fd = csv.writer(self.fd)
- self.field_list = []
- self.csv_fd.writerow(['NEW_DATA'])
-
- def send(self, data):
- if self.field_list == []:
- keys = set(data)
- assert self.required_keys.issubset(keys)
- keys -= self.required_keys
- self.field_list = sorted(keys)
- self.csv_fd.writerow([data['source_id'], socket.getfqdn()] +
- self.field_list)
- self.field_list = ['time'] + self.field_list
-
- self.csv_fd.writerow([data[sens].value for sens in self.field_list])
-
-
-class RAMTransport(ITransport):
- def __init__(self, next_tr):
- self.next = next_tr
- self.data = []
-
- def send(self, data):
- self.data.append(data)
-
- def flush(self):
- for data in self.data:
- self.next.send(data)
- self.data = []
-
-
-class UDPTransport(ITransport):
- def __init__(self, receiver, ip, port, packer_cls):
- self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- if receiver:
- self.port.bind((ip, port))
- self.packer_cls = packer_cls
- self.packers = {}
- else:
- self.packer = packer_cls()
- self.dst = (ip, port)
-
- def send(self, data):
- raw_data = self.packer.pack(data)
- self.port.sendto(raw_data, self.dst)
-
- def recv(self, timeout=None):
- r, _, _ = select.select([self.port], [], [], timeout)
- if len(r) != 0:
- raw_data, addr = self.port.recvfrom(10000)
- packer = self.packers.setdefault(addr, self.packer_cls())
- return addr, packer.unpack(raw_data)
- else:
- raise Timeout()
-
-
-# -------------------------- Factory function --------------------------------
-
-
-def create_protocol(uri, receiver=False):
- if uri == 'stdout':
- return StdoutTransport(receiver)
-
- parsed_uri = urlparse(uri)
- if parsed_uri.scheme == 'udp':
- ip, port = parsed_uri.netloc.split(":")
-
- if receiver:
- packer_cls = StructSerializerRecv
- else:
- packer_cls = StructSerializerSend
-
- return UDPTransport(receiver, ip=ip, port=int(port),
- packer_cls=packer_cls)
- elif parsed_uri.scheme == 'file':
- return FileTransport(receiver, parsed_uri.path)
- elif parsed_uri.scheme == 'csvfile':
- return CSVFileTransport(receiver, parsed_uri.path)
- elif parsed_uri.scheme == 'ram':
- intenal_recv = CSVFileTransport(receiver, parsed_uri.path)
- return RAMTransport(intenal_recv)
- else:
- templ = "Can't instantiate transport from {0!r}"
- raise ValueError(templ.format(uri))
diff --git a/wally/sensors/sensors/__init__.py b/wally/sensors/sensors/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wally/sensors/sensors/__init__.py
+++ /dev/null
diff --git a/wally/sensors/sensors/io_sensors.py b/wally/sensors/sensors/io_sensors.py
deleted file mode 100644
index e4049a9..0000000
--- a/wally/sensors/sensors/io_sensors.py
+++ /dev/null
@@ -1,74 +0,0 @@
-from ..discover import provides
-from .utils import SensorInfo, is_dev_accepted
-
-# 1 - major number
-# 2 - minor mumber
-# 3 - device name
-# 4 - reads completed successfully
-# 5 - reads merged
-# 6 - sectors read
-# 7 - time spent reading (ms)
-# 8 - writes completed
-# 9 - writes merged
-# 10 - sectors written
-# 11 - time spent writing (ms)
-# 12 - I/Os currently in progress
-# 13 - time spent doing I/Os (ms)
-# 14 - weighted time spent doing I/Os (ms)
-
-io_values_pos = [
- (3, 'reads_completed', True),
- (5, 'sectors_read', True),
- (6, 'rtime', True),
- (7, 'writes_completed', True),
- (9, 'sectors_written', True),
- (10, 'wtime', True),
- (11, 'io_queue', False),
- (13, 'io_time', True)
-]
-
-
-@provides("block-io")
-def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
- results = {}
- for line in open('/proc/diskstats'):
- vals = line.split()
- dev_name = vals[2]
-
- dev_ok = is_dev_accepted(dev_name,
- disallowed_prefixes,
- allowed_prefixes)
- if dev_name[-1].isdigit():
- dev_ok = False
-
- if dev_ok:
- for pos, name, accum_val in io_values_pos:
- sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
- return results
-
-
-def get_latency(stat1, stat2):
- disks = set(i.split('.', 1)[0] for i in stat1)
- results = {}
-
- for disk in disks:
- rdc = disk + '.reads_completed'
- wrc = disk + '.writes_completed'
- rdt = disk + '.rtime'
- wrt = disk + '.wtime'
- lat = 0.0
-
- io_ops1 = stat1[rdc].value + stat1[wrc].value
- io_ops2 = stat2[rdc].value + stat2[wrc].value
-
- diops = io_ops2 - io_ops1
-
- if diops != 0:
- io1 = stat1[rdt].value + stat1[wrt].value
- io2 = stat2[rdt].value + stat2[wrt].value
- lat = abs(float(io1 - io2)) / diops
-
- results[disk + '.latence'] = SensorInfo(lat, False)
-
- return results
diff --git a/wally/sensors/sensors/net_sensors.py b/wally/sensors/sensors/net_sensors.py
deleted file mode 100644
index 2723499..0000000
--- a/wally/sensors/sensors/net_sensors.py
+++ /dev/null
@@ -1,47 +0,0 @@
-from ..discover import provides
-from .utils import SensorInfo, is_dev_accepted
-
-# 1 - major number
-# 2 - minor mumber
-# 3 - device name
-# 4 - reads completed successfully
-# 5 - reads merged
-# 6 - sectors read
-# 7 - time spent reading (ms)
-# 8 - writes completed
-# 9 - writes merged
-# 10 - sectors written
-# 11 - time spent writing (ms)
-# 12 - I/Os currently in progress
-# 13 - time spent doing I/Os (ms)
-# 14 - weighted time spent doing I/Os (ms)
-
-net_values_pos = [
- (0, 'recv_bytes', True),
- (1, 'recv_packets', True),
- (8, 'send_bytes', True),
- (9, 'send_packets', True),
-]
-
-
-@provides("net-io")
-def net_stat(disallowed_prefixes=('docker', 'lo'), allowed_prefixes=('eth',)):
- results = {}
-
- for line in open('/proc/net/dev').readlines()[2:]:
- dev_name, stats = line.split(":", 1)
- dev_name = dev_name.strip()
- vals = stats.split()
-
- dev_ok = is_dev_accepted(dev_name,
- disallowed_prefixes,
- allowed_prefixes)
-
- if '.' in dev_name and dev_name.split('.')[-1].isdigit():
- dev_ok = False
-
- if dev_ok:
- for pos, name, accum_val in net_values_pos:
- sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
- return results
diff --git a/wally/sensors/sensors/pscpu_sensors.py b/wally/sensors/sensors/pscpu_sensors.py
deleted file mode 100644
index ccbcefc..0000000
--- a/wally/sensors/sensors/pscpu_sensors.py
+++ /dev/null
@@ -1,37 +0,0 @@
-import os
-
-from ..discover import provides
-from .utils import SensorInfo, get_pid_name, get_pid_list
-
-
-@provides("perprocess-cpu")
-def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
- pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
-
- for pid in pid_list:
- try:
- dev_name = get_pid_name(pid)
-
- pid_stat1 = pid_stat(pid)
-
- sensor_name = "{0}.{1}".format(dev_name, pid)
- results[sensor_name] = SensorInfo(pid_stat1, True)
- except IOError:
- # may be, proc has already terminated, skip it
- continue
- return results
-
-
-def pid_stat(pid):
- """Return total cpu usage time from process"""
- # read /proc/pid/stat
- with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
- proctimes = pidfile.readline().split()
- # get utime from /proc/<pid>/stat, 14 item
- utime = proctimes[13]
- # get stime from proc/<pid>/stat, 15 item
- stime = proctimes[14]
- # count total process used time
- proctotal = int(utime) + int(stime)
- return float(proctotal)
diff --git a/wally/sensors/sensors/psram_sensors.py b/wally/sensors/sensors/psram_sensors.py
deleted file mode 100644
index 34f729a..0000000
--- a/wally/sensors/sensors/psram_sensors.py
+++ /dev/null
@@ -1,75 +0,0 @@
-from ..discover import provides
-from .utils import SensorInfo, get_pid_name, get_pid_list
-
-
-# Based on ps_mem.py:
-# Licence: LGPLv2
-# Author: P@draigBrady.com
-# Source: http://www.pixelbeat.org/scripts/ps_mem.py
-# http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
-
-
-# Note shared is always a subset of rss (trs is not always)
-def get_mem_stats(pid):
- """Return memory data of pid in format (private, shared)"""
-
- fname = '/proc/{0}/{1}'.format(pid, "smaps")
- lines = open(fname).readlines()
-
- shared = 0
- private = 0
- pss = 0
-
- # add 0.5KiB as this avg error due to trunctation
- pss_adjust = 0.5
-
- for line in lines:
- if line.startswith("Shared"):
- shared += int(line.split()[1])
-
- if line.startswith("Private"):
- private += int(line.split()[1])
-
- if line.startswith("Pss"):
- pss += float(line.split()[1]) + pss_adjust
-
- # Note Shared + Private = Rss above
- # The Rss in smaps includes video card mem etc.
-
- if pss != 0:
- shared = int(pss - private)
-
- return (private, shared)
-
-
-@provides("perprocess-ram")
-def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
- pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
- for pid in pid_list:
- try:
- dev_name = get_pid_name(pid)
-
- private, shared = get_mem_stats(pid)
- total = private + shared
- sys_total = get_ram_size()
- usage = float(total) / float(sys_total)
-
- sensor_name = "{0}({1})".format(dev_name, pid)
-
- results[sensor_name + ".private_mem"] = SensorInfo(private, False)
- results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
- results[sensor_name + ".used_mem"] = SensorInfo(total, False)
- name = sensor_name + ".mem_usage_percent"
- results[name] = SensorInfo(usage * 100, False)
- except IOError:
- # permission denied or proc die
- continue
- return results
-
-
-def get_ram_size():
- """Return RAM size in Kb"""
- with open("/proc/meminfo") as proc:
- mem_total = proc.readline().split()
- return mem_total[1]
diff --git a/wally/sensors/sensors/syscpu_sensors.py b/wally/sensors/sensors/syscpu_sensors.py
deleted file mode 100644
index e2fab8c..0000000
--- a/wally/sensors/sensors/syscpu_sensors.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from .utils import SensorInfo
-from ..discover import provides
-
-# 0 - cpu name
-# 1 - user: normal processes executing in user mode
-# 2 - nice: niced processes executing in user mode
-# 3 - system: processes executing in kernel mode
-# 4 - idle: twiddling thumbs
-# 5 - iowait: waiting for I/O to complete
-# 6 - irq: servicing interrupts
-# 7 - softirq: servicing softirqs
-
-io_values_pos = [
- (1, 'user_processes', True),
- (2, 'nice_processes', True),
- (3, 'system_processes', True),
- (4, 'idle_time', True),
-]
-
-
-@provides("system-cpu")
-def syscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
-
- # calculate core count
- core_count = 0
-
- for line in open('/proc/stat'):
- vals = line.split()
- dev_name = vals[0]
-
- if dev_name == 'cpu':
- for pos, name, accum_val in io_values_pos:
- sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]),
- accum_val)
- elif dev_name == 'procs_blocked':
- val = int(vals[1])
- results["cpu.procs_blocked"] = SensorInfo(val, False)
- elif dev_name.startswith('cpu'):
- core_count += 1
-
- # procs in queue
- TASKSPOS = 3
- vals = open('/proc/loadavg').read().split()
- ready_procs = vals[TASKSPOS].partition('/')[0]
- # dec on current proc
- procs_queue = (float(ready_procs) - 1) / core_count
- results["cpu.procs_queue"] = SensorInfo(procs_queue, False)
-
- return results
diff --git a/wally/sensors/sensors/sysram_sensors.py b/wally/sensors/sensors/sysram_sensors.py
deleted file mode 100644
index 9de5d82..0000000
--- a/wally/sensors/sensors/sysram_sensors.py
+++ /dev/null
@@ -1,41 +0,0 @@
-from ..discover import provides
-from .utils import SensorInfo, is_dev_accepted
-
-
-# return this values or setted in allowed
-ram_fields = [
- 'MemTotal',
- 'MemFree',
- 'Buffers',
- 'Cached',
- 'SwapCached',
- 'Dirty',
- 'Writeback',
- 'SwapTotal',
- 'SwapFree'
-]
-
-
-@provides("system-ram")
-def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
- if allowed_prefixes is None:
- allowed_prefixes = ram_fields
- results = {}
- for line in open('/proc/meminfo'):
- vals = line.split()
- dev_name = vals[0].rstrip(":")
-
- dev_ok = is_dev_accepted(dev_name,
- disallowed_prefixes,
- allowed_prefixes)
-
- title = "ram.{0}".format(dev_name)
-
- if dev_ok:
- results[title] = SensorInfo(int(vals[1]), False)
-
- if 'ram.MemFree' in results and 'ram.MemTotal' in results:
- used = results['ram.MemTotal'].value - results['ram.MemFree'].value
- usage = float(used) / results['ram.MemTotal'].value
- results["ram.usage_percent"] = SensorInfo(usage, False)
- return results
diff --git a/wally/sensors/sensors/utils.py b/wally/sensors/sensors/utils.py
deleted file mode 100644
index cff3201..0000000
--- a/wally/sensors/sensors/utils.py
+++ /dev/null
@@ -1,83 +0,0 @@
-import os
-
-from collections import namedtuple
-
-SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
-
-
-def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
- dev_ok = True
-
- if disallowed_prefixes is not None:
- dev_ok = all(not name.startswith(prefix)
- for prefix in disallowed_prefixes)
-
- if dev_ok and allowed_prefixes is not None:
- dev_ok = any(name.startswith(prefix)
- for prefix in allowed_prefixes)
-
- return dev_ok
-
-
-def get_pid_list(disallowed_prefixes, allowed_prefixes):
- """Return pid list from list of pids and names"""
- # exceptions
- but = disallowed_prefixes if disallowed_prefixes is not None else []
- if allowed_prefixes is None:
- # if nothing setted - all ps will be returned except setted
- result = [pid
- for pid in os.listdir('/proc')
- if pid.isdigit() and pid not in but]
- else:
- result = []
- for pid in os.listdir('/proc'):
- if pid.isdigit() and pid not in but:
- name = get_pid_name(pid)
- if pid in allowed_prefixes or \
- any(name.startswith(val) for val in allowed_prefixes):
- print name
- # this is allowed pid?
- result.append(pid)
- return result
-
-
-def get_pid_name(pid):
- """Return name by pid"""
- try:
- with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
- try:
- cmd = pidfile.readline().split()[0]
- return os.path.basename(cmd).rstrip('\x00')
- except IndexError:
- # no cmd returned
- return "<NO NAME>"
- except IOError:
- # upstream wait any string, no matter if we couldn't read proc
- return "no_such_process"
-
-
-def delta(func, only_upd=True):
- prev = {}
- while True:
- for dev_name, vals in func():
- if dev_name not in prev:
- prev[dev_name] = {}
- for name, (val, _) in vals.items():
- prev[dev_name][name] = val
- else:
- dev_prev = prev[dev_name]
- res = {}
- for stat_name, (val, accum_val) in vals.items():
- if accum_val:
- if stat_name in dev_prev:
- delta = int(val) - int(dev_prev[stat_name])
- if not only_upd or 0 != delta:
- res[stat_name] = str(delta)
- dev_prev[stat_name] = val
- elif not only_upd or '0' != val:
- res[stat_name] = val
-
- if only_upd and len(res) == 0:
- continue
- yield dev_name, res
- yield None, None
diff --git a/wally/sensors/webui/sensors.html b/wally/sensors/webui/sensors.html
deleted file mode 100644
index 77e3cf6..0000000
--- a/wally/sensors/webui/sensors.html
+++ /dev/null
@@ -1,57 +0,0 @@
-<!DOCTYPE html>
-<meta charset="utf-8">
-<style>
- @import url(http://fonts.googleapis.com/css?family=Yanone+Kaffeesatz:400,700);
- @import url(http://square.github.io/cubism/style.css);
-</style>
-<script src="http://d3js.org/d3.v3.min.js" charset="utf-8"></script>
-<script src="http://square.github.io/cubism/cubism.v1.js"></script>
-<div id="body"> <div id="graph" /></div>
-
-<script>
-
-// create context and horizon
-var context = cubism.context().serverDelay(3 * 1000).step(1000).size(1000)
-var horizon = context.horizon().extent([0, 100])
-
-// define metric accessor
-function wally_source(name) {
- function selector(start, stop, step, callback){
- function on_result(data) {
- callback(null, data);
- };
-
- url = "/sensors?start=" + start + "&stop=" + stop + "&step=" + step + "&name=" + name;
- d3.json(url, on_result);
- }
-
- return context.metric(selector, name);
-}
-
-// draw graph
-var metrics = ["testnodes:io_q", "testnodes:cpu"];
-horizon.metric(wally_source);
-
-d3.select("#graph").selectAll(".horizon")
- .data(metrics)
- .enter()
- .append("div")
- .attr("class", "horizon")
- .call(horizon.height(120));
-
-// set rule
-d3.select("#body").append("div")
- .attr("class", "rule")
- .call(context.rule());
-
-// set focus
-context.on("focus", function(i) {
- d3.selectAll(".value")
- .style( "right", i == null ? null : context.size() - i + "px");
-});
-
-// set axis
-var axis = context.axis()
-d3.select("#graph").append("div").attr("class", "axis").append("g").call(axis);
-
-</script>
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
deleted file mode 100644
index 600f3bb..0000000
--- a/wally/sensors_utils.py
+++ /dev/null
@@ -1,92 +0,0 @@
-import os.path
-import logging
-import contextlib
-
-from concurrent.futures import ThreadPoolExecutor
-
-from wally import ssh_utils
-from wally.sensors.api import (with_sensors, sensors_info, SensorConfig)
-
-
-logger = logging.getLogger("wally.sensors")
-
-
-def get_sensors_config_for_nodes(cfg, nodes, remote_path):
- monitored_nodes = []
- sensors_configs = []
- source2roles_map = {}
-
- receiver_url = "csvfile://" + os.path.join(remote_path, "results.csv")
-
- for role, sensors_str in cfg["roles_mapping"].items():
- sensors = [sens.strip() for sens in sensors_str.split(",")]
-
- collect_cfg = dict((sensor, {}) for sensor in sensors)
-
- for node in nodes:
- if role in node.roles:
- source2roles_map[node.get_conn_id()] = node.roles
- monitored_nodes.append(node)
- sens_cfg = SensorConfig(node.connection,
- node.get_conn_id(),
- collect_cfg,
- source_id=node.get_conn_id(),
- monitor_url=receiver_url)
- sensors_configs.append(sens_cfg)
-
- return monitored_nodes, sensors_configs, source2roles_map
-
-
-PID_FILE = "/tmp/sensors.pid"
-
-
-def clear_old_sensors(sensors_configs):
- def stop_sensors(sens_cfg):
- with sens_cfg.conn.open_sftp() as sftp:
- try:
- pid = ssh_utils.read_from_remote(sftp, PID_FILE)
- pid = int(pid.strip())
- ssh_utils.run_over_ssh(sens_cfg.conn,
- "kill -9 " + str(pid))
- sftp.remove(PID_FILE)
- except:
- pass
-
- with ThreadPoolExecutor(32) as pool:
- list(pool.map(stop_sensors, sensors_configs))
-
-
-@contextlib.contextmanager
-def with_sensors_util(sensors_cfg, nodes):
- srp = sensors_cfg['sensors_remote_path']
- monitored_nodes, sensors_configs, source2roles_map = \
- get_sensors_config_for_nodes(sensors_cfg, nodes, srp)
-
- with with_sensors(sensors_configs, srp):
- yield source2roles_map
-
-
-@contextlib.contextmanager
-def sensors_info_util(cfg, nodes):
- if cfg.get('sensors', None) is None:
- yield
- return
-
- _, sensors_configs, _ = \
- get_sensors_config_for_nodes(cfg.sensors, nodes,
- cfg.sensors_remote_path)
-
- clear_old_sensors(sensors_configs)
- ctx = sensors_info(sensors_configs, cfg.sensors_remote_path)
- try:
- res = ctx.__enter__()
- yield res
- except:
- ctx.__exit__(None, None, None)
- raise
- finally:
- try:
- ctx.__exit__(None, None, None)
- except:
- logger.exception("During stop/collect sensors")
- del res[:]
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index e566cbe..ada4af6 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -78,8 +78,7 @@
def exists(sftp, path):
- """os.path.exists for paramiko's SCP object
- """
+ "os.path.exists for paramiko's SCP object"
try:
sftp.stat(path)
return True
@@ -329,6 +328,7 @@
def parse_ssh_uri(uri):
+ # [ssh://]+
# user:passwd@ip_host:port
# user:passwd@ip_host
# user@ip_host:port
diff --git a/wally/stage.py b/wally/stage.py
new file mode 100644
index 0000000..5f47a1b
--- /dev/null
+++ b/wally/stage.py
@@ -0,0 +1,36 @@
+import logging
+import contextlib
+
+
+from .utils import StopTestError
+
+logger = logging.getLogger("wally")
+
+
+class TestStage:
+ name = ""
+
+ def __init__(self, testrun, config):
+ self.testrun = testrun
+ self.config = config
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self):
+ return self
+
+
+@contextlib.contextmanager
+def log_stage(stage):
+ msg_templ = "Exception during {0}: {1!s}"
+ msg_templ_no_exc = "During {0}"
+
+ logger.info("Start " + stage.name)
+
+ try:
+ yield
+ except StopTestError as exc:
+ logger.error(msg_templ.format(stage.name, exc))
+ except Exception:
+ logger.exception(msg_templ_no_exc.format(stage.name))
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 0c4ccc7..759d63b 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -5,9 +5,8 @@
import urllib
import os.path
import logging
-import warnings
-import subprocess
import collections
+from typing import Dict, Any, Iterable
from concurrent.futures import ThreadPoolExecutor
@@ -41,7 +40,7 @@
CINDER_CONNECTION = None
-def is_connected():
+def is_connected() -> bool:
return NOVA_CONNECTION is not None
@@ -50,7 +49,7 @@
"tenant", "auth_url", "insecure"])
-def ostack_get_creds():
+def ostack_get_creds() -> OSCreds:
if STORED_OPENSTACK_CREDS is None:
return OSCreds(os.environ.get('OS_USERNAME'),
os.environ.get('OS_PASSWORD'),
@@ -61,7 +60,7 @@
return STORED_OPENSTACK_CREDS
-def nova_connect(os_creds=None):
+def nova_connect(os_creds: OSCreds=None) -> n_client:
global NOVA_CONNECTION
global STORED_OPENSTACK_CREDS
@@ -80,7 +79,7 @@
return NOVA_CONNECTION
-def cinder_connect(os_creds=None):
+def cinder_connect(os_creds: OSCreds=None) -> c_client:
global CINDER_CONNECTION
global STORED_OPENSTACK_CREDS
@@ -97,68 +96,7 @@
return CINDER_CONNECTION
-def prepare_os_subpr(nova, params, os_creds, max_vm_per_compute=8):
- if os_creds is None:
- os_creds = ostack_get_creds()
-
- serv_groups = " ".join(map(params['aa_group_name'].format,
- range(max_vm_per_compute)))
-
- image_name = params['image']['name']
- env = os.environ.copy()
-
- env.update(dict(
- OS_USERNAME=os_creds.name,
- OS_PASSWORD=os_creds.passwd,
- OS_TENANT_NAME=os_creds.tenant,
- OS_AUTH_URL=os_creds.auth_url,
- OS_INSECURE="1" if os_creds.insecure else "0",
-
- FLAVOR_NAME=params['flavor']['name'],
- FLAVOR_RAM=str(params['flavor']['ram_size']),
- FLAVOR_HDD=str(params['flavor']['hdd_size']),
- FLAVOR_CPU_COUNT=str(params['flavor']['cpu_count']),
-
- SERV_GROUPS=serv_groups,
-
- SECGROUP=params['security_group'],
-
- IMAGE_NAME=image_name,
- IMAGE_URL=params['image']['url'],
- ))
-
- spath = os.path.dirname(os.path.dirname(wally.__file__))
- spath = os.path.join(spath, 'scripts/prepare.sh')
-
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- fname = os.tempnam()
-
- cmd = "bash {spath} >{fname} 2>&1".format(spath=spath, fname=fname)
- try:
- subprocess.check_call(cmd, shell=True, env=env)
- except:
- logger.error("Prepare failed. Logs in " + fname)
- with open(fname) as fd:
- logger.error("Message:\n " + fd.read().replace("\n", "\n "))
- raise
- os.unlink(fname)
-
- while True:
- status = nova.images.find(name=image_name).status
- if status == 'ACTIVE':
- break
- msg = "Image {0} is still in {1} state. Waiting 10 more seconds"
- logger.info(msg.format(image_name, status))
- time.sleep(10)
-
- create_keypair(nova,
- params['keypair_name'],
- params['keypair_file_public'],
- params['keypair_file_private'])
-
-
-def find_vms(nova, name_prefix):
+def find_vms(nova: n_client, name_prefix: str) -> Iterable[str, int]:
for srv in nova.servers.list():
if srv.name.startswith(name_prefix):
for ips in srv.addresses.values():
@@ -168,7 +106,7 @@
break
-def pause(ids):
+def pause(ids: Iterable[int]) -> None:
def pause_vm(conn, vm_id):
vm = conn.servers.get(vm_id)
if vm.status == 'ACTIVE':
@@ -182,7 +120,7 @@
future.result()
-def unpause(ids, max_resume_time=10):
+def unpause(ids: Iterable[int], max_resume_time=10) -> None:
def unpause(conn, vm_id):
vm = conn.servers.get(vm_id)
if vm.status == 'PAUSED':
@@ -204,7 +142,7 @@
future.result()
-def prepare_os(nova, params, os_creds, max_vm_per_compute=8):
+def prepare_os(nova: n_client, params: Dict[str, Any], os_creds: OSCreds) -> None:
"""prepare openstack for futher usage
Creates server groups, security rules, keypair, flavor
@@ -257,7 +195,7 @@
create_flavor(nova, **params['flavor'])
-def create_keypair(nova, name, pub_key_path, priv_key_path):
+def create_keypair(nova: n_client, name: str, pub_key_path: str, priv_key_path: str):
"""create and upload keypair into nova, if doesn't exists yet
Create and upload keypair into nova, if keypair with given bane
@@ -308,7 +246,7 @@
" or remove key from openstack")
-def get_or_create_aa_group(nova, name):
+def get_or_create_aa_group(nova: n_client, name: str) -> int:
"""create anti-affinity server group, if doesn't exists yet
parameters:
@@ -326,7 +264,7 @@
return group.id
-def allow_ssh(nova, group_name):
+def allow_ssh(nova: n_client, group_name: str) -> int:
"""create sequrity group for ping and ssh
parameters:
@@ -355,7 +293,7 @@
return secgroup.id
-def create_image(nova, os_creds, name, url):
+def create_image(nova: n_client, os_creds: OSCreds, name: str, url: str):
"""upload image into glance from given URL, if given image doesn't exisis yet
parameters:
@@ -394,7 +332,7 @@
os.unlink(tempnam)
-def create_flavor(nova, name, ram_size, hdd_size, cpu_count):
+def create_flavor(nova: n_client, name: str, ram_size: int, hdd_size: int, cpu_count: int):
"""create flavor, if doesn't exisis yet
parameters:
@@ -415,7 +353,7 @@
nova.flavors.create(name, cpu_count, ram_size, hdd_size)
-def create_volume(size, name):
+def create_volume(size: int, name: str):
cinder = cinder_connect()
vol = cinder.volumes.create(size=size, display_name=name)
err_count = 0
@@ -436,7 +374,7 @@
return vol
-def wait_for_server_active(nova, server, timeout=300):
+def wait_for_server_active(nova: n_client, server, timeout: int=300)-> None:
"""waiting till server became active
parameters:
diff --git a/wally/suits/io/defaults.cfg b/wally/suits/io/defaults.cfg
new file mode 100644
index 0000000..8c8644b
--- /dev/null
+++ b/wally/suits/io/defaults.cfg
@@ -0,0 +1,24 @@
+buffered=0
+group_reporting=1
+iodepth=1
+unified_rw_reporting=1
+
+norandommap=1
+
+thread=1
+time_based=1
+wait_for_previous=1
+
+# this is critical for correct results in multy-node run
+randrepeat=0
+
+filename={FILENAME}
+
+size={TEST_FILE_SIZE}
+
+write_lat_log=fio_log
+write_iops_log=fio_log
+write_bw_log=fio_log
+log_avg_msec=500
+
+
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 7b77461..50bb1fd 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -3,51 +3,45 @@
import json
import stat
import random
-import shutil
+import hashlib
import os.path
import logging
import datetime
import functools
-import subprocess
import collections
+from typing import Dict, List, Callable, Any, Tuple, Optional
import yaml
-import paramiko
import texttable
from paramiko.ssh_exception import SSHException
from concurrent.futures import ThreadPoolExecutor, wait
import wally
-from wally.pretty_yaml import dumps
-from wally.statistic import round_3_digit, data_property, average
-from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
-from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
+
+from ...pretty_yaml import dumps
+from ...statistic import round_3_digit, data_property, average
+from ...utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
+from ...inode import INode
+
+from ..itest import (TimeSeriesValue, PerfTest, TestResults, TestConfig)
from .fio_task_parser import (execution_time, fio_cfg_compile,
get_test_summary, get_test_summary_tuple,
get_test_sync_mode, FioJobSection)
+from .rpc_plugin import parse_fio_result
-from ..itest import (TimeSeriesValue, PerfTest, TestResults,
- run_on_node, TestConfig, MeasurementMatrix)
logger = logging.getLogger("wally")
-# Results folder structure
-# results/
-# {loadtype}_{num}/
-# config.yaml
-# ......
-
-
-class NoData(object):
+class NoData:
pass
-def cached_prop(func):
+def cached_prop(func: Callable[..., Any]) -> Callable[..., Any]:
@property
@functools.wraps(func)
- def closure(self):
+ def closure(self) -> Any:
val = getattr(self, "_" + func.__name__)
if val is NoData:
val = func(self)
@@ -56,7 +50,7 @@
return closure
-def load_fio_log_file(fname):
+def load_fio_log_file(fname: str) -> TimeSeriesValue:
with open(fname) as fd:
it = [ln.split(',')[:2] for ln in fd]
@@ -72,7 +66,7 @@
WRITE_IOPS_DISCSTAT_POS = 7
-def load_sys_log_file(ftype, fname):
+def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
assert ftype == 'iops'
pval = None
with open(fname) as fd:
@@ -89,7 +83,7 @@
return TimeSeriesValue(vals)
-def load_test_results(folder, run_num):
+def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
res = {}
params = None
@@ -97,7 +91,7 @@
params = yaml.load(open(fn).read())
conn_ids_set = set()
- rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
+ rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
for fname in os.listdir(folder):
rm = re.match(rr, fname)
if rm is None:
@@ -115,7 +109,7 @@
conn_ids_set.add(conn_id)
- rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
+ rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
for fname in os.listdir(folder):
rm = re.match(rr, fname)
if rm is None:
@@ -159,8 +153,8 @@
return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
-class Attrmapper(object):
- def __init__(self, dct):
+class Attrmapper:
+ def __init__(self, dct: Dict[str, Any]):
self.__dct = dct
def __getattr__(self, name):
@@ -170,8 +164,8 @@
raise AttributeError(name)
-class DiskPerfInfo(object):
- def __init__(self, name, summary, params, testnodes_count):
+class DiskPerfInfo:
+ def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int):
self.name = name
self.bw = None
self.iops = None
@@ -193,7 +187,7 @@
self.concurence = self.params['vals'].get('numjobs', 1)
-def get_lat_perc_50_95(lat_mks):
+def get_lat_perc_50_95(lat_mks: List[float]) -> Tuple[float, float]:
curr_perc = 0
perc_50 = None
perc_95 = None
@@ -224,8 +218,8 @@
return perc_50 / 1000., perc_95 / 1000.
-class IOTestResults(object):
- def __init__(self, suite_name, fio_results, log_directory):
+class IOTestResults:
+ def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
self.suite_name = suite_name
self.fio_results = fio_results
self.log_directory = log_directory
@@ -236,7 +230,7 @@
def __len__(self):
return len(self.fio_results)
- def get_yamable(self):
+ def get_yamable(self) -> Dict[str, List[str]]:
items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
return {self.suite_name: [self.log_directory] + items}
@@ -424,6 +418,10 @@
soft_runcycle = 5 * 60
retry_time = 30
+ zero_md5_hash = hashlib.md5()
+ zero_md5_hash.update(b"\x00" * 1024)
+ zero_md5 = zero_md5_hash.hexdigest()
+
def __init__(self, config):
PerfTest.__init__(self, config)
@@ -464,7 +462,7 @@
self.fio_configs = None
@classmethod
- def load(cls, suite_name, folder):
+ def load(cls, suite_name: str, folder: str) -> IOTestResults:
res = []
for fname in os.listdir(folder):
if re.match("\d+_params.yaml$", fname):
@@ -478,11 +476,9 @@
pass
# size is megabytes
- def check_prefill_required(self, rossh, fname, size, num_blocks=16):
+ def check_prefill_required(self, node: INode, fname: str, size: int, num_blocks: Optional[int]=16) -> bool:
try:
- with rossh.connection.open_sftp() as sftp:
- fstats = sftp.stat(fname)
-
+ fstats = node.stat_file(fname)
if stat.S_ISREG(fstats.st_mode) and fstats.st_size < size * 1024 ** 2:
return True
except EnvironmentError:
@@ -498,14 +494,13 @@
if self.use_sudo:
cmd = "sudo " + cmd
- zero_md5 = '0f343b0931126a20f133d67c2b018a3b'
bsize = size * (1024 ** 2)
offsets = [random.randrange(bsize - 1024) for _ in range(num_blocks)]
offsets.append(bsize - 1024)
offsets.append(0)
for offset in offsets:
- data = rossh(cmd.format(fname, offset), nolog=True)
+ data = node.run(cmd.format(fname, offset), nolog=True)
md = ""
for line in data.split("\n"):
@@ -517,12 +512,12 @@
logger.error("File data check is failed - " + data)
return True
- if zero_md5 == md:
+ if self.zero_md5 == md:
return True
return False
- def prefill_test_files(self, rossh, files, force=False):
+ def prefill_test_files(self, node: INode, files: List[str], force:bool=False) -> None:
if self.use_system_fio:
cmd_templ = "fio "
else:
@@ -542,7 +537,7 @@
ddtime = 0
for fname, curr_sz in files.items():
if not force:
- if not self.check_prefill_required(rossh, fname, curr_sz):
+ if not self.check_prefill_required(node, fname, curr_sz):
logger.debug("prefill is skipped")
continue
@@ -551,7 +546,7 @@
ssize += curr_sz
stime = time.time()
- rossh(cmd, timeout=curr_sz)
+ node.run(cmd, timeout=curr_sz)
ddtime += time.time() - stime
if ddtime > 1.0:
@@ -559,10 +554,10 @@
mess = "Initiall fio fill bw is {0} MiBps for this vm"
logger.info(mess.format(fill_bw))
- def install_utils(self, node, rossh, max_retry=3, timeout=5):
+ def install_utils(self, node: INode) -> None:
need_install = []
packs = [('screen', 'screen')]
- os_info = get_os(rossh)
+ os_info = get_os(node)
if self.use_system_fio:
packs.append(('fio', 'fio'))
@@ -575,7 +570,7 @@
continue
try:
- rossh('which ' + bin_name, nolog=True)
+ node.run('which ' + bin_name, nolog=True)
except OSError:
need_install.append(package)
@@ -585,14 +580,10 @@
else:
cmd = "sudo apt-get -y install " + " ".join(need_install)
- for _ in range(max_retry):
- try:
- rossh(cmd)
- break
- except OSError as err:
- time.sleep(timeout)
- else:
- raise OSError("Can't install - " + str(err))
+ try:
+ node.run(cmd)
+ except OSError as err:
+ raise OSError("Can't install - {}".format(" ".join(need_install))) from err
if not self.use_system_fio:
fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
@@ -602,19 +593,16 @@
fio_path = os.path.join(fio_dir, fname)
if not os.path.exists(fio_path):
- raise RuntimeError("No prebuild fio available for {0}".format(os_info))
+ raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
bz_dest = self.join_remote('fio.bz2')
- with node.connection.open_sftp() as sftp:
- sftp.put(fio_path, bz_dest)
+ node.copy_file(fio_path, bz_dest)
+ node.run("bzip2 --decompress {}" + bz_dest, nolog=True)
+ node.run("chmod a+x " + self.join_remote("fio"), nolog=True)
- rossh("bzip2 --decompress " + bz_dest, nolog=True)
- rossh("chmod a+x " + self.join_remote("fio"), nolog=True)
-
- def pre_run(self):
+ def pre_run(self) -> None:
if 'FILESIZE' not in self.config_params:
- # need to detect file size
- pass
+ raise NotImplementedError("File size detection is not implemented")
self.fio_configs = fio_cfg_compile(self.raw_cfg,
self.config_fname,
@@ -641,36 +629,28 @@
force=self.force_prefill)
list(pool.map(fc, self.config.nodes))
- def pre_run_th(self, node, files, force):
+ def pre_run_th(self, node: INode, files: List[str], force_prefil: Optional[bool]=False) -> None:
try:
- # fill files with pseudo-random data
- rossh = run_on_node(node)
- rossh.connection = node.connection
+ cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+ cmd += " ; sudo chown {0} {1}".format(node.get_user(),
+ self.config.remote_dir)
+ node.run(cmd, nolog=True)
- try:
- cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
- if self.use_sudo:
- cmd = "sudo " + cmd
- cmd += " ; sudo chown {0} {1}".format(node.get_user(),
- self.config.remote_dir)
- rossh(cmd, nolog=True)
+ assert self.config.remote_dir != "" and self.config.remote_dir != "/"
+ node.run("rm -rf {}/*".format(self.config.remote_dir), nolog=True)
- assert self.config.remote_dir != "" and self.config.remote_dir != "/"
- rossh("rm -rf {0}/*".format(self.config.remote_dir), nolog=True)
+ except Exception as exc:
+ msg = "Failed to create folder {} on remote {}."
+ msg = msg.format(self.config.remote_dir, node, exc)
+ logger.exception(msg)
+ raise StopTestError(msg) from exc
- except Exception as exc:
- msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
- msg = msg.format(self.config.remote_dir, node.get_conn_id(), exc)
- logger.exception(msg)
- raise StopTestError(msg, exc)
+ self.install_utils(node)
+ self.prefill_test_files(node, files, force_prefil)
- self.install_utils(node, rossh)
- self.prefill_test_files(rossh, files, force)
- except:
- logger.exception("XXXX")
- raise
-
- def show_test_execution_time(self):
+ def show_expected_execution_time(self) -> None:
if len(self.fio_configs) > 1:
# +10% - is a rough estimation for additional operations
# like sftp, etc
@@ -682,16 +662,17 @@
logger.info(msg.format(exec_time_s,
end_dt.strftime("%H:%M:%S")))
- def run(self):
+ def run(self) -> IOTestResults:
logger.debug("Run preparation")
self.pre_run()
- self.show_test_execution_time()
+ self.show_expected_execution_time()
+ num_nodes = len(self.config.nodes)
tname = os.path.basename(self.config_fname)
if tname.endswith('.cfg'):
tname = tname[:-4]
- barrier = Barrier(len(self.config.nodes))
+ barrier = Barrier(num_nodes)
results = []
# set of Operation_Mode_BlockSize str's
@@ -699,17 +680,14 @@
# they already too slow with previous thread count
lat_bw_limit_reached = set()
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+ with ThreadPoolExecutor(num_nodes) as pool:
for pos, fio_cfg in enumerate(self.fio_configs):
- test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
+ test_descr = get_test_summary(fio_cfg.vals, noqd=True)
if test_descr in lat_bw_limit_reached:
continue
- else:
- logger.info("Will run {0} test".format(fio_cfg.name))
- templ = "Test should takes about {0}." + \
- " Should finish at {1}," + \
- " will wait at most till {2}"
+ logger.info("Will run {} test".format(fio_cfg.name))
+ templ = "Test should takes about {}. Should finish at {}, will wait at most till {}"
exec_time = execution_time(fio_cfg)
exec_time_str = sec_to_str(exec_time)
timeout = int(exec_time + max(300, exec_time))
@@ -722,43 +700,37 @@
end_dt.strftime("%H:%M:%S"),
wait_till.strftime("%H:%M:%S")))
- func = functools.partial(self.do_run,
- barrier=barrier,
- fio_cfg=fio_cfg,
- pos=pos)
+ run_test_func = functools.partial(self.do_run,
+ barrier=barrier,
+ fio_cfg=fio_cfg,
+ pos=pos)
max_retr = 3
for idx in range(max_retr):
+ if 0 != idx:
+ logger.info("Sleeping %ss and retrying", self.retry_time)
+ time.sleep(self.retry_time)
+
try:
- intervals = list(pool.map(func, self.config.nodes))
+ intervals = list(pool.map(run_test_func, self.config.nodes))
if None not in intervals:
break
except (EnvironmentError, SSHException) as exc:
+ if max_retr - 1 == idx:
+ raise StopTestError("Fio failed") from exc
logger.exception("During fio run")
- if idx == max_retr - 1:
- raise StopTestError("Fio failed", exc)
- logger.info("Reconnectiong, sleeping %ss and retrying", self.retry_time)
-
- wait([pool.submit(node.connection.close)
- for node in self.config.nodes])
-
- time.sleep(self.retry_time)
-
- wait([pool.submit(reconnect, node.connection, node.conn_url)
- for node in self.config.nodes])
-
- fname = "{0}_task.fio".format(pos)
+ fname = "{}_task.fio".format(pos)
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
fd.write(str(fio_cfg))
- params = {'vm_count': len(self.config.nodes)}
+ params = {'vm_count': num_nodes}
params['name'] = fio_cfg.name
params['vals'] = dict(fio_cfg.vals.items())
params['intervals'] = intervals
params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
- fname = "{0}_params.yaml".format(pos)
+ fname = "{}_params.yaml".format(pos)
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
fd.write(dumps(params))
@@ -770,7 +742,7 @@
# conver us to ms
if self.max_latency < lat_50:
- logger.info(("Will skip all subsequent tests of {0} " +
+ logger.info(("Will skip all subsequent tests of {} " +
"due to lat/bw limits").format(fio_cfg.name))
lat_bw_limit_reached.add(test_descr)
@@ -782,49 +754,7 @@
return IOTestResults(self.config.params['cfg'],
results, self.config.log_directory)
- def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
- if self.use_sudo:
- sudo = "sudo "
- else:
- sudo = ""
-
- bash_file = """
-#!/bin/bash
-
-function get_dev() {{
- if [ -b "$1" ] ; then
- echo $1
- else
- echo $(df "$1" | tail -1 | awk '{{print $1}}')
- fi
-}}
-
-function log_io_activiti(){{
- local dest="$1"
- local dev=$(get_dev "$2")
- local sleep_time="$3"
- dev=$(basename "$dev")
-
- echo $dev
-
- for (( ; ; )) ; do
- grep -E "\\b$dev\\b" /proc/diskstats >> "$dest"
- sleep $sleep_time
- done
-}}
-
-sync
-cd {exec_folder}
-
-log_io_activiti {io_log_file} {test_file} 1 &
-local pid="$!"
-
-{fio_path}fio --output-format=json --output={out_file} --alloc-size=262144 {job_file} >{err_out_file} 2>&1
-echo $? >{res_code_file}
-kill -9 $pid
-
-"""
-
+ def do_run(self, node: INode, barrier: Barrier, fio_cfg, pos: int, nolog: bool=False):
exec_folder = self.config.remote_dir
if self.use_system_fio:
@@ -835,157 +765,20 @@
else:
fio_path = exec_folder
- bash_file = bash_file.format(out_file=self.results_file,
- job_file=self.task_file,
- err_out_file=self.err_out_file,
- res_code_file=self.exit_code_file,
- exec_folder=exec_folder,
- fio_path=fio_path,
- test_file=self.config_params['FILENAME'],
- io_log_file=self.io_log_file).strip()
-
- with node.connection.open_sftp() as sftp:
- save_to_remote(sftp, self.task_file, str(fio_cfg))
- save_to_remote(sftp, self.sh_file, bash_file)
-
exec_time = execution_time(fio_cfg)
-
- timeout = int(exec_time + max(300, exec_time))
- soft_tout = exec_time
-
- begin = time.time()
-
- fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
-
barrier.wait()
-
- task = BGSSHTask(node, self.use_sudo)
- task.start(sudo + "bash " + self.sh_file)
-
- while True:
- try:
- task.wait(soft_tout, timeout)
- break
- except paramiko.SSHException:
- pass
-
- try:
- node.connection.close()
- except:
- pass
-
- reconnect(node.connection, node.conn_url)
-
- end = time.time()
- rossh = run_on_node(node)
- fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
-
- conn_id = node.get_conn_id().replace(":", "_")
- if not nolog:
- logger.debug("Test on node {0} is finished".format(conn_id))
-
- log_files_pref = []
- if 'write_lat_log' in fio_cfg.vals:
- fname = fio_cfg.vals['write_lat_log']
- log_files_pref.append(fname + '_clat')
- log_files_pref.append(fname + '_lat')
- log_files_pref.append(fname + '_slat')
-
- if 'write_iops_log' in fio_cfg.vals:
- fname = fio_cfg.vals['write_iops_log']
- log_files_pref.append(fname + '_iops')
-
- if 'write_bw_log' in fio_cfg.vals:
- fname = fio_cfg.vals['write_bw_log']
- log_files_pref.append(fname + '_bw')
-
- files = collections.defaultdict(lambda: [])
- all_files = [os.path.basename(self.results_file)]
- new_files = set(fnames_after.split()) - set(fnames_before.split())
-
- for fname in new_files:
- if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
- name, _ = os.path.splitext(fname)
- if fname.count('.') == 1:
- tp = name.split("_")[-1]
- cnt = 0
- else:
- tp_cnt = name.split("_")[-1]
- tp, cnt = tp_cnt.split('.')
- files[tp].append((int(cnt), fname))
- all_files.append(fname)
- elif fname == os.path.basename(self.io_log_file):
- files['iops'].append(('sys', fname))
- all_files.append(fname)
-
- arch_name = self.join_remote('wally_result.tar.gz')
- tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
-
- if os.path.exists(tmp_dir):
- shutil.rmtree(tmp_dir)
-
- os.mkdir(tmp_dir)
- loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
- file_full_names = " ".join(all_files)
-
- try:
- os.unlink(loc_arch_name)
- except:
- pass
-
- with node.connection.open_sftp() as sftp:
- try:
- exit_code = read_from_remote(sftp, self.exit_code_file)
- except IOError:
- logger.error("No exit code file found on %s. Looks like process failed to start",
- conn_id)
- return None
-
- err_out = read_from_remote(sftp, self.err_out_file)
- exit_code = exit_code.strip()
-
- if exit_code != '0':
- msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
- logger.critical(msg.strip())
- raise StopTestError("fio failed")
-
- rossh("rm -f {0}".format(arch_name), nolog=True)
- pack_files_cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
- rossh(pack_files_cmd, nolog=True)
- sftp.get(arch_name, loc_arch_name)
-
- unpack_files_cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
- subprocess.check_call(unpack_files_cmd, shell=True)
- os.unlink(loc_arch_name)
-
- for ftype, fls in files.items():
- for idx, fname in fls:
- cname = os.path.join(tmp_dir, fname)
- loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
- loc_path = os.path.join(self.config.log_directory, loc_fname)
- os.rename(cname, loc_path)
-
- cname = os.path.join(tmp_dir,
- os.path.basename(self.results_file))
- loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
- loc_path = os.path.join(self.config.log_directory, loc_fname)
- os.rename(cname, loc_path)
- os.rmdir(tmp_dir)
-
- remove_remote_res_files_cmd = "cd {0} ; rm -f {1} {2}".format(exec_folder,
- arch_name,
- file_full_names)
- rossh(remove_remote_res_files_cmd, nolog=True)
- return begin, end
+ run_data = node.rpc.fio.run_fio(self.use_sudo,
+ fio_path,
+ exec_folder,
+ str(fio_cfg),
+ exec_time + max(300, exec_time))
+ return parse_fio_result(run_data)
@classmethod
- def prepare_data(cls, results):
- """
- create a table with io performance report
- for console
- """
+ def prepare_data(cls, results) -> List[Dict[str, Any]]:
+ """create a table with io performance report for console"""
- def key_func(data):
+ def key_func(data) -> Tuple(str, str, str, str, int):
tpl = data.summary_tpl()
return (data.name,
tpl.oper,
@@ -1067,11 +860,8 @@
fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
@classmethod
- def format_for_console(cls, results):
- """
- create a table with io performance report
- for console
- """
+ def format_for_console(cls, results) -> str:
+ """create a table with io performance report for console"""
tab = texttable.Texttable(max_width=120)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
@@ -1090,11 +880,8 @@
return tab.draw()
@classmethod
- def format_diff_for_console(cls, list_of_results):
- """
- create a table with io performance report
- for console
- """
+ def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
+ """create a table with io performance report for console"""
tab = texttable.Texttable(max_width=200)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 0f788ed..233f6e2 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -1,13 +1,17 @@
+#!/usr/bin/env python3
+
+import re
import os
import sys
import copy
import os.path
import argparse
import itertools
+from typing import Optional, Generator, Union, Dict, Iterable, Any, List, TypeVar, Callable
from collections import OrderedDict, namedtuple
-from wally.utils import sec_to_str, ssize2b
+from ...utils import sec_to_str, ssize2b
SECTION = 0
@@ -20,20 +24,20 @@
'tp', 'name', 'val'))
-class FioJobSection(object):
- def __init__(self, name):
+class FioJobSection:
+ def __init__(self, name: str):
self.name = name
self.vals = OrderedDict()
- def copy(self):
+ def copy(self) -> 'FioJobSection':
return copy.deepcopy(self)
- def required_vars(self):
+ def required_vars(self) -> Generator[str, Var]:
for name, val in self.vals.items():
if isinstance(val, Var):
yield name, val
- def is_free(self):
+ def is_free(self) -> bool:
return len(list(self.required_vars())) == 0
def __str__(self):
@@ -51,7 +55,7 @@
class ParseError(ValueError):
- def __init__(self, msg, fname, lineno, line_cont=""):
+ def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] =""):
ValueError.__init__(self, msg)
self.file_name = fname
self.lineno = lineno
@@ -65,21 +69,11 @@
super(ParseError, self).__str__())
-def is_name(name):
- if len(name) == 0:
- return False
-
- if name[0] != '_' and not name[0].isalpha():
- return False
-
- for ch in name[1:]:
- if name[0] != '_' and not name[0].isalnum():
- return False
-
- return True
+def is_name(name: str) -> bool:
+ return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name)
-def parse_value(val):
+def parse_value(val: str) -> Union[int, str, Dict, Var]:
try:
return int(val)
except ValueError:
@@ -103,7 +97,7 @@
return val
-def fio_config_lexer(fio_cfg, fname):
+def fio_config_lexer(fio_cfg: str, fname: str) -> Generator[CfgLine]:
for lineno, oline in enumerate(fio_cfg.split("\n")):
try:
line = oline.strip()
@@ -136,7 +130,7 @@
raise ParseError(str(exc), fname, lineno, oline)
-def fio_config_parse(lexer_iter):
+def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Generator[FioJobSection]:
in_globals = False
curr_section = None
glob_vals = OrderedDict()
@@ -210,19 +204,7 @@
yield curr_section
-def process_repeats(sec):
- sec = sec.copy()
- count = sec.vals.pop('NUM_ROUNDS', 1)
- assert isinstance(count, (int, long))
-
- for _ in range(count):
- yield sec.copy()
-
- if 'ramp_time' in sec.vals:
- sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
-
-
-def process_cycles(sec):
+def process_cycles(sec: FioJobSection) -> Generator[FioJobSection]:
cycles = OrderedDict()
for name, val in sec.vals.items():
@@ -232,8 +214,8 @@
if len(cycles) == 0:
yield sec
else:
- # thread should changes faster
- numjobs = cycles.pop('numjobs', None)
+ # qd should changes faster
+ numjobs = cycles.pop('qd', None)
items = cycles.items()
if len(items) > 0:
@@ -246,7 +228,7 @@
if numjobs is not None:
vals.append(numjobs)
- keys.append('numjobs')
+ keys.append('qd')
for combination in itertools.product(*vals):
new_sec = sec.copy()
@@ -254,7 +236,11 @@
yield new_sec
-def apply_params(sec, params):
+FIO_PARAM_VAL = Union[str, Var]
+FIO_PARAMS = Dict[str, FIO_PARAM_VAL]
+
+
+def apply_params(sec: FioJobSection, params: FIO_PARAMS) -> FioJobSection:
processed_vals = OrderedDict()
processed_vals.update(params)
for name, val in sec.vals.items():
@@ -273,10 +259,7 @@
return sec
-MAGIC_OFFSET = 0.1885
-
-
-def abbv_name_to_full(name):
+def abbv_name_to_full(name: str) -> str:
assert len(name) == 3
smode = {
@@ -291,12 +274,11 @@
off_mode[name[0]] + " " + oper[name[1]]
-def finall_process(sec, counter=[0]):
- sec = sec.copy()
+MAGIC_OFFSET = 0.1885
- if sec.vals.get('numjobs', '1') != 1:
- msg = "Group reporting should be set if numjobs != 1"
- assert 'group_reporting' in sec.vals, msg
+
+def finall_process(sec: FioJobSection, counter: Optional[List[int]] = [0]) -> FioJobSection:
+ sec = sec.copy()
sec.vals['unified_rw_reporting'] = '1'
@@ -328,7 +310,7 @@
return sec
-def get_test_sync_mode(sec):
+def get_test_sync_mode(sec: FioJobSection) -> str:
if isinstance(sec, dict):
vals = sec
else:
@@ -347,10 +329,10 @@
return 'a'
-TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "th_count", "vm_count"))
+TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
-def get_test_summary_tuple(sec, vm_count=None):
+def get_test_summary_tuple(sec: FioJobSection, vm_count=None) -> TestSumm:
if isinstance(sec, dict):
vals = sec
else:
@@ -365,48 +347,51 @@
"readwrite": "sm"}[vals["rw"]]
sync_mode = get_test_sync_mode(sec)
- th_count = vals.get('numjobs')
-
- if th_count is None:
- th_count = vals.get('concurence', 1)
return TestSumm(rw,
sync_mode,
vals['blocksize'],
- th_count,
+ vals['iodepth'],
vm_count)
-def get_test_summary(sec, vm_count=None):
+def get_test_summary(sec: FioJobSection, vm_count: int=None, noqd: Optional[bool]=False) -> str:
tpl = get_test_summary_tuple(sec, vm_count)
- res = "{0.oper}{0.mode}{0.bsize}th{0.th_count}".format(tpl)
+
+ res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
+ if not noqd:
+ res += "qd{}".format(tpl.qd)
if tpl.vm_count is not None:
- res += "vm" + str(tpl.vm_count)
+ res += "vm{}".format(tpl.vm_count)
return res
-def execution_time(sec):
+def execution_time(sec: FioJobSection) -> int:
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
-def parse_all_in_1(source, fname=None):
+def parse_all_in_1(source:str, fname: str=None) -> Generator[FioJobSection]:
return fio_config_parse(fio_config_lexer(source, fname))
-def flatmap(func, inp_iter):
+FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
+FM_FUNC_RES = TypeVar("FM_FUNC_RES")
+
+
+def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
+ inp_iter: Iterable[FM_FUNC_INPUT]) -> Generator[FM_FUNC_RES]:
for val in inp_iter:
for res in func(val):
yield res
-def fio_cfg_compile(source, fname, test_params):
+def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Generator[FioJobSection]:
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
- it = flatmap(process_repeats, it)
- return itertools.imap(finall_process, it)
+ return map(finall_process, it)
def parse_args(argv):
@@ -438,12 +423,12 @@
sec_it = fio_cfg_compile(job_cfg, argv_obj.jobfile, params)
if argv_obj.action == 'estimate':
- print sec_to_str(sum(map(execution_time, sec_it)))
+ print(sec_to_str(sum(map(execution_time, sec_it))))
elif argv_obj.action == 'num_tests':
- print sum(map(len, map(list, sec_it)))
+ print(sum(map(len, map(list, sec_it))))
elif argv_obj.action == 'compile':
splitter = "\n#" + "-" * 70 + "\n\n"
- print splitter.join(map(str, sec_it))
+ print(splitter.join(map(str, sec_it)))
return 0
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
new file mode 100644
index 0000000..ca3f0f3
--- /dev/null
+++ b/wally/suits/io/rpc_plugin.py
@@ -0,0 +1,15 @@
+def rpc_run_fio(cfg):
+ fio_cmd_templ = "cd {exec_folder}; {fio_path}fio --output-format=json " + \
+ "--output={out_file} --alloc-size=262144 {job_file}"
+
+ # fnames_before = node.run("ls -1 " + exec_folder, nolog=True)
+ #
+ # timeout = int(exec_time + max(300, exec_time))
+ # soft_end_time = time.time() + exec_time
+ # logger.error("Fio timeouted on node {}. Killing it".format(node))
+ # end = time.time()
+ # fnames_after = node.run("ls -1 " + exec_folder, nolog=True)
+ #
+
+def parse_fio_result(data):
+ pass
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index f42dff6..86de738 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,6 +1,6 @@
[global]
include defaults.cfg
-QD={% 1, 5 %}
+NUMJOBS=8
ramp_time=5
runtime=5
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 0529c1f..00492c9 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -3,18 +3,21 @@
import logging
import os.path
import functools
+from typing import Dict, Any, List, Tuple
from concurrent.futures import ThreadPoolExecutor
-from wally.utils import Barrier, StopTestError
-from wally.statistic import data_property
-from wally.ssh_utils import run_over_ssh, copy_paths
+from ..utils import Barrier, StopTestError
+from ..statistic import data_property
+from ..ssh_utils import copy_paths
+from ..inode import INode
+
logger = logging.getLogger("wally")
-class TestConfig(object):
+class TestConfig:
"""
this class describe test input configuration
@@ -25,8 +28,13 @@
nodes:[Node] - node to run tests on
remote_dir:str - directory on nodes to be used for local files
"""
- def __init__(self, test_type, params, test_uuid, nodes,
- log_directory, remote_dir):
+ def __init__(self,
+ test_type: str,
+ params: Dict[str, Any],
+ test_uuid: str,
+ nodes: List[INode],
+ log_directory: str,
+ remote_dir: str):
self.test_type = test_type
self.params = params
self.test_uuid = test_uuid
@@ -35,7 +43,7 @@
self.remote_dir = remote_dir
-class TestResults(object):
+class TestResults:
"""
this class describe test results
@@ -45,7 +53,11 @@
raw_result:Any - opaque object to store raw results
run_interval:(float, float) - test tun time, used for sensors
"""
- def __init__(self, config, results, raw_result, run_interval):
+ def __init__(self,
+ config: TestConfig,
+ results: Dict[str, Any],
+ raw_result: Any,
+ run_interval: Tuple[float, float]):
self.config = config
self.params = config.params
self.results = results
@@ -76,22 +88,22 @@
pass
-class MeasurementMatrix(object):
- """
- data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
- """
- def __init__(self, data, connections_ids):
- self.data = data
- self.connections_ids = connections_ids
-
- def per_vm(self):
- return self.data
-
- def per_th(self):
- return sum(self.data, [])
+# class MeasurementMatrix:
+# """
+# data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
+# """
+# def __init__(self, data, connections_ids):
+# self.data = data
+# self.connections_ids = connections_ids
+#
+# def per_vm(self):
+# return self.data
+#
+# def per_th(self):
+# return sum(self.data, [])
-class MeasurementResults(object):
+class MeasurementResults:
def stat(self):
return data_property(self.data)
@@ -112,7 +124,7 @@
data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
odata: original values
"""
- def __init__(self, data):
+ def __init__(self, data: List[Tuple[float, float, float]]):
assert len(data) > 0
self.odata = data[:]
self.data = []
@@ -123,13 +135,13 @@
cstart = nstart
@property
- def values(self):
+ def values(self) -> List[float]:
return [val[2] for val in self.data]
- def average_interval(self):
+ def average_interval(self) -> float:
return float(sum([val[1] for val in self.data])) / len(self.data)
- def skip(self, seconds):
+ def skip(self, seconds) -> 'TimeSeriesValue':
nres = []
for start, ln, val in self.data:
nstart = start + ln - seconds
@@ -137,7 +149,7 @@
nres.append([nstart, val])
return self.__class__(nres)
- def derived(self, tdelta):
+ def derived(self, tdelta) -> 'TimeSeriesValue':
end = self.data[-1][0] + self.data[-1][1]
tdelta = float(tdelta)
@@ -166,7 +178,7 @@
return self.__class__(res)
-class PerfTest(object):
+class PerfTest:
"""
Very base class for tests
config:TestConfig - test configuration
@@ -176,41 +188,32 @@
self.config = config
self.stop_requested = False
- def request_stop(self):
+ def request_stop(self) -> None:
self.stop_requested = True
- def join_remote(self, path):
+ def join_remote(self, path: str) -> str:
return os.path.join(self.config.remote_dir, path)
@classmethod
@abc.abstractmethod
- def load(cls, path):
+ def load(cls, path: str):
pass
@abc.abstractmethod
- def run(self):
+ def run(self) -> List[TestResults]:
pass
@abc.abstractmethod
- def format_for_console(cls, data):
+ def format_for_console(cls, data: Any) -> str:
pass
-def run_on_node(node):
- def closure(*args, **kwargs):
- return run_over_ssh(node.connection,
- *args,
- node=node.get_conn_id(),
- **kwargs)
- return closure
-
-
class ThreadedTest(PerfTest):
"""
Base class for tests, which spawn separated thread for each node
"""
- def run(self):
+ def run(self) -> List[TestResults]:
barrier = Barrier(len(self.config.nodes))
th_test_func = functools.partial(self.th_test_func, barrier)
@@ -218,44 +221,27 @@
return list(pool.map(th_test_func, self.config.nodes))
@abc.abstractmethod
- def do_test(self, node):
+ def do_test(self, node: INode) -> TestResults:
pass
- def th_test_func(self, barrier, node):
- logger.debug("Starting {0} test on {1} node".format(self.__class__.__name__,
- node.conn_url))
-
- logger.debug("Run preparation for {0}".format(node.get_conn_id()))
+ def th_test_func(self, barrier: Barrier, node: INode) -> TestResults:
+ test_name = self.__class__.__name__
+ logger.debug("Starting {} test on {}".format(test_name , node))
+ logger.debug("Run test preparation on {}".format(node))
self.pre_run(node)
+
+ # wait till all thread became ready
barrier.wait()
+
+ logger.debug("Run test on {}".format(node))
try:
- logger.debug("Run test for {0}".format(node.get_conn_id()))
return self.do_test(node)
- except StopTestError as exc:
- pass
except Exception as exc:
- msg = "In test {0} for node {1}".format(self, node.get_conn_id())
+ msg = "In test {} for {}".format(test_name, node)
logger.exception(msg)
- exc = StopTestError(msg, exc)
+ raise StopTestError(msg) from exc
- try:
- self.cleanup()
- except StopTestError as exc1:
- if exc is None:
- exc = exc1
- except Exception as exc1:
- if exc is None:
- msg = "Duringf cleanup - in test {0} for node {1}".format(self, node)
- logger.exception(msg)
- exc = StopTestError(msg, exc)
-
- if exc is not None:
- raise exc
-
- def pre_run(self, node):
- pass
-
- def cleanup(self, node):
+ def pre_run(self, node: INode) -> None:
pass
@@ -269,25 +255,22 @@
self.prerun_tout = self.config.params.get('prerun_tout', 3600)
self.run_tout = self.config.params.get('run_tout', 3600)
- def get_remote_for_script(self, script):
- return os.path.join(self.remote_dir,
- os.path.basename(script))
+ def get_remote_for_script(self, script: str) -> str:
+ return os.path.join(self.remote_dir, os.path.basename(script))
- def pre_run(self, node):
+ def pre_run(self, node: INode) -> None:
copy_paths(node.connection,
- {
- self.run_script: self.get_remote_for_script(self.run_script),
- self.prerun_script: self.get_remote_for_script(self.prerun_script),
- })
+ {self.run_script: self.get_remote_for_script(self.run_script),
+ self.prerun_script: self.get_remote_for_script(self.prerun_script)})
cmd = self.get_remote_for_script(self.prerun_script)
cmd += ' ' + self.config.params.get('prerun_opts', '')
- run_on_node(node)(cmd, timeout=self.prerun_tout)
+ node.run(cmd, timeout=self.prerun_tout)
- def do_test(self, node):
+ def do_test(self, node: INode) -> TestResults:
cmd = self.get_remote_for_script(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')
t1 = time.time()
- res = run_on_node(node)(cmd, timeout=self.run_tout)
+ res = node.run(cmd, timeout=self.run_tout)
t2 = time.time()
return TestResults(self.config, None, res, (t1, t2))
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
new file mode 100644
index 0000000..9ce3370
--- /dev/null
+++ b/wally/test_run_class.py
@@ -0,0 +1,20 @@
+class TestRun:
+ """Test run information"""
+ def __init__(self):
+ # NodesInfo list
+ self.nodes_info = []
+
+ # Nodes list
+ self.nodes = []
+
+ self.build_meta = {}
+ self.clear_calls_stack = []
+
+ # created openstack nodes
+ self.openstack_nodes_ids = []
+ self.sensors_mon_q = None
+
+ # openstack credentials
+ self.fuel_openstack_creds = None
+
+
diff --git a/wally/utils.py b/wally/utils.py
index 3fba2b0..32c9056 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,13 +1,18 @@
import re
import os
+import io
import sys
import socket
import logging
+import ipaddress
import threading
import contextlib
import subprocess
import collections
+from .inode import INode
+from typing import Any, Tuple, Union, List, Generator, Dict, Callable, Iterable, Optional
+
try:
import psutil
except ImportError:
@@ -17,48 +22,27 @@
logger = logging.getLogger("wally")
-def is_ip(data):
- if data.count('.') != 3:
- return False
-
+def is_ip(data: str) -> bool:
try:
- for part in map(int, data.split('.')):
- if part > 255 or part < 0:
- raise ValueError()
+ ipaddress.ip_address(data)
+ return True
except ValueError:
return False
- return True
class StopTestError(RuntimeError):
- def __init__(self, reason, orig_exc=None):
- RuntimeError.__init__(self, reason)
- self.orig_exc = orig_exc
+ pass
-@contextlib.contextmanager
-def log_block(message, exc_logger=None):
- logger.debug("Starts : " + message)
- with log_error(message, exc_logger):
- yield
- # try:
- # yield
- # except Exception as exc:
- # if isinstance(exc, types) and not isinstance(exc, StopIteration):
- # templ = "Error during {0} stage: {1!s}"
- # logger.debug(templ.format(action, exc))
- # raise
-
-
-class log_error(object):
- def __init__(self, message, exc_logger=None):
+class LogError:
+ def __init__(self, message: str, exc_logger=None):
self.message = message
self.exc_logger = exc_logger
def __enter__(self):
return self
- def __exit__(self, tp, value, traceback):
+ def __exit__(self, tp: type, value: Exception, traceback: Any):
if value is None or isinstance(value, StopTestError):
return
@@ -71,13 +55,18 @@
raise StopTestError(self.message, value)
-def check_input_param(is_ok, message):
+def log_block(message: str, exc_logger=None) -> LogError:
+ logger.debug("Starts : " + message)
+ return LogError(message, exc_logger)
+
+
+def check_input_param(is_ok: bool, message: str) -> None:
if not is_ok:
logger.error(message)
raise StopTestError(message)
-def parse_creds(creds):
+def parse_creds(creds: str) -> Tuple[str, str, str]:
# parse user:passwd@host
user, passwd_host = creds.split(":", 1)
@@ -93,14 +82,14 @@
pass
-class Barrier(object):
- def __init__(self, count):
+class Barrier:
+ def __init__(self, count: int):
self.count = count
self.curr_count = 0
self.cond = threading.Condition()
self.exited = False
- def wait(self, timeout=None):
+ def wait(self, timeout: int=None) -> bool:
with self.cond:
if self.exited:
raise TaksFinished()
@@ -114,7 +103,7 @@
self.cond.wait(timeout=timeout)
return False
- def exit(self):
+ def exit(self) -> None:
with self.cond:
self.exited = True
@@ -122,9 +111,9 @@
SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
-def ssize2b(ssize):
+def ssize2b(ssize: Union[str, int]) -> int:
try:
- if isinstance(ssize, (int, long)):
+ if isinstance(ssize, int):
return ssize
ssize = ssize.lower()
@@ -132,7 +121,7 @@
return int(ssize[:-1]) * SMAP[ssize[-1]]
return int(ssize)
except (ValueError, TypeError, AttributeError):
- raise ValueError("Unknow size format {0!r}".format(ssize))
+ raise ValueError("Unknow size format {!r}".format(ssize))
RSMAP = [('K', 1024),
@@ -141,18 +130,18 @@
('T', 1024 ** 4)]
-def b2ssize(size):
+def b2ssize(size: int) -> str:
if size < 1024:
return str(size)
for name, scale in RSMAP:
if size < 1024 * scale:
if size % scale == 0:
- return "{0} {1}i".format(size // scale, name)
+ return "{} {}i".format(size // scale, name)
else:
- return "{0:.1f} {1}i".format(float(size) / scale, name)
+ return "{:.1f} {}i".format(float(size) / scale, name)
- return "{0}{1}i".format(size // scale, name)
+ return "{}{}i".format(size // scale, name)
RSMAP_10 = [('k', 1000),
@@ -161,22 +150,22 @@
('t', 1000 ** 4)]
-def b2ssize_10(size):
+def b2ssize_10(size: int) -> str:
if size < 1000:
return str(size)
for name, scale in RSMAP_10:
if size < 1000 * scale:
if size % scale == 0:
- return "{0} {1}".format(size // scale, name)
+ return "{} {}".format(size // scale, name)
else:
- return "{0:.1f} {1}".format(float(size) / scale, name)
+ return "{:.1f} {}".format(float(size) / scale, name)
- return "{0}{1}".format(size // scale, name)
+ return "{}{}".format(size // scale, name)
-def run_locally(cmd, input_data="", timeout=20):
- shell = isinstance(cmd, basestring)
+def run_locally(cmd: Union[str, List[str]], input_data: str="", timeout:int =20) -> str:
+ shell = isinstance(cmd, str)
proc = subprocess.Popen(cmd,
shell=shell,
stdin=subprocess.PIPE,
@@ -184,7 +173,7 @@
stderr=subprocess.PIPE)
res = []
- def thread_func():
+ def thread_func() -> None:
rr = proc.communicate(input_data)
res.extend(rr)
@@ -214,7 +203,7 @@
return out
-def get_ip_for_target(target_ip):
+def get_ip_for_target(target_ip: str) -> str:
if not is_ip(target_ip):
target_ip = socket.gethostbyname(target_ip)
@@ -245,7 +234,7 @@
raise OSError("Can't define interface for {0}".format(target_ip))
-def open_for_append_or_create(fname):
+def open_for_append_or_create(fname: str) -> io.IO:
if not os.path.exists(fname):
return open(fname, "w")
@@ -254,20 +243,17 @@
return fd
-def sec_to_str(seconds):
+def sec_to_str(seconds: int) -> str:
h = seconds // 3600
m = (seconds % 3600) // 60
s = seconds % 60
- return "{0}:{1:02d}:{2:02d}".format(h, m, s)
+ return "{}:{:02d}:{:02d}".format(h, m, s)
-def yamable(data):
+def yamable(data: Any) -> Any:
if isinstance(data, (tuple, list)):
return map(yamable, data)
- if isinstance(data, unicode):
- return str(data)
-
if isinstance(data, dict):
res = {}
for k, v in data.items():
@@ -280,16 +266,16 @@
CLEANING = []
-def clean_resource(func, *args, **kwargs):
+def clean_resource(func: Callable[..., Any], *args, **kwargs) -> None:
CLEANING.append((func, args, kwargs))
-def iter_clean_func():
+def iter_clean_func() -> Generator[Callable[..., Any], List[Any], Dict[str, Any]]:
while CLEANING != []:
yield CLEANING.pop()
-def flatten(data):
+def flatten(data: Iterable[Any]) -> List[Any]:
res = []
for i in data:
if isinstance(i, (list, tuple, set)):
@@ -299,17 +285,17 @@
return res
-def get_creds_openrc(path):
+def get_creds_openrc(path: str) -> Tuple[str, str, str, str, str]:
fc = open(path).read()
echo = 'echo "$OS_INSECURE:$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
msg = "Failed to get creads from openrc file"
- with log_error(msg):
+ with LogError(msg):
data = run_locally(['/bin/bash'], input_data=fc + "\n" + echo)
msg = "Failed to get creads from openrc file: " + data
- with log_error(msg):
+ with LogError(msg):
data = data.strip()
insecure_str, user, tenant, passwd_auth_url = data.split(':', 3)
insecure = (insecure_str in ('1', 'True', 'true'))
@@ -323,20 +309,20 @@
os_release = collections.namedtuple("Distro", ["distro", "release", "arch"])
-def get_os(run_func):
- arch = run_func("arch", nolog=True).strip()
+def get_os(node: INode) -> os_release:
+ arch = node.run("arch", nolog=True).strip()
try:
- run_func("ls -l /etc/redhat-release", nolog=True)
+ node.run("ls -l /etc/redhat-release", nolog=True)
return os_release('redhat', None, arch)
except:
pass
try:
- run_func("ls -l /etc/debian_version", nolog=True)
+ node.run("ls -l /etc/debian_version", nolog=True)
release = None
- for line in run_func("lsb_release -a", nolog=True).split("\n"):
+ for line in node.run("lsb_release -a", nolog=True).split("\n"):
if ':' not in line:
continue
opt, val = line.split(":", 1)
@@ -352,16 +338,16 @@
@contextlib.contextmanager
-def empty_ctx(val=None):
+def empty_ctx(val: Any=None) -> Generator[Any]:
yield val
-def mkdirs_if_unxists(path):
+def mkdirs_if_unxists(path: str) -> None:
if not os.path.exists(path):
os.makedirs(path)
-def log_nodes_statistic(nodes):
+def log_nodes_statistic(nodes: Iterable[INode]) -> None:
logger.info("Found {0} nodes total".format(len(nodes)))
per_role = collections.defaultdict(lambda: 0)
for node in nodes:
@@ -372,7 +358,7 @@
logger.debug("Found {0} nodes with role {1}".format(count, role))
-def which(program):
+def which(program: str) -> Optional[str]:
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)