move even more code to cephlib
diff --git a/wally/ceph.py b/wally/ceph.py
index 89324c5..5c2a4f0 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -1,21 +1,17 @@
""" Collect data about ceph nodes"""
-import os
import logging
from typing import Dict, cast, List, Set
-
-from .node_interfaces import NodeInfo, IRPCNode
-from .ssh_utils import ConnCreds
-from .common_types import IP
-from .stage import Stage, StepOrder
-from .test_run_class import TestRun
-from .ssh_utils import parse_ssh_uri
-from .node import connect, setup_rpc
-from .utils import StopTestError, to_ip
-
-
from cephlib import discover
from cephlib.discover import OSDInfo
+from cephlib.common import to_ip
+from cephlib.node import NodeInfo, IRPCNode
+from cephlib.ssh import ConnCreds, IP, parse_ssh_uri
+from cephlib.node_impl import connect, setup_rpc
+
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .utils import StopTestError
logger = logging.getLogger("wally")
@@ -57,6 +53,7 @@
ceph = ctx.config.ceph
root_node_uri = cast(str, ceph.root_node)
cluster = ceph.get("cluster", "ceph")
+ ip_remap = ctx.config.ceph.get('ip_remap', {})
conf = ceph.get("conf")
key = ceph.get("key")
@@ -82,17 +79,6 @@
ceph_params = {"cluster": cluster, "conf": conf, "key": key}
- ip_remap = {
- '10.8.0.4': '172.16.164.71',
- '10.8.0.3': '172.16.164.72',
- '10.8.0.2': '172.16.164.73',
- '10.8.0.5': '172.16.164.74',
- '10.8.0.6': '172.16.164.75',
- '10.8.0.7': '172.16.164.76',
- '10.8.0.8': '172.16.164.77',
- '10.8.0.9': '172.16.164.78',
- }
-
with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
log_level=ctx.config.rpc_log_level) as node:
@@ -101,7 +87,7 @@
try:
ips = set()
for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
- ip = ip_remap[ip]
+ ip = ip_remap.get(ip, ip)
ips.add(ip)
# creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
creds = ConnCreds(to_ip(cast(str, ip)), user="root")
@@ -121,7 +107,7 @@
try:
counter = 0
for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
- ip = ip_remap[ip]
+ ip = ip_remap.get(ip, ip)
# creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
creds = ConnCreds(to_ip(cast(str, ip)), user="root")
info = ctx.merge_node(creds, {'ceph-mon'})
@@ -136,7 +122,7 @@
logger.warning("MON discovery failed %s", exc)
-def raw_dev_name(path):
+def raw_dev_name(path: str) -> str:
if path.startswith("/dev/"):
path = path[5:]
while path and path[-1].isdigit():
@@ -152,8 +138,8 @@
for node in ctx.nodes:
if 'ceph_storage_devs' not in node.info.params:
if 'ceph-osd' in node.info.roles:
- jdevs = set()
- sdevs = set()
+ jdevs = set() # type: Set[str]
+ sdevs = set() # type: Set[str]
for osd_info in node.info.params['ceph-osds']:
for key, sset in [('journal', jdevs), ('storage', sdevs)]:
path = osd_info.get(key)
diff --git a/wally/common_types.py b/wally/common_types.py
deleted file mode 100644
index 4aedfaa..0000000
--- a/wally/common_types.py
+++ /dev/null
@@ -1,36 +0,0 @@
-from typing import Any, Dict, NamedTuple
-
-from cephlib.storage import IStorable
-
-
-IP = str
-IPAddr = NamedTuple("IPAddr", [("host", IP), ("port", int)])
-
-
-class ConnCreds(IStorable):
- def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
- key_file: str = None, key: bytes = None) -> None:
- self.user = user
- self.passwd = passwd
- self.addr = IPAddr(host, int(port))
- self.key_file = key_file
- self.key = key
-
- def __str__(self) -> str:
- return "{}@{}:{}".format(self.user, self.addr.host, self.addr.port)
-
- def __repr__(self) -> str:
- return str(self)
-
- def raw(self) -> Dict[str, Any]:
- return {
- 'user': self.user,
- 'host': self.addr.host,
- 'port': self.addr.port,
- 'passwd': self.passwd,
- 'key_file': self.key_file
- }
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'ConnCreds':
- return cls(**data)
diff --git a/wally/console_report.py b/wally/console_report.py
index 1528089..6490329 100644
--- a/wally/console_report.py
+++ b/wally/console_report.py
@@ -7,7 +7,6 @@
from cephlib import texttable
from cephlib.statistic import calc_norm_stat_props, calc_histo_stat_props
-from .result_classes import IResultStorage
from .stage import Stage, StepOrder
from .test_run_class import TestRun
from .suits.io.fio import FioTest
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 3e6bc3e..795c66b 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -6,7 +6,7 @@
from cephlib.numeric_types import DataSource, TimeSeries
from cephlib.storage_selectors import c_interpolate_ts_on_seconds_border
-from .result_classes import IResultStorage
+from .result_classes import IWallyStorage
from .suits.io.fio_hist import expected_lat_bins
@@ -33,12 +33,12 @@
AGG_TAG = 'ALL'
-def find_all_series(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str) -> Iterator[TimeSeries]:
+def find_all_series(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str) -> Iterator[TimeSeries]:
"Iterated over selected metric for all nodes for given Suite/job"
return (rstorage.get_ts(ds) for ds in rstorage.iter_ts(suite_id=suite_id, job_id=job_id, metric=metric))
-def get_aggregated(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str,
+def get_aggregated(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str,
trange: Tuple[int, int]) -> TimeSeries:
"Sum selected metric for all nodes for given Suite/job"
@@ -60,7 +60,7 @@
raise ValueError(msg)
if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
- msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
+ msg = "Sensor {}.{} on node {} has shape={}. Can only process sensors with shape=[X, {}].".format(
ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
logger.error(msg)
raise ValueError(msg)
diff --git a/wally/fuel.py b/wally/fuel.py
index 3a11403..668be21 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -3,12 +3,14 @@
from paramiko.ssh_exception import AuthenticationException
+from cephlib.common import parse_creds, to_ip
+from cephlib.ssh import ConnCreds
+from cephlib.node_impl import connect, setup_rpc
+
from .fuel_rest_api import get_cluster_id, reflect_cluster, FuelInfo, KeystoneAuth
-from .ssh_utils import ConnCreds
-from .utils import StopTestError, parse_creds, to_ip
+from .utils import StopTestError
from .stage import Stage, StepOrder
from .test_run_class import TestRun
-from .node import connect, setup_rpc
from .config import ConfigBlock
from .openstack_api import OSCreds
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 8907b81..b1cfc0c 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -261,6 +261,7 @@
def __getattr__(self, name: str) -> List[Node]:
if name in self.allowed_roles:
return [node for node in self if name in node.roles]
+ return []
class Cluster(RestObj):
diff --git a/wally/hw_info.py b/wally/hw_info.py
deleted file mode 100644
index bb857f2..0000000
--- a/wally/hw_info.py
+++ /dev/null
@@ -1,275 +0,0 @@
-import re
-import logging
-from typing import Dict, Any
-import xml.etree.ElementTree as ET
-from typing import List, Tuple, cast, Optional
-
-from cephlib.istorage import Storable
-from cephlib.common import b2ssize
-
-from .node_utils import get_os
-from .node_interfaces import IRPCNode
-
-
-logger = logging.getLogger("wally")
-
-
-def get_data(rr: str, data: str) -> str:
- match_res = re.search("(?ims)" + rr, data)
- return match_res.group(0)
-
-
-class HWInfo(Storable):
- __ignore_fields__ = ['raw_xml']
-
- def __init__(self) -> None:
- self.hostname = None # type: str
- self.cores = [] # type: List[Tuple[str, int]]
-
- # /dev/... devices
- self.disks_info = {} # type: Dict[str, Tuple[str, int]]
-
- # real disks on raid controller
- self.disks_raw_info = {} # type: Dict[str, str]
-
- # name => (speed, is_full_diplex, ip_addresses)
- self.net_info = {} # type: Dict[str, Tuple[Optional[int], Optional[bool], List[str]]]
-
- self.ram_size = 0 # type: int
- self.sys_name = None # type: str
- self.mb = None # type: str
- self.raw_xml = None # type: Optional[str]
-
- self.storage_controllers = [] # type: List[str]
-
- def get_summary(self) -> Dict[str, int]:
- cores = sum(count for _, count in self.cores)
- disks = sum(size for _, size in self.disks_info.values())
-
- return {'cores': cores,
- 'ram': self.ram_size,
- 'storage': disks,
- 'disk_count': len(self.disks_info)}
-
- def __str__(self):
- res = []
-
- summ = self.get_summary()
- summary = "Simmary: {cores} cores, {ram}B RAM, {disk}B storage"
- res.append(summary.format(cores=summ['cores'],
- ram=b2ssize(summ['ram']),
- disk=b2ssize(summ['storage'])))
- res.append(str(self.sys_name))
- if self.mb:
- res.append("Motherboard: " + self.mb)
-
- if not self.ram_size:
- res.append("RAM: Failed to get RAM size")
- else:
- res.append("RAM " + b2ssize(self.ram_size) + "B")
-
- if not self.cores:
- res.append("CPU cores: Failed to get CPU info")
- else:
- res.append("CPU cores:")
- for name, count in self.cores:
- if count > 1:
- res.append(" {0} * {1}".format(count, name))
- else:
- res.append(" " + name)
-
- if self.storage_controllers:
- res.append("Disk controllers:")
- for descr in self.storage_controllers:
- res.append(" " + descr)
-
- if self.disks_info:
- res.append("Storage devices:")
- for dev, (model, size) in sorted(self.disks_info.items()):
- ssize = b2ssize(size) + "B"
- res.append(" {0} {1} {2}".format(dev, ssize, model))
- else:
- res.append("Storage devices's: Failed to get info")
-
- if self.disks_raw_info:
- res.append("Disks devices:")
- for dev, descr in sorted(self.disks_raw_info.items()):
- res.append(" {0} {1}".format(dev, descr))
- else:
- res.append("Disks devices's: Failed to get info")
-
- if self.net_info:
- res.append("Net adapters:")
- for name, (speed, dtype, _) in self.net_info.items():
- res.append(" {0} {2} duplex={1}".format(name, dtype, speed))
- else:
- res.append("Net adapters: Failed to get net info")
-
- return str(self.hostname) + ":\n" + "\n".join(" " + i for i in res)
-
-
-class SWInfo(Storable):
- def __init__(self) -> None:
- self.mtab = None # type: str
- self.kernel_version = None # type: str
- self.libvirt_version = None # type: Optional[str]
- self.qemu_version = None # type: Optional[str]
- self.os_version = None # type: Tuple[str, ...]
-
-
-def get_sw_info(node: IRPCNode) -> SWInfo:
- res = SWInfo()
-
- res.os_version = tuple(get_os(node))
- res.kernel_version = node.get_file_content('/proc/version').decode('utf8').strip()
- res.mtab = node.get_file_content('/etc/mtab').decode('utf8').strip()
-
- try:
- res.libvirt_version = node.run("virsh -v", nolog=True).strip()
- except OSError:
- res.libvirt_version = None
-
- # dpkg -l ??
-
- try:
- res.qemu_version = node.run("qemu-system-x86_64 --version", nolog=True).strip()
- except OSError:
- res.qemu_version = None
-
- return res
-
-
-def get_hw_info(node: IRPCNode) -> Optional[HWInfo]:
- try:
- lshw_out = node.run('sudo lshw -xml 2>/dev/null')
- except Exception as exc:
- logger.warning("lshw failed on node %s: %s", node.node_id, exc)
- return None
-
- res = HWInfo()
- res.raw_xml = lshw_out
- lshw_et = ET.fromstring(lshw_out)
-
- try:
- res.hostname = cast(str, lshw_et.find("node").attrib['id'])
- except Exception:
- pass
-
- try:
-
- res.sys_name = cast(str, lshw_et.find("node/vendor").text) + " " + \
- cast(str, lshw_et.find("node/product").text)
- res.sys_name = res.sys_name.replace("(To be filled by O.E.M.)", "")
- res.sys_name = res.sys_name.replace("(To be Filled by O.E.M.)", "")
- except Exception:
- pass
-
- core = lshw_et.find("node/node[@id='core']")
- if core is None:
- return res
-
- try:
- res.mb = " ".join(cast(str, core.find(node).text)
- for node in ['vendor', 'product', 'version'])
- except Exception:
- pass
-
- for cpu in core.findall("node[@class='processor']"):
- try:
- model = cast(str, cpu.find('product').text)
- threads_node = cpu.find("configuration/setting[@id='threads']")
- if threads_node is None:
- threads = 1
- else:
- threads = int(threads_node.attrib['value'])
- res.cores.append((model, threads))
- except Exception:
- pass
-
- res.ram_size = 0
- for mem_node in core.findall(".//node[@class='memory']"):
- descr = mem_node.find('description')
- try:
- if descr is not None and descr.text == 'System Memory':
- mem_sz = mem_node.find('size')
- if mem_sz is None:
- for slot_node in mem_node.find("node[@class='memory']"):
- slot_sz = slot_node.find('size')
- if slot_sz is not None:
- assert slot_sz.attrib['units'] == 'bytes'
- res.ram_size += int(slot_sz.text)
- else:
- assert mem_sz.attrib['units'] == 'bytes'
- res.ram_size += int(mem_sz.text)
- except Exception:
- pass
-
- for net in core.findall(".//node[@class='network']"):
- try:
- link = net.find("configuration/setting[@id='link']")
- if link.attrib['value'] == 'yes':
- name = cast(str, net.find("logicalname").text)
- speed_node = net.find("configuration/setting[@id='speed']")
-
- if speed_node is None:
- speed = None
- else:
- speed = int(speed_node.attrib['value'])
-
- dup_node = net.find("configuration/setting[@id='duplex']")
- if dup_node is None:
- dup = None
- else:
- dup = cast(str, dup_node.attrib['value']).lower() == 'yes'
-
- ips = [] # type: List[str]
- res.net_info[name] = (speed, dup, ips)
- except Exception:
- pass
-
- for controller in core.findall(".//node[@class='storage']"):
- try:
- description = getattr(controller.find("description"), 'text', "")
- product = getattr(controller.find("product"), 'text', "")
- vendor = getattr(controller.find("vendor"), 'text', "")
- dev = getattr(controller.find("logicalname"), 'text', "")
- if dev != "":
- res.storage_controllers.append(
- "{0}: {1} {2} {3}".format(dev, description,
- vendor, product))
- else:
- res.storage_controllers.append(
- "{0} {1} {2}".format(description,
- vendor, product))
- except Exception:
- pass
-
- for disk in core.findall(".//node[@class='disk']"):
- try:
- lname_node = disk.find('logicalname')
- if lname_node is not None:
- dev = cast(str, lname_node.text).split('/')[-1]
-
- if dev == "" or dev[-1].isdigit():
- continue
-
- sz_node = disk.find('size')
- assert sz_node.attrib['units'] == 'bytes'
- sz = int(sz_node.text)
- res.disks_info[dev] = ('', sz)
- else:
- description = disk.find('description').text
- product = disk.find('product').text
- vendor = disk.find('vendor').text
- version = disk.find('version').text
- serial = disk.find('serial').text
-
- full_descr = "{0} {1} {2} {3} {4}".format(
- description, product, vendor, version, serial)
-
- businfo = cast(str, disk.find('businfo').text)
- res.disks_raw_info[businfo] = full_descr
- except Exception:
- pass
-
- return res
diff --git a/wally/main.py b/wally/main.py
index e43f1f9..66e39db 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -31,14 +31,15 @@
from cephlib.common import setup_logging
from cephlib.storage import make_storage
+from cephlib.ssh import set_ssh_key_passwd
+from cephlib.node import log_nodes_statistic
+from cephlib.node_impl import get_rpc_server_code
-from . import utils, node, report_profiles, report
-from .node_utils import log_nodes_statistic
+from . import utils, report_profiles, report
from .config import Config
from .stage import Stage
from .test_run_class import TestRun
-from .ssh import set_ssh_key_passwd
-from .result_storage import ResultStorage
+from .result_storage import WallyStorage
# stages
from .ceph import DiscoverCephStage, CollectCephInfoStage
@@ -280,7 +281,6 @@
elif opts.subparser_name == 'ls':
tab = Texttable(max_width=200)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
tab.set_cols_align(["l", "l", "l", "l"])
tab.header(["Name", "Tests", "Run at", "Comment"])
tab.add_rows(list_results(opts.result_storage))
@@ -317,7 +317,7 @@
return 0
elif opts.subparser_name == 'ipython':
storage = make_storage(opts.storage_dir, existing=True)
- rstorage = ResultStorage(storage=storage)
+ rstorage = WallyStorage(storage=storage)
import IPython
IPython.embed()
@@ -343,8 +343,8 @@
logger.info("All info would be stored into %r", config.storage_url)
- ctx = TestRun(config, storage, ResultStorage(storage))
- ctx.rpc_code, ctx.default_rpc_plugins = node.get_rpc_server_code()
+ ctx = TestRun(config, storage, WallyStorage(storage))
+ ctx.rpc_code, ctx.default_rpc_plugins = get_rpc_server_code()
if opts.ssh_key_passwd is not None:
set_ssh_key_passwd(opts.ssh_key_passwd)
@@ -398,8 +398,14 @@
logger.info("All info is stored into %r", config.storage_url)
if failed or cleanup_failed:
- logger.error("Tests are failed. See error details in log above")
+ if opts.subparser_name == 'report':
+ logger.error("Report generation failed. See error details in log above")
+ else:
+ logger.error("Tests are failed. See error details in log above")
return 1
else:
- logger.info("Tests finished successfully")
+ if opts.subparser_name == 'report':
+ logger.info("Report successfully generated")
+ else:
+ logger.info("Tests finished successfully")
return 0
diff --git a/wally/node.py b/wally/node.py
deleted file mode 100644
index a662f76..0000000
--- a/wally/node.py
+++ /dev/null
@@ -1,359 +0,0 @@
-import os
-import zlib
-import time
-import json
-import socket
-import logging
-import tempfile
-import subprocess
-from typing import Union, cast, Optional, Tuple, Dict
-
-from agent import agent
-import paramiko
-
-from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
-from .ssh import connect as ssh_connect
-
-
-logger = logging.getLogger("wally")
-
-
-class SSHHost(ISSHHost):
- def __init__(self, conn: paramiko.SSHClient, info: NodeInfo) -> None:
- self.conn = conn
- self.info = info
-
- def __str__(self) -> str:
- return self.node_id
-
- @property
- def node_id(self) -> str:
- return self.info.node_id
-
- def put_to_file(self, path: Optional[str], content: bytes) -> str:
- if path is None:
- path = self.run("mktemp", nolog=True).strip()
-
- logger.debug("PUT %s bytes to %s", len(content), path)
-
- with self.conn.open_sftp() as sftp:
- with sftp.open(path, "wb") as fd:
- fd.write(content)
-
- return path
-
- def disconnect(self):
- self.conn.close()
-
- def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
- if not nolog:
- logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
-
- transport = self.conn.get_transport()
- session = transport.open_session()
-
- try:
- session.set_combine_stderr(True)
- stime = time.time()
-
- session.exec_command(cmd)
- session.settimeout(1)
- session.shutdown_write()
- output = ""
-
- while True:
- try:
- ndata = session.recv(1024).decode("utf-8")
- if not ndata:
- break
- output += ndata
- except socket.timeout:
- pass
-
- if time.time() - stime > timeout:
- raise OSError(output + "\nExecution timeout")
-
- code = session.recv_exit_status()
- finally:
- found = False
-
- if found:
- session.close()
-
- if code != 0:
- templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
- raise OSError(templ.format(self, cmd, code, output))
-
- return output
-
-
-class LocalHost(ISSHHost):
- def __str__(self):
- return "<Local>"
-
- def get_ip(self) -> str:
- return 'localhost'
-
- def put_to_file(self, path: Optional[str], content: bytes) -> str:
- if path is None:
- fd, path = tempfile.mkstemp(text=False)
- os.close(fd)
- else:
- dir_name = os.path.dirname(path)
- os.makedirs(dir_name, exist_ok=True)
-
- with open(path, "wb") as fd2:
- fd2.write(content)
-
- return path
-
- def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
- proc = subprocess.Popen(cmd, shell=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
-
- stdout_data_b, _ = proc.communicate()
- stdout_data = stdout_data_b.decode("utf8")
-
- if proc.returncode != 0:
- templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
- raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
-
- return stdout_data
-
- def disconnect(self):
- pass
-
-
-def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
- # setup rpc data
- if agent.__file__.endswith(".pyc"):
- path = agent.__file__[:-1]
- else:
- path = agent.__file__
-
- master_code = open(path, "rb").read()
-
- plugins = {} # type: Dict[str, bytes]
- cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
- plugins["cli"] = open(cli_path, "rb").read()
-
- fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
- plugins["fs"] = open(fs_path, "rb").read()
-
- return master_code, plugins
-
-
-def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
- if info == 'local':
- return LocalHost()
- else:
- info_c = cast(NodeInfo, info)
- return SSHHost(ssh_connect(info_c.ssh_creds, conn_timeout), info_c)
-
-
-class RPCNode(IRPCNode):
- """Node object"""
-
- def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
- self.info = info
- self.conn = conn
-
- def __str__(self) -> str:
- return "Node({!r})".format(self.info)
-
- def __repr__(self) -> str:
- return str(self)
-
- @property
- def node_id(self) -> str:
- return self.info.node_id
-
- def get_file_content(self, path: str, expanduser: bool = False, compress: bool = True) -> bytes:
- logger.debug("GET %s from %s", path, self.info)
- if expanduser:
- path = self.conn.fs.expanduser(path)
- res = self.conn.fs.get_file(path, compress)
- logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
- if compress:
- res = zlib.decompress(res)
- return res
-
- def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
- if not nolog:
- logger.debug("Node %s - run %s", self.node_id, cmd)
-
- cmd_b = cmd.encode("utf8")
- proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
- out = ""
-
- while True:
- code, outb, _ = self.conn.cli.get_updates(proc_id)
- out += outb.decode("utf8")
- if code is not None:
- break
- time.sleep(check_timeout)
-
- if code != 0:
- templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
- raise OSError(templ.format(self.node_id, cmd, code, out))
-
- return out
-
- def copy_file(self, local_path: str, remote_path: str = None,
- expanduser: bool = False,
- compress: bool = False) -> str:
-
- if expanduser:
- remote_path = self.conn.fs.expanduser(remote_path)
-
- data = open(local_path, 'rb').read() # type: bytes
- return self.put_to_file(remote_path, data, compress=compress)
-
- def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
- if expanduser:
- path = self.conn.fs.expanduser(path)
- if compress:
- content = zlib.compress(content)
- return self.conn.fs.store_file(path, content, compress)
-
- def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
- if expanduser:
- path = self.conn.fs.expanduser(path)
- return self.conn.fs.file_stat(path)
-
- def __exit__(self, x, y, z) -> bool:
- self.disconnect(stop=True)
- return False
-
- def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
- self.conn.server.load_module(name, version, code)
-
- def disconnect(self, stop: bool = False) -> None:
- if stop:
- logger.debug("Stopping RPC server on %s", self.info)
- self.conn.server.stop()
-
- logger.debug("Disconnecting from %s", self.info)
- self.conn.disconnect()
- self.conn = None
-
-
-def get_node_python_27(node: ISSHHost) -> Optional[str]:
- python_cmd = None # type: Optional[str]
- try:
- python_cmd = node.run('which python2.7').strip()
- except Exception as exc:
- pass
-
- if python_cmd is None:
- try:
- if '2.7' in node.run('python --version'):
- python_cmd = node.run('which python').strip()
- except Exception as exc:
- pass
-
- return python_cmd
-
-
-def setup_rpc(node: ISSHHost,
- rpc_server_code: bytes,
- plugins: Dict[str, bytes] = None,
- port: int = 0,
- log_level: str = None) -> IRPCNode:
-
- logger.debug("Setting up RPC connection to {}".format(node.info))
- python_cmd = get_node_python_27(node)
- if python_cmd:
- logger.debug("python2.7 on node {} path is {}".format(node.info, python_cmd))
- else:
- logger.error(("Can't find python2.7 on node {}. " +
- "Install python2.7 and rerun test").format(node.info))
- raise ValueError("Python not found")
-
- code_file = node.put_to_file(None, rpc_server_code)
- ip = node.info.ssh_creds.addr.host
-
- log_file = None # type: Optional[str]
- if log_level:
- log_file = node.run("mktemp", nolog=True).strip()
- cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
- cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
- logger.info("Agent logs for node {} stored remotely in file {}, log level is {}".format(
- node.node_id, log_file, log_level))
- else:
- cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
- cmd = cmd.format(python_cmd, code_file, ip, port)
-
- params_js = node.run(cmd).strip()
- params = json.loads(params_js)
-
- node.info.params.update(params)
-
- port = int(params['addr'].split(":")[1])
- rpc_conn = agent.connect((ip, port))
-
- rpc_node = RPCNode(rpc_conn, node.info)
- rpc_node.rpc_log_file = log_file
-
- if plugins is not None:
- try:
- for name, code in plugins.items():
- rpc_node.upload_plugin(name, code)
- except Exception:
- rpc_node.disconnect(True)
- raise
-
- return rpc_node
-
-
- # class RemoteNode(node_interfaces.IRPCNode):
-# def __init__(self, node_info: node_interfaces.NodeInfo, rpc_conn: agent.RPCClient):
-# self.info = node_info
-# self.rpc = rpc_conn
-#
- # def get_interface(self, ip: str) -> str:
- # """Get node external interface for given IP"""
- # data = self.run("ip a", nolog=True)
- # curr_iface = None
- #
- # for line in data.split("\n"):
- # match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
- # if match1 is not None:
- # curr_iface = match1.group('name')
- #
- # match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
- # if match2 is not None:
- # if match2.group('ip') == ip:
- # assert curr_iface is not None
- # return curr_iface
- #
- # raise KeyError("Can't found interface for ip {0}".format(ip))
- #
- # def get_user(self) -> str:
- # """"get ssh connection username"""
- # if self.ssh_conn_url == 'local':
- # return getpass.getuser()
- # return self.ssh_cred.user
- #
- #
- # def run(self, cmd: str, stdin_data: str = None, timeout: int = 60, nolog: bool = False) -> Tuple[int, str]:
- # """Run command on node. Will use rpc connection, if available"""
- #
- # if self.rpc_conn is None:
- # return run_over_ssh(self.ssh_conn, cmd,
- # stdin_data=stdin_data, timeout=timeout,
- # nolog=nolog, node=self)
- # assert not stdin_data
- # proc_id = self.rpc_conn.cli.spawn(cmd)
- # exit_code = None
- # output = ""
- #
- # while exit_code is None:
- # exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
- # output += stdout_data + stderr_data
- #
- # return exit_code, output
-
-
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
deleted file mode 100644
index 703ef71..0000000
--- a/wally/node_interfaces.py
+++ /dev/null
@@ -1,144 +0,0 @@
-import abc
-import logging
-from typing import Any, Set, Dict, Optional, NamedTuple
-
-from cephlib.istorage import IStorable
-
-from .ssh_utils import ConnCreds
-from .common_types import IPAddr
-
-
-RPCCreds = NamedTuple("RPCCreds", [("addr", IPAddr), ("key_file", str), ("cert_file", str)])
-logger = logging.getLogger("wally")
-
-
-class NodeInfo(IStorable):
- """Node information object, result of discovery process or config parsing"""
- def __init__(self, ssh_creds: ConnCreds, roles: Set[str], params: Dict[str, Any] = None) -> None:
- # ssh credentials
- self.ssh_creds = ssh_creds
-
- # credentials for RPC connection
- self.rpc_creds = None # type: Optional[RPCCreds]
-
- self.roles = roles
- self.os_vm_id = None # type: Optional[int]
- self.params = {} # type: Dict[str, Any]
- if params is not None:
- self.params = params
-
- @property
- def node_id(self) -> str:
- return "{0.host}:{0.port}".format(self.ssh_creds.addr)
-
- def __str__(self) -> str:
- return self.node_id
-
- def __repr__(self) -> str:
- return str(self)
-
- def raw(self) -> Dict[str, Any]:
- dct = self.__dict__.copy()
-
- if self.rpc_creds is not None:
- dct['rpc_creds'] = list(self.rpc_creds)
-
- dct['ssh_creds'] = self.ssh_creds.raw()
- dct['roles'] = list(self.roles)
- return dct
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'NodeInfo':
- data = data.copy()
- if data['rpc_creds'] is not None:
- data['rpc_creds'] = RPCCreds(*data['rpc_creds'])
-
- data['ssh_creds'] = ConnCreds.fromraw(data['ssh_creds'])
- data['roles'] = set(data['roles'])
- obj = cls.__new__(cls) # type: ignore
- obj.__dict__.update(data)
- return obj
-
-
-class ISSHHost(metaclass=abc.ABCMeta):
- """Minimal interface, required to setup RPC connection"""
- info = None # type: NodeInfo
-
- @abc.abstractmethod
- def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
- pass
-
- @abc.abstractmethod
- def __str__(self) -> str:
- pass
-
- @abc.abstractmethod
- def disconnect(self) -> None:
- pass
-
- @abc.abstractmethod
- def put_to_file(self, path: Optional[str], content: bytes) -> str:
- pass
-
- def __enter__(self) -> 'ISSHHost':
- return self
-
- def __exit__(self, x, y, z) -> bool:
- self.disconnect()
- return False
-
- @property
- def node_id(self) -> str:
- return self.info.node_id
-
-
-class IRPCNode(metaclass=abc.ABCMeta):
- """Remote filesystem interface"""
- info = None # type: NodeInfo
- conn = None # type: Any
- rpc_log_file = None # type: str
-
- @property
- def node_id(self) -> str:
- return self.info.node_id
-
- @abc.abstractmethod
- def __str__(self) -> str:
- pass
-
- @abc.abstractmethod
- def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
- pass
-
- @abc.abstractmethod
- def copy_file(self, local_path: str, remote_path: str = None,
- expanduser: bool = False, compress: bool = False) -> str:
- pass
-
- @abc.abstractmethod
- def get_file_content(self, path: str, expanduser: bool = False, compress: bool = False) -> bytes:
- pass
-
- @abc.abstractmethod
- def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
- pass
-
- @abc.abstractmethod
- def stat_file(self, path:str, expanduser: bool = False) -> Any:
- pass
-
- @abc.abstractmethod
- def disconnect(self) -> None:
- pass
-
- @abc.abstractmethod
- def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
- pass
-
- def __enter__(self) -> 'IRPCNode':
- return self
-
- def __exit__(self, x, y, z) -> bool:
- self.disconnect()
- return False
-
diff --git a/wally/node_utils.py b/wally/node_utils.py
deleted file mode 100644
index e66ba44..0000000
--- a/wally/node_utils.py
+++ /dev/null
@@ -1,56 +0,0 @@
-import logging
-import collections
-from typing import Dict, Sequence, NamedTuple
-
-from .node_interfaces import IRPCNode
-
-logger = logging.getLogger("wally")
-
-
-def log_nodes_statistic(nodes: Sequence[IRPCNode]) -> None:
- logger.info("Found {0} nodes total".format(len(nodes)))
-
- per_role = collections.defaultdict(int) # type: Dict[str, int]
- for node in nodes:
- for role in node.info.roles:
- per_role[role] += 1
-
- for role, count in sorted(per_role.items()):
- logger.debug("Found {0} nodes with role {1}".format(count, role))
-
-
-
-OSRelease = NamedTuple("OSRelease",
- [("distro", str),
- ("release", str),
- ("arch", str)])
-
-
-def get_os(node: IRPCNode) -> OSRelease:
- """return os type, release and architecture for node.
- """
- arch = node.run("arch", nolog=True).strip()
-
- try:
- node.run("ls -l /etc/redhat-release", nolog=True)
- return OSRelease('redhat', None, arch)
- except:
- pass
-
- try:
- node.run("ls -l /etc/debian_version", nolog=True)
-
- release = None
- for line in node.run("lsb_release -a", nolog=True).split("\n"):
- if ':' not in line:
- continue
- opt, val = line.split(":", 1)
-
- if opt == 'Codename':
- release = val.strip()
-
- return OSRelease('ubuntu', release, arch)
- except:
- pass
-
- raise RuntimeError("Unknown os")
diff --git a/wally/openstack.py b/wally/openstack.py
index 4048207..16b9db6 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -1,13 +1,13 @@
import os.path
import socket
import logging
-from typing import Dict, Any, List, Tuple, cast
+from typing import Dict, Any, List, Tuple, cast, Optional
from cephlib.common import to_ip
+from cephlib.node import NodeInfo
+from cephlib.ssh import ConnCreds
-from .node_interfaces import NodeInfo
from .config import ConfigBlock, Config
-from .ssh_utils import ConnCreds
from .openstack_api import (os_connect, find_vms,
OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
from .test_run_class import TestRun
diff --git a/wally/openstack_api.py b/wally/openstack_api.py
index 302d898..257a7de 100644
--- a/wally/openstack_api.py
+++ b/wally/openstack_api.py
@@ -17,9 +17,8 @@
from glanceclient import Client as GlanceClient
from cephlib.common import Timeout, to_ip
-
-from .node_interfaces import NodeInfo
-from .ssh_utils import ConnCreds
+from cephlib.node import NodeInfo
+from cephlib.ssh import ConnCreds
__doc__ = """
diff --git a/wally/ops_log.py b/wally/ops_log.py
deleted file mode 100644
index 48f2b61..0000000
--- a/wally/ops_log.py
+++ /dev/null
@@ -1,9 +0,0 @@
-from typing import Any
-
-
-log = []
-
-
-def log_op(name: str, *params: Any) -> None:
- log.append([name] + list(params))
-
diff --git a/wally/plot.py b/wally/plot.py
index 857a594..f809d19 100644
--- a/wally/plot.py
+++ b/wally/plot.py
@@ -3,16 +3,9 @@
import numpy
-# to make seaborn styles available
-import warnings
-with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- import seaborn
-
from cephlib.units import unit_conversion_coef_f
from cephlib.plot import PlotParams, provide_plot
-from .report_profiles import StyleProfile
from .resources import IOSummary
@@ -143,7 +136,7 @@
# override some styles
pp.fig.set_size_inches(*pp.style.qd_chart_inches)
- pp.fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
+ pp.fig.subplots_adjust(right=pp.style.subplot_adjust_r)
if pp.style.extra_io_spine:
ax3.grid(False)
diff --git a/wally/report.py b/wally/report.py
index d37ac60..9615461 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -2,7 +2,7 @@
import abc
import logging
from collections import defaultdict
-from typing import Dict, Any, Iterator, Tuple, cast, List, Set, Optional, Union
+from typing import Dict, Any, Iterator, Tuple, cast, List, Set, Optional, Union, Type
import numpy
from statsmodels.tsa.stattools import adfuller
@@ -16,11 +16,13 @@
from cephlib.statistic import calc_norm_stat_props
from cephlib.storage_selectors import summ_sensors, find_sensors_to_2d
from cephlib.wally_storage import find_nodes_by_roles
+from cephlib.plot import (plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
+ plot_histo_heatmap, plot_v_over_time, plot_hist)
from .utils import STORAGE_ROLES
from .stage import Stage, StepOrder
from .test_run_class import TestRun
-from .result_classes import IResultStorage
+from .result_classes import IWallyStorage
from .result_classes import DataSource, TimeSeries, SuiteConfig
from .suits.io.fio import FioTest, FioJobConfig
from .suits.io.fio_job import FioJobParams
@@ -28,9 +30,10 @@
from .data_selectors import get_aggregated, AGG_TAG
from .report_profiles import (DefStyleProfile, DefColorProfile, StyleProfile, ColorProfile,
default_format, io_chart_format)
-from .plot import (io_chart, plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
- plot_histo_heatmap, plot_v_over_time, plot_hist)
+from .plot import io_chart
from .resources import ResourceNames, get_resources_usage, make_iosum, IOSummary, get_cluster_cpu_load
+
+
logger = logging.getLogger("wally")
@@ -111,12 +114,12 @@
class Table:
def __init__(self, header: List[str]) -> None:
self.header = header
- self.data = []
+ self.data = [] # type: List[List[str]]
def add_line(self, values: List[str]) -> None:
self.data.append(values)
- def html(self):
+ def html(self) -> str:
return html.table("", self.header, self.data)
@@ -143,7 +146,7 @@
# -------------------- REPORTS --------------------------------------------------------------------------------------
class ReporterBase:
- def __init__(self, rstorage: IResultStorage, style: StyleProfile, colors: ColorProfile) -> None:
+ def __init__(self, rstorage: IWallyStorage, style: StyleProfile, colors: ColorProfile) -> None:
self.style = style
self.colors = colors
self.rstorage = rstorage
@@ -193,7 +196,7 @@
def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
ts_map = defaultdict(list) # type: Dict[FioJobParams, List[Tuple[SuiteConfig, FioJobConfig]]]
- str_summary = {} # type: Dict[FioJobParams, List[IOSummary]]
+ str_summary = {} # type: Dict[FioJobParams, Tuple[str, str]]
for job in self.rstorage.iter_job(suite):
fjob = cast(FioJobConfig, job)
@@ -389,10 +392,10 @@
(ResourceNames.storage_read_iop, ResourceNames.storage_read),
(ResourceNames.storage_iop, ResourceNames.storage_rw),
(ResourceNames.storage_cpu_s, ResourceNames.storage_cpu_s_b),
- ] # type: List[Union[str, Tuple[Optional[str], ...]]
+ ] # type: List[Union[str, Tuple[Optional[str], ...]]]
if not iops_ok:
- table_structure2 = []
+ table_structure2 = [] # type: List[Union[Tuple[str, ...], str]]
for line in table_structure:
if isinstance(line, str):
table_structure2.append(line)
@@ -483,7 +486,8 @@
bytes_names = [ResourceNames.test_write, ResourceNames.test_read, ResourceNames.test_rw,
ResourceNames.test_send, ResourceNames.test_recv, ResourceNames.test_net,
ResourceNames.storage_write, ResourceNames.storage_read, ResourceNames.storage_rw,
- ResourceNames.storage_send, ResourceNames.storage_recv, ResourceNames.storage_net]
+ ResourceNames.storage_send, ResourceNames.storage_recv,
+ ResourceNames.storage_net] # type: List[str]
net_pkt_names = [ResourceNames.test_send_pkt, ResourceNames.test_recv_pkt, ResourceNames.test_net_pkt,
ResourceNames.storage_send_pkt, ResourceNames.storage_recv_pkt, ResourceNames.storage_net_pkt]
@@ -497,9 +501,9 @@
HTMLBlock(html.H2(html.center("Resource consumption per service provided")))
for tp, names in pairs:
- vals = []
- devs = []
- avail_names = []
+ vals = [] # type: List[float]
+ devs = [] # type: List[float]
+ avail_names = [] # type: List[str]
for name in names:
if name in records:
avail_names.append(name)
@@ -512,7 +516,7 @@
devs.append(dev)
# synchronously sort values and names, values is a key
- vals, names, devs = map(list, zip(*sorted(zip(vals, names, devs))))
+ vals, names, devs = map(list, zip(*sorted(zip(vals, names, devs)))) # type: ignore
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
@@ -587,18 +591,19 @@
storage_devs = None
test_nodes_devs = ['rbd0']
- for node in self.rstorage.find_nodes(STORAGE_ROLES):
- cjd = set(node.params['ceph_journal_devs'])
- if journal_devs is None:
- journal_devs = cjd
- else:
- assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
+ for node in self.rstorage.load_nodes():
+ if node.roles.intersection(STORAGE_ROLES):
+ cjd = set(node.params['ceph_journal_devs'])
+ if journal_devs is None:
+ journal_devs = cjd
+ else:
+ assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
- csd = set(node.params['ceph_storage_devs'])
- if storage_devs is None:
- storage_devs = csd
- else:
- assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
+ csd = set(node.params['ceph_storage_devs'])
+ if storage_devs is None:
+ storage_devs = csd
+ else:
+ assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
@@ -611,8 +616,8 @@
# QD heatmap
nodes = find_nodes_by_roles(self.rstorage.storage, roles)
- ioq2d = find_sensors_to_2d(self.rstorage, trange, sensor='block-io', devs=devs,
- node_id=nodes, metric='io_queue', )
+ ioq2d = find_sensors_to_2d(self.rstorage, trange, sensor='block-io', dev=devs,
+ node_id=nodes, metric='io_queue')
ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
@@ -621,10 +626,10 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# Block size heatmap
- wc2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+ wc2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
metric='writes_completed')
wc2d[wc2d < 1E-3] = 1
- sw2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+ sw2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
metric='sectors_written')
data2d = sw2d / wc2d / 1024
fname = self.plt(plot_hmap_from_2d, ds(metric='wr_block_size'),
@@ -633,7 +638,7 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# iotime heatmap
- wtime2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+ wtime2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
metric='io_time')
fname = self.plt(plot_hmap_from_2d, ds(metric='io_time'), data2d=wtime2d,
xlabel='Time', ylabel="IO time (ms) per second",
@@ -765,10 +770,12 @@
def run(self, ctx: TestRun) -> None:
job_reporters_cls = [StatInfo, Resources, LoadToolResults, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
- job_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in job_reporters_cls]
+ job_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile)
+ for rcls in job_reporters_cls] # type: ignore
- suite_reporters_cls = [IOQD, ResourceQD]
- suite_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in suite_reporters_cls]
+ suite_reporters_cls = [IOQD, ResourceQD] # type: List[Type[SuiteReporter]]
+ suite_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile)
+ for rcls in suite_reporters_cls] # type: ignore
root_dir = os.path.dirname(os.path.dirname(wally.__file__))
doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -800,7 +807,7 @@
for job in all_jobs:
try:
- for reporter in job_reporters:
+ for reporter in job_reporters: # type: JobReporter
logger.debug("Start reporter %s on job %s suite %s",
reporter.__class__.__name__, job.summary, suite.test_type)
for block, item, html in reporter.get_divs(suite, job):
@@ -810,13 +817,13 @@
except Exception:
logger.exception("Failed to generate report for %s", job.summary)
- for reporter in suite_reporters:
+ for sreporter in suite_reporters: # type: SuiteReporter
try:
- logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
- for block, item, html in reporter.get_divs(suite):
+ logger.debug("Start reporter %s on suite %s", sreporter.__class__.__name__, suite.test_type)
+ for block, item, html in sreporter.get_divs(suite):
items[block][item].append(html)
- except Exception as exc:
- logger.exception("Failed to generate report")
+ except Exception:
+ logger.exception("Failed to generate report for suite %s", suite)
if DEBUG:
break
diff --git a/wally/resources.py b/wally/resources.py
index d867a1a..dca9dcc 100644
--- a/wally/resources.py
+++ b/wally/resources.py
@@ -9,13 +9,13 @@
from cephlib.numeric_types import TimeSeries
from cephlib.wally_storage import find_nodes_by_roles
from cephlib.storage_selectors import summ_sensors
+from cephlib.node import HWInfo
-from .result_classes import IResultStorage, SuiteConfig
+from .result_classes import IWallyStorage, SuiteConfig
from .utils import STORAGE_ROLES
from .suits.io.fio import FioJobConfig
from .suits.job import JobConfig
from .data_selectors import get_aggregated
-from .hw_info import HWInfo
logger = logging.getLogger('wally')
@@ -82,10 +82,10 @@
return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
-iosum_cache = {} # type: Dict[Tuple[str, str]]
+iosum_cache = {} # type: Dict[Tuple[str, str], IOSummary]
-def make_iosum(rstorage: IResultStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
+def make_iosum(rstorage: IWallyStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
nc: bool = False) -> IOSummary:
key = (suite.storage_id, job.storage_id)
@@ -110,7 +110,7 @@
cpu_load_cache = {} # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
-def get_cluster_cpu_load(rstorage: IResultStorage, roles: List[str],
+def get_cluster_cpu_load(rstorage: IWallyStorage, roles: List[str],
time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
key = (id(rstorage), tuple(roles), time_range)
@@ -121,7 +121,7 @@
cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
nodes = find_nodes_by_roles(rstorage.storage, roles)
for name in cpu_metrics:
- cpu_ts[name] = summ_sensors(rstorage, time_range, nodes=nodes, sensor='system-cpu', metric=name)
+ cpu_ts[name] = summ_sensors(rstorage, time_range, node_id=nodes, sensor='system-cpu', metric=name)
it = iter(cpu_ts.values())
total_over_time = next(it).data.copy() # type: numpy.ndarray
@@ -141,11 +141,12 @@
def get_resources_usage(suite: SuiteConfig,
job: JobConfig,
- rstorage: IResultStorage,
+ rstorage: IWallyStorage,
large_block: int = 256,
hist_boxes: int = 10,
nc: bool = False) -> Tuple[Dict[str, Tuple[str, float, float]], bool]:
+ records = {} # type: Dict[str, Tuple[str, float, float]]
if not nc:
records = rstorage.get_job_info(suite, job, "resource_usage")
if records is not None:
@@ -163,7 +164,7 @@
records = {
ResourceNames.data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
- } # type: Dict[str, Tuple[str, float, float]]
+ }
if iops_ok:
ops_done = io_transfered / (fjob.bsize * unit_conversion_coef_f("KiBps", "Bps"))
@@ -200,7 +201,7 @@
continue
nodes = find_nodes_by_roles(rstorage.storage, roles)
- res_ts = summ_sensors(rstorage, job.reliable_info_range_s, nodes=nodes, sensor=sensor, metric=metric)
+ res_ts = summ_sensors(rstorage, job.reliable_info_range_s, node_id=nodes, sensor=sensor, metric=metric)
if res_ts is None:
continue
@@ -219,10 +220,10 @@
all_stor_nodes = list(find_nodes_by_roles(rstorage.storage, STORAGE_ROLES))
for node in all_stor_nodes:
try:
- node_hw_info = rstorage.storage.load(HWInfo, 'hw_info', node.node_id)
+ node_hw_info = rstorage.storage.load(HWInfo, 'hw_info', node)
except KeyError:
logger.warning("No hw_info available for node %s. Using 'NODE time' instead of " +
- "CPU core time for CPU consumption metrics")
+ "CPU core time for CPU consumption metrics", node)
stor_cores_count = len(all_stor_nodes)
break
stor_cores_count += sum(cores for _, cores in node_hw_info.cores)
@@ -277,4 +278,3 @@
rstorage.put_job_info(suite, job, "resource_usage", srecords)
return records, iops_ok
-
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 207ba71..d1fd104 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -4,9 +4,10 @@
from cephlib.numeric_types import TimeSeries, DataSource
from cephlib.statistic import StatProps
from cephlib.istorage import IImagesStorage, Storable, ISensorStorage
+from cephlib.node import NodeInfo
+from cephlib.node_impl import IRPCNode
from .suits.job import JobConfig
-from .node_interfaces import IRPCNode, NodeInfo
class SuiteConfig(Storable):
@@ -72,7 +73,7 @@
self.processed = None # type: JobStatMetrics
-class IResultStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
+class IWallyStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
@abc.abstractmethod
def put_or_check_suite(self, suite: SuiteConfig) -> None:
@@ -128,5 +129,5 @@
pass
@abc.abstractmethod
- def find_nodes(self, roles: Union[str, List[str]]) -> List[NodeInfo]:
+ def load_nodes(self) -> List[NodeInfo]:
pass
\ No newline at end of file
diff --git a/wally/result_storage.py b/wally/result_storage.py
index 282bdb5..70c46d7 100644
--- a/wally/result_storage.py
+++ b/wally/result_storage.py
@@ -1,17 +1,19 @@
import os
+import json
import pprint
import logging
-from typing import cast, Iterator, Tuple, Type, Optional, Any
+from typing import cast, Iterator, Tuple, Type, Optional, Any, Union, List
import numpy
from cephlib.wally_storage import WallyDB
-from cephlib.hlstorage import SensorStorageBase
+from cephlib.sensor_storage import SensorStorageBase
from cephlib.statistic import StatProps
from cephlib.numeric_types import DataSource, TimeSeries
+from cephlib.node import NodeInfo
from .suits.job import JobConfig
-from .result_classes import SuiteConfig, IResultStorage
+from .result_classes import SuiteConfig, IWallyStorage
from .utils import StopTestError
from .suits.all_suits import all_suits
@@ -27,9 +29,10 @@
return path
-class ResultStorage(IResultStorage, SensorStorageBase):
+class WallyStorage(IWallyStorage, SensorStorageBase):
def __init__(self, storage: Storage) -> None:
SensorStorageBase.__init__(self, storage, WallyDB)
+ self.cached_nodes = None
# ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
def check_plot_file(self, source: DataSource) -> Optional[str]:
@@ -74,18 +77,21 @@
def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
- self.storage.put(data, path)
+ if isinstance(data, bytes):
+ self.storage.put_raw(data, path)
+ else:
+ self.storage.put(data, path)
# ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
return self.storage.load(stat_cls, self.db_paths.stat.format(**source.__dict__))
-
def get_txt_report(self, suite: SuiteConfig) -> Optional[str]:
path = self.db_paths.txt_report.format(suite_id=suite.storage_id)
if path in self.storage:
return self.storage.get_raw(path).decode('utf8')
+ return None
def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
@@ -110,11 +116,19 @@
assert job.storage_id == groups['job_id']
yield job
+ def load_nodes(self) -> List[NodeInfo]:
+ if not self.cached_nodes:
+ self.cached_nodes = self.storage.load_list(NodeInfo, WallyDB.all_nodes)
+ if WallyDB.nodes_params in self.storage:
+ params = json.loads(self.storage.get_raw(WallyDB.nodes_params).decode('utf8'))
+ for node in self.cached_nodes:
+ node.params = params.get(node.node_id, {})
+ return self.cached_nodes
+
# ----------------- TS ------------------------------------------------------------------------------------------
def get_ts(self, ds: DataSource) -> TimeSeries:
path = self.db_paths.ts.format_map(ds.__dict__)
(units, time_units), header2, content = self.storage.get_array(path)
-
times = content[:,0].copy()
data = content[:,1:]
diff --git a/wally/run_test.py b/wally/run_test.py
index 4e02a75..6fbefc4 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,10 +5,13 @@
from concurrent.futures import Future
from typing import List, Dict, Tuple, Optional, Union, cast
-from . import utils, ssh_utils, hw_info
+from cephlib.wally_storage import WallyDB
+from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info
+from cephlib.ssh import parse_ssh_uri
+from cephlib.node_impl import setup_rpc, connect
+
+from . import utils
from .config import ConfigBlock
-from .node import setup_rpc, connect
-from .node_interfaces import NodeInfo, IRPCNode
from .stage import Stage, StepOrder
from .sensors import collect_sensors_data
from .suits.all_suits import all_suits
@@ -103,7 +106,7 @@
node.conn.cli.killall()
if node.rpc_log_file is not None:
nid = node.node_id
- path = "rpc_logs/{}.txt".format(nid)
+ path = WallyDB.rpc_logs.format(node_id=nid)
node.conn.server.flush_logs()
log = node.get_file_content(node.rpc_log_file)
if path in ctx.storage:
@@ -133,9 +136,9 @@
# can't make next RPC request until finish with previous
for node in ctx.nodes:
nid = node.node_id
- hw_info_path = "hw_info/{}".format(nid)
+ hw_info_path = WallyDB.hw_info.format(node_id=nid)
if hw_info_path not in ctx.storage:
- futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
+ futures[(hw_info_path, nid)] = pool.submit(get_hw_info, node)
for (path, nid), future in futures.items():
try:
@@ -147,9 +150,9 @@
futures.clear()
for node in ctx.nodes:
nid = node.node_id
- sw_info_path = "sw_info/{}".format(nid)
+ sw_info_path = WallyDB.sw_info.format(node_id=nid)
if sw_info_path not in ctx.storage:
- futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
+ futures[(sw_info_path, nid)] = pool.submit(get_sw_info, node)
for (path, nid), future in futures.items():
try:
@@ -166,12 +169,12 @@
config_block = 'nodes'
def run(self, ctx: TestRun) -> None:
- if 'all_nodes' in ctx.storage:
+ if WallyDB.all_nodes in ctx.storage:
logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
return
for url, roles in ctx.config.get('nodes', {}).raw().items():
- ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
+ ctx.merge_node(parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
logger.debug("Add node %s with roles %s", url, roles)
@@ -293,8 +296,6 @@
class SaveNodesStage(Stage):
"""Save nodes list to file"""
- nodes_path = 'all_nodes'
- params_path = 'all_nodes_params.js'
priority = StepOrder.UPDATE_NODES_INFO + 1
def run(self, ctx: TestRun) -> None:
@@ -302,27 +303,20 @@
params = {node.node_id: node.params for node in infos}
ninfos = [copy.copy(node) for node in infos]
for node in ninfos:
- node.params = "in {!r} file".format(self.params_path)
- ctx.storage.put_list(ninfos, self.nodes_path)
- ctx.storage.put_raw(json.dumps(params).encode('utf8'), self.params_path)
+ node.params = {"in file": WallyDB.nodes_params}
+ ctx.storage.put_list(ninfos, WallyDB.all_nodes)
+ ctx.storage.put_raw(json.dumps(params).encode('utf8'), WallyDB.nodes_params)
class LoadStoredNodesStage(Stage):
priority = StepOrder.DISCOVER
def run(self, ctx: TestRun) -> None:
- if SaveNodesStage.nodes_path in ctx.storage:
+ if WallyDB.all_nodes in ctx.storage:
if ctx.nodes_info:
logger.error("Internal error: Some nodes already stored in " +
"nodes_info before LoadStoredNodesStage stage")
raise utils.StopTestError()
- nodes = {node.node_id: node for node in ctx.storage.load_list(NodeInfo, SaveNodesStage.nodes_path)}
-
- if SaveNodesStage.params_path in ctx.storage:
- params = json.loads(ctx.storage.get_raw(SaveNodesStage.params_path).decode('utf8'))
- for node_id, node in nodes.items():
- node.params = params.get(node_id, {})
-
- ctx.nodes_info = nodes
+ ctx.nodes_info = {node.node_id: node for node in ctx.rstorage.load_nodes()}
logger.info("%s nodes loaded from database", len(ctx.nodes_info))
diff --git a/wally/ssh.py b/wally/ssh.py
deleted file mode 100644
index ddf3e58..0000000
--- a/wally/ssh.py
+++ /dev/null
@@ -1,141 +0,0 @@
-import time
-import errno
-import socket
-import logging
-import os.path
-import selectors
-from io import StringIO
-from typing import cast, Dict, List, Set, Optional
-
-import paramiko
-
-from cephlib.common import Timeout
-
-from .common_types import ConnCreds, IPAddr
-
-logger = logging.getLogger("wally")
-NODE_KEYS = {} # type: Dict[IPAddr, paramiko.RSAKey]
-SSH_KEY_PASSWD = None # type: Optional[str]
-
-
-def set_ssh_key_passwd(passwd: str) -> None:
- global SSH_KEY_PASSWD
- SSH_KEY_PASSWD = passwd
-
-
-def set_key_for_node(host_port: IPAddr, key: bytes) -> None:
- with StringIO(key.decode("utf8")) as sio:
- NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio) # type: ignore
-
-
-def connect(creds: ConnCreds,
- conn_timeout: int = 60,
- tcp_timeout: int = 15,
- default_banner_timeout: int = 30) -> paramiko.SSHClient:
-
- ssh = paramiko.SSHClient()
- ssh.load_host_keys('/dev/null')
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.known_hosts = None
-
- end_time = time.time() + conn_timeout # type: float
-
- logger.debug("SSH connecting to %s", creds)
-
- while True:
- try:
- time_left = end_time - time.time()
- c_tcp_timeout = min(tcp_timeout, time_left)
-
- banner_timeout_arg = {} # type: Dict[str, int]
- if paramiko.__version_info__ >= (1, 15, 2):
- banner_timeout_arg['banner_timeout'] = int(min(default_banner_timeout, time_left))
-
- if creds.passwd is not None:
- ssh.connect(creds.addr.host,
- timeout=c_tcp_timeout,
- username=creds.user,
- password=cast(str, creds.passwd),
- port=creds.addr.port,
- allow_agent=False,
- look_for_keys=False,
- **banner_timeout_arg)
- elif creds.key_file is not None:
- ssh.connect(creds.addr.host,
- username=creds.user,
- timeout=c_tcp_timeout,
- pkey=paramiko.RSAKey.from_private_key_file(creds.key_file, password=SSH_KEY_PASSWD),
- look_for_keys=False,
- port=creds.addr.port,
- **banner_timeout_arg)
- elif creds.key is not None:
- with StringIO(creds.key.decode("utf8")) as sio:
- ssh.connect(creds.addr.host,
- username=creds.user,
- timeout=c_tcp_timeout,
- pkey=paramiko.RSAKey.from_private_key(sio, password=SSH_KEY_PASSWD), # type: ignore
- look_for_keys=False,
- port=creds.addr.port,
- **banner_timeout_arg)
- elif (creds.addr.host, creds.addr.port) in NODE_KEYS:
- ssh.connect(creds.addr.host,
- username=creds.user,
- timeout=c_tcp_timeout,
- pkey=NODE_KEYS[creds.addr],
- look_for_keys=False,
- port=creds.addr.port,
- **banner_timeout_arg)
- else:
- key_file = os.path.expanduser('~/.ssh/id_rsa')
- ssh.connect(creds.addr.host,
- username=creds.user,
- timeout=c_tcp_timeout,
- key_filename=key_file,
- look_for_keys=False,
- port=creds.addr.port,
- **banner_timeout_arg)
- return ssh
- except (socket.gaierror, paramiko.PasswordRequiredException):
- raise
- except socket.error:
- if time.time() > end_time:
- raise
- time.sleep(1)
-
-
-def wait_ssh_available(addrs: List[IPAddr],
- timeout: int = 300,
- tcp_timeout: float = 1.0) -> None:
-
- addrs_set = set(addrs) # type: Set[IPAddr]
-
- for _ in Timeout(timeout):
- selector = selectors.DefaultSelector() # type: selectors.BaseSelector
- with selector:
- for addr in addrs_set:
- sock = socket.socket()
- sock.setblocking(False)
- try:
- sock.connect(addr)
- except BlockingIOError:
- pass
- selector.register(sock, selectors.EVENT_READ, data=addr)
-
- etime = time.time() + tcp_timeout
- ltime = etime - time.time()
- while ltime > 0:
- # convert to greater or equal integer
- for key, _ in selector.select(timeout=int(ltime + 0.99999)):
- selector.unregister(key.fileobj)
- try:
- key.fileobj.getpeername() # type: ignore
- addrs_set.remove(key.data)
- except OSError as exc:
- if exc.errno == errno.ENOTCONN:
- pass
- ltime = etime - time.time()
-
- if not addrs_set:
- break
-
-
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
deleted file mode 100644
index 56d317f..0000000
--- a/wally/ssh_utils.py
+++ /dev/null
@@ -1,68 +0,0 @@
-import re
-import getpass
-import logging
-from typing import List, Dict
-
-
-from cephlib.common import to_ip
-from .common_types import ConnCreds
-
-
-logger = logging.getLogger("wally")
-
-
-class URIsNamespace:
- class ReParts:
- user_rr = "[^:]*?"
- host_rr = "[^:@]*?"
- port_rr = "\\d+"
- key_file_rr = "[^:@]*"
- passwd_rr = ".*?"
-
- re_dct = ReParts.__dict__
-
- for attr_name, val in re_dct.items():
- if attr_name.endswith('_rr'):
- new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
- setattr(ReParts, attr_name, new_rr)
-
- re_dct = ReParts.__dict__
-
- templs = [
- "^{host_rr}$",
- "^{host_rr}:{port_rr}$",
- "^{host_rr}::{key_file_rr}$",
- "^{host_rr}:{port_rr}:{key_file_rr}$",
- "^{user_rr}@{host_rr}$",
- "^{user_rr}@{host_rr}:{port_rr}$",
- "^{user_rr}@{host_rr}::{key_file_rr}$",
- "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
- "^{user_rr}:{passwd_rr}@{host_rr}$",
- "^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$",
- ]
-
- uri_reg_exprs = [] # type: List[str]
- for templ in templs:
- uri_reg_exprs.append(templ.format(**re_dct))
-
-
-def parse_ssh_uri(uri: str) -> ConnCreds:
- """Parse ssh connection URL from one of following form
- [ssh://]user:passwd@host[:port]
- [ssh://][user@]host[:port][:key_file]
- """
-
- if uri.startswith("ssh://"):
- uri = uri[len("ssh://"):]
-
- for rr in URIsNamespace.uri_reg_exprs:
- rrm = re.match(rr, uri)
- if rrm is not None:
- params = {"user": getpass.getuser()} # type: Dict[str, str]
- params.update(rrm.groupdict())
- params['host'] = to_ip(params['host'])
- return ConnCreds(**params) # type: ignore
-
- raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
-
-
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 1895b6c..374e5a4 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,13 +1,14 @@
import os.path
import logging
-from typing import cast, Any, List, Union
+from typing import cast, Any, List, Union, Tuple, Optional
import numpy
+from cephlib.units import ssize2b, b2ssize
+from cephlib.node import IRPCNode, get_os
+
import wally
-from ...utils import StopTestError, ssize2b, b2ssize
-from ...node_interfaces import IRPCNode
-from ...node_utils import get_os
+from ...utils import StopTestError
from ..itest import ThreadedTest
from ...result_classes import TimeSeries, DataSource
from ..job import JobConfig
@@ -189,7 +190,7 @@
node.conn.fs.unlink(self.remote_output_file)
files = [name for name in node.conn.fs.listdir(self.exec_folder)]
- result = []
+ result = [] # type: List[TimeSeries]
for name, file_path, units in get_log_files(cast(FioJobConfig, job)):
log_files = [fname for fname in files if fname.startswith(file_path)]
if len(log_files) != 1:
@@ -235,19 +236,16 @@
logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
raise StopTestError()
- if not self.suite.keep_raw_files:
- raw_result = None
+ assert not self.suite.keep_raw_files, "keep_raw_files is not supported"
histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
-
- result.append(TimeSeries(name=name,
- raw=raw_result,
- data=numpy.array(parsed, dtype='uint64'),
- units=units,
- times=numpy.array(times, dtype='uint64'),
- time_units='ms',
- source=path(metric=name, tag='csv'),
- histo_bins=histo_bins))
+ ts = TimeSeries(data=numpy.array(parsed, dtype='uint64'),
+ units=units,
+ times=numpy.array(times, dtype='uint64'),
+ time_units='ms',
+ source=path(metric=name, tag='csv'),
+ histo_bins=histo_bins)
+ result.append(ts)
return result
def format_for_console(self, data: Any) -> str:
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 39715ef..3d4e886 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -1,10 +1,9 @@
-import abc
import copy
from collections import OrderedDict
from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
+from cephlib.units import ssize2b, b2ssize
-from ...utils import ssize2b, b2ssize
from ..job import JobConfig, JobParams
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 222589b..9ffeed8 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -9,8 +9,9 @@
from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
from collections import OrderedDict
+from cephlib.units import ssize2b
+from cephlib.common import flatmap, sec_to_str
-from ...utils import sec_to_str, ssize2b, flatmap
from .fio_job import Var, FioJobConfig
SECTION = 0
@@ -87,16 +88,12 @@
fname, lineno, oline)
if line.startswith('['):
- yield CfgLine(fname, lineno, oline, SECTION,
- line[1:-1].strip(), None)
+ yield CfgLine(fname, lineno, oline, SECTION, line[1:-1].strip(), None)
elif '=' in line:
opt_name, opt_val = line.split('=', 1)
- yield CfgLine(fname, lineno, oline, SETTING,
- opt_name.strip(),
- parse_value(opt_val.strip()))
+ yield CfgLine(fname, lineno, oline, SETTING, opt_name.strip(), parse_value(opt_val.strip()))
elif line.startswith("include "):
- yield CfgLine(fname, lineno, oline, INCLUDE,
- line.split(" ", 1)[1], None)
+ yield CfgLine(fname, lineno, oline, INCLUDE, line.split(" ", 1)[1], None)
else:
yield CfgLine(fname, lineno, oline, SETTING, line, '1')
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 39ed5cc..1c7fee0 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -42,10 +42,13 @@
assert size % 4 == 0, "File size must be proportional to 4M"
- cmd_templ = "{} --name=xxx --filename={} --direct=1 --bs=4m --size={}m --rw=write"
+ cmd_templ = "{0} --name=xxx --filename={1} --direct=1 --bs=4m --size={2}m --rw=write"
run_time = time.time()
- subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
+ try:
+ subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
+ except subprocess.CalledProcessError as exc:
+ raise RuntimeError("{0!s}.\nOutput: {1}".format(exc, exc.output))
run_time = time.time() - run_time
prefill_bw = None if run_time < 1.0 else int(size / run_time)
@@ -55,6 +58,6 @@
def rpc_install(name, binary):
try:
- subprocess.check_output("which {}".format(binary), shell=True)
+ subprocess.check_output("which {0}".format(binary), shell=True)
except:
- subprocess.check_output("apt-get install -y {}".format(name), shell=True)
+ subprocess.check_output("apt-get install -y {0}".format(name), shell=True)
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index ae2d960..b013796 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -8,27 +8,27 @@
rw=randread
iodepth=1
-[test_{TEST_SUMM}]
-iodepth=16
-blocksize=60k
-rw=randread
+#[test_{TEST_SUMM}]
+#iodepth=16
+#blocksize=60k
+#rw=randread
-[test_{TEST_SUMM}]
-blocksize=60k
-rw=randwrite
-iodepth=1
+#[test_{TEST_SUMM}]
+#blocksize=60k
+#rw=randwrite
+#iodepth=1
-[test_{TEST_SUMM}]
-iodepth=16
-blocksize=60k
-rw=randwrite
+#[test_{TEST_SUMM}]
+#iodepth=16
+#blocksize=60k
+#rw=randwrite
-[test_{TEST_SUMM}]
-iodepth=1
-blocksize=1m
-rw=write
+#[test_{TEST_SUMM}]
+#iodepth=1
+#blocksize=1m
+#rw=write
-[test_{TEST_SUMM}]
-iodepth=1
-blocksize=1m
-rw=read
+#[test_{TEST_SUMM}]
+#iodepth=1
+#blocksize=1m
+#rw=read
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index e848442..b2b3a54 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -2,13 +2,14 @@
import time
import logging
import os.path
-from typing import Any, List, Optional, Callable, Iterable, cast
+from typing import Any, List, Optional, Callable, Iterable, cast, Tuple
from concurrent.futures import ThreadPoolExecutor, wait
+from cephlib.node import IRPCNode
+
from ..utils import StopTestError, get_time_interval_printable_info
-from ..node_interfaces import IRPCNode
-from ..result_classes import SuiteConfig, JobConfig, TimeSeries, IResultStorage
+from ..result_classes import SuiteConfig, JobConfig, TimeSeries, IWallyStorage
logger = logging.getLogger("wally")
@@ -24,7 +25,7 @@
retry_time = 30
job_config_cls = None # type: type
- def __init__(self, storage: IResultStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
+ def __init__(self, storage: IWallyStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
self.suite = suite
self.stop_requested = False
self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 5f3c764..8ef1093 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -2,7 +2,7 @@
from typing import Dict, Any, Tuple, cast, Union
from collections import OrderedDict
-from cephlib.storage import Storable
+from cephlib.istorage import Storable
class JobParams(metaclass=abc.ABCMeta):
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index b67035e..a7721e6 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -2,18 +2,18 @@
from concurrent.futures import ThreadPoolExecutor
from cephlib.istorage import IStorage
+from cephlib.node import NodeInfo, IRPCNode
+from cephlib.ssh import ConnCreds
-from .node_interfaces import NodeInfo, IRPCNode
from .openstack_api import OSCreds, OSConnection
from .config import Config
from .fuel_rest_api import Connection
-from .ssh_utils import ConnCreds
-from .result_classes import IResultStorage
+from .result_classes import IWallyStorage
class TestRun:
"""Test run information"""
- def __init__(self, config: Config, storage: IStorage, rstorage: IResultStorage) -> None:
+ def __init__(self, config: Config, storage: IStorage, rstorage: IWallyStorage) -> None:
# NodesInfo list
self.nodes_info = {} # type: Dict[str, NodeInfo]
diff --git a/wally/utils.py b/wally/utils.py
index dfd0e8c..754734d 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -19,7 +19,7 @@
logger = logging.getLogger("wally")
-STORAGE_ROLES = {'ceph-osd'}
+STORAGE_ROLES = ['ceph-osd']
class StopTestError(RuntimeError):
@@ -121,9 +121,3 @@
return exec_time_s, "{:%H:%M:%S}".format(end_dt)
-def shape2str(shape: Iterable[int]) -> str:
- return "*".join(map(str, shape))
-
-
-def str2shape(shape: str) -> Tuple[int, ...]:
- return tuple(map(int, shape.split('*')))