Skeleton and sensors works
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index de17d30..af4ca5f 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -1,11 +1,14 @@
# ------------------------------------ CONFIGS -------------------------------------------------------------------
collect_info: true
-var_dir_root: /tmp/perf_tests
+results_dir: /tmp/perf_tests
settings_dir: ~/.wally
+connect_timeout: 30
+download_rpc_logs: true
+rpc_log_level: DEBUG
logging:
- extra_logs: 1
level: DEBUG
+# config: logger.yaml
vm_configs:
keypair_file_private: wally_vm_key_perf3.pem
@@ -38,11 +41,20 @@
online: true
roles_mapping:
testnode: system-cpu, block-io, net-io
- ceph-osd: system-cpu, block-io, net-io, ceph
+ ceph-osd:
+ system-cpu: ".*"
+ block-io: ".*"
+ net-io: ".*"
+ ceph:
+ counters:
+ - osd/op_r
+ - osd/op_w
+ - osd/op_r_out_bytes
+ - osd/op_w_in_bytes
compute:
- system-cpu: *
- block-io: sd*
- net-io: *
+ system-cpu: ".*"
+ block-io: "sd?"
+ net-io: ".*"
#---------------------------------- TEST PROFILES --------------------------------------------------------------------
profiles:
diff --git a/configs-examples/local_lxc_ceph.yaml b/configs-examples/local_lxc_ceph.yaml
index 03b5185..23ca523 100644
--- a/configs-examples/local_lxc_ceph.yaml
+++ b/configs-examples/local_lxc_ceph.yaml
@@ -1,4 +1,7 @@
include: default.yaml
+collect_info: false
ceph:
- root_node: local
+ root_node: localhost
+
+sleep: 60
diff --git a/configs-examples/logger.yaml b/configs-examples/logger.yaml
new file mode 100644
index 0000000..6d833c3
--- /dev/null
+++ b/configs-examples/logger.yaml
@@ -0,0 +1,24 @@
+formatters:
+ default:
+ format: '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
+ datefmt: '"%H:%M:%S"'
+ custom:
+ (): my.package.customFormatterFactory
+ bar: baz
+ spam: 99.9
+ answer: 42
+
+handlers:
+ email:
+ class: logging.handlers.SMTPHandler
+ mailhost: localhost
+ fromaddr: my_app@domain.tld
+ toaddrs:
+ - support_team@domain.tld
+ - dev_team@domain.tld
+ subject: Houston, we have a problem.
+
+loggers:
+ foo.bar.baz:
+ # other configuration for logger 'foo.bar.baz'
+ handlers: [h1, h2]
\ No newline at end of file
diff --git a/v2_plans.md b/v2_plans.md
index edce540..967256d 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,24 +1,22 @@
* Code:
+ * store more information for node - OSD settings, etc
* use overloading module
* Make storage class with dict-like interface
- - map path to value, e.g. 'cluster_info': yaml
- - should support both binary and text(yaml) formats, maybe
- store in both
- - store all results in it
+ - should support both binary and text(yaml) formats, maybe store in both
- Results stored in archived binary format for fast parsing
* Collect and store cluster info
- * Simplify settings
* Unit-tests
- * 'perf' sensor
- * ftrace, [bcc](https://github.com/iovisor/bcc), etc
+ * Sensors
+ - perf
+ - ftrace, [bcc](https://github.com/iovisor/bcc)
+ - ceph ops
* Config revised:
- * Full config is a set of independent sections, each related to one plugin or 'step'
* Simple user config get compiled into "full" config with variable substitution
* Result config then validated
- * Each plugin defines config sections tructure and validation
* Add sync 4k write with small set of thcount
* White-box event logs for UT
* Result-to-yaml for UT
+ * Flexible SSH connection creds - use agent, default ssh settings or part of config
* Infra:
* Add script to download fio from git and build it
diff --git a/wally/ceph.py b/wally/ceph.py
index 1e79126..cbb4740 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -1,7 +1,7 @@
""" Collect data about ceph nodes"""
import json
import logging
-from typing import Set, Dict, cast
+from typing import Dict, cast, List, Set, Optional
from .node_interfaces import NodeInfo, IRPCNode
@@ -11,22 +11,30 @@
from .test_run_class import TestRun
from .ssh_utils import parse_ssh_uri
from .node import connect, setup_rpc
+from .utils import StopTestError
-logger = logging.getLogger("wally.discover")
+logger = logging.getLogger("wally")
-def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+class OSDInfo:
+ def __init__(self, id: int, journal: str = None, storage: str = None) -> None:
+ self.id = id
+ self.journal = journal
+ self.storage = storage
+
+
+def get_osds_info(node: IRPCNode, conf: str, key: str) -> Dict[IP, List[OSDInfo]]:
"""Get set of osd's ip"""
data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
jdata = json.loads(data)
- ips = set() # type: Set[IP]
+ ips = {} # type: Dict[IP, List[OSDInfo]]
first_error = True
for osd_data in jdata["osds"]:
+ osd_id = int(osd_data["osd"])
if "public_addr" not in osd_data:
if first_error:
- osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
"(all subsequent errors omitted)", osd_id)
first_error = False
@@ -34,7 +42,26 @@
ip_port = osd_data["public_addr"]
if '/' in ip_port:
ip_port = ip_port.split("/", 1)[0]
- ips.add(IP(ip_port.split(":")[0]))
+ ip = IP(ip_port.split(":")[0])
+
+ osd_journal_path = None # type: Optional[str]
+ osd_data_path = None # type: Optional[str]
+
+ # TODO: parallelize this!
+ osd_cfg = node.run("ceph -n osd.{} --show-config".format(osd_id))
+ for line in osd_cfg.split("\n"):
+ if line.startswith("osd_journal ="):
+ osd_journal_path = line.split("=")[1].strip()
+ elif line.startswith("osd_data ="):
+ osd_data_path = line.split("=")[1].strip()
+
+ if osd_data_path is None or osd_journal_path is None:
+ logger.error("Can't detect osd %s journal or storage path", osd_id)
+ raise StopTestError()
+
+ ips.setdefault(ip, []).append(OSDInfo(osd_id,
+ journal=osd_journal_path,
+ storage=osd_data_path))
return ips
@@ -68,45 +95,71 @@
def run(self, ctx: TestRun) -> None:
"""Return list of ceph's nodes NodeInfo"""
- if 'ceph_nodes' in ctx.storage:
- ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'ceph_nodes'))
- else:
- ceph = ctx.config.ceph
- root_node_uri = cast(str, ceph.root_node)
- cluster = ceph.get("cluster", "ceph")
- conf = ceph.get("conf")
- key = ceph.get("key")
- info = NodeInfo(parse_ssh_uri(root_node_uri), set())
- ceph_nodes = {} # type: Dict[IP, NodeInfo]
+ discovery = ctx.config.get("discovery")
+ if discovery == 'disable' or discovery == 'metadata':
+ logger.info("Skip ceph discovery due to config setting")
+ return
- if conf is None:
- conf = "/etc/ceph/{}.conf".format(cluster)
+ if 'all_nodes' in ctx.storage:
+ logger.debug("Skip ceph discovery, use previously discovered nodes")
+ return
- if key is None:
- key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
+ ceph = ctx.config.ceph
+ root_node_uri = cast(str, ceph.root_node)
+ cluster = ceph.get("cluster", "ceph")
+ conf = ceph.get("conf")
+ key = ceph.get("key")
- with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins) as node:
+ logger.debug("Start discovering ceph nodes from root %s", root_node_uri)
+ logger.debug("cluster=%s key=%s conf=%s", cluster, conf, key)
- # new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
- ssh_key = node.get_file_content("~/.ssh/id_rsa")
+ info = NodeInfo(parse_ssh_uri(root_node_uri), set())
- try:
- for ip in get_osds_ips(node, conf, key):
- if ip in ceph_nodes:
- ceph_nodes[ip].roles.add("ceph-osd")
- else:
- ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-osd"})
- except Exception as exc:
- logger.error("OSD discovery failed: %s", exc)
+ if conf is None:
+ conf = "/etc/ceph/{}.conf".format(cluster)
- try:
- for ip in get_mons_ips(node, conf, key):
- if ip in ceph_nodes:
- ceph_nodes[ip].roles.add("ceph-mon")
- else:
- ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-mon"})
- except Exception as exc:
- logger.error("MON discovery failed: %s", exc)
+ if key is None:
+ key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
- ctx.nodes_info.extend(ceph_nodes.values())
- ctx.storage['ceph-nodes'] = list(ceph_nodes.values())
+ ceph_params = {"cluster": cluster, "conf": conf, "key": key}
+
+ with setup_rpc(connect(info),
+ ctx.rpc_code,
+ ctx.default_rpc_plugins,
+ log_level=ctx.config.rpc_log_level) as node:
+
+ ssh_key = node.get_file_content("~/.ssh/id_rsa")
+
+
+ try:
+ ips = set()
+ for ip, osds_info in get_osds_info(node, conf, key).items():
+ ips.add(ip)
+ creds = ConnCreds(cast(str, ip), user="root", key=ssh_key)
+ info = ctx.merge_node(creds, {'ceph-osd'})
+ info.params.setdefault('ceph-osds', []).extend(osds_info)
+ assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
+ info.params['ceph'] = ceph_params
+
+ logger.debug("Found %s nodes with ceph-osd role", len(ips))
+ except Exception as exc:
+ if discovery != 'ignore_errors':
+ logger.exception("OSD discovery failed")
+ raise StopTestError()
+ else:
+ logger.warning("OSD discovery failed %s", exc)
+
+ try:
+ counter = 0
+ for counter, ip in enumerate(get_mons_ips(node, conf, key)):
+ creds = ConnCreds(cast(str, ip), user="root", key=ssh_key)
+ info = ctx.merge_node(creds, {'ceph-mon'})
+ assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
+ info.params['ceph'] = ceph_params
+ logger.debug("Found %s nodes with ceph-mon role", counter + 1)
+ except Exception as exc:
+ if discovery != 'ignore_errors':
+ logger.exception("MON discovery failed")
+ raise StopTestError()
+ else:
+ logger.warning("MON discovery failed %s", exc)
diff --git a/wally/config.py b/wally/config.py
index e96a03e..46669f0 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,30 +1,45 @@
-from typing import Any, Dict
+from typing import Any, Dict, Optional
from .storage import IStorable
-ConfigBlock = Dict[str, Any]
+ConfigBlock = Any
class Config(IStorable):
- # make mypy happy
- run_uuid = None # type: str
- storage_url = None # type: str
- comment = None # type: str
- keep_vm = None # type: bool
- dont_discover_nodes = None # type: bool
- build_id = None # type: str
- build_description = None # type: str
- build_type = None # type: str
- default_test_local_folder = None # type: str
- settings_dir = None # type: str
- connect_timeout = 30 # type: int
- no_tests = False # type: bool
-
def __init__(self, dct: ConfigBlock) -> None:
- self.__dict__['_dct'] = dct
+ # make mypy happy, set fake dict
+ self.__dict__['_dct'] = {}
+ self.run_uuid = None # type: str
+ self.storage_url = None # type: str
+ self.comment = None # type: str
+ self.keep_vm = None # type: bool
+ self.dont_discover_nodes = None # type: bool
+ self.build_id = None # type: str
+ self.build_description = None # type: str
+ self.build_type = None # type: str
+ self.default_test_local_folder = None # type: str
+ self.settings_dir = None # type: str
+ self.connect_timeout = None # type: int
+ self.no_tests = False # type: bool
+ self.debug_agents = False # type: bool
+
+ # None, disabled, enabled, metadata, ignore_errors
+ self.discovery = None # type: Optional[str]
+
+ self.logging = None # type: ConfigBlock
+ self.ceph = None # type: ConfigBlock
+ self.openstack = None # type: ConfigBlock
+ self.fuel = None # type: ConfigBlock
+ self.test = None # type: ConfigBlock
+ self.sensors = None # type: ConfigBlock
+
+ self._dct.clear()
+ self._dct.update(dct)
+
+ def raw(self) -> ConfigBlock:
+ return self._dct
def get(self, path: str, default: Any = None) -> Any:
curr = self
-
while path:
if '/' in path:
name, path = path.split('/', 1)
@@ -41,7 +56,7 @@
def __getattr__(self, name: str) -> Any:
try:
- val = self.__dct[name]
+ val = self._dct[name]
except KeyError:
raise AttributeError(name)
@@ -51,7 +66,7 @@
return val
def __setattr__(self, name: str, val: Any):
- self.__dct[name] = val
+ self._dct[name] = val
def __contains__(self, name: str) -> bool:
return self.get(name) is not None
diff --git a/wally/fuel.py b/wally/fuel.py
index 1680d29..bcb76e8 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -14,7 +14,7 @@
from .openstack_api import OSCreds
-logger = logging.getLogger("wally.discover")
+logger = logging.getLogger("wally")
FuelNodeInfo = NamedTuple("FuelNodeInfo",
@@ -38,68 +38,82 @@
pass
def run(self, ctx: TestRun) -> None:
- if 'fuel' in ctx.storage:
- ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'fuel/nodes'))
- ctx.fuel_openstack_creds = ctx.storage['fuel/os_creds'] # type: ignore
- ctx.fuel_version = ctx.storage['fuel/version'] # type: ignore
+ discovery = ctx.config.get("discovery")
+ if discovery == 'disable':
+ logger.info("Skip FUEL discovery due to config setting")
+ return
+
+ if 'all_nodes' in ctx.storage:
+ logger.debug("Skip FUEL discovery, use previously discovered nodes")
+ ctx.fuel_openstack_creds = ctx.storage['fuel_os_creds'] # type: ignore
+ ctx.fuel_version = ctx.storage['fuel_version'] # type: ignore
+ return
+
+ fuel = ctx.config.fuel
+ fuel_node_info = ctx.merge_node(fuel.ssh_creds, {'fuel_master'})
+ creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
+ fuel_conn = KeystoneAuth(fuel.url, creds)
+
+ # get cluster information from REST API
+ if "fuel_os_creds" in ctx.storage and 'fuel_version' in ctx.storage:
+ ctx.fuel_openstack_creds = ctx.storage['fuel_os_creds'] # type: ignore
+ ctx.fuel_version = ctx.storage['fuel_version'] # type: ignore
+ return
+
+ cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
+ cluster = reflect_cluster(fuel_conn, cluster_id)
+ ctx.fuel_version = FuelInfo(fuel_conn).get_version()
+ ctx.storage["fuel_version"] = ctx.fuel_version
+
+ logger.info("Found FUEL {0}".format(".".join(map(str, ctx.fuel_version))))
+ openrc = cluster.get_openrc()
+
+ if openrc:
+ auth_url = cast(str, openrc['os_auth_url'])
+ if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
+ logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
+ auth_url = auth_url.replace("https", "http", 1)
+
+ os_creds = OSCreds(name=cast(str, openrc['username']),
+ passwd=cast(str, openrc['password']),
+ tenant=cast(str, openrc['tenant_name']),
+ auth_url=cast(str, auth_url),
+ insecure=cast(bool, openrc['insecure']))
+
+ ctx.fuel_openstack_creds = os_creds
else:
- fuel = ctx.config.fuel
- discover_nodes = (fuel.discover != "fuel_openrc_only")
- fuel_node_info = NodeInfo(parse_ssh_uri(fuel.ssh_creds), {'fuel_master'})
- fuel_nodes = [fuel_node_info]
+ ctx.fuel_openstack_creds = None
+ ctx.storage["fuel_os_creds"] = ctx.fuel_openstack_creds
- creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
- fuel_conn = KeystoneAuth(fuel.url, creds)
+ if discovery == 'metadata':
+ logger.debug("Skip FUEL nodes discovery due to discovery settings")
+ return
- # get cluster information from REST API
- cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
- cluster = reflect_cluster(fuel_conn, cluster_id)
- ctx.fuel_version = FuelInfo(fuel_conn).get_version()
- logger.info("Found fuel {0}".format(".".join(map(str, ctx.fuel_version))))
- openrc = cluster.get_openrc()
+ try:
+ fuel_rpc = setup_rpc(connect(fuel_node_info),
+ ctx.rpc_code,
+ ctx.default_rpc_plugins,
+ log_level=ctx.config.rpc_log_level)
+ except AuthenticationException:
+ msg = "FUEL nodes discovery failed - wrong FUEL master SSH credentials"
+ if discovery != 'ignore_errors':
+ raise StopTestError(msg)
+ logger.warning(msg)
+ return
+ except Exception as exc:
+ if discovery != 'ignore_errors':
+ logger.exception("While connection to FUEL")
+ raise StopTestError("Failed to connect to FUEL")
+ logger.warning("Failed to connect to FUEL - %s", exc)
+ return
- if openrc:
- auth_url = cast(str, openrc['os_auth_url'])
- if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
- logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
- auth_url = auth_url.replace("https", "http", 1)
+ logger.debug("Downloading FUEL node ssh master key")
+ fuel_key = fuel_rpc.get_file_content('/root/.ssh/id_rsa')
+ network = 'fuelweb_admin' if ctx.fuel_version >= [6, 0] else 'admin'
- os_creds = OSCreds(name=cast(str, openrc['username']),
- passwd=cast(str, openrc['password']),
- tenant=cast(str, openrc['tenant_name']),
- auth_url=cast(str, auth_url),
- insecure=cast(bool, openrc['insecure']))
+ count = 0
+ for count, fuel_node in enumerate(list(cluster.get_nodes())):
+ ip = str(fuel_node.get_ip(network))
+ ctx.merge_node(ConnCreds(ip, "root", key=fuel_key), set(fuel_node.get_roles()))
- ctx.fuel_openstack_creds = os_creds
- else:
- ctx.fuel_openstack_creds = None
-
- if discover_nodes:
-
- try:
- fuel_rpc = setup_rpc(connect(fuel_node_info), ctx.rpc_code, ctx.default_rpc_plugins)
- except AuthenticationException:
- raise StopTestError("Wrong fuel credentials")
- except Exception:
- logger.exception("While connection to FUEL")
- raise StopTestError("Failed to connect to FUEL")
-
- logger.debug("Downloading FUEL node ssh master key")
- fuel_key = fuel_rpc.get_file_content('/root/.ssh/id_rsa')
- network = 'fuelweb_admin' if ctx.fuel_version >= [6, 0] else 'admin'
-
- for fuel_node in list(cluster.get_nodes()):
- ip = str(fuel_node.get_ip(network))
- fuel_nodes.append(NodeInfo(ConnCreds(ip, "root", key=fuel_key),
- roles=set(fuel_node.get_roles())))
-
- ctx.storage['fuel_nodes'] = fuel_nodes
- ctx.nodes_info.extend(fuel_nodes)
- ctx.nodes_info.append(fuel_node_info)
- logger.debug("Found {} FUEL nodes for env {}".format(len(fuel_nodes) - 1, fuel.openstack_env))
- else:
- logger.debug("Skip FUEL nodes discovery, as 'fuel_openrc_only' is set to fuel.discover option")
-
- ctx.storage["fuel/nodes"] = fuel_nodes
- ctx.storage["fuel/os_creds"] = ctx.fuel_openstack_creds
- ctx.storage["fuel/version"] = ctx.fuel_version
+ logger.debug("Found {} FUEL nodes for env {}".format(count, fuel.openstack_env))
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 6117606..8907b81 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -13,7 +13,7 @@
from keystoneclient.v2_0 import Client as keystoneclient
-logger = logging.getLogger("wally.fuel_api")
+logger = logging.getLogger("wally")
class Connection(metaclass=abc.ABCMeta):
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 764d126..2a5c1e5 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -1,4 +1,5 @@
import re
+import logging
from typing import Dict, Iterable
import xml.etree.ElementTree as ET
from typing import List, Tuple, cast, Optional
@@ -7,6 +8,9 @@
from .node_interfaces import IRPCNode
+logger = logging.getLogger("wally")
+
+
def get_data(rr: str, data: str) -> str:
match_res = re.search("(?ims)" + rr, data)
return match_res.group(0)
@@ -105,12 +109,12 @@
class SWInfo:
def __init__(self) -> None:
- self.partitions = None # type: str
+ self.mtab = None # type: str
self.kernel_version = None # type: str
- self.libvirt_version = None # type: str
- self.qemu_version = None # type: str
+ self.libvirt_version = None # type: Optional[str]
+ self.qemu_version = None # type: Optional[str]
self.OS_version = None # type: utils.OSRelease
- self.ceph_version = None # type: str
+ self.ceph_version = None # type: Optional[str]
def get_sw_info(node: IRPCNode) -> SWInfo:
@@ -118,18 +122,35 @@
res.OS_version = utils.get_os(node)
res.kernel_version = node.get_file_content('/proc/version').decode('utf8').strip()
- res.partitions = node.get_file_content('/etc/mtab').decode('utf8').strip()
- res.libvirt_version = node.run("virsh -v", nolog=True).strip()
- res.qemu_version = node.run("qemu-system-x86_64 --version", nolog=True).strip()
- res.ceph_version = node.run("ceph --version", nolog=True).strip()
+ res.mtab = node.get_file_content('/etc/mtab').decode('utf8').strip()
+
+ try:
+ res.libvirt_version = node.run("virsh -v", nolog=True).strip()
+ except OSError:
+ res.libvirt_version = None
+
+ try:
+ res.qemu_version = node.run("qemu-system-x86_64 --version", nolog=True).strip()
+ except OSError:
+ res.qemu_version = None
+
+ try:
+ res.ceph_version = node.run("ceph --version", nolog=True).strip()
+ except OSError:
+ res.ceph_version = None
return res
-def get_hw_info(node: IRPCNode) -> HWInfo:
- res = HWInfo()
- lshw_out = node.run('sudo lshw -xml 2>/dev/null', nolog=True)
+def get_hw_info(node: IRPCNode) -> Optional[HWInfo]:
+ try:
+ lshw_out = node.run('sudo lshw -xml 2>/dev/null')
+ except Exception as exc:
+ logger.warning("lshw failed on node %s: %s", node.info.node_id(), exc)
+ return None
+
+ res = HWInfo()
res.raw = lshw_out
lshw_et = ET.fromstring(lshw_out)
diff --git a/wally/logger.py b/wally/logger.py
index e8c916d..9ebc425 100644
--- a/wally/logger.py
+++ b/wally/logger.py
@@ -1,4 +1,6 @@
+import yaml
import logging
+import logging.config
from typing import Callable, IO, Optional
@@ -48,35 +50,41 @@
return res
-def setup_loggers(def_level: int = logging.DEBUG, log_fname: str = None, log_fd: IO = None) -> None:
+def setup_loggers(def_level: int = logging.DEBUG,
+ log_fname: str = None,
+ log_fd: IO = None,
+ config_file: str = None) -> None:
- log_format = '%(asctime)s - %(levelname)s - %(name)-15s - %(message)s'
- colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
+ # TODO: need to better combine file with custom settings
+ if config_file is not None:
+ data = yaml.load(open(config_file).read())
+ logging.config.dictConfig(data)
+ else:
+ log_format = '%(asctime)s - %(levelname)8s - %(name)-10s - %(message)s'
+ colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
- sh = logging.StreamHandler()
- sh.setLevel(def_level)
- sh.setFormatter(colored_formatter)
+ sh = logging.StreamHandler()
+ sh.setLevel(def_level)
+ sh.setFormatter(colored_formatter)
- logger = logging.getLogger('wally')
- logger.setLevel(logging.DEBUG)
- logger.addHandler(sh)
+ logger = logging.getLogger('wally')
+ logger.setLevel(logging.DEBUG)
- logger_api = logging.getLogger("wally.fuel_api")
- logger_api.setLevel(logging.WARNING)
- logger_api.addHandler(sh)
+ root_logger = logging.getLogger()
+ root_logger.handlers = []
+ root_logger.addHandler(sh)
+ root_logger.setLevel(logging.DEBUG)
- if log_fname or log_fd:
- if log_fname:
- handler = logging.FileHandler(log_fname) # type: Optional[logging.Handler]
- else:
- handler = logging.StreamHandler(log_fd)
+ if log_fname or log_fd:
+ if log_fname:
+ handler = logging.FileHandler(log_fname) # type: Optional[logging.Handler]
+ else:
+ handler = logging.StreamHandler(log_fd)
- log_format = '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
- formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
- handler.setFormatter(formatter)
- handler.setLevel(logging.DEBUG)
+ formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
+ handler.setFormatter(formatter)
+ handler.setLevel(logging.DEBUG)
- logger.addHandler(handler)
- logger_api.addHandler(handler)
+ root_logger.addHandler(handler)
- logging.getLogger('paramiko').setLevel(logging.WARNING)
+ logging.getLogger('paramiko').setLevel(logging.WARNING)
diff --git a/wally/main.py b/wally/main.py
index 9a453fb..f57e1a5 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -27,21 +27,20 @@
except ImportError:
faulthandler = None
-import agent
-
from . import utils, node
from .storage import make_storage, Storage
from .config import Config
from .logger import setup_loggers
from .stage import Stage
from .test_run_class import TestRun
+from .ssh import set_ssh_key_passwd
# stages
from .ceph import DiscoverCephStage
from .openstack import DiscoverOSStage
from .fuel import DiscoverFuelStage
-from .run_test import CollectInfoStage, ExplicitNodesStage, SaveNodesStage, RunTestsStage
+from .run_test import CollectInfoStage, ExplicitNodesStage, SaveNodesStage, RunTestsStage, ConnectStage, SleepStage
from .report import ConsoleReportStage, HtmlReportStage
from .sensors import StartSensorsStage, CollectSensorsStage
@@ -50,14 +49,15 @@
@contextlib.contextmanager
-def log_stage(stage: Stage) -> Iterator[None]:
- logger.info("Start " + stage.name())
+def log_stage(stage: Stage, cleanup: bool = False) -> Iterator[None]:
+ logger.info("Start " + stage.name() + ("::cleanup" if cleanup else ""))
try:
yield
except utils.StopTestError as exc:
- logger.error("Exception during %s: %r", stage.name(), exc)
+ raise
except Exception:
- logger.exception("During %s", stage.name())
+ logger.exception("During %s", stage.name() + ("::cleanup" if cleanup else ""))
+ raise
def list_results(path: str) -> List[Tuple[str, str, str, str]]:
@@ -93,6 +93,7 @@
descr = "Disk io performance test suite"
parser = argparse.ArgumentParser(prog='wally', description=descr)
parser.add_argument("-l", '--log-level', help="print some extra log info")
+ parser.add_argument("--ssh-key-passwd", default=None, help="Pass ssh key password")
parser.add_argument("-s", '--settings-dir', default=None,
help="Folder to store key/settings/history files")
@@ -125,9 +126,9 @@
test_parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
help="Don't connect/discover fuel nodes")
test_parser.add_argument('--no-report', action='store_true', help="Skip report stages")
- test_parser.add_argument('--result-dir', default=None, help="Save results to DIR", metavart="DIR")
+ test_parser.add_argument('--result-dir', default=None, help="Save results to DIR", metavar="DIR")
test_parser.add_argument("comment", help="Test information")
- test_parser.add_argument("config_file", help="Yaml config file", nargs='?', default=None)
+ test_parser.add_argument("config_file", help="Yaml config file")
# ---------------------------------------------------------------------
test_parser = subparsers.add_parser('resume', help='resume tests')
@@ -147,6 +148,39 @@
return os.path.abspath(os.path.expanduser(val))
+def find_cfg_file(name: str, included_from: str = None) -> str:
+ paths = [".", os.path.expanduser('~/.wally')]
+ if included_from is not None:
+ paths.append(os.path.dirname(included_from))
+
+ search_paths = set(os.path.abspath(path) for path in paths if os.path.isdir(path))
+
+ for folder in search_paths:
+ path = os.path.join(folder, name)
+ if os.path.exists(path):
+ return path
+
+ raise FileNotFoundError(name)
+
+
+def load_config(path: str) -> Config:
+ path = os.path.abspath(path)
+ cfg_dict = yaml_load(open(path).read())
+
+ while 'include' in cfg_dict:
+ inc = cfg_dict.pop('include')
+ if isinstance(inc, str):
+ inc = [inc]
+
+ for fname in inc:
+ inc_path = find_cfg_file(fname, path)
+ inc_dict = yaml_load(open(inc_path).read())
+ inc_dict.update(cfg_dict)
+ cfg_dict = inc_dict
+
+ return Config(cfg_dict)
+
+
def main(argv: List[str]) -> int:
if faulthandler is not None:
faulthandler.register(signal.SIGUSR1, all_threads=True)
@@ -159,9 +193,7 @@
storage = None # type: Storage
if opts.subparser_name == 'test':
- file_name = os.path.abspath(opts.config_file)
- with open(file_name) as fd:
- config = Config(yaml_load(fd.read())) # type: ignore
+ config = load_config(opts.config_file)
config.storage_url, config.run_uuid = utils.get_uniq_path_uuid(config.results_dir)
config.comment = opts.comment
@@ -177,24 +209,32 @@
storage['config'] = config # type: ignore
- stages.append(DiscoverCephStage) # type: ignore
- stages.append(DiscoverOSStage) # type: ignore
- stages.append(DiscoverFuelStage) # type: ignore
- stages.append(ExplicitNodesStage) # type: ignore
- stages.append(SaveNodesStage) # type: ignore
- stages.append(StartSensorsStage) # type: ignore
- stages.append(RunTestsStage) # type: ignore
- stages.append(CollectSensorsStage) # type: ignore
+ stages.append(DiscoverCephStage())
+ stages.append(DiscoverOSStage())
+ stages.append(DiscoverFuelStage())
+ stages.append(ExplicitNodesStage())
+ stages.append(SaveNodesStage())
+ stages.append(StartSensorsStage())
+ stages.append(RunTestsStage())
+ stages.append(CollectSensorsStage())
+ stages.append(ConnectStage())
+ stages.append(SleepStage())
if not opts.dont_collect:
- stages.append(CollectInfoStage) # type: ignore
+ stages.append(CollectInfoStage())
- storage['cli'] = argv
+ argv2 = argv[:]
+ if '--ssh-key-passwd' in argv2:
+ # don't save ssh key password to storage
+ argv2[argv2.index("--ssh-key-passwd") + 1] = "<removed from output>"
+ storage['cli'] = argv2
elif opts.subparser_name == 'resume':
storage = make_storage(opts.storage_dir, existing=True)
config = storage.load(Config, 'config')
# TODO: fix this
+ # TODO: add node loading from storage
+ # TODO: fill nodes conncreds with keys
raise NotImplementedError("Resume in not fully implemented")
elif opts.subparser_name == 'ls':
@@ -219,8 +259,8 @@
report_stages = [] # type: List[Stage]
if not getattr(opts, "no_report", False):
- report_stages.append(ConsoleReportStage) # type: ignore
- report_stages.append(HtmlReportStage) # type: ignore
+ report_stages.append(ConsoleReportStage())
+ report_stages.append(HtmlReportStage())
# log level is not a part of config
if opts.log_level is not None:
@@ -228,15 +268,26 @@
else:
str_level = config.get('logging/log_level', 'INFO')
- setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log', "w"))
+ log_config_file = config.get('logging/config', None)
+
+ if log_config_file is not None:
+ log_config_file = find_cfg_file(log_config_file, opts.config_file)
+
+ setup_loggers(getattr(logging, str_level),
+ log_fd=storage.get_stream('log', "w"),
+ config_file=log_config_file)
+
logger.info("All info would be stored into %r", config.storage_url)
ctx = TestRun(config, storage)
ctx.rpc_code, ctx.default_rpc_plugins = node.get_rpc_server_code()
+ if opts.ssh_key_passwd is not None:
+ set_ssh_key_passwd(opts.ssh_key_passwd)
+
stages.sort(key=lambda x: x.priority)
- # TODO: run only stages, which have configs
+ # TODO: run only stages, which have config
failed = False
cleanup_stages = []
for stage in stages:
@@ -248,7 +299,7 @@
try:
with log_stage(stage):
stage.run(ctx)
- except:
+ except (Exception, KeyboardInterrupt):
failed = True
break
@@ -256,7 +307,7 @@
cleanup_failed = False
for stage in cleanup_stages[::-1]:
try:
- with log_stage(stage):
+ with log_stage(stage, cleanup=True):
stage.cleanup(ctx)
except:
cleanup_failed = True
diff --git a/wally/node.py b/wally/node.py
index 54a6291..4d2dda8 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -3,8 +3,9 @@
import json
import socket
import logging
+import tempfile
import subprocess
-from typing import Union, cast, Any, Optional, Tuple, Dict, List
+from typing import Union, cast, Any, Optional, Tuple, Dict
import agent
@@ -28,7 +29,9 @@
def put_to_file(self, path: Optional[str], content: bytes) -> str:
if path is None:
- path = self.run("mktemp").strip()
+ path = self.run("mktemp", nolog=True).strip()
+
+ logger.debug("PUT %s bytes to %s", len(content), path)
with self.conn.open_sftp() as sftp:
with sftp.open(path, "wb") as fd:
@@ -40,17 +43,16 @@
self.conn.close()
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+ if not nolog:
+ logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
+
transport = self.conn.get_transport()
session = transport.open_session()
try:
session.set_combine_stderr(True)
-
stime = time.time()
- if not nolog:
- logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
-
session.exec_command(cmd)
session.settimeout(1)
session.shutdown_write()
@@ -89,12 +91,18 @@
def get_ip(self) -> str:
return 'localhost'
- def put_to_file(self, path: str, content: bytes) -> None:
- dir_name = os.path.dirname(path)
- os.makedirs(dir_name, exist_ok=True)
+ def put_to_file(self, path: Optional[str], content: bytes) -> str:
+ if path is None:
+ fd, path = tempfile.mkstemp(text=False)
+ os.close(fd)
+ else:
+ dir_name = os.path.dirname(path)
+ os.makedirs(dir_name, exist_ok=True)
- with open(path, "wb") as fd:
- fd.write(content)
+ with open(path, "wb") as fd2:
+ fd2.write(content)
+
+ return path
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
proc = subprocess.Popen(cmd, shell=True,
@@ -156,10 +164,29 @@
return str(self)
def get_file_content(self, path: str) -> bytes:
- raise NotImplementedError()
+ logger.debug("GET %s", path)
+ res = self.conn.fs.get_file(path, expanduser=True)
+ logger.debug("Receive %s bytes from %s", len(res), path)
+ return res
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
- raise NotImplementedError()
+ if not nolog:
+ logger.debug("Node %s - run %s", self.info.node_id(), cmd)
+
+ cmd_b = cmd.encode("utf8")
+ proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
+ code = None
+ out = ""
+ while code is None:
+ code, outb, _ = self.conn.cli.get_updates(proc_id)
+ out += outb.decode("utf8")
+ time.sleep(0.01)
+
+ if code != 0:
+ templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
+ raise OSError(templ.format(self.info.node_id(), cmd, code, out))
+
+ return out
def copy_file(self, local_path: str, remote_path: str = None) -> str:
raise NotImplementedError()
@@ -173,38 +200,64 @@
def stat_file(self, path: str) -> Any:
raise NotImplementedError()
- def disconnect(self) -> str:
+ def __exit__(self, x, y, z) -> bool:
+ self.disconnect(stop=True)
+ return False
+
+ def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
+ self.conn.server.load_module(name, version, code)
+
+ def disconnect(self, stop: bool = False) -> None:
+ if stop:
+ logger.debug("Stopping RPC server on %s", self.info.node_id())
+ self.conn.server.stop()
+
+ logger.debug("Disconnecting from %s", self.info.node_id())
self.conn.disconnect()
self.conn = None
-def setup_rpc(node: ISSHHost, rpc_server_code: bytes, plugins: Dict[str, bytes] = None, port: int = 0) -> IRPCNode:
- log_file = node.run("mktemp").strip()
+def setup_rpc(node: ISSHHost,
+ rpc_server_code: bytes,
+ plugins: Dict[str, bytes] = None,
+ port: int = 0,
+ log_level: str = None) -> IRPCNode:
+
+ logger.debug("Setting up RPC connection to {}".format(node.info))
code_file = node.put_to_file(None, rpc_server_code)
ip = node.info.ssh_creds.addr.host
- cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
- "--show-settings --stdout-file={out_file}"
- cmd = cmd.format(code_file=code_file, listen_ip=ip, out_file=log_file, port=port)
+ log_file = None # type: Optional[str]
+ if log_level:
+ log_file = node.run("mktemp", nolog=True).strip()
+ cmd = "python {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
+ cmd = cmd.format(code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
+ logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
+ node.info.node_id(), log_file, log_level))
+ else:
+ cmd = "python {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
+ cmd = cmd.format(code_file, ip, port)
+
params_js = node.run(cmd).strip()
params = json.loads(params_js)
- params['log_file'] = log_file
+
node.info.params.update(params)
port = int(params['addr'].split(":")[1])
rpc_conn = agent.connect((ip, port))
+ rpc_node = RPCNode(rpc_conn, node.info)
+ rpc_node.rpc_log_file = log_file
+
if plugins is not None:
try:
for name, code in plugins.items():
- rpc_conn.server.load_module(name, None, code)
+ rpc_node.upload_plugin(name, code)
except Exception:
- rpc_conn.server.stop()
- rpc_conn.disconnect()
+ rpc_node.disconnect(True)
raise
- return RPCNode(rpc_conn, node.info)
-
+ return rpc_node
# class RemoteNode(node_interfaces.IRPCNode):
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index bc9ba28..8552a3d 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -9,7 +9,7 @@
class NodeInfo:
"""Node information object, result of discovery process or config parsing"""
- def __init__(self, ssh_creds: ConnCreds, roles: Set[str]) -> None:
+ def __init__(self, ssh_creds: ConnCreds, roles: Set[str], params: Dict[str, Any] = None) -> None:
# ssh credentials
self.ssh_creds = ssh_creds
@@ -18,10 +18,18 @@
self.roles = roles
self.os_vm_id = None # type: Optional[int]
self.params = {} # type: Dict[str, Any]
+ if params is not None:
+ self.params = params
def node_id(self) -> str:
return "{0.host}:{0.port}".format(self.ssh_creds.addr)
+ def __str__(self) -> str:
+ return self.node_id()
+
+ def __repr__(self) -> str:
+ return str(self)
+
class ISSHHost(metaclass=abc.ABCMeta):
"""Minimal interface, required to setup RPC connection"""
@@ -55,6 +63,7 @@
"""Remote filesystem interface"""
info = None # type: NodeInfo
conn = None # type: Any
+ rpc_log_file = None # type: str
@abc.abstractmethod
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
@@ -84,6 +93,10 @@
def disconnect(self) -> str:
pass
+ @abc.abstractmethod
+ def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
+ pass
+
def __enter__(self) -> 'IRPCNode':
return self
diff --git a/wally/openstack.py b/wally/openstack.py
index cff6150..5e09d6d 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -13,7 +13,7 @@
from .utils import LogError, StopTestError, get_creds_openrc
-logger = logging.getLogger("wally.discover")
+logger = logging.getLogger("wally")
def get_floating_ip(vm: Any) -> str:
@@ -104,9 +104,14 @@
pass
def run(self, ctx: TestRun) -> None:
+ if 'all_nodes' in ctx.storage:
+ logger.debug("Skip openstack discovery, use previously discovered nodes")
+ return
+
+ ensure_connected_to_openstack(ctx)
+
cfg = ctx.config.openstack
os_nodes_auth = cfg.auth # type: str
-
if os_nodes_auth.count(":") == 2:
user, password, key_file = os_nodes_auth.split(":") # type: str, Optional[str], Optional[str]
if not password:
@@ -115,12 +120,7 @@
user, password = os_nodes_auth.split(":")
key_file = None
- ensure_connected_to_openstack(ctx)
-
- if 'openstack_nodes' in ctx.storage:
- ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "openstack_nodes"))
- else:
- openstack_nodes = [] # type: List[NodeInfo]
+ if ctx.config.discovery not in ('disabled', 'metadata'):
services = ctx.os_connection.nova.services.list() # type: List[Any]
host_services_mapping = {} # type: Dict[str, List[str]]
@@ -132,36 +132,33 @@
for host, services in host_services_mapping.items():
creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
- openstack_nodes.append(NodeInfo(creds, set(services)))
-
- ctx.nodes_info.extend(openstack_nodes)
- ctx.storage['openstack_nodes'] = openstack_nodes # type: ignore
-
- if "reused_os_nodes" in ctx.storage:
- ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "reused_nodes"))
+ ctx.merge_node(creds, set(services))
+ # TODO: log OS nodes discovery results
else:
- reused_nodes = [] # type: List[NodeInfo]
- private_key_path = get_vm_keypair_path(ctx.config)[0]
+ logger.info("Scip OS cluster discovery due to 'discovery' setting value")
- vm_creds = None # type: str
- for vm_creds in cfg.get("vms", []):
- user_name, vm_name_pattern = vm_creds.split("@", 1)
- msg = "Vm like {} lookup failed".format(vm_name_pattern)
+ private_key_path = get_vm_keypair_path(ctx.config)[0]
- with LogError(msg):
- msg = "Looking for vm with name like {0}".format(vm_name_pattern)
- logger.debug(msg)
+ vm_creds = None # type: str
+ for vm_creds in cfg.get("vms", []):
+ user_name, vm_name_pattern = vm_creds.split("@", 1)
+ msg = "Vm like {} lookup failed".format(vm_name_pattern)
- ensure_connected_to_openstack(ctx)
+ with LogError(msg):
+ msg = "Looking for vm with name like {0}".format(vm_name_pattern)
+ logger.debug(msg)
- for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
- creds = ConnCreds(host=ip, user=user_name, key_file=private_key_path)
- node_info = NodeInfo(creds, {'testnode'})
- node_info.os_vm_id = vm_id
- reused_nodes.append(node_info)
+ ensure_connected_to_openstack(ctx)
- ctx.nodes_info.extend(reused_nodes)
- ctx.storage["reused_os_nodes"] = reused_nodes # type: ignore
+ for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
+ creds = ConnCreds(host=ip, user=user_name, key_file=private_key_path)
+ info = NodeInfo(creds, {'testnode'})
+ info.os_vm_id = vm_id
+ nid = info.node_id()
+ if nid in ctx.nodes_info:
+ logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
+ raise StopTestError()
+ ctx.nodes_info[nid] = info
class CreateOSVMSStage(Stage):
@@ -171,32 +168,44 @@
config_block = 'spawn_os_vms' # type: str
def run(self, ctx: TestRun) -> None:
+ if 'all_nodes' in ctx.storage:
+ ctx.os_spawned_nodes_ids = ctx.storage['os_spawned_nodes_ids'] # type: ignore
+ logger.info("Skipping OS VMS discovery/spawn as all data found in storage")
+ return
+
+ if 'os_spawned_nodes_ids' in ctx.storage:
+ logger.error("spawned_os_nodes_ids is found in storage, but no nodes_info is stored." +
+ "Fix this before continue")
+ raise StopTestError()
+
vm_spawn_config = ctx.config.spawn_os_vms
vm_image_config = ctx.config.vm_configs[vm_spawn_config.cfg_name]
- if 'spawned_os_nodes' in ctx.storage:
- ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "spawned_os_nodes"))
+ ensure_connected_to_openstack(ctx)
+ params = vm_image_config.copy()
+ params.update(vm_spawn_config)
+ params.update(get_vm_keypair_path(ctx.config))
+ params['group_name'] = ctx.config.run_uuid
+ params['keypair_name'] = ctx.config.vm_configs['keypair_name']
+
+ if not ctx.config.openstack.get("skip_preparation", False):
+ logger.info("Preparing openstack")
+ prepare_os(ctx.os_connection, params)
else:
- ensure_connected_to_openstack(ctx)
- params = vm_image_config.copy()
- params.update(vm_spawn_config)
- params.update(get_vm_keypair_path(ctx.config))
- params['group_name'] = ctx.config.run_uuid
- params['keypair_name'] = ctx.config.vm_configs['keypair_name']
+ logger.info("Scip openstack preparation as 'skip_preparation' is set")
- if not ctx.config.openstack.get("skip_preparation", False):
- logger.info("Preparing openstack")
- prepare_os(ctx.os_connection, params)
+ ctx.os_spawned_nodes_ids = []
+ with ctx.get_pool() as pool:
+ for info in launch_vms(ctx.os_connection, params, pool):
+ info.roles.add('testnode')
+ nid = info.node_id()
+ if nid in ctx.nodes_info:
+ logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
+ raise StopTestError()
+ ctx.nodes_info[nid] = info
+ ctx.os_spawned_nodes_ids.append(info.os_vm_id)
- new_nodes = []
- ctx.os_spawned_nodes_ids = []
- with ctx.get_pool() as pool:
- for node_info in launch_vms(ctx.os_connection, params, pool):
- node_info.roles.add('testnode')
- ctx.os_spawned_nodes_ids.append(node_info.os_vm_id)
- new_nodes.append(node_info)
-
- ctx.storage['spawned_os_nodes'] = new_nodes # type: ignore
+ ctx.storage['os_spawned_nodes_ids'] = ctx.os_spawned_nodes_ids # type: ignore
def cleanup(self, ctx: TestRun) -> None:
# keep nodes in case of error for future test restart
@@ -206,7 +215,7 @@
clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
del ctx.storage['spawned_os_nodes']
- logger.info("Nodes has been removed")
+ logger.info("OS spawned nodes has been successfully removed")
diff --git a/wally/openstack_api.py b/wally/openstack_api.py
index 2e9ab63..aa9d7f3 100644
--- a/wally/openstack_api.py
+++ b/wally/openstack_api.py
@@ -35,7 +35,7 @@
"""
-logger = logging.getLogger("wally.vms")
+logger = logging.getLogger("wally")
OSCreds = NamedTuple("OSCreds",
diff --git a/wally/report.py b/wally/report.py
index ecf3ba7..d2bef7e 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -29,7 +29,7 @@
abbv_name_to_full)
-logger = logging.getLogger("wally.report")
+logger = logging.getLogger("wally")
def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
diff --git a/wally/run_test.py b/wally/run_test.py
index 8b54f8b..7baac35 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,3 +1,4 @@
+import time
import logging
from concurrent.futures import Future
from typing import List, Dict, Tuple, Optional, Union, cast
@@ -38,16 +39,19 @@
def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
try:
ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
- return True, setup_rpc(ssh_node, ctx.rpc_code, ctx.default_rpc_plugins)
+ return True, setup_rpc(ssh_node,
+ ctx.rpc_code,
+ ctx.default_rpc_plugins,
+ log_level=ctx.config.rpc_log_level)
except Exception as exc:
- logger.error("During connect to {}: {!s}".format(node, exc))
+ logger.exception("During connect to %s: %s", node_info, exc)
return False, node_info
failed_testnodes = [] # type: List[NodeInfo]
failed_nodes = [] # type: List[NodeInfo]
ctx.nodes = []
- for ok, node in pool.map(connect_ext, ctx.nodes_info):
+ for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
if not ok:
node = cast(NodeInfo, node)
if 'testnode' in node.roles:
@@ -59,11 +63,10 @@
if failed_nodes:
msg = "Node(s) {} would be excluded - can't connect"
- logger.warning(msg.format(",".join(map(str, failed_nodes))))
+ logger.warning(msg.format(", ".join(map(str, failed_nodes))))
if failed_testnodes:
- msg = "Can't connect to testnode(s) " + \
- ",".join(map(str, failed_testnodes))
+ msg = "Can't connect to testnode(s) " + ",".join(map(str, failed_testnodes))
logger.error(msg)
raise utils.StopTestError(msg)
@@ -74,8 +77,18 @@
# TODO(koder): what next line was for?
# ssh_utils.close_all_sessions()
- for node in ctx.nodes:
- node.disconnect()
+ if ctx.config.get("download_rpc_logs", False):
+ for node in ctx.nodes:
+ if node.rpc_log_file is not None:
+ nid = node.info.node_id()
+ path = "rpc_logs/" + nid
+ node.conn.server.flush_logs()
+ log = node.get_file_content(node.rpc_log_file)
+ ctx.storage[path] = log.decode("utf8")
+ logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
+
+ with ctx.get_pool() as pool:
+ list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
class CollectInfoStage(Stage):
@@ -88,20 +101,36 @@
if not ctx.config.collect_info:
return
- futures = {} # type: Dict[str, Future]
+ futures = {} # type: Dict[Tuple[str, str], Future]
with ctx.get_pool() as pool:
+ # can't make next RPC request until finish with previous
for node in ctx.nodes:
- hw_info_path = "hw_info/{}".format(node.info.node_id())
+ nid = node.info.node_id()
+ hw_info_path = "hw_info/{}".format(nid)
if hw_info_path not in ctx.storage:
- futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
+ futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
- sw_info_path = "sw_info/{}".format(node.info.node_id())
+ for (path, nid), future in futures.items():
+ try:
+ ctx.storage[path] = future.result()
+ except Exception:
+ logger.exception("During collecting hardware info from %s", nid)
+ raise utils.StopTestError()
+
+ futures.clear()
+ for node in ctx.nodes:
+ nid = node.info.node_id()
+ sw_info_path = "sw_info/{}".format(nid)
if sw_info_path not in ctx.storage:
- futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
+ futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
- for path, future in futures.items():
- ctx.storage[path] = future.result()
+ for (path, nid), future in futures.items():
+ try:
+ ctx.storage[path] = future.result()
+ except Exception:
+ logger.exception("During collecting software info from %s", nid)
+ raise utils.StopTestError()
class ExplicitNodesStage(Stage):
@@ -111,14 +140,12 @@
config_block = 'nodes'
def run(self, ctx: TestRun) -> None:
- explicit_nodes = []
- for url, roles in ctx.config.get('explicit_nodes', {}).items():
- creds = ssh_utils.parse_ssh_uri(url)
- roles = set(roles.split(","))
- explicit_nodes.append(NodeInfo(creds, roles))
+ if 'all_nodes' in ctx.storage:
+ logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
+ return
- ctx.nodes_info.extend(explicit_nodes)
- ctx.storage['explicit_nodes'] = explicit_nodes # type: ignore
+ for url, roles in ctx.config.get('explicit_nodes', {}).items():
+ ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(roles.split(",")))
class SaveNodesStage(Stage):
@@ -127,7 +154,18 @@
priority = StepOrder.CONNECT
def run(self, ctx: TestRun) -> None:
- ctx.storage['all_nodes'] = ctx.nodes_info # type: ignore
+ ctx.storage['all_nodes'] = list(ctx.nodes_info.values()) # type: ignore
+
+
+class SleepStage(Stage):
+ """Save nodes list to file"""
+
+ priority = StepOrder.TEST
+ config_block = 'sleep'
+
+ def run(self, ctx: TestRun) -> None:
+ logger.debug("Will sleep for %r seconds", ctx.config.sleep)
+ time.sleep(ctx.config.sleep)
class RunTestsStage(Stage):
diff --git a/wally/sensors.py b/wally/sensors.py
index c86aeb4..4f00a3e 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,25 +1,34 @@
-from typing import List, Dict, Tuple, Any
+import array
+import logging
+from typing import List, Dict, Tuple
+from . import utils
from .test_run_class import TestRun
from . import sensors_rpc_plugin
from .stage import Stage, StepOrder
plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
-SENSORS_PLUGIN_CODE = open(plugin_fname).read()
+SENSORS_PLUGIN_CODE = open(plugin_fname, "rb").read()
-# TODO(koder): in case if node has more than one role sensor settigns might be incorrect
+logger = logging.getLogger("wally")
+
+# TODO(koder): in case if node has more than one role sensor settings might be incorrect
class StartSensorsStage(Stage):
priority = StepOrder.START_SENSORS
config_block = 'sensors'
def run(self, ctx: TestRun) -> None:
- if 'sensors' not in ctx.config:
- return
+ if array.array('L').itemsize != 8:
+ message = "Python array.array('L') items should be 8 bytes in size, not {}." + \
+ " Can't provide sensors on this platform. Disable sensors in config and retry"
+ logger.critical(message.format(array.array('L').itemsize))
+ raise utils.StopTestError()
per_role_config = {} # type: Dict[str, Dict[str, str]]
- for name, val in ctx.config['sensors'].copy():
+
+ for name, val in ctx.config.sensors.roles_mapping.raw().items():
if isinstance(val, str):
val = {vl.strip(): ".*" for vl in val.split(",")}
elif isinstance(val, list):
@@ -39,14 +48,47 @@
per_role_config[name] = new_vals
for node in ctx.nodes:
- node_cfg = {} # type: Dict[str, str]
+ node_cfg = {} # type: Dict[str, Dict[str, str]]
for role in node.info.roles:
node_cfg.update(per_role_config.get(role, {}))
+ nid = node.info.node_id()
if node_cfg:
- node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
- ctx.sensors_run_on.add(node.info.node_id())
- node.conn.sensors.start()
+ # ceph requires additional settings
+ if 'ceph' in node_cfg:
+ node_cfg['ceph'].update(node.info.params['ceph'])
+ node_cfg['ceph']['osds'] = [osd.id for osd in node.info.params['ceph-osds']]
+
+ logger.debug("Setting up sensort RPC plugin for node %s", nid)
+ node.upload_plugin("sensors", SENSORS_PLUGIN_CODE)
+ ctx.sensors_run_on.add(nid)
+ logger.debug("Start monitoring node %s", nid)
+ node.conn.sensors.start(node_cfg)
+ else:
+ logger.debug("Skip monitoring node %s, as no sensors selected", nid)
+
+
+def collect_sensors_data(ctx: TestRun, stop: bool = False):
+ for node in ctx.nodes:
+ node_id = node.info.node_id()
+ if node_id in ctx.sensors_run_on:
+
+ if stop:
+ func = node.conn.sensors.stop
+ else:
+ func = node.conn.sensors.get_updates
+
+ data, collected_at_b = func() # type: Dict[Tuple[bytes, bytes], List[int]], List[float]
+
+ collected_at = array.array('f')
+ collected_at.frombytes(collected_at_b)
+
+ mstore = ctx.storage.sub_storage("metric", node_id)
+ for (source_name, sensor_name), values_b in data.items():
+ values = array.array('Q')
+ values.frombytes(values_b)
+ mstore.append(values, source_name, sensor_name.decode("utf8"))
+ mstore.append(collected_at, "collected_at")
class CollectSensorsStage(Stage):
@@ -54,16 +96,7 @@
config_block = 'sensors'
def run(self, ctx: TestRun) -> None:
- for node in ctx.nodes:
- node_id = node.info.node_id()
- if node_id in ctx.sensors_run_on:
-
- data, collected_at = node.conn.sensors.stop() # type: Dict[Tuple[str, str], List[int]], List[float]
-
- mstore = ctx.storage.sub_storage("metric", node_id)
- for (source_name, sensor_name), values in data.items():
- mstore[source_name, sensor_name] = values
- mstore["collected_at"] = collected_at
+ collect_sensors_data(ctx, True)
# def delta(func, only_upd=True):
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index c218bff..71a24ac 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -1,13 +1,18 @@
import os
+import json
import time
import array
+import logging
import threading
+import traceback
+import subprocess
-mod_name = "sensor"
+mod_name = "sensors"
__version__ = (0, 1)
+logger = logging.getLogger("agent.sensors")
SensorsMap = {}
@@ -46,8 +51,7 @@
for pid in os.listdir('/proc'):
if pid.isdigit() and pid not in disallowed:
name = get_pid_name(pid)
- if pid in allowed_prefixes or \
- any(name.startswith(val) for val in allowed_prefixes):
+ if pid in allowed_prefixes or any(name.startswith(val) for val in allowed_prefixes):
# this is allowed pid?
result.append(pid)
return result
@@ -96,7 +100,7 @@
@provides("block-io")
-def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
+def io_stat(config, disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
results = {}
for line in open('/proc/diskstats'):
vals = line.split()
@@ -136,7 +140,7 @@
@provides("net-io")
-def net_stat(disallowed_prefixes=('docker', 'lo'), allowed_prefixes=('eth',)):
+def net_stat(config, disallowed_prefixes=('docker', 'lo'), allowed_prefixes=('eth',)):
results = {}
for line in open('/proc/net/dev').readlines()[2:]:
@@ -171,7 +175,7 @@
@provides("perprocess-cpu")
-def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+def pscpu_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
results = {}
# TODO(koder): fixed list of PID's nust be given
for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
@@ -223,7 +227,7 @@
@provides("perprocess-ram")
-def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+def psram_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
results = {}
# TODO(koder): fixed list of PID's nust be given
for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
@@ -265,7 +269,7 @@
@provides("system-cpu")
-def syscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+def syscpu_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
results = {}
# calculate core count
@@ -292,7 +296,7 @@
# dec on current proc
procs_queue = (float(ready_procs) - 1) / core_count
- results["cpu.procs_queue"] = procs_queue
+ results["cpu.procs_queue_x10"] = int(procs_queue * 10)
return results
@@ -312,7 +316,7 @@
@provides("system-ram")
-def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+def sysram_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
if allowed_prefixes is None:
allowed_prefixes = ram_fields
@@ -338,55 +342,89 @@
return results
+@provides("ceph")
+def ceph_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+
+ def get_val(dct, path):
+ if '/' in path:
+ root, next = path.split('/', 1)
+ return get_val(dct[root], next)
+ return dct[path]
+
+ for osd_id in config['osds']:
+ asok = '/var/run/ceph/{}-osd.{}.asok'.format(config['cluster'], osd_id)
+ out = subprocess.check_output('ceph daemon {} perf dump'.format(asok), shell=True)
+ data = json.loads(out)
+ for key_name in config['counters']:
+ results["osd{}.{}".format(osd_id, key_name.replace("/", "."))] = get_val(data, key_name)
+
+ return results
+
+
class SensorsData(object):
def __init__(self):
self.cond = threading.Condition()
self.collected_at = array.array("f")
self.stop = False
- self.data = {} # map sensor_name to list of results
- self.data_fd = None
+ self.data = {} # {str: array[data]}
+ self.data_fd = None # temporary file to store results
+ self.exception = None
+
+
+def collect(sensors_config):
+ curr = {}
+ for name, config in sensors_config.items():
+ params = {'config': config}
+
+ if "allow" in config:
+ params["allowed_prefixes"] = config["allow"]
+
+ if "disallow" in config:
+ params["disallowed_prefixes"] = config["disallow"]
+
+ curr[name] = SensorsMap[name](**params)
+ return curr
# TODO(koder): a lot code here can be optimized and cached, but nobody cares (c)
def sensors_bg_thread(sensors_config, sdata):
- next_collect_at = time.time()
+ try:
+ next_collect_at = time.time()
- while not sdata.stop:
- dtime = next_collect_at - time.time()
- if dtime > 0:
- sdata.cond.wait(dtime)
+ # TODO: handle exceptions here
+ while not sdata.stop:
+ dtime = next_collect_at - time.time()
+ if dtime > 0:
+ with sdata.cond:
+ sdata.cond.wait(dtime)
- if sdata.stop:
- break
+ next_collect_at += 1.0
- ctm = time.time()
- curr = {}
- for name, config in sensors_config.items():
- params = {}
+ if sdata.stop:
+ break
- if "allow" in config:
- params["allowed_prefixes"] = config["allow"]
+ ctm = time.time()
+ new_data = collect(sensors_config)
+ etm = time.time()
- if "disallow" in config:
- params["disallowed_prefixes"] = config["disallow"]
+ if etm - ctm > 0.1:
+ # TODO(koder): need to signal that something in not really ok with sensor collecting
+ pass
- curr[name] = SensorsMap[name](**params)
-
- etm = time.time()
-
- if etm - ctm > 0.1:
- # TODO(koder): need to signal that something in not really ok with sensor collecting
- pass
-
- with sdata.cond:
- sdata.collected_at.append(ctm)
- for source_name, vals in curr.items():
- for sensor_name, val in vals.items():
- key = (source_name, sensor_name)
- if key not in sdata.data:
- sdata.data[key] = array.array("I", [val])
- else:
- sdata.data[key].append(val)
+ # TODO: need to pack data time after time to avoid long operations on next updates request
+ with sdata.cond:
+ sdata.collected_at.append(ctm)
+ for source_name, vals in new_data.items():
+ for sensor_name, val in vals.items():
+ key = (source_name, sensor_name)
+ if key not in sdata.data:
+ sdata.data[key] = array.array('L', [val])
+ else:
+ sdata.data[key].append(val)
+ except Exception:
+ logger.exception("In sensor BG thread")
+ sdata.exception = traceback.format_exc()
sensors_thread = None
@@ -397,6 +435,11 @@
global sensors_thread
global sdata
+ if array.array('L').itemsize != 8:
+ message = "Python array.array('L') items should be 8 bytes in size, not {}." + \
+ " Can't provide sensors on this platform. Disable sensors in config and retry"
+ raise ValueError(message.format(array.array('L').itemsize))
+
if sensors_thread is not None:
raise ValueError("Thread already running")
@@ -411,12 +454,15 @@
raise ValueError("No sensor thread running")
with sdata.cond:
+ if sdata.exception:
+ raise Exception(sdata.exception)
res = sdata.data
collected_at = sdata.collected_at
- sdata.collected_at = array.array("f")
- sdata.data = {name: array.array("I") for name in sdata.data}
+ sdata.collected_at = array.array(sdata.collected_at.typecode)
+ sdata.data = {name: array.array(val.typecode) for name, val in sdata.data.items()}
- return res, collected_at
+ bres = {key: data.tostring() for key, data in res.items()}
+ return bres, collected_at.tostring()
def rpc_stop():
@@ -431,10 +477,15 @@
sdata.cond.notify_all()
sensors_thread.join()
+
+ if sdata.exception:
+ raise Exception(sdata.exception)
+
res = sdata.data
collected_at = sdata.collected_at
sensors_thread = None
sdata = None
- return res, collected_at
+ bres = {key: data.tostring() for key, data in res.items()}
+ return bres, collected_at.tostring()
diff --git a/wally/ssh.py b/wally/ssh.py
index f1786f8..a577ffb 100644
--- a/wally/ssh.py
+++ b/wally/ssh.py
@@ -4,8 +4,8 @@
import logging
import os.path
import selectors
-from io import BytesIO
-from typing import cast, Dict, List, Set
+from io import StringIO
+from typing import cast, Dict, List, Set, Optional
import paramiko
@@ -14,10 +14,16 @@
logger = logging.getLogger("wally")
NODE_KEYS = {} # type: Dict[IPAddr, paramiko.RSAKey]
+SSH_KEY_PASSWD = None # type: Optional[str]
+
+
+def set_ssh_key_passwd(passwd: str) -> None:
+ global SSH_KEY_PASSWD
+ SSH_KEY_PASSWD = passwd
def set_key_for_node(host_port: IPAddr, key: bytes) -> None:
- with BytesIO(key) as sio:
+ with StringIO(key.decode("utf8")) as sio:
NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)
@@ -33,6 +39,8 @@
end_time = time.time() + conn_timeout # type: float
+ logger.debug("SSH connecting to %s", creds)
+
while True:
try:
time_left = end_time - time.time()
@@ -55,16 +63,16 @@
ssh.connect(creds.addr.host,
username=creds.user,
timeout=c_tcp_timeout,
- key_filename=cast(str, creds.key_file),
+ pkey=paramiko.RSAKey.from_private_key(creds.key_file, password=SSH_KEY_PASSWD),
look_for_keys=False,
port=creds.addr.port,
**banner_timeout_arg)
elif creds.key is not None:
- with BytesIO(creds.key) as sio:
+ with StringIO(creds.key.decode("utf8")) as sio:
ssh.connect(creds.addr.host,
username=creds.user,
timeout=c_tcp_timeout,
- pkey=paramiko.RSAKey.from_private_key(sio),
+ pkey=paramiko.RSAKey.from_private_key(sio, password=SSH_KEY_PASSWD),
look_for_keys=False,
port=creds.addr.port,
**banner_timeout_arg)
@@ -86,9 +94,9 @@
port=creds.addr.port,
**banner_timeout_arg)
return ssh
- except paramiko.PasswordRequiredException:
+ except (socket.gaierror, paramiko.PasswordRequiredException):
raise
- except (socket.error, paramiko.SSHException):
+ except socket.error:
if time.time() > end_time:
raise
time.sleep(1)
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 24fc178..efb67b3 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,6 +1,7 @@
import re
+import yaml
import getpass
-from typing import List
+from typing import List, Dict, Any
from .common_types import IPAddr
@@ -41,8 +42,10 @@
uri_reg_exprs.append(templ.format(**re_dct))
-class ConnCreds:
- def __init__(self, host: str, user: str, passwd: str = None, port: int = 22,
+class ConnCreds(yaml.YAMLObject):
+ yaml_tag = '!ConnCreds'
+
+ def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
key_file: str = None, key: bytes = None) -> None:
self.user = user
self.passwd = passwd
@@ -56,6 +59,22 @@
def __repr__(self) -> str:
return str(self)
+ @classmethod
+ def to_yaml(cls, dumper: Any, data: 'ConnCreds') -> Any:
+ dict_representation = {
+ 'user': data.user,
+ 'host': data.addr.host,
+ 'port': data.addr.port,
+ 'passwd': data.passwd,
+ 'key_file': data.key_file
+ }
+ return dumper.represent_mapping(data.yaml_tag, dict_representation)
+
+ @classmethod
+ def from_yaml(cls, loader: Any, node: Any) -> 'ConnCreds':
+ dct = loader.construct_mapping(node)
+ return cls(**dct)
+
def parse_ssh_uri(uri: str) -> ConnCreds:
"""Parse ssh connection URL from one of following form
@@ -69,9 +88,9 @@
for rr in URIsNamespace.uri_reg_exprs:
rrm = re.match(rr, uri)
if rrm is not None:
- params = {"user": getpass.getuser()}
+ params = {"user": getpass.getuser()} # type: Dict[str, str]
params.update(rrm.groupdict())
- return ConnCreds(**params)
+ return ConnCreds(**params) # type: ignore
raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
diff --git a/wally/storage.py b/wally/storage.py
index 540da88..fc9b174 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -118,6 +118,8 @@
else:
create_on_fail = False
+ os.makedirs(os.path.dirname(jpath), exist_ok=True)
+
try:
fd = open(jpath, mode)
except IOError:
@@ -135,10 +137,10 @@
"""Serialize data to yaml"""
def pack(self, value: Any) -> bytes:
if type(value) not in basic_types:
- for name, val in value.__dict__.items():
- if type(val) not in basic_types:
- raise ValueError(("Can't pack {!r}. Attribute {} has value {!r} (type: {}), but only" +
- " basic types accepted as attributes").format(value, name, val, type(val)))
+ # for name, val in value.__dict__.items():
+ # if type(val) not in basic_types:
+ # raise ValueError(("Can't pack {!r}. Attribute {} has value {!r} (type: {}), but only" +
+ # " basic types accepted as attributes").format(value, name, val, type(val)))
value = value.__dict__
return yaml.dump(value, Dumper=Dumper, encoding="utf8")
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index dfbb49f..82ba4e8 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,20 +1,13 @@
config: Config - full configuration
all_nodes: List[NodeInfo] - all nodes
cli: List[str] - cli options
+spawned_nodes_ids: List[int] - list of openstack VM, spawned for test
-fuel:
- version: List[int] - FUEL master node version
- os_creds: OSCreds - openstack creds, discovered from fuel (or None)
- nodes: List[NodeInfo] - FUEL cluster nodes
+fuel_version: List[int] - FUEL master node version
+fuel_os_creds: OSCreds - openstack creds, discovered from fuel (or None)
openstack_openrc: OSCreds - openrc used for openstack cluster
-openstack_nodes: List[NodeInfo] - list of openstack nodes
-reused_os_nodes: List[NodeInfo] - list of openstack VM, reused in test
-spawned_os_nodes: List[NodeInfo] - list of openstack VM, spawned for test
-ceph_nodes: List[NodeInfo] - list of ceph nodes
-explicit_nodes: List[NodeInfo] - list of explicit nodes
-
info/comment : str - run comment
info/run_uuid : str - run uuid
info/run_time : float - run unix time
@@ -28,4 +21,6 @@
metric/{node_name}/{dev}/{metric_name} : List[float] - node metrics data. E.g.:
metric/node-22/cpu/load
metric/node-22/sda/read_io
- metric/node-22/eth0/data_recv
\ No newline at end of file
+ metric/node-22/eth0/data_recv
+
+rpc_logs/{node_id} - str, rpc server log from node
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index ef69b05..8636596 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -167,7 +167,7 @@
try:
futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
results = [fut.result() for fut in futures]
- except (EnvironmentError, agent.RPCError) as exc:
+ except EnvironmentError as exc:
if self.max_retry - 1 == idx:
raise StopTestError("Fio failed") from exc
logger.exception("During fio run")
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index a731b5a..40ce395 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,4 +1,4 @@
-from typing import List, Callable, Any, Dict, Optional, Set
+from typing import List, Callable, Any, Dict, Optional, Set, Union
from concurrent.futures import ThreadPoolExecutor
@@ -8,13 +8,14 @@
from .storage import Storage
from .config import Config
from .fuel_rest_api import Connection
+from .ssh_utils import ConnCreds
class TestRun:
"""Test run information"""
def __init__(self, config: Config, storage: Storage) -> None:
# NodesInfo list
- self.nodes_info = [] # type: List[NodeInfo]
+ self.nodes_info = {} # type: Dict[str, NodeInfo]
# Nodes list
self.nodes = [] # type: List[IRPCNode]
@@ -40,3 +41,13 @@
def get_pool(self):
return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
+ def merge_node(self, creds: ConnCreds, roles: Set[str]) -> NodeInfo:
+ info = NodeInfo(creds, roles)
+ nid = info.node_id()
+
+ if nid in self.nodes_info:
+ self.nodes_info[nid].roles.update(info.roles)
+ return self.nodes_info[nid]
+ else:
+ self.nodes_info[nid] = info
+ return info