refactoring and typing in progress
diff --git a/wally/run_test.py b/wally/run_test.py
index b96ecf8..e62cfb1 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -4,13 +4,14 @@
import functools
import contextlib
import collections
-from typing import List, Dict, Iterable, Any, Iterator, Mapping, Callable, Tuple, Optional
+from typing import List, Dict, Iterable, Any, Iterator, Mapping, Callable, Tuple, Optional, Union, cast
from concurrent.futures import ThreadPoolExecutor, Future
-from .inode import INode
-from .discover import discover
+from .node_interfaces import NodeInfo, IRPCNode
from .test_run_class import TestRun
+from .discover import discover
from . import pretty_yaml, utils, report, ssh_utils, start_vms, hw_info
+from .config import ConfigBlock, Config
from .suits.mysql import MysqlTest
from .suits.itest import TestConfig
@@ -30,37 +31,35 @@
logger = logging.getLogger("wally")
-
-def connect_all(nodes: Iterable[INode],
+def connect_all(nodes_info: List[NodeInfo],
pool: ThreadPoolExecutor,
conn_timeout: int = 30,
- rpc_conn_callback: ssh_utils.RPCBeforeConnCallback = None) -> None:
- """Connect to all nodes, log errors
- nodes - list of nodes
- """
+ rpc_conn_callback: ssh_utils.RPCBeforeConnCallback = None) -> List[IRPCNode]:
+ """Connect to all nodes, log errors"""
- logger.info("Connecting to %s nodes", len(nodes))
+ logger.info("Connecting to %s nodes", len(nodes_info))
- def connect_ext(node: INode) -> bool:
+ def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
try:
- node.connect_ssh(conn_timeout)
- node.rpc, node.rpc_params = ssh_utils.setup_rpc(node, rpc_conn_callback=rpc_conn_callback)
- return True
+ ssh_node = ssh_utils.connect(node_info.ssh_conn_url, conn_timeout=conn_timeout)
+ return True, ssh_utils.setup_rpc(ssh_node, rpc_conn_callback=rpc_conn_callback)
except Exception as exc:
logger.error("During connect to {}: {!s}".format(node, exc))
- return False
+ return False, node_info
- list(pool.map(connect_ext, nodes))
+ failed_testnodes = [] # type: List[NodeInfo]
+ failed_nodes = [] # type: List[NodeInfo]
+ ready = [] # type: List[IRPCNode]
- failed_testnodes = []
- failed_nodes = []
-
- for node in nodes:
- if not node.is_connected():
+ for ok, node in pool.map(connect_ext, nodes_info):
+ if not ok:
+ node = cast(NodeInfo, node)
if 'testnode' in node.roles:
failed_testnodes.append(node)
else:
failed_nodes.append(node)
+ else:
+ ready.append(cast(IRPCNode, node))
if failed_nodes:
msg = "Node(s) {} would be excluded - can't connect"
@@ -75,15 +74,17 @@
if not failed_nodes:
logger.info("All nodes connected successfully")
+ return ready
-def collect_info_stage(ctx: TestRun, nodes: Iterable[INode]) -> None:
- futures = {} # type: Dict[str, Future]
+
+def collect_info_stage(ctx: TestRun, nodes: Iterable[IRPCNode]) -> None:
+ futures = {} # type: Dict[str, Future]]
with ctx.get_pool() as pool:
for node in nodes:
hw_info_path = "hw_info/{}".format(node.node_id())
if hw_info_path not in ctx.storage:
- futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node)
+ futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
sw_info_path = "sw_info/{}".format(node.node_id())
if sw_info_path not in ctx.storage:
@@ -94,174 +95,118 @@
@contextlib.contextmanager
-def suspend_vm_nodes_ctx(unused_nodes: List[INode]) -> Iterator[List[int]]:
+def suspend_vm_nodes_ctx(unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
- pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
- if node.os_vm_id is not None]
+ pausable_nodes_ids = [cast(int, node.info.os_vm_id)
+ for node in unused_nodes
+ if node.info.os_vm_id is not None]
non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
- if 0 != non_pausable:
- logger.warning("Can't pause {} nodes".format(
- non_pausable))
+ if non_pausable:
+ logger.warning("Can't pause {} nodes".format(non_pausable))
- if len(pausable_nodes_ids) != 0:
- logger.debug("Try to pause {} unused nodes".format(
- len(pausable_nodes_ids)))
+ if pausable_nodes_ids:
+ logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
start_vms.pause(pausable_nodes_ids)
try:
yield pausable_nodes_ids
finally:
- if len(pausable_nodes_ids) != 0:
- logger.debug("Unpausing {} nodes".format(
- len(pausable_nodes_ids)))
+ if pausable_nodes_ids:
+ logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
start_vms.unpause(pausable_nodes_ids)
-def generate_result_dir_name(results: str, name: str, params: Dict[str, Any]) -> str:
- # make a directory for results
- all_tests_dirs = os.listdir(results)
-
- if 'name' in params:
- dir_name = "{}_{}".format(name, params['name'])
- else:
- for idx in range(len(all_tests_dirs) + 1):
- dir_name = "{}_{}".format(name, idx)
- if dir_name not in all_tests_dirs:
- break
- else:
- raise utils.StopTestError("Can't select directory for test results")
-
- return os.path.join(results, dir_name)
-
-
-@contextlib.contextmanager
-def sensor_monitoring(sensor_cfg: Any, nodes: Iterable[INode]) -> Iterator[None]:
- # TODO(koder): write this function
- pass
-
-
-def run_tests(cfg: Config,
- test_block: Dict[str, Dict[str, Any]],
- nodes: Iterable[INode]) -> Iterator[Tuple[str, List[Any]]]:
+def run_tests(ctx: TestRun, test_block: ConfigBlock, nodes: List[IRPCNode]) -> None:
"""Run test from test block"""
- test_nodes = [node for node in nodes if 'testnode' in node.roles]
+ test_nodes = [node for node in nodes if 'testnode' in node.info.roles]
- if len(test_nodes) == 0:
+ if not test_nodes:
logger.error("No test nodes found")
return
for name, params in test_block.items():
- results = []
+ vm_count = params.get('node_limit', None) # type: Optional[int]
- # iterate over all node counts
- limit = params.get('node_limit', len(test_nodes))
- if isinstance(limit, int):
- vm_limits = [limit] # type: List[int]
+ # select test nodes
+ if vm_count is None:
+ curr_test_nodes = test_nodes
+ unused_nodes = []
else:
- list_or_tpl = isinstance(limit, (tuple, list))
- all_ints = list_or_tpl and all(isinstance(climit, int)
- for climit in limit)
- if not all_ints:
- msg = "'node_limit' parameter ion config should" + \
- "be either int or list if integers, not {0!r}".format(limit)
- raise ValueError(msg)
- vm_limits = limit # type: List[int]
+ curr_test_nodes = test_nodes[:vm_count]
+ unused_nodes = test_nodes[vm_count:]
- for vm_count in vm_limits:
- # select test nodes
- if vm_count == 'all':
- curr_test_nodes = test_nodes
- unused_nodes = []
- else:
- curr_test_nodes = test_nodes[:vm_count]
- unused_nodes = test_nodes[vm_count:]
+ if not curr_test_nodes:
+ logger.error("No nodes found for test, skipping it.")
+ continue
- if 0 == len(curr_test_nodes):
- continue
+ # results_path = generate_result_dir_name(cfg.results_storage, name, params)
+ # utils.mkdirs_if_unxists(results_path)
- results_path = generate_result_dir_name(cfg.results_storage, name, params)
- utils.mkdirs_if_unxists(results_path)
+ # suspend all unused virtual nodes
+ if ctx.config.get('suspend_unused_vms', True):
+ suspend_ctx = suspend_vm_nodes_ctx(unused_nodes)
+ else:
+ suspend_ctx = utils.empty_ctx()
- # suspend all unused virtual nodes
- if cfg.settings.get('suspend_unused_vms', True):
- suspend_ctx = suspend_vm_nodes_ctx(unused_nodes)
- else:
- suspend_ctx = utils.empty_ctx()
+ with suspend_ctx:
+ resumable_nodes_ids = [cast(int, node.info.os_vm_id)
+ for node in curr_test_nodes
+ if node.info.os_vm_id is not None]
- with suspend_ctx:
- resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
- if node.os_vm_id is not None]
+ if resumable_nodes_ids:
+ logger.debug("Check and unpause {} nodes".format(len(resumable_nodes_ids)))
+ start_vms.unpause(resumable_nodes_ids)
- if len(resumable_nodes_ids) != 0:
- logger.debug("Check and unpause {} nodes".format(
- len(resumable_nodes_ids)))
- start_vms.unpause(resumable_nodes_ids)
+ test_cls = TOOL_TYPE_MAPPER[name]
- test_cls = TOOL_TYPE_MAPPER[name]
+ remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
- remote_dir = cfg.default_test_local_folder.format(name=name)
+ test_cfg = TestConfig(test_cls.__name__,
+ params=params,
+ run_uuid=ctx.config.run_uuid,
+ nodes=test_nodes,
+ storage=ctx.storage,
+ remote_dir=remote_dir)
- test_cfg = TestConfig(test_cls.__name__,
- params=params,
- test_uuid=cfg.run_uuid,
- nodes=test_nodes,
- log_directory=results_path,
- remote_dir=remote_dir)
-
- t_start = time.time()
- res = test_cls(test_cfg).run()
- t_end = time.time()
-
- results.append(res)
-
- yield name, results
+ test_cls(test_cfg).run()
-def connect_stage(cfg: Config, ctx: TestRun) -> None:
+def connect_stage(ctx: TestRun) -> None:
ctx.clear_calls_stack.append(disconnect_stage)
- connect_all(ctx.nodes)
- ctx.nodes = [node for node in ctx.nodes if node.is_connected()]
+
+ with ctx.get_pool() as pool:
+ ctx.nodes = connect_all(ctx.nodes_info, pool, rpc_conn_callback=ctx.before_conn_callback)
-def discover_stage(cfg: Config, ctx: TestRun) -> None:
+def discover_stage(ctx: TestRun) -> None:
"""discover clusters and nodes stage"""
- if cfg.get('discover') is not None:
- discover_objs = [i.strip() for i in cfg.discover.strip().split(",")]
+ discover_info = ctx.config.get('discover')
+ if discover_info:
+ discover_objs = [i.strip() for i in discover_info.strip().split(",")]
- nodes = discover(ctx,
- discover_objs,
- cfg.clouds,
- cfg.results_storage,
- not cfg.dont_discover_nodes)
+ nodes_info = discover.discover(ctx, discover_objs,
+ ctx.config.clouds,
+ ctx.storage,
+ not ctx.config.dont_discover_nodes)
- ctx.nodes.extend(nodes)
+ ctx.nodes_info.extend(nodes_info)
- for url, roles in cfg.get('explicit_nodes', {}).items():
- ctx.nodes.append(Node(url, roles.split(",")))
+ for url, roles in ctx.config.get('explicit_nodes', {}).items():
+ ctx.nodes_info.append(NodeInfo(url, set(roles.split(","))))
-def save_nodes_stage(cfg: Config, ctx: TestRun) -> None:
+def save_nodes_stage(ctx: TestRun) -> None:
"""Save nodes list to file"""
- cluster = {}
- for node in ctx.nodes:
- roles = node.roles[:]
- if 'testnode' in roles:
- roles.remove('testnode')
-
- if len(roles) != 0:
- cluster[node.ssh_conn_url] = roles
-
- with open(cfg.nodes_report_file, "w") as fd:
- fd.write(pretty_yaml.dumps(cluster))
+ ctx.storage['nodes'] = ctx.nodes_info
-def reuse_vms_stage(cfg: Config, ctx: TestRun) -> None:
- vms_patterns = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
- private_key_path = get_vm_keypair(cfg)['keypair_file_private']
+def reuse_vms_stage(ctx: TestRun) -> None:
+ vms_patterns = ctx.config.get('clouds/openstack/vms', [])
+ private_key_path = get_vm_keypair(ctx.config)['keypair_file_private']
for creds in vms_patterns:
user_name, vm_name_pattern = creds.split("@", 1)
@@ -272,7 +217,7 @@
logger.debug(msg)
if not start_vms.is_connected():
- os_creds = get_OS_credentials(cfg, ctx)
+ os_creds = get_OS_credentials(ctx)
else:
os_creds = None
@@ -281,15 +226,16 @@
conn_url = "ssh://{user}@{ip}::{key}".format(user=user_name,
ip=ip,
key=private_key_path)
- node = Node(conn_url, ['testnode'])
- node.os_vm_id = vm_id
- ctx.nodes.append(node)
+ node_info = NodeInfo(conn_url, ['testnode'])
+ node_info.os_vm_id = vm_id
+ ctx.nodes_info.append(node_info)
-def get_OS_credentials(cfg: Config, ctx: TestRun) -> None:
+def get_OS_credentials(ctx: TestRun) -> None:
creds = None
os_creds = None
force_insecure = False
+ cfg = ctx.config
if 'openstack' in cfg.clouds:
os_cfg = cfg.clouds['openstack']
@@ -336,75 +282,65 @@
return creds
-def get_vm_keypair(cfg: Config) -> Dict[str, str]:
- res = {} # type: Dict[str, str]
- for field, ext in (('keypair_file_private', 'pem'),
- ('keypair_file_public', 'pub')):
- fpath = cfg.vm_configs.get(field)
-
- if fpath is None:
- fpath = cfg.vm_configs['keypair_name'] + "." + ext
-
- if os.path.isabs(fpath):
- res[field] = fpath
- else:
- res[field] = os.path.join(cfg.config_folder, fpath)
- return res
+def get_vm_keypair(cfg: Config) -> Tuple[str, str]:
+ key_name = cfg.vm_configs['keypair_name']
+ private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
+ public_path = os.path.join(cfg.settings_dir, key_name + "_public.pub")
+ return (private_path, public_path)
@contextlib.contextmanager
-def create_vms_ctx(ctx: TestRun, cfg: Config, config, already_has_count: int=0) -> Iterator[List[INode]]:
- if config['count'].startswith('='):
- count = int(config['count'][1:])
+def create_vms_ctx(ctx: TestRun, vm_config: ConfigBlock, already_has_count: int = 0) -> Iterator[List[NodeInfo]]:
+ if vm_config['count'].startswith('='):
+ count = int(vm_config['count'][1:])
if count <= already_has_count:
logger.debug("Not need new vms")
yield []
return
- params = cfg.vm_configs[config['cfg_name']].copy()
- os_nodes_ids = []
-
if not start_vms.is_connected():
- os_creds = get_OS_credentials(cfg, ctx)
+ os_creds = get_OS_credentials(ctx)
else:
os_creds = None
nova = start_vms.nova_connect(os_creds)
- params.update(config)
- params.update(get_vm_keypair(cfg))
+ os_nodes_ids = ctx.storage.get('spawned_vm_ids', []) # # type: List[int]
+ new_nodes = [] # type: List[IRPCNode]
- params['group_name'] = cfg.run_uuid
- params['keypair_name'] = cfg.vm_configs['keypair_name']
+ if not os_nodes_ids:
+ params = ctx.config.vm_configs[vm_config['cfg_name']].copy()
+ params.update(vm_config)
+ params.update(get_vm_keypair(ctx.config))
+ params['group_name'] = ctx.config.run_uuid
+ params['keypair_name'] = ctx.config.vm_configs['keypair_name']
- if not config.get('skip_preparation', False):
- logger.info("Preparing openstack")
- start_vms.prepare_os(nova, params, os_creds)
+ if not vm_config.get('skip_preparation', False):
+ logger.info("Preparing openstack")
+ start_vms.prepare_os(nova, params, os_creds)
+ else:
+ # TODO(koder): reconnect to old VM's
+ raise NotImplementedError("Reconnect to old vms is not implemented")
- new_nodes = []
+ already_has_count += len(os_nodes_ids)
old_nodes = ctx.nodes[:]
- try:
- for new_node, node_id in start_vms.launch_vms(nova, params, already_has_count):
- new_node.roles.append('testnode')
- ctx.nodes.append(new_node)
- os_nodes_ids.append(node_id)
- new_nodes.append(new_node)
- store_nodes_in_log(cfg, os_nodes_ids)
- ctx.openstack_nodes_ids = os_nodes_ids
+ for node_info, node_id in start_vms.launch_vms(nova, params, already_has_count):
+ node_info.roles.append('testnode')
+ os_nodes_ids.append(node_id)
+ new_nodes.append(node_info)
+ ctx.storage['spawned_vm_ids'] = os_nodes_ids
- yield new_nodes
+ yield new_nodes
- finally:
- if not cfg.keep_vm:
- shut_down_vms_stage(cfg, ctx)
- ctx.nodes = old_nodes
+ # keep nodes in case of error for future test restart
+ if not ctx.config.keep_vm:
+ shut_down_vms_stage(ctx, os_nodes_ids)
+ ctx.storage['spawned_vm_ids'] = []
-def run_tests_stage(cfg: Config, ctx: TestRun) -> None:
- ctx.results = collections.defaultdict(lambda: [])
-
- for group in cfg.get('tests', []):
+def run_tests_stage(ctx: TestRun) -> None:
+ for group in ctx.config.get('tests', []):
gitems = list(group.items())
if len(gitems) != 1:
msg = "Items in tests section should have len == 1"
@@ -419,159 +355,138 @@
logger.error(msg)
raise utils.StopTestError(msg)
- num_test_nodes = 0
- for node in ctx.nodes:
- if 'testnode' in node.roles:
- num_test_nodes += 1
-
- vm_ctx = create_vms_ctx(ctx, cfg, config['openstack'],
- num_test_nodes)
+ num_test_nodes = len([node for node in ctx.nodes if 'testnode' in node.info.roles])
+ vm_ctx = create_vms_ctx(ctx, config['openstack'], num_test_nodes)
tests = config.get('tests', [])
else:
vm_ctx = utils.empty_ctx([])
tests = [group]
- if cfg.get('sensors') is None:
- sensor_ctx = utils.empty_ctx()
- else:
- sensor_ctx = sensor_monitoring(cfg.get('sensors'), ctx.nodes)
+ with vm_ctx as new_nodes: # type: List[NodeInfo]
+ if new_nodes:
+ with ctx.get_pool() as pool:
+ new_rpc_nodes = connect_all(new_nodes, pool, rpc_conn_callback=ctx.before_conn_callback)
- with vm_ctx as new_nodes:
- if len(new_nodes) != 0:
- connect_all(new_nodes, True)
+ test_nodes = ctx.nodes + new_rpc_nodes
- if not cfg.no_tests:
+ if ctx.config.get('sensors'):
+ sensor_ctx = sensor_monitoring(ctx.config.get('sensors'), test_nodes)
+ else:
+ sensor_ctx = utils.empty_ctx([])
+
+ if not ctx.config.no_tests:
for test_group in tests:
with sensor_ctx:
- it = run_tests(cfg, test_group, ctx.nodes)
- for tp, res in it:
- ctx.results[tp].extend(res)
+ run_tests(ctx, test_group, test_nodes)
+
+ for node in new_rpc_nodes:
+ node.disconnect()
-def shut_down_vms_stage(cfg: Config, ctx: TestRun) -> None:
- vm_ids_fname = cfg.vm_ids_fname
- if ctx.openstack_nodes_ids is None:
- nodes_ids = open(vm_ids_fname).read().split()
- else:
- nodes_ids = ctx.openstack_nodes_ids
-
- if len(nodes_ids) != 0:
+def shut_down_vms_stage(ctx: TestRun, nodes_ids: List[int]) -> None:
+ if nodes_ids:
logger.info("Removing nodes")
start_vms.clear_nodes(nodes_ids)
logger.info("Nodes has been removed")
- if os.path.exists(vm_ids_fname):
- os.remove(vm_ids_fname)
+
+def clear_enviroment(ctx: TestRun) -> None:
+ shut_down_vms_stage(ctx, ctx.storage.get('spawned_vm_ids', []))
+ ctx.storage['spawned_vm_ids'] = []
-def store_nodes_in_log(cfg: Config, nodes_ids: Iterable[str]) -> None:
- with open(cfg.vm_ids_fname, 'w') as fd:
- fd.write("\n".join(nodes_ids))
-
-
-def clear_enviroment(cfg: Config, ctx: TestRun) -> None:
- if os.path.exists(cfg.vm_ids_fname):
- shut_down_vms_stage(cfg, ctx)
-
-
-def disconnect_stage(cfg: Config, ctx: TestRun) -> None:
- ssh_utils.close_all_sessions()
+def disconnect_stage(ctx: TestRun) -> None:
+ # TODO(koder): what next line was for?
+ # ssh_utils.close_all_sessions()
for node in ctx.nodes:
node.disconnect()
-def store_raw_results_stage(cfg: Config, ctx: TestRun) -> None:
- if os.path.exists(cfg.raw_results):
- cont = yaml_load(open(cfg.raw_results).read())
- else:
- cont = []
-
- cont.extend(utils.yamable(ctx.results).items())
- raw_data = pretty_yaml.dumps(cont)
-
- with open(cfg.raw_results, "w") as fd:
- fd.write(raw_data)
+def console_report_stage(ctx: TestRun) -> None:
+ # TODO(koder): load data from storage
+ raise NotImplementedError("...")
+ # first_report = True
+ # text_rep_fname = ctx.config.text_report_file
+ #
+ # with open(text_rep_fname, "w") as fd:
+ # for tp, data in ctx.results.items():
+ # if 'io' == tp and data is not None:
+ # rep_lst = []
+ # for result in data:
+ # rep_lst.append(
+ # IOPerfTest.format_for_console(list(result)))
+ # rep = "\n\n".join(rep_lst)
+ # elif tp in ['mysql', 'pgbench'] and data is not None:
+ # rep = MysqlTest.format_for_console(data)
+ # elif tp == 'omg':
+ # rep = OmgTest.format_for_console(data)
+ # else:
+ # logger.warning("Can't generate text report for " + tp)
+ # continue
+ #
+ # fd.write(rep)
+ # fd.write("\n")
+ #
+ # if first_report:
+ # logger.info("Text report were stored in " + text_rep_fname)
+ # first_report = False
+ #
+ # print("\n" + rep + "\n")
-def console_report_stage(cfg: Config, ctx: TestRun) -> None:
- first_report = True
- text_rep_fname = cfg.text_report_file
- with open(text_rep_fname, "w") as fd:
- for tp, data in ctx.results.items():
- if 'io' == tp and data is not None:
- rep_lst = []
- for result in data:
- rep_lst.append(
- IOPerfTest.format_for_console(list(result)))
- rep = "\n\n".join(rep_lst)
- elif tp in ['mysql', 'pgbench'] and data is not None:
- rep = MysqlTest.format_for_console(data)
- elif tp == 'omg':
- rep = OmgTest.format_for_console(data)
- else:
- logger.warning("Can't generate text report for " + tp)
- continue
+# def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
+# load_rep_fname = cfg.load_report_file
+# found = False
+# for idx, (tp, data) in enumerate(ctx.results.items()):
+# if 'io' == tp and data is not None:
+# if found:
+# logger.error("Making reports for more than one " +
+# "io block isn't supported! All " +
+# "report, except first are skipped")
+# continue
+# found = True
+# report.make_load_report(idx, cfg['results'], load_rep_fname)
+#
+#
- fd.write(rep)
- fd.write("\n")
+def html_report_stage(ctx: TestRun) -> None:
+ # TODO(koder): load data from storage
+ raise NotImplementedError("...")
+ # html_rep_fname = cfg.html_report_file
+ # found = False
+ # for tp, data in ctx.results.items():
+ # if 'io' == tp and data is not None:
+ # if found or len(data) > 1:
+ # logger.error("Making reports for more than one " +
+ # "io block isn't supported! All " +
+ # "report, except first are skipped")
+ # continue
+ # found = True
+ # report.make_io_report(list(data[0]),
+ # cfg.get('comment', ''),
+ # html_rep_fname,
+ # lab_info=ctx.nodes)
- if first_report:
- logger.info("Text report were stored in " + text_rep_fname)
- first_report = False
-
- print("\n" + rep + "\n")
-
-
-def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
- load_rep_fname = cfg.load_report_file
- found = False
- for idx, (tp, data) in enumerate(ctx.results.items()):
- if 'io' == tp and data is not None:
- if found:
- logger.error("Making reports for more than one " +
- "io block isn't supported! All " +
- "report, except first are skipped")
- continue
- found = True
- report.make_load_report(idx, cfg['results'], load_rep_fname)
-
-
-def html_report_stage(cfg: Config, ctx: TestRun) -> None:
- html_rep_fname = cfg.html_report_file
- found = False
- for tp, data in ctx.results.items():
- if 'io' == tp and data is not None:
- if found or len(data) > 1:
- logger.error("Making reports for more than one " +
- "io block isn't supported! All " +
- "report, except first are skipped")
- continue
- found = True
- report.make_io_report(list(data[0]),
- cfg.get('comment', ''),
- html_rep_fname,
- lab_info=ctx.nodes)
-
-
-def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
- files = get_test_files(test_res_dir)
- raw_res = yaml_load(open(files['raw_results']).read())
- res = collections.defaultdict(list)
-
- for tp, test_lists in raw_res:
- for tests in test_lists:
- for suite_name, suite_data in tests.items():
- result_folder = suite_data[0]
- res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
-
- return res
-
-
-def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
- for tp, vals in load_data_from_path(var_dir).items():
- ctx.results.setdefault(tp, []).extend(vals)
-
-
-def load_data_from(var_dir: str) -> Callable[[TestRun], None]:
- return functools.partial(load_data_from_path_stage, var_dir)
+#
+# def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
+# files = get_test_files(test_res_dir)
+# raw_res = yaml_load(open(files['raw_results']).read())
+# res = collections.defaultdict(list)
+#
+# for tp, test_lists in raw_res:
+# for tests in test_lists:
+# for suite_name, suite_data in tests.items():
+# result_folder = suite_data[0]
+# res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
+#
+# return res
+#
+#
+# def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
+# for tp, vals in load_data_from_path(var_dir).items():
+# ctx.results.setdefault(tp, []).extend(vals)
+#
+#
+# def load_data_from(var_dir: str) -> Callable[[TestRun], None]:
+# return functools.partial(load_data_from_path_stage, var_dir)