Add stage base class, refactor discovery, etc
diff --git a/wally/ceph.py b/wally/ceph.py
new file mode 100644
index 0000000..e23343e
--- /dev/null
+++ b/wally/ceph.py
@@ -0,0 +1,112 @@
+""" Collect data about ceph nodes"""
+import json
+import logging
+from typing import Set, Dict, cast
+
+
+from .node_interfaces import NodeInfo, IRPCNode
+from .ssh_utils import ConnCreds
+from .common_types import IP
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .ssh_utils import parse_ssh_uri
+from .node import connect, setup_rpc
+
+
+logger = logging.getLogger("wally.discover")
+
+
+def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+ """Get set of osd's ip"""
+
+ data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
+ jdata = json.loads(data)
+ ips = set() # type: Set[IP]
+ first_error = True
+ for osd_data in jdata["osds"]:
+ if "public_addr" not in osd_data:
+ if first_error:
+ osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
+ logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
+ "(all subsequent errors omitted)", osd_id)
+ first_error = False
+ else:
+ ip_port = osd_data["public_addr"]
+ if '/' in ip_port:
+ ip_port = ip_port.split("/", 1)[0]
+ ips.add(IP(ip_port.split(":")[0]))
+ return ips
+
+
+def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+ """Return mon ip set"""
+
+ data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
+ jdata = json.loads(data)
+ ips = set() # type: Set[IP]
+ first_error = True
+ for mon_data in jdata["monmap"]["mons"]:
+ if "addr" not in mon_data:
+ if first_error:
+ mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
+ logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
+ "(all subsequent errors omitted)", mon_name)
+ first_error = False
+ else:
+ ip_port = mon_data["addr"]
+ if '/' in ip_port:
+ ip_port = ip_port.split("/", 1)[0]
+ ips.add(IP(ip_port.split(":")[0]))
+
+ return ips
+
+
+class DiscoverCephStage(Stage):
+ config_block = 'ceph'
+ priority = StepOrder.DISCOVER
+
+ def run(self, ctx: TestRun) -> None:
+ """Return list of ceph's nodes NodeInfo"""
+
+ if 'ceph_nodes' in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'ceph_nodes'))
+ else:
+ ceph = ctx.config.ceph
+ root_node_uri = cast(str, ceph.root_node)
+ cluster = ceph.get("cluster", "ceph")
+ conf = ceph.get("conf")
+ key = ceph.get("key")
+ info = NodeInfo(parse_ssh_uri(root_node_uri), set())
+ ceph_nodes = {} # type: Dict[IP, NodeInfo]
+
+ if conf is None:
+ conf = "/etc/ceph/{}.conf".format(cluster)
+
+ if key is None:
+ key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
+
+ with setup_rpc(connect(info), ctx.rpc_code) as node:
+
+ # new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
+ ssh_key = node.get_file_content("~/.ssh/id_rsa")
+
+ try:
+ for ip in get_osds_ips(node, conf, key):
+ if ip in ceph_nodes:
+ ceph_nodes[ip].roles.add("ceph-osd")
+ else:
+ ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-osd"})
+ except Exception as exc:
+ logger.error("OSD discovery failed: %s", exc)
+
+ try:
+ for ip in get_mons_ips(node, conf, key):
+ if ip in ceph_nodes:
+ ceph_nodes[ip].roles.add("ceph-mon")
+ else:
+ ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-mon"})
+ except Exception as exc:
+ logger.error("MON discovery failed: %s", exc)
+
+ ctx.nodes_info.extend(ceph_nodes.values())
+ ctx.storage['ceph-nodes'] = list(ceph_nodes.values())
diff --git a/wally/config.py b/wally/config.py
index cac4acf..e96a03e 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -10,13 +10,14 @@
storage_url = None # type: str
comment = None # type: str
keep_vm = None # type: bool
- no_tests = None # type: bool
dont_discover_nodes = None # type: bool
build_id = None # type: str
build_description = None # type: str
build_type = None # type: str
default_test_local_folder = None # type: str
settings_dir = None # type: str
+ connect_timeout = 30 # type: int
+ no_tests = False # type: bool
def __init__(self, dct: ConfigBlock) -> None:
self.__dict__['_dct'] = dct
diff --git a/wally/discover/__init__.py b/wally/discover/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wally/discover/__init__.py
+++ /dev/null
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
deleted file mode 100644
index 4a72bfb..0000000
--- a/wally/discover/ceph.py
+++ /dev/null
@@ -1,91 +0,0 @@
-""" Collect data about ceph nodes"""
-import json
-import logging
-from typing import List, Set, Dict
-
-
-from ..node_interfaces import NodeInfo, IRPCNode
-from ..ssh_utils import ConnCreds
-from ..common_types import IP
-
-logger = logging.getLogger("wally.discover")
-
-
-def discover_ceph_nodes(node: IRPCNode,
- cluster: str = "ceph",
- conf: str = None,
- key: str = None) -> List[NodeInfo]:
- """Return list of ceph's nodes NodeInfo"""
-
- if conf is None:
- conf = "/etc/ceph/{}.conf".format(cluster)
-
- if key is None:
- key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
-
- try:
- osd_ips = get_osds_ips(node, conf, key)
- except Exception as exc:
- logger.error("OSD discovery failed: %s", exc)
- osd_ips = set()
-
- try:
- mon_ips = get_mons_ips(node, conf, key)
- except Exception as exc:
- logger.error("MON discovery failed: %s", exc)
- mon_ips = set()
-
- ips = {} # type: Dict[str, List[str]]
- for ip in osd_ips:
- ips.setdefault(ip, []).append("ceph-osd")
-
- for ip in mon_ips:
- ips.setdefault(ip, []).append("ceph-mon")
-
- ssh_key = node.get_file_content("~/.ssh/id_rsa")
- return [NodeInfo(ConnCreds(host=ip, user="root", key=ssh_key), set(roles)) for ip, roles in ips.items()]
-
-
-def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
- """Get set of osd's ip"""
-
- data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
- jdata = json.loads(data)
- ips = set() # type: Set[IP]
- first_error = True
- for osd_data in jdata["osds"]:
- if "public_addr" not in osd_data:
- if first_error:
- osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
- logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
- "(all subsequent errors omitted)", osd_id)
- first_error = False
- else:
- ip_port = osd_data["public_addr"]
- if '/' in ip_port:
- ip_port = ip_port.split("/", 1)[0]
- ips.add(IP(ip_port.split(":")[0]))
- return ips
-
-
-def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
- """Return mon ip set"""
-
- data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
- jdata = json.loads(data)
- ips = set() # type: Set[IP]
- first_error = True
- for mon_data in jdata["monmap"]["mons"]:
- if "addr" not in mon_data:
- if first_error:
- mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
- logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
- "(all subsequent errors omitted)", mon_name)
- first_error = False
- else:
- ip_port = mon_data["addr"]
- if '/' in ip_port:
- ip_port = ip_port.split("/", 1)[0]
- ips.add(IP(ip_port.split(":")[0]))
-
- return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
deleted file mode 100644
index 35ddc62..0000000
--- a/wally/discover/discover.py
+++ /dev/null
@@ -1,108 +0,0 @@
-import os.path
-import logging
-from typing import Dict, NamedTuple, List, Optional, cast
-
-from paramiko.ssh_exception import AuthenticationException
-
-from . import ceph
-from . import fuel
-from . import openstack
-from ..utils import parse_creds, StopTestError
-from ..config import ConfigBlock
-from ..start_vms import OSCreds
-from ..node_interfaces import NodeInfo
-from ..node import connect, setup_rpc
-from ..ssh_utils import parse_ssh_uri
-from ..test_run_class import TestRun
-
-
-logger = logging.getLogger("wally.discover")
-
-
-openrc_templ = """#!/bin/sh
-export LC_ALL=C
-export OS_NO_CACHE='true'
-export OS_TENANT_NAME='{tenant}'
-export OS_USERNAME='{name}'
-export OS_PASSWORD='{passwd}'
-export OS_AUTH_URL='{auth_url}'
-export OS_INSECURE={insecure}
-export OS_AUTH_STRATEGY='keystone'
-export OS_REGION_NAME='RegionOne'
-export CINDER_ENDPOINT_TYPE='publicURL'
-export GLANCE_ENDPOINT_TYPE='publicURL'
-export KEYSTONE_ENDPOINT_TYPE='publicURL'
-export NOVA_ENDPOINT_TYPE='publicURL'
-export NEUTRON_ENDPOINT_TYPE='publicURL'
-"""
-
-
-DiscoveryResult = NamedTuple("DiscoveryResult", [("os_creds", Optional[OSCreds]), ("nodes", List[NodeInfo])])
-
-
-def discover(ctx: TestRun,
- discover_list: List[str],
- clusters_info: ConfigBlock,
- discover_nodes: bool = True) -> DiscoveryResult:
- """Discover nodes in clusters"""
-
- new_nodes = [] # type: List[NodeInfo]
- os_creds = None # type: Optional[OSCreds]
-
- for cluster in discover_list:
- if cluster == "openstack":
- if not discover_nodes:
- logger.warning("Skip openstack cluster discovery")
- continue
-
- cluster_info = clusters_info["openstack"] # type: ConfigBlock
- new_nodes.extend(openstack.discover_openstack_nodes(ctx.os_connection, cluster_info))
-
- elif cluster == "fuel" or cluster == "fuel_openrc_only":
- if cluster == "fuel_openrc_only":
- discover_nodes = False
-
- fuel_node_info = NodeInfo(parse_ssh_uri(clusters_info['fuel']['ssh_creds']), {'fuel_master'})
- try:
- fuel_rpc_conn = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
- except AuthenticationException:
- raise StopTestError("Wrong fuel credentials")
- except Exception:
- logger.exception("While connection to FUEL")
- raise StopTestError("Failed to connect to FUEL")
-
- # TODO(koder): keep FUEL rpc in context? Maybe open this connection on upper stack level?
- with fuel_rpc_conn:
- nodes, fuel_info = fuel.discover_fuel_nodes(
- fuel_rpc_conn, ctx.fuel_conn, clusters_info['fuel'], discover_nodes)
- new_nodes.extend(nodes)
-
- if fuel_info.openrc:
- auth_url = cast(str, fuel_info.openrc['os_auth_url'])
- if fuel_info.version >= [8, 0] and auth_url.startswith("https://"):
- logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
- auth_url = auth_url.replace("https", "http", 1)
-
- os_creds = OSCreds(name=cast(str, fuel_info.openrc['username']),
- passwd=cast(str, fuel_info.openrc['password']),
- tenant=cast(str, fuel_info.openrc['tenant_name']),
- auth_url=cast(str, auth_url),
- insecure=cast(bool, fuel_info.openrc['insecure']))
-
- elif cluster == "ceph":
- if discover_nodes:
- cluster_info = clusters_info["ceph"]
- root_node_uri = cast(str, cluster_info["root_node"])
- cluster = clusters_info["ceph"].get("cluster", "ceph")
- conf = clusters_info["ceph"].get("conf")
- key = clusters_info["ceph"].get("key")
- info = NodeInfo(parse_ssh_uri(root_node_uri), set())
- with setup_rpc(connect(info), ctx.rpc_code) as ceph_root_conn:
- new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
- else:
- logger.warning("Skip ceph cluster discovery")
- else:
- msg_templ = "Unknown cluster type in 'discover' parameter: {!r}"
- raise ValueError(msg_templ.format(cluster))
-
- return DiscoveryResult(os_creds, new_nodes)
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
deleted file mode 100644
index 7b4f90f..0000000
--- a/wally/discover/fuel.py
+++ /dev/null
@@ -1,58 +0,0 @@
-import logging
-import socket
-from typing import Dict, Any, Tuple, List, NamedTuple, Union, cast
-from urllib.parse import urlparse
-
-from .. import fuel_rest_api
-from ..node_interfaces import NodeInfo, IRPCNode
-from ..ssh_utils import ConnCreds
-from ..utils import check_input_param
-
-logger = logging.getLogger("wally.discover")
-
-
-FuelNodeInfo = NamedTuple("FuelNodeInfo",
- [("version", List[int]),
- ("fuel_ext_iface", str),
- ("openrc", Dict[str, Union[str, bool]])])
-
-
-def discover_fuel_nodes(fuel_master_node: IRPCNode,
- fuel_conn: fuel_rest_api.Connection,
- fuel_data: Dict[str, Any],
- discover_nodes: bool = True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
- """Discover nodes in fuel cluster, get openrc for selected cluster"""
-
- msg = "openstack_env should be provided in fuel config"
- check_input_param('openstack_env' in fuel_data, msg)
-
- # get cluster information from REST API
- cluster_id = fuel_rest_api.get_cluster_id(fuel_conn, fuel_data['openstack_env'])
- cluster = fuel_rest_api.reflect_cluster(fuel_conn, cluster_id)
- version = fuel_rest_api.FuelInfo(fuel_conn).get_version()
-
- if not discover_nodes:
- logger.warning("Skip fuel cluster discovery")
- return [], FuelNodeInfo(version, None, cluster.get_openrc()) # type: ignore
-
- logger.info("Found fuel {0}".format(".".join(map(str, version))))
-
- # get FUEL master key to connect to cluster nodes via ssh
- logger.debug("Downloading fuel master key")
- fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
-
- network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
- fuel_ip = socket.gethostbyname(fuel_conn.host)
- fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
-
- nodes = []
- for fuel_node in list(cluster.get_nodes()):
- ip = str(fuel_node.get_ip(network))
- creds = ConnCreds(ip, "root", key=fuel_key)
- nodes.append(NodeInfo(creds, roles=set(fuel_node.get_roles())))
-
- logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
-
- return nodes, FuelNodeInfo(version, fuel_ext_iface,
- cast(Dict[str, Union[str, bool]], cluster.get_openrc()))
-
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
deleted file mode 100644
index f590359..0000000
--- a/wally/discover/openstack.py
+++ /dev/null
@@ -1,66 +0,0 @@
-import socket
-import logging
-from typing import Dict, Any, List, Optional, cast
-
-from ..node_interfaces import NodeInfo
-from ..config import ConfigBlock
-from ..ssh_utils import ConnCreds
-from ..start_vms import OSConnection, NovaClient
-
-
-logger = logging.getLogger("wally.discover")
-
-
-def get_floating_ip(vm: Any) -> str:
- """Get VM floating IP address"""
-
- for net_name, ifaces in vm.addresses.items():
- for iface in ifaces:
- if iface.get('OS-EXT-IPS:type') == "floating":
- return iface['addr']
-
- raise ValueError("VM {} has no floating ip".format(vm))
-
-
-def discover_vms(client: NovaClient, search_data: str) -> List[NodeInfo]:
- """Discover virtual machines"""
- name, user, key_file = search_data.split(",")
- servers = client.servers.list(search_opts={"name": name})
- logger.debug("Found %s openstack vms" % len(servers))
-
- nodes = [] # type: List[NodeInfo]
- for server in servers:
- ip = get_floating_ip(server)
- creds = ConnCreds(host=ip, user=user, key_file=key_file)
- nodes.append(NodeInfo(creds, roles={"test_vm"}))
-
- return nodes
-
-
-def discover_openstack_nodes(conn: OSConnection, conf: ConfigBlock) -> List[NodeInfo]:
- """Discover openstack services for given cluster"""
- os_nodes_auth = conf['auth'] # type: str
-
- if os_nodes_auth.count(":") == 2:
- user, password, key_file = os_nodes_auth.split(":") # type: str, Optional[str], Optional[str]
- if not password:
- password = None
- else:
- user, password = os_nodes_auth.split(":")
- key_file = None
-
- services = conn.nova.services.list() # type: List[Any]
- host_services_mapping = {} # type: Dict[str, List[str]]
-
- for service in services:
- ip = cast(str, socket.gethostbyname(service.host))
- host_services_mapping.get(ip, []).append(service.binary)
-
- logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
-
- nodes = [] # type: List[NodeInfo]
- for host, services in host_services_mapping.items():
- creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
- nodes.append(NodeInfo(creds, set(services)))
-
- return nodes
diff --git a/wally/fuel.py b/wally/fuel.py
new file mode 100644
index 0000000..040dcf4
--- /dev/null
+++ b/wally/fuel.py
@@ -0,0 +1,105 @@
+import logging
+from typing import Dict, List, NamedTuple, Union, cast
+
+from paramiko.ssh_exception import AuthenticationException
+
+from .fuel_rest_api import get_cluster_id, reflect_cluster, FuelInfo, KeystoneAuth
+from .node_interfaces import NodeInfo
+from .ssh_utils import ConnCreds, parse_ssh_uri
+from .utils import check_input_param, StopTestError, parse_creds
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .node import connect, setup_rpc
+from .config import ConfigBlock
+from .openstack_api import OSCreds
+
+
+logger = logging.getLogger("wally.discover")
+
+
+FuelNodeInfo = NamedTuple("FuelNodeInfo",
+ [("version", List[int]),
+ ("fuel_ext_iface", str),
+ ("openrc", Dict[str, Union[str, bool]])])
+
+
+
+class DiscoverFuelStage(Stage):
+ """"Fuel nodes discovery, also can get openstack openrc"""
+
+ priority = StepOrder.DISCOVER
+ config_block = 'fuel'
+
+ @classmethod
+ def validate(cls, cfg: ConfigBlock) -> None:
+ # msg = "openstack_env should be provided in fuel config"
+ # check_input_param('openstack_env' in fuel_data, msg)
+ # fuel.openstack_env
+ pass
+
+ def run(self, ctx: TestRun) -> None:
+ if 'fuel' in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'fuel/nodes'))
+ ctx.fuel_openstack_creds = ctx.storage['fuel/os_creds'] # type: ignore
+ ctx.fuel_version = ctx.storage['fuel/version'] # type: ignore
+ else:
+ fuel = ctx.config.fuel
+ discover_nodes = (fuel.discover != "fuel_openrc_only")
+ fuel_node_info = NodeInfo(parse_ssh_uri(fuel.ssh_creds), {'fuel_master'})
+ fuel_nodes = [fuel_node_info]
+
+ creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
+ fuel_conn = KeystoneAuth(fuel.url, creds)
+
+ # get cluster information from REST API
+ cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
+ cluster = reflect_cluster(fuel_conn, cluster_id)
+ ctx.fuel_version = FuelInfo(fuel_conn).get_version()
+ logger.info("Found fuel {0}".format(".".join(map(str, ctx.fuel_version))))
+ openrc = cluster.get_openrc()
+
+ if openrc:
+ auth_url = cast(str, openrc['os_auth_url'])
+ if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
+ logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
+ auth_url = auth_url.replace("https", "http", 1)
+
+ os_creds = OSCreds(name=cast(str, openrc['username']),
+ passwd=cast(str, openrc['password']),
+ tenant=cast(str, openrc['tenant_name']),
+ auth_url=cast(str, auth_url),
+ insecure=cast(bool, openrc['insecure']))
+
+ ctx.fuel_openstack_creds = os_creds
+ else:
+ ctx.fuel_openstack_creds = None
+
+ if discover_nodes:
+
+ try:
+ fuel_rpc = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
+ except AuthenticationException:
+ raise StopTestError("Wrong fuel credentials")
+ except Exception:
+ logger.exception("While connection to FUEL")
+ raise StopTestError("Failed to connect to FUEL")
+
+ logger.debug("Downloading FUEL node ssh master key")
+ fuel_key = fuel_rpc.get_file_content('/root/.ssh/id_rsa')
+ network = 'fuelweb_admin' if ctx.fuel_version >= [6, 0] else 'admin'
+
+ for fuel_node in list(cluster.get_nodes()):
+ ip = str(fuel_node.get_ip(network))
+ fuel_nodes.append(NodeInfo(ConnCreds(ip, "root", key=fuel_key),
+ roles=set(fuel_node.get_roles())))
+
+ ctx.storage['fuel_nodes'] = fuel_nodes
+ ctx.nodes_info.extend(fuel_nodes)
+ ctx.nodes_info.append(fuel_node_info)
+ logger.debug("Found {} FUEL nodes for env {}".format(len(fuel_nodes) - 1, fuel.openstack_env))
+ else:
+ logger.debug("Skip FUEL nodes discovery, as 'fuel_openrc_only' is set to fuel.discover option")
+
+ ctx.storage["fuel/nodes"] = fuel_nodes
+ ctx.storage["fuel/os_creds"] = ctx.fuel_openstack_creds
+ ctx.storage["fuel/version"] = ctx.fuel_version
diff --git a/wally/main.py b/wally/main.py
index 3e1fcb3..14da140 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -5,7 +5,8 @@
import logging
import argparse
import functools
-from typing import List, Tuple, Any, Callable, IO, cast, Optional
+import contextlib
+from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
from yaml import load as _yaml_load
@@ -32,13 +33,33 @@
from .storage import make_storage, Storage
from .config import Config
from .logger import setup_loggers
-from .stage import log_stage, StageType
+from .stage import Stage
from .test_run_class import TestRun
+# stages
+from .ceph import DiscoverCephStage
+from .openstack import DiscoverOSStage
+from .fuel import DiscoverFuelStage
+from .run_test import CollectInfoStage, ExplicitNodesStage, SaveNodesStage, RunTestsStage
+from .report import ConsoleReportStage, HtmlReportStage
+from .sensors import StartSensorsStage, CollectSensorsStage
+
+
logger = logging.getLogger("wally")
+@contextlib.contextmanager
+def log_stage(stage: Stage) -> Iterator[None]:
+ logger.info("Start " + stage.name())
+ try:
+ yield
+ except utils.StopTestError as exc:
+ logger.error("Exception during %s: %r", stage.name(), exc)
+ except Exception:
+ logger.exception("During %s", stage.name())
+
+
def list_results(path: str) -> List[Tuple[str, str, str, str]]:
results = [] # type: List[Tuple[float, str, str, str, str]]
@@ -97,6 +118,7 @@
test_parser.add_argument('--build-description', type=str, default="Build info")
test_parser.add_argument('--build-id', type=str, default="id")
test_parser.add_argument('--build-type', type=str, default="GA")
+ test_parser.add_argument('--dont-collect', action='store_true', help="Don't collect cluster info")
test_parser.add_argument('-n', '--no-tests', action='store_true', help="Don't run tests")
test_parser.add_argument('--load-report', action='store_true')
test_parser.add_argument("-k", '--keep-vm', action='store_true', help="Don't remove test vm's")
@@ -131,8 +153,7 @@
opts = parse_args(argv)
- stages = [] # type: List[StageType]
- report_stages = [] # type: List[StageType]
+ stages = [] # type: List[Stage]
# stop mypy from telling that config & storage might be undeclared
config = None # type: Config
@@ -141,7 +162,7 @@
if opts.subparser_name == 'test':
if opts.resume:
storage = make_storage(opts.resume, existing=True)
- config = storage.load('config', Config)
+ config = storage.load(Config, 'config')
else:
file_name = os.path.abspath(opts.config_file)
with open(file_name) as fd:
@@ -161,20 +182,18 @@
storage['config'] = config # type: ignore
- stages.extend([
- run_test.clouds_connect_stage,
- run_test.discover_stage,
- run_test.reuse_vms_stage,
- log_nodes_statistic_stage,
- run_test.save_nodes_stage,
- run_test.connect_stage])
- if config.get("collect_info", True):
- stages.append(run_test.collect_info_stage)
+ stages.append(DiscoverCephStage) # type: ignore
+ stages.append(DiscoverOSStage) # type: ignore
+ stages.append(DiscoverFuelStage) # type: ignore
+ stages.append(ExplicitNodesStage) # type: ignore
+ stages.append(SaveNodesStage) # type: ignore
+ stages.append(StartSensorsStage) # type: ignore
+ stages.append(RunTestsStage) # type: ignore
+ stages.append(CollectSensorsStage) # type: ignore
- stages.extend([
- run_test.run_tests_stage,
- ])
+ if not opts.dont_collect:
+ stages.append(CollectInfoStage) # type: ignore
elif opts.subparser_name == 'ls':
tab = texttable.Texttable(max_width=200)
@@ -196,9 +215,10 @@
# [x['io'][0], y['io'][0]]))
return 0
+ report_stages = [] # type: List[Stage]
if not getattr(opts, "no_report", False):
- report_stages.append(run_test.console_report_stage)
- report_stages.append(run_test.html_report_stage)
+ report_stages.append(ConsoleReportStage) # type: ignore
+ report_stages.append(HtmlReportStage) # type: ignore
# log level is not a part of config
if opts.log_level is not None:
@@ -206,39 +226,44 @@
else:
str_level = config.get('logging/log_level', 'INFO')
- setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log'))
+ setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log', "w"))
logger.info("All info would be stored into %r", config.storage_url)
ctx = TestRun(config, storage)
+ stages.sort(key=lambda x: x.priority)
+
+ # TODO: run only stages, which have configs
+ failed = False
+ cleanup_stages = []
for stage in stages:
- ok = False
- with log_stage(stage):
- stage(ctx)
- ok = True
- if not ok:
+ try:
+ cleanup_stages.append(stage)
+ with log_stage(stage):
+ stage.run(ctx)
+ except:
+ failed = True
break
- exc, cls, tb = sys.exc_info()
- for stage in ctx.clear_calls_stack[::-1]:
- with log_stage(stage):
- stage(ctx)
+ logger.debug("Start cleanup")
+ cleanup_failed = False
+ for stage in cleanup_stages[::-1]:
+ try:
+ with log_stage(stage):
+ stage.cleanup(ctx)
+ except:
+ cleanup_failed = True
- logger.debug("Start utils.cleanup")
- for clean_func, args, kwargs in utils.iter_clean_func():
- with log_stage(clean_func):
- clean_func(*args, **kwargs)
-
- if exc is None:
+ if not failed:
for report_stage in report_stages:
with log_stage(report_stage):
- report_stage(ctx)
+ report_stage.run(ctx)
logger.info("All info is stored into %r", config.storage_url)
- if exc is None:
- logger.info("Tests finished successfully")
- return 0
- else:
+ if failed or cleanup_failed:
logger.error("Tests are failed. See error details in log above")
return 1
+ else:
+ logger.info("Tests finished successfully")
+ return 0
diff --git a/wally/node.py b/wally/node.py
index 2b58571..fae7879 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -133,22 +133,22 @@
raise NotImplementedError()
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
- raise NotImplemented()
+ raise NotImplementedError()
def copy_file(self, local_path: str, remote_path: str = None) -> str:
- raise NotImplemented()
+ raise NotImplementedError()
def put_to_file(self, path: str, content: bytes) -> None:
- raise NotImplemented()
+ raise NotImplementedError()
def get_interface(self, ip: str) -> str:
- raise NotImplemented()
+ raise NotImplementedError()
def stat_file(self, path: str) -> Any:
- raise NotImplemented()
+ raise NotImplementedError()
def disconnect(self) -> str:
- raise NotImplemented()
+ raise NotImplementedError()
def setup_rpc(node: ISSHHost, rpc_server_code: bytes, port: int = 0) -> IRPCNode:
diff --git a/wally/openstack.py b/wally/openstack.py
new file mode 100644
index 0000000..cff6150
--- /dev/null
+++ b/wally/openstack.py
@@ -0,0 +1,256 @@
+import os.path
+import socket
+import logging
+from typing import Dict, Any, List, Tuple, cast, Optional
+
+from .node_interfaces import NodeInfo
+from .config import ConfigBlock, Config
+from .ssh_utils import ConnCreds
+from .openstack_api import (os_connect, find_vms,
+ OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
+from .test_run_class import TestRun
+from .stage import Stage, StepOrder
+from .utils import LogError, StopTestError, get_creds_openrc
+
+
+logger = logging.getLogger("wally.discover")
+
+
+def get_floating_ip(vm: Any) -> str:
+ """Get VM floating IP address"""
+
+ for net_name, ifaces in vm.addresses.items():
+ for iface in ifaces:
+ if iface.get('OS-EXT-IPS:type') == "floating":
+ return iface['addr']
+
+ raise ValueError("VM {} has no floating ip".format(vm))
+
+
+def ensure_connected_to_openstack(ctx: TestRun) -> None:
+ if not ctx.os_connection is None:
+ if ctx.os_creds is None:
+ ctx.os_creds = get_OS_credentials(ctx)
+ ctx.os_connection = os_connect(ctx.os_creds)
+
+
+def get_OS_credentials(ctx: TestRun) -> OSCreds:
+ if "openstack_openrc" in ctx.storage:
+ return ctx.storage.load(OSCreds, "openstack_openrc")
+
+ creds = None
+ os_creds = None
+ force_insecure = False
+ cfg = ctx.config
+
+ if 'openstack' in cfg.clouds:
+ os_cfg = cfg.clouds['openstack']
+ if 'OPENRC' in os_cfg:
+ logger.info("Using OS credentials from " + os_cfg['OPENRC'])
+ creds_tuple = get_creds_openrc(os_cfg['OPENRC'])
+ os_creds = OSCreds(*creds_tuple)
+ elif 'ENV' in os_cfg:
+ logger.info("Using OS credentials from shell environment")
+ os_creds = get_openstack_credentials()
+ elif 'OS_TENANT_NAME' in os_cfg:
+ logger.info("Using predefined credentials")
+ os_creds = OSCreds(os_cfg['OS_USERNAME'].strip(),
+ os_cfg['OS_PASSWORD'].strip(),
+ os_cfg['OS_TENANT_NAME'].strip(),
+ os_cfg['OS_AUTH_URL'].strip(),
+ os_cfg.get('OS_INSECURE', False))
+
+ elif 'OS_INSECURE' in os_cfg:
+ force_insecure = os_cfg.get('OS_INSECURE', False)
+
+ if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
+ ctx.fuel_openstack_creds is not None:
+ logger.info("Using fuel creds")
+ creds = ctx.fuel_openstack_creds
+ elif os_creds is None:
+ logger.error("Can't found OS credentials")
+ raise StopTestError("Can't found OS credentials", None)
+
+ if creds is None:
+ creds = os_creds
+
+ if force_insecure and not creds.insecure:
+ creds = OSCreds(creds.name, creds.passwd, creds.tenant, creds.auth_url, True)
+
+ logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
+ "auth_url={0.auth_url} insecure={0.insecure}").format(creds))
+
+ ctx.storage["openstack_openrc"] = creds # type: ignore
+ return creds
+
+
+def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
+ key_name = cfg.vm_configs['keypair_name']
+ private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
+ public_path = os.path.join(cfg.settings_dir, key_name + "_public.pub")
+ return (private_path, public_path)
+
+
+class DiscoverOSStage(Stage):
+ """Discover openstack nodes and VMS"""
+
+ config_block = 'openstack'
+
+ # discover FUEL cluster first
+ priority = StepOrder.DISCOVER + 1
+
+ @classmethod
+ def validate(cls, conf: ConfigBlock) -> None:
+ pass
+
+ def run(self, ctx: TestRun) -> None:
+ cfg = ctx.config.openstack
+ os_nodes_auth = cfg.auth # type: str
+
+ if os_nodes_auth.count(":") == 2:
+ user, password, key_file = os_nodes_auth.split(":") # type: str, Optional[str], Optional[str]
+ if not password:
+ password = None
+ else:
+ user, password = os_nodes_auth.split(":")
+ key_file = None
+
+ ensure_connected_to_openstack(ctx)
+
+ if 'openstack_nodes' in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "openstack_nodes"))
+ else:
+ openstack_nodes = [] # type: List[NodeInfo]
+ services = ctx.os_connection.nova.services.list() # type: List[Any]
+ host_services_mapping = {} # type: Dict[str, List[str]]
+
+ for service in services:
+ ip = cast(str, socket.gethostbyname(service.host))
+ host_services_mapping.get(ip, []).append(service.binary)
+
+ logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
+
+ for host, services in host_services_mapping.items():
+ creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
+ openstack_nodes.append(NodeInfo(creds, set(services)))
+
+ ctx.nodes_info.extend(openstack_nodes)
+ ctx.storage['openstack_nodes'] = openstack_nodes # type: ignore
+
+ if "reused_os_nodes" in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "reused_nodes"))
+ else:
+ reused_nodes = [] # type: List[NodeInfo]
+ private_key_path = get_vm_keypair_path(ctx.config)[0]
+
+ vm_creds = None # type: str
+ for vm_creds in cfg.get("vms", []):
+ user_name, vm_name_pattern = vm_creds.split("@", 1)
+ msg = "Vm like {} lookup failed".format(vm_name_pattern)
+
+ with LogError(msg):
+ msg = "Looking for vm with name like {0}".format(vm_name_pattern)
+ logger.debug(msg)
+
+ ensure_connected_to_openstack(ctx)
+
+ for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
+ creds = ConnCreds(host=ip, user=user_name, key_file=private_key_path)
+ node_info = NodeInfo(creds, {'testnode'})
+ node_info.os_vm_id = vm_id
+ reused_nodes.append(node_info)
+
+ ctx.nodes_info.extend(reused_nodes)
+ ctx.storage["reused_os_nodes"] = reused_nodes # type: ignore
+
+
+class CreateOSVMSStage(Stage):
+ "Spawn new VM's in Openstack cluster"
+
+ priority = StepOrder.SPAWN # type: int
+ config_block = 'spawn_os_vms' # type: str
+
+ def run(self, ctx: TestRun) -> None:
+ vm_spawn_config = ctx.config.spawn_os_vms
+ vm_image_config = ctx.config.vm_configs[vm_spawn_config.cfg_name]
+
+ if 'spawned_os_nodes' in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "spawned_os_nodes"))
+ else:
+ ensure_connected_to_openstack(ctx)
+ params = vm_image_config.copy()
+ params.update(vm_spawn_config)
+ params.update(get_vm_keypair_path(ctx.config))
+ params['group_name'] = ctx.config.run_uuid
+ params['keypair_name'] = ctx.config.vm_configs['keypair_name']
+
+ if not ctx.config.openstack.get("skip_preparation", False):
+ logger.info("Preparing openstack")
+ prepare_os(ctx.os_connection, params)
+
+ new_nodes = []
+ ctx.os_spawned_nodes_ids = []
+ with ctx.get_pool() as pool:
+ for node_info in launch_vms(ctx.os_connection, params, pool):
+ node_info.roles.add('testnode')
+ ctx.os_spawned_nodes_ids.append(node_info.os_vm_id)
+ new_nodes.append(node_info)
+
+ ctx.storage['spawned_os_nodes'] = new_nodes # type: ignore
+
+ def cleanup(self, ctx: TestRun) -> None:
+ # keep nodes in case of error for future test restart
+ if not ctx.config.keep_vm and ctx.os_spawned_nodes_ids:
+ logger.info("Removing nodes")
+
+ clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
+ del ctx.storage['spawned_os_nodes']
+
+ logger.info("Nodes has been removed")
+
+
+
+# @contextlib.contextmanager
+# def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
+#
+# pausable_nodes_ids = [cast(int, node.info.os_vm_id)
+# for node in unused_nodes
+# if node.info.os_vm_id is not None]
+#
+# non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
+#
+# if non_pausable:
+# logger.warning("Can't pause {} nodes".format(non_pausable))
+#
+# if pausable_nodes_ids:
+# logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
+# with ctx.get_pool() as pool:
+# openstack_api.pause(ctx.os_connection, pausable_nodes_ids, pool)
+#
+# try:
+# yield pausable_nodes_ids
+# finally:
+# if pausable_nodes_ids:
+# logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
+# with ctx.get_pool() as pool:
+# openstack_api.unpause(ctx.os_connection, pausable_nodes_ids, pool)
+# def clouds_connect_stage(ctx: TestRun) -> None:
+ # TODO(koder): need to use this to connect to openstack in upper code
+ # conn = ctx.config['clouds/openstack']
+ # user, passwd, tenant = parse_creds(conn['creds'])
+ # auth_data = dict(auth_url=conn['auth_url'],
+ # username=user,
+ # api_key=passwd,
+ # project_id=tenant) # type: Dict[str, str]
+ # logger.debug("Discovering openstack nodes with connection details: %r", conn)
+ # connect to openstack, fuel
+
+ # # parse FUEL REST credentials
+ # username, tenant_name, password = parse_creds(fuel_data['creds'])
+ # creds = {"username": username,
+ # "tenant_name": tenant_name,
+ # "password": password}
+ #
+ # # connect to FUEL
+ # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
+ # pass
\ No newline at end of file
diff --git a/wally/start_vms.py b/wally/openstack_api.py
similarity index 97%
rename from wally/start_vms.py
rename to wally/openstack_api.py
index a55fdbf..2e9ab63 100644
--- a/wally/start_vms.py
+++ b/wally/openstack_api.py
@@ -7,7 +7,7 @@
import tempfile
import subprocess
import urllib.request
-from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple
+from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple, Set
from concurrent.futures import ThreadPoolExecutor
from keystoneauth1 import loading, session
@@ -19,6 +19,7 @@
from .utils import Timeout
from .node_interfaces import NodeInfo
from .storage import IStorable
+from .ssh_utils import ConnCreds
__doc__ = """
@@ -81,7 +82,6 @@
def find_vms(conn: OSConnection, name_prefix: str) -> Iterable[Tuple[str, int]]:
for srv in conn.nova.servers.list():
if srv.name.startswith(name_prefix):
-
# need to exit after found server first external IP
# so have to rollout two cycles to avoid using exceptions
all_ip = [] # type: List[Any]
@@ -451,11 +451,10 @@
# precache all errors before start creating vms
private_key_path = params['keypair_file_private']
- creds = params['image']['creds']
+ user = params['image']['user']
for ip, os_node in create_vms_mt(conn, count, executor, **vm_params):
- conn_uri = creds.format(ip=ip, private_key_path=private_key_path)
- info = NodeInfo(conn_uri, set())
+ info = NodeInfo(ConnCreds(ip, user, key_file=private_key_path), set())
info.os_vm_id = os_node.id
yield info
@@ -574,19 +573,18 @@
scheduler_hints=scheduler_hints, security_groups=security_groups)
if not wait_for_server_active(conn, srv):
- msg = "Server {0} fails to start. Kill it and try again"
- logger.debug(msg.format(srv))
+ logger.debug("Server {} fails to start. Kill it and try again".format(srv))
conn.nova.servers.delete(srv)
try:
- for _ in Timeout(delete_timeout, "Server {0} delete timeout".format(srv.id)):
+ for _ in Timeout(delete_timeout, "Server {} delete timeout".format(srv.id)):
srv = conn.nova.servers.get(srv.id)
except NotFound:
pass
else:
break
else:
- raise RuntimeError("Failed to start server".format(srv.id))
+ raise RuntimeError("Failed to start server {}".format(srv.id))
if vol_sz is not None:
vol = create_volume(conn, vol_sz, name)
@@ -598,6 +596,7 @@
if flt_ip is not None:
srv.add_floating_ip(flt_ip)
+ # pylint: disable=E1101
return flt_ip.ip, conn.nova.servers.get(srv.id)
diff --git a/wally/ops_log.py b/wally/ops_log.py
new file mode 100644
index 0000000..48f2b61
--- /dev/null
+++ b/wally/ops_log.py
@@ -0,0 +1,9 @@
+from typing import Any
+
+
+log = []
+
+
+def log_op(name: str, *params: Any) -> None:
+ log.append([name] + list(params))
+
diff --git a/wally/report.py b/wally/report.py
index 88c97b7..ecf3ba7 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -20,6 +20,8 @@
from .utils import ssize2b
from .statistic import round_3_digit
from .storage import Storage
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
from .result_classes import TestInfo, FullTestResult, SensorInfo
from .suits.io.fio_task_parser import (get_test_sync_mode,
get_test_summary,
@@ -33,29 +35,46 @@
def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
sensors_data = {} # type: Dict[Tuple[str, str, str], SensorInfo]
- for _, node_id in storage.list("metric"):
- for _, dev_name in storage.list("metric", node_id):
- for _, sensor_name in storage.list("metric", node_id, dev_name):
+ mstorage = storage.sub_storage("metric")
+ for _, node_id in mstorage.list():
+ for _, dev_name in mstorage.list(node_id):
+ for _, sensor_name in mstorage.list(node_id, dev_name):
key = (node_id, dev_name, sensor_name)
si = SensorInfo(*key)
- si.begin_time, si.end_time, si.data = storage["metric/{}/{}/{}".format(*key)] # type: ignore
+ si.begin_time, si.end_time, si.data = storage[node_id, dev_name, sensor_name] # type: ignore
sensors_data[key] = si
- for _, run_id in storage.list("result"):
- path = "result/" + run_id
+ rstorage = storage.sub_storage("result")
+ for _, run_id in rstorage.list():
ftr = FullTestResult()
- ftr.info = storage.load(TestInfo, path, "info")
+ ftr.test_info = rstorage.load(TestInfo, run_id, "info")
ftr.performance_data = {}
- p1 = "result/{}/measurement".format(run_id)
- for _, node_id in storage.list(p1):
- for _, measurement_name in storage.list(p1, node_id):
+ p1 = "{}/measurement".format(run_id)
+ for _, node_id in rstorage.list(p1):
+ for _, measurement_name in rstorage.list(p1, node_id):
perf_key = (node_id, measurement_name)
- ftr.performance_data[perf_key] = storage["{}/{}/{}".format(p1, *perf_key)] # type: ignore
+ ftr.performance_data[perf_key] = rstorage["{}/{}/{}".format(p1, *perf_key)] # type: ignore
yield ftr
+class ConsoleReportStage(Stage):
+
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ # TODO(koder): load data from storage
+ raise NotImplementedError("...")
+
+class HtmlReportStage(Stage):
+
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ # TODO(koder): load data from storage
+ raise NotImplementedError("...")
+
# class StoragePerfInfo:
# def __init__(self, name: str, summary: Any, params, testnodes_count) -> None:
# self.direct_iops_r_max = 0 # type: int
diff --git a/wally/run_test.py b/wally/run_test.py
index 9ae2c9e..1a645b6 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,21 +1,18 @@
-import os
import logging
-import contextlib
-from typing import List, Dict, Iterable, Iterator, Tuple, Optional, Union, cast
-from concurrent.futures import ThreadPoolExecutor, Future
+from concurrent.futures import Future
+from typing import List, Dict, Tuple, Optional, Union, cast
-from .node_interfaces import NodeInfo, IRPCNode
-from .test_run_class import TestRun
-from .discover import discover
-from . import pretty_yaml, utils, report, ssh_utils, start_vms, hw_info
+from . import utils, ssh_utils, hw_info
+from .config import ConfigBlock
from .node import setup_rpc, connect
-from .config import ConfigBlock, Config
-
-from .suits.mysql import MysqlTest
-from .suits.itest import TestInputConfig
+from .node_interfaces import NodeInfo, IRPCNode
+from .stage import Stage, StepOrder
from .suits.io.fio import IOPerfTest
-from .suits.postgres import PgBenchTest
+from .suits.itest import TestInputConfig
+from .suits.mysql import MysqlTest
from .suits.omgbench import OmgTest
+from .suits.postgres import PgBenchTest
+from .test_run_class import TestRun
TOOL_TYPE_MAPPER = {
@@ -29,431 +26,149 @@
logger = logging.getLogger("wally")
-def connect_all(nodes_info: List[NodeInfo], pool: ThreadPoolExecutor, conn_timeout: int = 30) -> List[IRPCNode]:
- """Connect to all nodes, log errors"""
+class ConnectStage(Stage):
+ """Connect to nodes stage"""
- logger.info("Connecting to %s nodes", len(nodes_info))
+ priority = StepOrder.CONNECT
- def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
- try:
- ssh_node = connect(node_info, conn_timeout=conn_timeout)
- # TODO(koder): need to pass all required rpc bytes to this call
- return True, setup_rpc(ssh_node, b"")
- except Exception as exc:
- logger.error("During connect to {}: {!s}".format(node, exc))
- return False, node_info
-
- failed_testnodes = [] # type: List[NodeInfo]
- failed_nodes = [] # type: List[NodeInfo]
- ready = [] # type: List[IRPCNode]
-
- for ok, node in pool.map(connect_ext, nodes_info):
- if not ok:
- node = cast(NodeInfo, node)
- if 'testnode' in node.roles:
- failed_testnodes.append(node)
- else:
- failed_nodes.append(node)
- else:
- ready.append(cast(IRPCNode, node))
-
- if failed_nodes:
- msg = "Node(s) {} would be excluded - can't connect"
- logger.warning(msg.format(",".join(map(str, failed_nodes))))
-
- if failed_testnodes:
- msg = "Can't connect to testnode(s) " + \
- ",".join(map(str, failed_testnodes))
- logger.error(msg)
- raise utils.StopTestError(msg)
-
- if not failed_nodes:
- logger.info("All nodes connected successfully")
-
- return ready
-
-
-def collect_info_stage(ctx: TestRun) -> None:
- futures = {} # type: Dict[str, Future]
-
- with ctx.get_pool() as pool:
- for node in ctx.nodes:
- hw_info_path = "hw_info/{}".format(node.info.node_id())
- if hw_info_path not in ctx.storage:
- futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
-
- sw_info_path = "sw_info/{}".format(node.info.node_id())
- if sw_info_path not in ctx.storage:
- futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
-
- for path, future in futures.items():
- ctx.storage[path] = future.result()
-
-
-@contextlib.contextmanager
-def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
-
- pausable_nodes_ids = [cast(int, node.info.os_vm_id)
- for node in unused_nodes
- if node.info.os_vm_id is not None]
-
- non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
-
- if non_pausable:
- logger.warning("Can't pause {} nodes".format(non_pausable))
-
- if pausable_nodes_ids:
- logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
+ def run(self, ctx: TestRun) -> None:
with ctx.get_pool() as pool:
- start_vms.pause(ctx.os_connection, pausable_nodes_ids, pool)
+ logger.info("Connecting to %s nodes", len(ctx.nodes_info))
- try:
- yield pausable_nodes_ids
- finally:
- if pausable_nodes_ids:
- logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
- with ctx.get_pool() as pool:
- start_vms.unpause(ctx.os_connection, pausable_nodes_ids, pool)
+ def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
+ try:
+ ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
+ # TODO(koder): need to pass all required rpc bytes to this call
+ return True, setup_rpc(ssh_node, b"")
+ except Exception as exc:
+ logger.error("During connect to {}: {!s}".format(node, exc))
+ return False, node_info
+ failed_testnodes = [] # type: List[NodeInfo]
+ failed_nodes = [] # type: List[NodeInfo]
+ ctx.nodes = []
-def run_tests(ctx: TestRun, test_block: ConfigBlock, nodes: List[IRPCNode]) -> None:
- """Run test from test block"""
+ for ok, node in pool.map(connect_ext, ctx.nodes_info):
+ if not ok:
+ node = cast(NodeInfo, node)
+ if 'testnode' in node.roles:
+ failed_testnodes.append(node)
+ else:
+ failed_nodes.append(node)
+ else:
+ ctx.nodes.append(cast(IRPCNode, node))
- test_nodes = [node for node in nodes if 'testnode' in node.info.roles]
+ if failed_nodes:
+ msg = "Node(s) {} would be excluded - can't connect"
+ logger.warning(msg.format(",".join(map(str, failed_nodes))))
- if not test_nodes:
- logger.error("No test nodes found")
- return
-
- for name, params in test_block.items():
- vm_count = params.get('node_limit', None) # type: Optional[int]
-
- # select test nodes
- if vm_count is None:
- curr_test_nodes = test_nodes
- unused_nodes = [] # type: List[IRPCNode]
- else:
- curr_test_nodes = test_nodes[:vm_count]
- unused_nodes = test_nodes[vm_count:]
-
- if not curr_test_nodes:
- logger.error("No nodes found for test, skipping it.")
- continue
-
- # results_path = generate_result_dir_name(cfg.results_storage, name, params)
- # utils.mkdirs_if_unxists(results_path)
-
- # suspend all unused virtual nodes
- if ctx.config.get('suspend_unused_vms', True):
- suspend_ctx = suspend_vm_nodes_ctx(ctx, unused_nodes)
- else:
- suspend_ctx = utils.empty_ctx()
-
- resumable_nodes_ids = [cast(int, node.info.os_vm_id)
- for node in curr_test_nodes
- if node.info.os_vm_id is not None]
-
- if resumable_nodes_ids:
- logger.debug("Check and unpause {} nodes".format(len(resumable_nodes_ids)))
-
- with ctx.get_pool() as pool:
- start_vms.unpause(ctx.os_connection, resumable_nodes_ids, pool)
-
- with suspend_ctx:
- test_cls = TOOL_TYPE_MAPPER[name]
- remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
- test_cfg = TestInputConfig(test_cls.__name__,
- params=params,
- run_uuid=ctx.config.run_uuid,
- nodes=test_nodes,
- storage=ctx.storage,
- remote_dir=remote_dir)
-
- test_cls(test_cfg).run()
-
-
-def connect_stage(ctx: TestRun) -> None:
- ctx.clear_calls_stack.append(disconnect_stage)
-
- with ctx.get_pool() as pool:
- ctx.nodes = connect_all(ctx.nodes_info, pool)
-
-
-def discover_stage(ctx: TestRun) -> None:
- """discover clusters and nodes stage"""
-
- # TODO(koder): Properly store discovery info and check if it available to skip phase
-
- discover_info = ctx.config.get('discover')
- if discover_info:
- if "discovered_nodes" in ctx.storage:
- nodes = ctx.storage.load_list("discovered_nodes", NodeInfo)
- ctx.fuel_openstack_creds = ctx.storage.load("fuel_openstack_creds", start_vms.OSCreds)
- else:
- discover_objs = [i.strip() for i in discover_info.strip().split(",")]
-
- ctx.fuel_openstack_creds, nodes = discover.discover(ctx,
- discover_objs,
- ctx.config.clouds,
- not ctx.config.dont_discover_nodes)
-
- ctx.storage["fuel_openstack_creds"] = ctx.fuel_openstack_creds # type: ignore
- ctx.storage["discovered_nodes"] = nodes # type: ignore
- ctx.nodes_info.extend(nodes)
-
- for url, roles in ctx.config.get('explicit_nodes', {}).items():
- creds = ssh_utils.parse_ssh_uri(url)
- roles = set(roles.split(","))
- ctx.nodes_info.append(NodeInfo(creds, roles))
-
-
-def save_nodes_stage(ctx: TestRun) -> None:
- """Save nodes list to file"""
- ctx.storage['nodes'] = ctx.nodes_info # type: ignore
-
-
-def ensure_connected_to_openstack(ctx: TestRun) -> None:
- if not ctx.os_connection is None:
- if ctx.os_creds is None:
- ctx.os_creds = get_OS_credentials(ctx)
- ctx.os_connection = start_vms.os_connect(ctx.os_creds)
-
-
-def reuse_vms_stage(ctx: TestRun) -> None:
- if "reused_nodes" in ctx.storage:
- ctx.nodes_info.extend(ctx.storage.load_list("reused_nodes", NodeInfo))
- else:
- reused_nodes = []
- vms_patterns = ctx.config.get('clouds/openstack/vms', [])
- private_key_path = get_vm_keypair_path(ctx.config)[0]
-
- for creds in vms_patterns:
- user_name, vm_name_pattern = creds.split("@", 1)
- msg = "Vm like {} lookup failed".format(vm_name_pattern)
-
- with utils.LogError(msg):
- msg = "Looking for vm with name like {0}".format(vm_name_pattern)
- logger.debug(msg)
-
- ensure_connected_to_openstack(ctx)
-
- for ip, vm_id in start_vms.find_vms(ctx.os_connection, vm_name_pattern):
- creds = ssh_utils.ConnCreds(host=ip, user=user_name, key_file=private_key_path)
- node_info = NodeInfo(creds, {'testnode'})
- node_info.os_vm_id = vm_id
- reused_nodes.append(node_info)
- ctx.nodes_info.append(node_info)
-
- ctx.storage["reused_nodes"] = reused_nodes # type: ignore
-
-
-def get_OS_credentials(ctx: TestRun) -> start_vms.OSCreds:
-
- if "openstack_openrc" in ctx.storage:
- return ctx.storage.load("openstack_openrc", start_vms.OSCreds)
-
- creds = None
- os_creds = None
- force_insecure = False
- cfg = ctx.config
-
- if 'openstack' in cfg.clouds:
- os_cfg = cfg.clouds['openstack']
- if 'OPENRC' in os_cfg:
- logger.info("Using OS credentials from " + os_cfg['OPENRC'])
- creds_tuple = utils.get_creds_openrc(os_cfg['OPENRC'])
- os_creds = start_vms.OSCreds(*creds_tuple)
- elif 'ENV' in os_cfg:
- logger.info("Using OS credentials from shell environment")
- os_creds = start_vms.get_openstack_credentials()
- elif 'OS_TENANT_NAME' in os_cfg:
- logger.info("Using predefined credentials")
- os_creds = start_vms.OSCreds(os_cfg['OS_USERNAME'].strip(),
- os_cfg['OS_PASSWORD'].strip(),
- os_cfg['OS_TENANT_NAME'].strip(),
- os_cfg['OS_AUTH_URL'].strip(),
- os_cfg.get('OS_INSECURE', False))
-
- elif 'OS_INSECURE' in os_cfg:
- force_insecure = os_cfg.get('OS_INSECURE', False)
-
- if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
- ctx.fuel_openstack_creds is not None:
- logger.info("Using fuel creds")
- creds = ctx.fuel_openstack_creds
- elif os_creds is None:
- logger.error("Can't found OS credentials")
- raise utils.StopTestError("Can't found OS credentials", None)
-
- if creds is None:
- creds = os_creds
-
- if force_insecure and not creds.insecure:
- creds = start_vms.OSCreds(creds.name,
- creds.passwd,
- creds.tenant,
- creds.auth_url,
- True)
-
- logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
- "auth_url={0.auth_url} insecure={0.insecure}").format(creds))
-
- ctx.storage["openstack_openrc"] = creds # type: ignore
- return creds
-
-
-def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
- key_name = cfg.vm_configs['keypair_name']
- private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
- public_path = os.path.join(cfg.settings_dir, key_name + "_public.pub")
- return (private_path, public_path)
-
-
-@contextlib.contextmanager
-def create_vms_ctx(ctx: TestRun, vm_config: ConfigBlock, already_has_count: int = 0) -> Iterator[List[NodeInfo]]:
-
- if 'spawned_vm_ids' in ctx.storage:
- os_nodes_ids = ctx.storage.get('spawned_vm_ids', []) # type: List[int]
- new_nodes = [] # type: List[NodeInfo]
-
- # TODO(koder): reconnect to old VM's
- raise NotImplementedError("Reconnect to old vms is not implemented")
- else:
- os_nodes_ids = []
- new_nodes = []
- no_spawn = False
- if vm_config['count'].startswith('='):
- count = int(vm_config['count'][1:])
- if count <= already_has_count:
- logger.debug("Not need new vms")
- no_spawn = True
-
- if not no_spawn:
- ensure_connected_to_openstack(ctx)
- params = ctx.config.vm_configs[vm_config['cfg_name']].copy()
- params.update(vm_config)
- params.update(get_vm_keypair_path(ctx.config))
- params['group_name'] = ctx.config.run_uuid
- params['keypair_name'] = ctx.config.vm_configs['keypair_name']
-
- if not vm_config.get('skip_preparation', False):
- logger.info("Preparing openstack")
- start_vms.prepare_os(ctx.os_connection, params)
-
- with ctx.get_pool() as pool:
- for node_info in start_vms.launch_vms(ctx.os_connection, params, pool, already_has_count):
- node_info.roles.add('testnode')
- os_nodes_ids.append(node_info.os_vm_id)
- new_nodes.append(node_info)
-
- ctx.storage['spawned_vm_ids'] = os_nodes_ids # type: ignore
- yield new_nodes
-
- # keep nodes in case of error for future test restart
- if not ctx.config.keep_vm:
- shut_down_vms_stage(ctx, os_nodes_ids)
-
- del ctx.storage['spawned_vm_ids']
-
-
-@contextlib.contextmanager
-def sensor_monitoring(ctx: TestRun, cfg: ConfigBlock, nodes: List[IRPCNode]) -> Iterator[None]:
- yield
-
-
-def run_tests_stage(ctx: TestRun) -> None:
- for group in ctx.config.get('tests', []):
- gitems = list(group.items())
- if len(gitems) != 1:
- msg = "Items in tests section should have len == 1"
- logger.error(msg)
- raise utils.StopTestError(msg)
-
- key, config = gitems[0]
-
- if 'start_test_nodes' == key:
- if 'openstack' not in config:
- msg = "No openstack block in config - can't spawn vm's"
+ if failed_testnodes:
+ msg = "Can't connect to testnode(s) " + \
+ ",".join(map(str, failed_testnodes))
logger.error(msg)
raise utils.StopTestError(msg)
- num_test_nodes = len([node for node in ctx.nodes if 'testnode' in node.info.roles])
- vm_ctx = create_vms_ctx(ctx, config['openstack'], num_test_nodes)
- tests = config.get('tests', [])
- else:
- vm_ctx = utils.empty_ctx([])
- tests = [group]
+ if not failed_nodes:
+ logger.info("All nodes connected successfully")
- # make mypy happy
- new_nodes = [] # type: List[NodeInfo]
+ def cleanup(self, ctx: TestRun) -> None:
+ # TODO(koder): what next line was for?
+ # ssh_utils.close_all_sessions()
- with vm_ctx as new_nodes:
- if new_nodes:
- with ctx.get_pool() as pool:
- new_rpc_nodes = connect_all(new_nodes, pool)
+ for node in ctx.nodes:
+ node.disconnect()
- test_nodes = ctx.nodes + new_rpc_nodes
- if ctx.config.get('sensors'):
- sensor_ctx = sensor_monitoring(ctx, ctx.config.get('sensors'), test_nodes)
- else:
- sensor_ctx = utils.empty_ctx([])
+class CollectInfoStage(Stage):
+ """Collect node info"""
+ priority = StepOrder.START_SENSORS - 1
+ config_block = 'collect_info'
+
+ def run(self, ctx: TestRun) -> None:
+ if not ctx.config.collect_info:
+ return
+
+ futures = {} # type: Dict[str, Future]
+
+ with ctx.get_pool() as pool:
+ for node in ctx.nodes:
+ hw_info_path = "hw_info/{}".format(node.info.node_id())
+ if hw_info_path not in ctx.storage:
+ futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
+
+ sw_info_path = "sw_info/{}".format(node.info.node_id())
+ if sw_info_path not in ctx.storage:
+ futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
+
+ for path, future in futures.items():
+ ctx.storage[path] = future.result()
+
+
+class ExplicitNodesStage(Stage):
+ """add explicit nodes"""
+
+ priority = StepOrder.DISCOVER
+ config_block = 'nodes'
+
+ def run(self, ctx: TestRun) -> None:
+ explicit_nodes = []
+ for url, roles in ctx.config.get('explicit_nodes', {}).items():
+ creds = ssh_utils.parse_ssh_uri(url)
+ roles = set(roles.split(","))
+ explicit_nodes.append(NodeInfo(creds, roles))
+
+ ctx.nodes_info.extend(explicit_nodes)
+ ctx.storage['explicit_nodes'] = explicit_nodes # type: ignore
+
+
+class SaveNodesStage(Stage):
+ """Save nodes list to file"""
+
+ priority = StepOrder.CONNECT
+
+ def run(self, ctx: TestRun) -> None:
+ ctx.storage['all_nodes'] = ctx.nodes_info # type: ignore
+
+
+class RunTestsStage(Stage):
+
+ priority = StepOrder.TEST
+ config_block = 'tests'
+
+ def run(self, ctx: TestRun) -> None:
+ for test_group in ctx.config.get('tests', []):
if not ctx.config.no_tests:
- for test_group in tests:
- with sensor_ctx:
- run_tests(ctx, test_group, test_nodes)
+ test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
- for node in new_rpc_nodes:
- node.disconnect()
+ if not test_nodes:
+ logger.error("No test nodes found")
+ return
+ for name, params in test_group.items():
+ vm_count = params.get('node_limit', None) # type: Optional[int]
-def clouds_connect_stage(ctx: TestRun) -> None:
- # TODO(koder): need to use this to connect to openstack in upper code
- # conn = ctx.config['clouds/openstack']
- # user, passwd, tenant = parse_creds(conn['creds'])
- # auth_data = dict(auth_url=conn['auth_url'],
- # username=user,
- # api_key=passwd,
- # project_id=tenant) # type: Dict[str, str]
- # logger.debug("Discovering openstack nodes with connection details: %r", conn)
- # connect to openstack, fuel
+ # select test nodes
+ if vm_count is None:
+ curr_test_nodes = test_nodes
+ else:
+ curr_test_nodes = test_nodes[:vm_count]
- # # parse FUEL REST credentials
- # username, tenant_name, password = parse_creds(fuel_data['creds'])
- # creds = {"username": username,
- # "tenant_name": tenant_name,
- # "password": password}
- #
- # # connect to FUEL
- # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
- pass
+ if not curr_test_nodes:
+ logger.error("No nodes found for test, skipping it.")
+ continue
+ test_cls = TOOL_TYPE_MAPPER[name]
+ remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
+ test_cfg = TestInputConfig(test_cls.__name__,
+ params=params,
+ run_uuid=ctx.config.run_uuid,
+ nodes=test_nodes,
+ storage=ctx.storage,
+ remote_dir=remote_dir)
-def shut_down_vms_stage(ctx: TestRun, nodes_ids: List[int]) -> None:
- if nodes_ids:
- logger.info("Removing nodes")
- start_vms.clear_nodes(ctx.os_connection, nodes_ids)
- logger.info("Nodes has been removed")
+ test_cls(test_cfg).run()
-
-def clear_enviroment(ctx: TestRun) -> None:
- shut_down_vms_stage(ctx, ctx.storage.get('spawned_vm_ids', []))
- ctx.storage['spawned_vm_ids'] = [] # type: ignore
-
-
-def disconnect_stage(ctx: TestRun) -> None:
- # TODO(koder): what next line was for?
- # ssh_utils.close_all_sessions()
-
- for node in ctx.nodes:
- node.disconnect()
-
-
-def console_report_stage(ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
-
-def html_report_stage(ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
+ @classmethod
+ def validate_config(cls, cfg: ConfigBlock) -> None:
+ pass
diff --git a/wally/sensors.py b/wally/sensors.py
index b579f3f..c86aeb4 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,62 +1,69 @@
-from typing import List, Dict, Tuple
+from typing import List, Dict, Tuple, Any
+
from .test_run_class import TestRun
from . import sensors_rpc_plugin
-
+from .stage import Stage, StepOrder
plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
SENSORS_PLUGIN_CODE = open(plugin_fname).read()
# TODO(koder): in case if node has more than one role sensor settigns might be incorrect
-def start_sensors_stage(ctx: TestRun) -> None:
- if 'sensors' not in ctx.config:
- return
- per_role_config = {}
- for name, val in ctx.config['sensors'].copy():
- if isinstance(val, str):
- val = {vl.strip(): ".*" for vl in val.split(",")}
- elif isinstance(val, list):
- val = {vl: ".*" for vl in val}
- per_role_config[name] = val
+class StartSensorsStage(Stage):
+ priority = StepOrder.START_SENSORS
+ config_block = 'sensors'
- if 'all' in per_role_config:
- all_vl = per_role_config.pop('all')
- all_roles = set(per_role_config)
+ def run(self, ctx: TestRun) -> None:
+ if 'sensors' not in ctx.config:
+ return
+
+ per_role_config = {} # type: Dict[str, Dict[str, str]]
+ for name, val in ctx.config['sensors'].copy():
+ if isinstance(val, str):
+ val = {vl.strip(): ".*" for vl in val.split(",")}
+ elif isinstance(val, list):
+ val = {vl: ".*" for vl in val}
+ per_role_config[name] = val
+
+ if 'all' in per_role_config:
+ all_vl = per_role_config.pop('all')
+ all_roles = set(per_role_config)
+
+ for node in ctx.nodes:
+ all_roles.update(node.info.roles)
+
+ for name, vals in list(per_role_config.items()):
+ new_vals = all_vl.copy()
+ new_vals.update(vals)
+ per_role_config[name] = new_vals
for node in ctx.nodes:
- all_roles.update(node.info.roles)
+ node_cfg = {} # type: Dict[str, str]
+ for role in node.info.roles:
+ node_cfg.update(per_role_config.get(role, {}))
- for name, vals in list(per_role_config.items()):
- new_vals = all_roles.copy()
- new_vals.update(vals)
- per_role_config[name] = new_vals
-
- for node in ctx.nodes:
- node_cfg = {}
- for role in node.info.roles:
- node_cfg.update(per_role_config.get(role, {}))
-
- if node_cfg:
- node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
- ctx.sensors_run_on.add(node.info.node_id())
- node.conn.sensors.start()
+ if node_cfg:
+ node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
+ ctx.sensors_run_on.add(node.info.node_id())
+ node.conn.sensors.start()
-def collect_sensors_stage(ctx: TestRun, stop: bool = True) -> None:
- for node in ctx.nodes:
- node_id = node.info.node_id()
- if node_id in ctx.sensors_run_on:
+class CollectSensorsStage(Stage):
+ priority = StepOrder.COLLECT_SENSORS
+ config_block = 'sensors'
- if stop:
+ def run(self, ctx: TestRun) -> None:
+ for node in ctx.nodes:
+ node_id = node.info.node_id()
+ if node_id in ctx.sensors_run_on:
+
data, collected_at = node.conn.sensors.stop() # type: Dict[Tuple[str, str], List[int]], List[float]
- else:
- data, collected_at = node.conn.sensors.get_updates()
- for (source_name, sensor_name), values in data.items():
- path = "metric/{}/{}/{}".format(node_id, source_name, sensor_name)
- ctx.storage.append(path, values)
- ctx.storage.append("metric/{}/collected_at".format(node_id), collected_at)
+ mstore = ctx.storage.sub_storage("metric", node_id)
+ for (source_name, sensor_name), values in data.items():
+ mstore[source_name, sensor_name] = values
+ mstore["collected_at"] = collected_at
# def delta(func, only_upd=True):
diff --git a/wally/stage.py b/wally/stage.py
index 0451d18..95542f3 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -1,27 +1,36 @@
-import logging
-import contextlib
-from typing import Callable, Iterator
+import abc
+from typing import Optional
-from .utils import StopTestError
from .test_run_class import TestRun
+from .config import ConfigBlock
-logger = logging.getLogger("wally")
+class StepOrder:
+ DISCOVER = 0
+ SPAWN = 10
+ CONNECT = 20
+ START_SENSORS = 30
+ TEST = 40
+ COLLECT_SENSORS = 50
+ REPORT = 60
-@contextlib.contextmanager
-def log_stage(stage) -> Iterator[None]:
- msg_templ = "Exception during {0}: {1!s}"
- msg_templ_no_exc = "During {0}"
+class Stage(metaclass=abc.ABCMeta):
+ priority = None # type: int
+ config_block = None # type: Optional[str]
- logger.info("Start " + stage.name)
+ @classmethod
+ def name(cls) -> str:
+ return cls.__name__
- try:
- yield
- except StopTestError as exc:
- logger.error(msg_templ.format(stage.__name__, exc))
- except Exception:
- logger.exception(msg_templ_no_exc.format(stage.__name__))
+ @classmethod
+ def validate_config(cls, cfg: ConfigBlock) -> None:
+ pass
+ @abc.abstractmethod
+ def run(self, ctx: TestRun) -> None:
+ pass
-StageType = Callable[[TestRun], None]
+ def cleanup(self, ctx: TestRun) -> None:
+ pass
+
diff --git a/wally/storage.py b/wally/storage.py
index 05e4259..540da88 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -4,30 +4,24 @@
import os
import abc
-from typing import Any, Iterable, TypeVar, Type, IO, Tuple, cast, List
+import array
+from typing import Any, Iterator, TypeVar, Type, IO, Tuple, cast, List, Dict, Union, Iterable
+
+
+import yaml
+try:
+ from yaml import CLoader as Loader, CDumper as Dumper # type: ignore
+except ImportError:
+ from yaml import Loader, Dumper # type: ignore
class IStorable(metaclass=abc.ABCMeta):
"""Interface for type, which can be stored"""
- @abc.abstractmethod
- def __getstate__(self) -> Any:
- pass
- @abc.abstractmethod
- def __setstate__(self, Any):
- pass
-
-
-# all builtin types can be stored
-IStorable.register(list) # type: ignore
-IStorable.register(dict) # type: ignore
-IStorable.register(tuple) # type: ignore
-IStorable.register(set) # type: ignore
-IStorable.register(None) # type: ignore
-IStorable.register(int) # type: ignore
-IStorable.register(str) # type: ignore
-IStorable.register(bytes) # type: ignore
-IStorable.register(bool) # type: ignore
+basic_types = {list, dict, tuple, set, type(None), int, str, bytes, bool, float}
+for btype in basic_types:
+ # pylint: disable=E1101
+ IStorable.register(btype) # type: ignore
ObjClass = TypeVar('ObjClass')
@@ -54,11 +48,15 @@
pass
@abc.abstractmethod
- def list(self, path: str) -> Iterable[Tuple[bool, str]]:
+ def list(self, path: str) -> Iterator[Tuple[bool, str]]:
pass
@abc.abstractmethod
- def get_stream(self, path: str) -> IO:
+ def get_stream(self, path: str, mode: str = "rb+") -> IO:
+ pass
+
+ @abc.abstractmethod
+ def sub_storage(self, path: str) -> 'ISimpleStorage':
pass
@@ -78,14 +76,18 @@
def __init__(self, root_path: str, existing: bool) -> None:
self.root_path = root_path
+ self.existing = existing
if existing:
if not os.path.isdir(self.root_path):
- raise ValueError("No storage found at {!r}".format(root_path))
+ raise IOError("No storage found at {!r}".format(root_path))
+
+ def j(self, path: str) -> str:
+ return os.path.join(self.root_path, path)
def __setitem__(self, path: str, value: bytes) -> None:
- path = os.path.join(self.root_path, path)
- os.makedirs(os.path.dirname(path), exist_ok=True)
- with open(path, "wb") as fd:
+ jpath = self.j(path)
+ os.makedirs(os.path.dirname(jpath), exist_ok=True)
+ with open(jpath, "wb") as fd:
fd.write(value)
def __delitem__(self, path: str) -> None:
@@ -95,32 +97,53 @@
pass
def __getitem__(self, path: str) -> bytes:
- path = os.path.join(self.root_path, path)
- with open(path, "rb") as fd:
+ with open(self.j(path), "rb") as fd:
return fd.read()
def __contains__(self, path: str) -> bool:
- path = os.path.join(self.root_path, path)
- return os.path.exists(path)
+ return os.path.exists(self.j(path))
- def list(self, path: str) -> Iterable[Tuple[bool, str]]:
- path = os.path.join(self.root_path, path)
- for entry in os.scandir(path):
+ def list(self, path: str = "") -> Iterator[Tuple[bool, str]]:
+ jpath = self.j(path)
+ for entry in os.scandir(jpath):
if not entry.name in ('..', '.'):
yield entry.is_file(), entry.name
- def get_stream(self, path: str, mode: str = "rb") -> IO:
- path = os.path.join(self.root_path, path)
- return open(path, mode)
+ def get_stream(self, path: str, mode: str = "rb+") -> IO[bytes]:
+ jpath = self.j(path)
+
+ if "cb" == mode:
+ create_on_fail = True
+ mode = "rb+"
+ else:
+ create_on_fail = False
+
+ try:
+ fd = open(jpath, mode)
+ except IOError:
+ if not create_on_fail:
+ raise
+ fd = open(jpath, "wb")
+
+ return cast(IO[bytes], fd)
+
+ def sub_storage(self, path: str) -> 'FSStorage':
+ return self.__class__(self.j(path), self.existing)
class YAMLSerializer(ISerializer):
"""Serialize data to yaml"""
- def pack(self, value: IStorable) -> bytes:
- raise NotImplementedError()
+ def pack(self, value: Any) -> bytes:
+ if type(value) not in basic_types:
+ for name, val in value.__dict__.items():
+ if type(val) not in basic_types:
+ raise ValueError(("Can't pack {!r}. Attribute {} has value {!r} (type: {}), but only" +
+ " basic types accepted as attributes").format(value, name, val, type(val)))
+ value = value.__dict__
+ return yaml.dump(value, Dumper=Dumper, encoding="utf8")
def unpack(self, data: bytes) -> IStorable:
- raise NotImplementedError()
+ return yaml.load(data, Loader=Loader)
class Storage:
@@ -129,32 +152,65 @@
self.storage = storage
self.serializer = serializer
- def __setitem__(self, path: str, value: IStorable) -> None:
- self.storage[path] = self.serializer.pack(value)
+ def sub_storage(self, *path: str) -> 'Storage':
+ return self.__class__(self.storage.sub_storage("/".join(path)), self.serializer)
- def __getitem__(self, path: str) -> IStorable:
+ def __setitem__(self, path: Union[str, Iterable[str]], value: Any) -> None:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
+ self.storage[path] = self.serializer.pack(cast(IStorable, value))
+
+ def __getitem__(self, path: Union[str, Iterable[str]]) -> IStorable:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
return self.serializer.unpack(self.storage[path])
- def __delitem__(self, path: str) -> None:
+ def __delitem__(self, path: Union[str, Iterable[str]]) -> None:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
del self.storage[path]
- def __contains__(self, path: str) -> bool:
+ def __contains__(self, path: Union[str, Iterable[str]]) -> bool:
+ if not isinstance(path, str):
+ path = "/".join(path)
return path in self.storage
- def list(self, *path: str) -> Iterable[Tuple[bool, str]]:
+ def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
return self.storage.list("/".join(path))
- def construct(self, path: str, raw_val: IStorable, obj_class: Type[ObjClass]) -> ObjClass:
+ def set_array(self, value: array.array, *path: str) -> None:
+ with self.get_stream("/".join(path), "wb") as fd:
+ value.tofile(fd) # type: ignore
+
+ def get_array(self, typecode: str, *path: str) -> array.array:
+ res = array.array(typecode)
+ path_s = "/".join(path)
+ with self.get_stream(path_s, "rb") as fd:
+ fd.seek(0, os.SEEK_END)
+ size = fd.tell()
+ fd.seek(0, os.SEEK_SET)
+ assert size % res.itemsize == 0, "Storage object at path {} contains no array of {} or corrupted."\
+ .format(path_s, typecode)
+ res.fromfile(fd, size // res.itemsize) # type: ignore
+ return res
+
+ def append(self, value: array.array, *path: str) -> None:
+ with self.get_stream("/".join(path), "cb") as fd:
+ fd.seek(0, os.SEEK_END)
+ value.tofile(fd) # type: ignore
+
+ def construct(self, path: str, raw_val: Dict, obj_class: Type[ObjClass]) -> ObjClass:
+ "Internal function, used to construct user type from raw unpacked value"
if obj_class in (int, str, dict, list, None):
- if not isinstance(raw_val, obj_class):
- raise ValueError("Can't load path {!r} into type {}. Real type is {}"
- .format(path, obj_class, type(raw_val)))
- return cast(ObjClass, raw_val)
+ raise ValueError("Can't load into build-in value - {!r} into type {}")
if not isinstance(raw_val, dict):
raise ValueError("Can't load path {!r} into python type. Raw value not dict".format(path))
- if not all(isinstance(str, key) for key in raw_val.keys):
+ if not all(isinstance(key, str) for key in raw_val.keys()):
raise ValueError("Can't load path {!r} into python type.".format(path) +
"Raw not all keys in raw value is strings")
@@ -170,19 +226,25 @@
def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
path_s = "/".join(path)
- return self.construct(path_s, self[path_s], obj_class)
+ return self.construct(path_s, cast(Dict, self[path_s]), obj_class)
- def get_stream(self, *path: str) -> IO:
- return self.storage.get_stream("/".join(path))
+ def get_stream(self, path: str, mode: str = "r") -> IO:
+ return self.storage.get_stream(path, mode)
- def get(self, path: str, default: Any = None) -> Any:
+ def get(self, path: Union[str, Iterable[str]], default: Any = None) -> Any:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
try:
return self[path]
- except KeyError:
+ except Exception:
return default
- def append(self, path: str, data: List):
- raise NotImplemented()
+ def __enter__(self) -> 'Storage':
+ return self
+
+ def __exit__(self, x: Any, y: Any, z: Any) -> None:
+ return
def make_storage(url: str, existing: bool = False) -> Storage:
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 583a4a0..5715046 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,10 +1,19 @@
config: Config - full configuration
-nodes: List[NodeInfo] - all nodes
-fuel_openstack_creds: OSCreds - openstack creds, discovered from fuel (or None)
+all_nodes: List[NodeInfo] - all nodes
+
+fuel:
+ version: List[int] - FUEL master node version
+ os_creds: OSCreds - openstack creds, discovered from fuel (or None)
+ nodes: List[NodeInfo] - FUEL cluster nodes
+
openstack_openrc: OSCreds - openrc used for openstack cluster
-discovered_nodes: List[NodeInfo] - list of discovered nodes
-reused_nodes: List[NodeInfo] - list of reused nodes from cluster
-spawned_vm_ids: List[int] - list of openstack VM id's, spawned for test
+
+openstack_nodes: List[NodeInfo] - list of openstack nodes
+reused_os_nodes: List[NodeInfo] - list of openstack VM, reused in test
+spawned_os_nodes: List[NodeInfo] - list of openstack VM, spawned for test
+ceph_nodes: List[NodeInfo] - list of ceph nodes
+explicit_nodes: List[NodeInfo] - list of explicit nodes
+
info/comment : str - run comment
info/run_uuid : str - run uuid
info/run_time : float - run unix time
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 0f4ebde..1b5f38e 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -83,7 +83,7 @@
node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
node.conn.mkdir(self.config.remote_dir)
except Exception as exc:
- msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node, exc)
+ msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node)
logger.exception(msg)
raise StopTestError(msg) from exc
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 6d1eeee..ef69b05 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -74,7 +74,7 @@
pass
@abc.abstractmethod
- def format_for_console(cls, data: Any) -> str:
+ def format_for_console(self, data: Any) -> str:
pass
@@ -122,9 +122,9 @@
}
assert info == expected_config, \
- "Test info at path {} is not equal to expected config." + \
- "Maybe configuration was changed before test was restarted. " + \
- "Current cfg is {!r}, expected cfg is {!r}".format(info_path, info, expected_config)
+ ("Test info at path {} is not equal to expected config." +
+ "Maybe configuration was changed before test was restarted. " +
+ "Current cfg is {!r}, expected cfg is {!r}").format(info_path, info, expected_config)
logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
else:
@@ -181,10 +181,10 @@
start_times = [] # type: List[int]
stop_times = [] # type: List[int]
+ mstorage = storage.sub_storage("result", str(run_id), "measurement")
for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
for metrics_name, data in result.items():
- path = "result/{}/measurement/{}/{}".format(run_id, node.info.node_id(), metrics_name)
- storage[path] = data # type: ignore
+ mstorage[node.info.node_id(), metrics_name] = data # type: ignore
start_times.append(t_start)
stop_times.append(t_stop)
@@ -214,7 +214,7 @@
'end_time': max_stop_time
}
- storage["result/{}/info".format(run_id)] = test_config # type: ignore
+ storage["result", str(run_id), "info"] = test_config # type: ignore
@abc.abstractmethod
def config_node(self, node: IRPCNode) -> None:
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index db41a46..30c46e7 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -4,7 +4,7 @@
from .timeseries import SensorDatastore
from .node_interfaces import NodeInfo, IRPCNode
-from .start_vms import OSCreds, OSConnection
+from .openstack_api import OSCreds, OSConnection
from .storage import Storage
from .config import Config
from .fuel_rest_api import Connection
@@ -24,6 +24,7 @@
# openstack credentials
self.fuel_openstack_creds = None # type: Optional[OSCreds]
+ self.fuel_version = None # type: Optional[List[int]]
self.os_creds = None # type: Optional[OSCreds]
self.os_connection = None # type: Optional[OSConnection]
self.fuel_conn = None # type: Optional[Connection]
@@ -33,6 +34,7 @@
self.config = config
self.sensors_data = SensorDatastore()
self.sensors_run_on = set() # type: Set[str]
+ self.os_spawned_nodes_ids = None # type: List[int]
def get_pool(self):
return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
diff --git a/wally/utils.py b/wally/utils.py
index b27b1ba..45b67b4 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -260,7 +260,7 @@
raise OSError("Can't define interface for {0}".format(target_ip))
-def open_for_append_or_create(fname: str) -> IO:
+def open_for_append_or_create(fname: str) -> IO[str]:
if not os.path.exists(fname):
return open(fname, "w")
@@ -289,18 +289,6 @@
return data
-CLEANING = [] # type: List[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]
-
-
-def clean_resource(func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
- CLEANING.append((func, list(args), kwargs))
-
-
-def iter_clean_func() -> Iterator[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]:
- while CLEANING:
- yield CLEANING.pop()
-
-
def flatten(data: Iterable[Any]) -> List[Any]:
res = []
for i in data: