2.0 refactoring:
* Add type for most of functions
* Remove old fio run code, move to RPC/pluggable
* Remove most of sensors code, will move then to RPC
* Other refactoring
diff --git a/wally/run_test.py b/wally/run_test.py
index f99f445..4390f5e 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,7 +5,7 @@
import functools
import contextlib
import collections
-
+from typing import List, Dict, Optional, Iterable, Any, Generator, Mapping, Callable
from yaml import load as _yaml_load
try:
@@ -14,19 +14,21 @@
except ImportError:
yaml_load = _yaml_load
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, wait
-from wally.hw_info import get_hw_info
-from wally.config import get_test_files
-from wally.discover import discover, Node
-from wally import pretty_yaml, utils, report, ssh_utils, start_vms
-from wally.sensors_utils import with_sensors_util, sensors_info_util
+from .config import Config
+from .config import get_test_files
+from .discover import discover, Node
+from .inode import INode
+from .test_run_class import TestRun
-from wally.suits.mysql import MysqlTest
-from wally.suits.itest import TestConfig
-from wally.suits.io.fio import IOPerfTest
-from wally.suits.postgres import PgBenchTest
-from wally.suits.omgbench import OmgTest
+from . import pretty_yaml, utils, report, ssh_utils, start_vms
+
+from .suits.mysql import MysqlTest
+from .suits.itest import TestConfig
+from .suits.io.fio import IOPerfTest
+from .suits.postgres import PgBenchTest
+from .suits.omgbench import OmgTest
TOOL_TYPE_MAPPER = {
@@ -40,9 +42,8 @@
logger = logging.getLogger("wally")
-def connect_all(nodes, spawned_node=False):
- """
- Connect to all nodes, log errors
+def connect_all(nodes: Iterable[INode], spawned_node: Optional[bool]=False) -> None:
+ """Connect to all nodes, log errors
nodes:[Node] - list of nodes
spawned_node:bool - whenever nodes is newly spawned VM
"""
@@ -51,93 +52,87 @@
conn_timeout = 240 if spawned_node else 30
- def connect_ext(conn_url):
+ def connect_ext(node: INode) -> bool:
try:
- return ssh_utils.connect(conn_url, conn_timeout=conn_timeout)
+ node.connect_ssh(conn_timeout)
+ node.connect_rpc(conn_timeout)
+ return True
except Exception as exc:
- logger.error("During connect to {0}: {1!s}".format(conn_url, exc))
- return None
-
- urls = []
- ssh_pref = "ssh://"
-
- for node in nodes:
- if node.conn_url == 'local':
- urls.append(node.conn_url)
- elif node.conn_url.startswith(ssh_pref):
- urls.append(node.conn_url[len(ssh_pref):])
- else:
- msg = "Unknown url type {0}".format(node.conn_url)
- logger.error(msg)
- raise utils.StopTestError(msg)
+ logger.error("During connect to {}: {!s}".format(node, exc))
+ return False
with ThreadPoolExecutor(32) as pool:
- for node, conn in zip(nodes, pool.map(connect_ext, urls)):
- node.connection = conn
+ list(pool.map(connect_ext, nodes))
failed_testnodes = []
failed_nodes = []
for node in nodes:
- if node.connection is None:
+ if not node.is_connected():
if 'testnode' in node.roles:
- failed_testnodes.append(node.get_conn_id())
+ failed_testnodes.append(node)
else:
- failed_nodes.append(node.get_conn_id())
+ failed_nodes.append(node)
- if failed_nodes != []:
- msg = "Node(s) {0} would be excluded - can't connect"
- logger.warning(msg.format(",".join(failed_nodes)))
+ if failed_nodes:
+ msg = "Node(s) {} would be excluded - can't connect"
+ logger.warning(msg.format(",".join(map(str, failed_nodes))))
- if failed_testnodes != []:
- msg = "Can't connect to testnode(s) " + ",".join(failed_testnodes)
+ if failed_testnodes:
+ msg = "Can't connect to testnode(s) " + \
+ ",".join(map(str, failed_testnodes))
logger.error(msg)
raise utils.StopTestError(msg)
- if len(failed_nodes) == 0:
+ if not failed_nodes:
logger.info("All nodes connected successfully")
-def collect_hw_info_stage(cfg, ctx):
- if os.path.exists(cfg['hwreport_fname']):
+def collect_hw_info_stage(cfg: Config, nodes: Iterable[INode]) -> None:
+ # TODO(koder): rewrite this function, to use other storage type
+ if os.path.exists(cfg.hwreport_fname):
msg = "{0} already exists. Skip hw info"
logger.info(msg.format(cfg['hwreport_fname']))
return
with ThreadPoolExecutor(32) as pool:
- connections = (node.connection for node in ctx.nodes)
- ctx.hw_info.extend(pool.map(get_hw_info, connections))
+ fitures = pool.submit(node.discover_hardware_info
+ for node in nodes)
+ wait(fitures)
- with open(cfg['hwreport_fname'], 'w') as hwfd:
- for node, info in zip(ctx.nodes, ctx.hw_info):
+ with open(cfg.hwreport_fname, 'w') as hwfd:
+ for node in nodes:
hwfd.write("-" * 60 + "\n")
hwfd.write("Roles : " + ", ".join(node.roles) + "\n")
- hwfd.write(str(info) + "\n")
+ hwfd.write(str(node.hwinfo) + "\n")
hwfd.write("-" * 60 + "\n\n")
- if info.hostname is not None:
+ if node.hwinfo.hostname is not None:
fname = os.path.join(
cfg.hwinfo_directory,
- info.hostname + "_lshw.xml")
+ node.hwinfo.hostname + "_lshw.xml")
with open(fname, "w") as fd:
- fd.write(info.raw)
- logger.info("Hardware report stored in " + cfg['hwreport_fname'])
- logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
+ fd.write(node.hwinfo.raw)
+
+ logger.info("Hardware report stored in " + cfg.hwreport_fname)
+ logger.debug("Raw hardware info in " + cfg.hwinfo_directory + " folder")
@contextlib.contextmanager
-def suspend_vm_nodes_ctx(unused_nodes):
+def suspend_vm_nodes_ctx(unused_nodes: Iterable[INode]) -> Generator[List[int]]:
+
pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
if node.os_vm_id is not None]
+
non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
if 0 != non_pausable:
- logger.warning("Can't pause {0} nodes".format(
+ logger.warning("Can't pause {} nodes".format(
non_pausable))
if len(pausable_nodes_ids) != 0:
- logger.debug("Try to pause {0} unused nodes".format(
+ logger.debug("Try to pause {} unused nodes".format(
len(pausable_nodes_ids)))
start_vms.pause(pausable_nodes_ids)
@@ -145,20 +140,20 @@
yield pausable_nodes_ids
finally:
if len(pausable_nodes_ids) != 0:
- logger.debug("Unpausing {0} nodes".format(
+ logger.debug("Unpausing {} nodes".format(
len(pausable_nodes_ids)))
start_vms.unpause(pausable_nodes_ids)
-def generate_result_dir_name(results, name, params):
+def generate_result_dir_name(results: str, name: str, params: Dict[str, Any]) -> str:
# make a directory for results
all_tests_dirs = os.listdir(results)
if 'name' in params:
- dir_name = "{0}_{1}".format(name, params['name'])
+ dir_name = "{}_{}".format(name, params['name'])
else:
for idx in range(len(all_tests_dirs) + 1):
- dir_name = "{0}_{1}".format(name, idx)
+ dir_name = "{}_{}".format(name, idx)
if dir_name not in all_tests_dirs:
break
else:
@@ -167,7 +162,13 @@
return os.path.join(results, dir_name)
-def run_tests(cfg, test_block, nodes):
+@contextlib.contextmanager
+def sensor_monitoring(sensor_cfg: Any, nodes: Iterable[INode]) -> Generator[None]:
+ # TODO(koder): write this function
+ pass
+
+
+def run_tests(cfg: Config, test_block, nodes: Iterable[INode]) -> None:
"""
Run test from test block
"""
@@ -183,11 +184,11 @@
# iterate over all node counts
limit = params.get('node_limit', len(test_nodes))
- if isinstance(limit, (int, long)):
+ if isinstance(limit, int):
vm_limits = [limit]
else:
list_or_tpl = isinstance(limit, (tuple, list))
- all_ints = list_or_tpl and all(isinstance(climit, (int, long))
+ all_ints = list_or_tpl and all(isinstance(climit, int)
for climit in limit)
if not all_ints:
msg = "'node_limit' parameter ion config should" + \
@@ -221,50 +222,39 @@
if node.os_vm_id is not None]
if len(resumable_nodes_ids) != 0:
- logger.debug("Check and unpause {0} nodes".format(
+ logger.debug("Check and unpause {} nodes".format(
len(resumable_nodes_ids)))
start_vms.unpause(resumable_nodes_ids)
- sens_nodes = curr_test_nodes + not_test_nodes
- with sensors_info_util(cfg, sens_nodes) as sensor_data:
- test_cls = TOOL_TYPE_MAPPER[name]
+ test_cls = TOOL_TYPE_MAPPER[name]
- remote_dir = cfg.default_test_local_folder.format(name=name)
+ remote_dir = cfg.default_test_local_folder.format(name=name)
- test_cfg = TestConfig(test_cls.__name__,
- params=params,
- test_uuid=cfg.run_uuid,
- nodes=test_nodes,
- log_directory=results_path,
- remote_dir=remote_dir)
+ test_cfg = TestConfig(test_cls.__name__,
+ params=params,
+ test_uuid=cfg.run_uuid,
+ nodes=test_nodes,
+ log_directory=results_path,
+ remote_dir=remote_dir)
- t_start = time.time()
- res = test_cls(test_cfg).run()
- t_end = time.time()
-
- # save sensor data
- if sensor_data is not None:
- fname = "{0}_{1}.csv".format(int(t_start), int(t_end))
- fpath = os.path.join(cfg.sensor_storage, fname)
-
- with open(fpath, "w") as fd:
- fd.write("\n\n".join(sensor_data))
+ t_start = time.time()
+ res = test_cls(test_cfg).run()
+ t_end = time.time()
results.append(res)
yield name, results
-def connect_stage(cfg, ctx):
+def connect_stage(cfg: Config, ctx: TestRun) -> None:
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
ctx.nodes = [node for node in ctx.nodes if node.connection is not None]
-def discover_stage(cfg, ctx):
- """
- discover clusters and nodes stage
- """
+def discover_stage(cfg: Config, ctx: TestRun) -> None:
+ """discover clusters and nodes stage"""
+
if cfg.get('discover') is not None:
discover_objs = [i.strip() for i in cfg.discover.strip().split(",")]
@@ -280,7 +270,8 @@
ctx.nodes.append(Node(url, roles.split(",")))
-def save_nodes_stage(cfg, ctx):
+def save_nodes_stage(cfg: Config, ctx: TestRun) -> None:
+ """Save nodes list to file"""
cluster = {}
for node in ctx.nodes:
roles = node.roles[:]
@@ -294,15 +285,15 @@
fd.write(pretty_yaml.dumps(cluster))
-def reuse_vms_stage(cfg, ctx):
+def reuse_vms_stage(cfg: Config, ctx: TestRun) -> None:
vms_patterns = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
private_key_path = get_vm_keypair(cfg)['keypair_file_private']
for creds in vms_patterns:
user_name, vm_name_pattern = creds.split("@", 1)
- msg = "Vm like {0} lookup failed".format(vm_name_pattern)
+ msg = "Vm like {} lookup failed".format(vm_name_pattern)
- with utils.log_error(msg):
+ with utils.LogError(msg):
msg = "Looking for vm with name like {0}".format(vm_name_pattern)
logger.debug(msg)
@@ -321,7 +312,7 @@
ctx.nodes.append(node)
-def get_OS_credentials(cfg, ctx):
+def get_OS_credentials(cfg: Config, ctx: TestRun) -> None:
creds = None
os_creds = None
force_insecure = False
@@ -371,7 +362,7 @@
return creds
-def get_vm_keypair(cfg):
+def get_vm_keypair(cfg: Config) -> Dict[str, str]:
res = {}
for field, ext in (('keypair_file_private', 'pem'),
('keypair_file_public', 'pub')):
@@ -388,7 +379,7 @@
@contextlib.contextmanager
-def create_vms_ctx(ctx, cfg, config, already_has_count=0):
+def create_vms_ctx(ctx: TestRun, cfg: Config, config, already_has_count: int=0) -> Generator[List[INode]]:
if config['count'].startswith('='):
count = int(config['count'][1:])
if count <= already_has_count:
@@ -436,7 +427,7 @@
ctx.nodes = old_nodes
-def run_tests_stage(cfg, ctx):
+def run_tests_stage(cfg: Config, ctx: TestRun) -> None:
ctx.results = collections.defaultdict(lambda: [])
for group in cfg.get('tests', []):
@@ -469,7 +460,7 @@
if cfg.get('sensors') is None:
sensor_ctx = utils.empty_ctx()
else:
- sensor_ctx = with_sensors_util(cfg.get('sensors'), ctx.nodes)
+ sensor_ctx = sensor_monitoring(cfg.get('sensors'), ctx.nodes)
with vm_ctx as new_nodes:
if len(new_nodes) != 0:
@@ -482,7 +473,7 @@
ctx.results[tp].extend(res)
-def shut_down_vms_stage(cfg, ctx):
+def shut_down_vms_stage(cfg: Config, ctx: TestRun) -> None:
vm_ids_fname = cfg.vm_ids_fname
if ctx.openstack_nodes_ids is None:
nodes_ids = open(vm_ids_fname).read().split()
@@ -498,17 +489,17 @@
os.remove(vm_ids_fname)
-def store_nodes_in_log(cfg, nodes_ids):
+def store_nodes_in_log(cfg: Config, nodes_ids: Iterable[str]):
with open(cfg.vm_ids_fname, 'w') as fd:
fd.write("\n".join(nodes_ids))
-def clear_enviroment(cfg, ctx):
+def clear_enviroment(cfg: Config, ctx: TestRun) -> None:
if os.path.exists(cfg.vm_ids_fname):
shut_down_vms_stage(cfg, ctx)
-def disconnect_stage(cfg, ctx):
+def disconnect_stage(cfg: Config, ctx: TestRun) -> None:
ssh_utils.close_all_sessions()
for node in ctx.nodes:
@@ -516,7 +507,7 @@
node.connection.close()
-def store_raw_results_stage(cfg, ctx):
+def store_raw_results_stage(cfg: Config, ctx: TestRun) -> None:
if os.path.exists(cfg.raw_results):
cont = yaml_load(open(cfg.raw_results).read())
else:
@@ -529,7 +520,7 @@
fd.write(raw_data)
-def console_report_stage(cfg, ctx):
+def console_report_stage(cfg: Config, ctx: TestRun) -> None:
first_report = True
text_rep_fname = cfg.text_report_file
with open(text_rep_fname, "w") as fd:
@@ -558,7 +549,7 @@
print("\n" + rep + "\n")
-def test_load_report_stage(cfg, ctx):
+def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
load_rep_fname = cfg.load_report_file
found = False
for idx, (tp, data) in enumerate(ctx.results.items()):
@@ -572,7 +563,7 @@
report.make_load_report(idx, cfg['results'], load_rep_fname)
-def html_report_stage(cfg, ctx):
+def html_report_stage(cfg: Config, ctx: TestRun) -> None:
html_rep_fname = cfg.html_report_file
found = False
for tp, data in ctx.results.items():
@@ -589,10 +580,10 @@
lab_info=ctx.hw_info)
-def load_data_from_path(test_res_dir):
+def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
files = get_test_files(test_res_dir)
raw_res = yaml_load(open(files['raw_results']).read())
- res = collections.defaultdict(lambda: [])
+ res = collections.defaultdict(list)
for tp, test_lists in raw_res:
for tests in test_lists:
@@ -603,10 +594,10 @@
return res
-def load_data_from_path_stage(var_dir, _, ctx):
+def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
for tp, vals in load_data_from_path(var_dir).items():
ctx.results.setdefault(tp, []).extend(vals)
-def load_data_from(var_dir):
+def load_data_from(var_dir: str) -> Callable:
return functools.partial(load_data_from_path_stage, var_dir)