|  | import re | 
|  | import os | 
|  | import sys | 
|  | import math | 
|  | import time | 
|  | import uuid | 
|  | import socket | 
|  | import logging | 
|  | import datetime | 
|  | import ipaddress | 
|  | import threading | 
|  | import contextlib | 
|  | import subprocess | 
|  | from fractions import Fraction | 
|  |  | 
|  |  | 
|  | from typing import Any, Tuple, Union, List, Iterator, Iterable, Optional, IO, cast, TypeVar, Callable | 
|  |  | 
|  | try: | 
|  | import psutil | 
|  | except ImportError: | 
|  | psutil = None | 
|  |  | 
|  | try: | 
|  | from petname import Generate as pet_generate | 
|  | except ImportError: | 
|  | def pet_generate(x: str, y: str) -> str: | 
|  | return str(uuid.uuid4()) | 
|  |  | 
|  |  | 
|  | logger = logging.getLogger("wally") | 
|  | TNumber = TypeVar('TNumber', int, float) | 
|  | Number = Union[int, float] | 
|  |  | 
|  |  | 
|  | STORAGE_ROLES = {'ceph-osd'} | 
|  |  | 
|  |  | 
|  | class StopTestError(RuntimeError): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class LogError: | 
|  | def __init__(self, message: str, exc_logger: logging.Logger = None) -> None: | 
|  | self.message = message | 
|  | self.exc_logger = exc_logger | 
|  |  | 
|  | def __enter__(self) -> 'LogError': | 
|  | return self | 
|  |  | 
|  | def __exit__(self, tp: type, value: Exception, traceback: Any) -> bool: | 
|  | if value is None or isinstance(value, StopTestError): | 
|  | return False | 
|  |  | 
|  | if self.exc_logger is None: | 
|  | exc_logger = sys._getframe(1).f_globals.get('logger', logger) | 
|  | else: | 
|  | exc_logger = self.exc_logger | 
|  |  | 
|  | exc_logger.exception(self.message, exc_info=(tp, value, traceback)) | 
|  | raise StopTestError(self.message) from value | 
|  |  | 
|  |  | 
|  | class TaskFinished(Exception): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class Timeout(Iterable[float]): | 
|  | def __init__(self, timeout: int, message: str = None, min_tick: int = 1, no_exc: bool = False) -> None: | 
|  | self.end_time = time.time() + timeout | 
|  | self.message = message | 
|  | self.min_tick = min_tick | 
|  | self.prev_tick_at = time.time() | 
|  | self.no_exc = no_exc | 
|  |  | 
|  | def tick(self) -> bool: | 
|  | current_time = time.time() | 
|  |  | 
|  | if current_time > self.end_time: | 
|  | if self.message: | 
|  | msg = "Timeout: {}".format(self.message) | 
|  | else: | 
|  | msg = "Timeout" | 
|  |  | 
|  | if self.no_exc: | 
|  | return False | 
|  |  | 
|  | raise TimeoutError(msg) | 
|  |  | 
|  | sleep_time = self.min_tick - (current_time - self.prev_tick_at) | 
|  | if sleep_time > 0: | 
|  | time.sleep(sleep_time) | 
|  | self.prev_tick_at = time.time() | 
|  | else: | 
|  | self.prev_tick_at = current_time | 
|  |  | 
|  | return True | 
|  |  | 
|  | def __iter__(self) -> Iterator[float]: | 
|  | return cast(Iterator[float], self) | 
|  |  | 
|  | def __next__(self) -> float: | 
|  | if not self.tick(): | 
|  | raise StopIteration() | 
|  | return self.end_time - time.time() | 
|  |  | 
|  |  | 
|  | def greater_digit_pos(val: Number) -> int: | 
|  | return int(math.floor(math.log10(val))) + 1 | 
|  |  | 
|  |  | 
|  | def round_digits(val: TNumber, num_digits: int = 3) -> TNumber: | 
|  | pow = 10 ** (greater_digit_pos(val) - num_digits) | 
|  | return type(val)(int(val / pow) * pow) | 
|  |  | 
|  |  | 
|  | def is_ip(data: str) -> bool: | 
|  | try: | 
|  | ipaddress.ip_address(data) | 
|  | return True | 
|  | except ValueError: | 
|  | return False | 
|  |  | 
|  |  | 
|  | def log_block(message: str, exc_logger:logging.Logger = None) -> LogError: | 
|  | logger.debug("Starts : " + message) | 
|  | return LogError(message, exc_logger) | 
|  |  | 
|  |  | 
|  | def check_input_param(is_ok: bool, message: str) -> None: | 
|  | if not is_ok: | 
|  | logger.error(message) | 
|  | raise StopTestError(message) | 
|  |  | 
|  |  | 
|  | def parse_creds(creds: str) -> Tuple[str, str, str]: | 
|  | """Parse simple credentials format user[:passwd]@host""" | 
|  | user, passwd_host = creds.split(":", 1) | 
|  |  | 
|  | if '@' not in passwd_host: | 
|  | passwd, host = passwd_host, None | 
|  | else: | 
|  | passwd, host = passwd_host.rsplit('@', 1) | 
|  |  | 
|  | return user, passwd, host | 
|  |  | 
|  |  | 
|  | SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4) | 
|  |  | 
|  |  | 
|  | def ssize2b(ssize: Union[str, int]) -> int: | 
|  | try: | 
|  | if isinstance(ssize, int): | 
|  | return ssize | 
|  |  | 
|  | ssize = ssize.lower() | 
|  | if ssize[-1] in SMAP: | 
|  | return int(ssize[:-1]) * SMAP[ssize[-1]] | 
|  | return int(ssize) | 
|  | except (ValueError, TypeError, AttributeError): | 
|  | raise ValueError("Unknow size format {!r}".format(ssize)) | 
|  |  | 
|  |  | 
|  | RSMAP = [('K', 1024), | 
|  | ('M', 1024 ** 2), | 
|  | ('G', 1024 ** 3), | 
|  | ('T', 1024 ** 4)] | 
|  |  | 
|  |  | 
|  | def b2ssize(value: Union[int, float]) -> str: | 
|  | if isinstance(value, float) and value < 100: | 
|  | return b2ssize_10(value) | 
|  |  | 
|  | value = int(value) | 
|  | if value < 1024: | 
|  | return str(value) + " " | 
|  |  | 
|  | # make mypy happy | 
|  | scale = 1 | 
|  | name = "" | 
|  |  | 
|  | for name, scale in RSMAP: | 
|  | if value < 1024 * scale: | 
|  | if value % scale == 0: | 
|  | return "{} {}i".format(value // scale, name) | 
|  | else: | 
|  | return "{:.1f} {}i".format(float(value) / scale, name) | 
|  |  | 
|  | return "{}{}i".format(value // scale, name) | 
|  |  | 
|  |  | 
|  | RSMAP_10 = [(' f', 0.001 ** 4), | 
|  | (' n', 0.001 ** 3), | 
|  | (' u', 0.001 ** 2), | 
|  | (' m', 0.001), | 
|  | (' ', 1), | 
|  | (' K', 1000), | 
|  | (' M', 1000 ** 2), | 
|  | (' G', 1000 ** 3), | 
|  | (' T', 1000 ** 4), | 
|  | (' P', 1000 ** 5), | 
|  | (' E', 1000 ** 6)] | 
|  |  | 
|  |  | 
|  | def has_next_digit_after_coma(x: float) -> bool: | 
|  | return x * 10 - int(x * 10) > 1 | 
|  |  | 
|  |  | 
|  | def has_second_digit_after_coma(x: float) -> bool: | 
|  | return (x * 10 - int(x * 10)) * 10 > 1 | 
|  |  | 
|  |  | 
|  | def b2ssize_10(value: Union[int, float]) -> str: | 
|  | # make mypy happy | 
|  | scale = 1 | 
|  | name = " " | 
|  |  | 
|  | if value == 0.0: | 
|  | return "0 " | 
|  |  | 
|  | if value / RSMAP_10[0][1] < 1.0: | 
|  | return "{:.2e} ".format(value) | 
|  |  | 
|  | for name, scale in RSMAP_10: | 
|  | cval = value / scale | 
|  | if cval < 1000: | 
|  | # detect how many digits after dot to show | 
|  | if cval > 100: | 
|  | return "{}{}".format(int(cval), name) | 
|  | if cval > 10: | 
|  | if has_next_digit_after_coma(cval): | 
|  | return "{:.1f}{}".format(cval, name) | 
|  | else: | 
|  | return "{}{}".format(int(cval), name) | 
|  | if cval >= 1: | 
|  | if has_second_digit_after_coma(cval): | 
|  | return "{:.2f}{}".format(cval, name) | 
|  | elif has_next_digit_after_coma(cval): | 
|  | return "{:.1f}{}".format(cval, name) | 
|  | return "{}{}".format(int(cval), name) | 
|  | raise AssertionError("Can't get here") | 
|  |  | 
|  | return "{}{}".format(int(value // scale), name) | 
|  |  | 
|  |  | 
|  | def run_locally(cmd: Union[str, List[str]], input_data: str = "", timeout: int = 20) -> str: | 
|  | if isinstance(cmd, str): | 
|  | shell = True | 
|  | cmd_str = cmd | 
|  | else: | 
|  | shell = False | 
|  | cmd_str = " ".join(cmd) | 
|  |  | 
|  | proc = subprocess.Popen(cmd, | 
|  | shell=shell, | 
|  | stdin=subprocess.PIPE, | 
|  | stdout=subprocess.PIPE, | 
|  | stderr=subprocess.PIPE) | 
|  | res = []  # type: List[Tuple[bytes, bytes]] | 
|  |  | 
|  | def thread_func() -> None: | 
|  | rr = proc.communicate(input_data.encode("utf8")) | 
|  | res.extend(rr) | 
|  |  | 
|  | thread = threading.Thread(target=thread_func, | 
|  | name="Local cmd execution") | 
|  | thread.daemon = True | 
|  | thread.start() | 
|  | thread.join(timeout) | 
|  |  | 
|  | if thread.is_alive(): | 
|  | if psutil is not None: | 
|  | parent = psutil.Process(proc.pid) | 
|  | for child in parent.children(recursive=True): | 
|  | child.kill() | 
|  | parent.kill() | 
|  | else: | 
|  | proc.kill() | 
|  |  | 
|  | thread.join() | 
|  | raise RuntimeError("Local process timeout: " + cmd_str) | 
|  |  | 
|  | stdout_data, stderr_data = zip(*res)  # type: List[bytes], List[bytes] | 
|  |  | 
|  | out = b"".join(stdout_data).decode("utf8") | 
|  | err = b"".join(stderr_data).decode("utf8") | 
|  |  | 
|  | if 0 != proc.returncode: | 
|  | raise subprocess.CalledProcessError(proc.returncode, | 
|  | cmd_str, out + err) | 
|  |  | 
|  | return out | 
|  |  | 
|  |  | 
|  | def get_ip_for_target(target_ip: str) -> str: | 
|  | if not is_ip(target_ip): | 
|  | target_ip = socket.gethostbyname(target_ip) | 
|  |  | 
|  | first_dig = map(int, target_ip.split(".")) | 
|  | if first_dig == 127: | 
|  | return '127.0.0.1' | 
|  |  | 
|  | data = run_locally('ip route get to'.split(" ") + [target_ip]) | 
|  |  | 
|  | rr1 = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$' | 
|  | rr1 = rr1.replace(" ", r'\s+') | 
|  | rr1 = rr1.format(target_ip.replace('.', r'\.')) | 
|  |  | 
|  | rr2 = r'{0} dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$' | 
|  | rr2 = rr2.replace(" ", r'\s+') | 
|  | rr2 = rr2.format(target_ip.replace('.', r'\.')) | 
|  |  | 
|  | data_line = data.split("\n")[0].strip() | 
|  | res1 = re.match(rr1, data_line) | 
|  | res2 = re.match(rr2, data_line) | 
|  |  | 
|  | if res1 is not None: | 
|  | return res1.group('ip') | 
|  |  | 
|  | if res2 is not None: | 
|  | return res2.group('ip') | 
|  |  | 
|  | raise OSError("Can't define interface for {0}".format(target_ip)) | 
|  |  | 
|  |  | 
|  | def open_for_append_or_create(fname: str) -> IO[str]: | 
|  | if not os.path.exists(fname): | 
|  | return open(fname, "w") | 
|  |  | 
|  | fd = open(fname, 'r+') | 
|  | fd.seek(0, os.SEEK_END) | 
|  | return fd | 
|  |  | 
|  |  | 
|  | def sec_to_str(seconds: int) -> str: | 
|  | h = seconds // 3600 | 
|  | m = (seconds % 3600) // 60 | 
|  | s = seconds % 60 | 
|  | return "{}:{:02d}:{:02d}".format(h, m, s) | 
|  |  | 
|  |  | 
|  | def yamable(data: Any) -> Any: | 
|  | if isinstance(data, (tuple, list)): | 
|  | return map(yamable, data) | 
|  |  | 
|  | if isinstance(data, dict): | 
|  | res = {} | 
|  | for k, v in data.items(): | 
|  | res[yamable(k)] = yamable(v) | 
|  | return res | 
|  |  | 
|  | return data | 
|  |  | 
|  |  | 
|  | def flatten(data: Iterable[Any]) -> List[Any]: | 
|  | res = [] | 
|  | for i in data: | 
|  | if isinstance(i, (list, tuple, set)): | 
|  | res.extend(flatten(i)) | 
|  | else: | 
|  | res.append(i) | 
|  | return res | 
|  |  | 
|  |  | 
|  | def get_creds_openrc(path: str) -> Tuple[str, str, str, str, bool]: | 
|  | fc = open(path).read() | 
|  |  | 
|  | echo = 'echo "$OS_INSECURE:$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"' | 
|  |  | 
|  | msg = "Failed to get creads from openrc file" | 
|  | with LogError(msg): | 
|  | data = run_locally(['/bin/bash'], input_data=fc + "\n" + echo) | 
|  |  | 
|  | msg = "Failed to get creads from openrc file: " + data | 
|  | with LogError(msg): | 
|  | data = data.strip() | 
|  | insecure_str, user, tenant, passwd_auth_url = data.split(':', 3) | 
|  | insecure = (insecure_str in ('1', 'True', 'true')) | 
|  | passwd, auth_url = passwd_auth_url.rsplit("@", 1) | 
|  | assert (auth_url.startswith("https://") or | 
|  | auth_url.startswith("http://")) | 
|  |  | 
|  | return user, passwd, tenant, auth_url, insecure | 
|  |  | 
|  |  | 
|  | def which(program: str) -> Optional[str]: | 
|  | def is_exe(fpath): | 
|  | return os.path.isfile(fpath) and os.access(fpath, os.X_OK) | 
|  |  | 
|  | for path in os.environ["PATH"].split(os.pathsep): | 
|  | path = path.strip('"') | 
|  | exe_file = os.path.join(path, program) | 
|  | if is_exe(exe_file): | 
|  | return exe_file | 
|  |  | 
|  | return None | 
|  |  | 
|  |  | 
|  | @contextlib.contextmanager | 
|  | def empty_ctx(val: Any = None) -> Iterator[Any]: | 
|  | yield val | 
|  |  | 
|  |  | 
|  | def get_uniq_path_uuid(path: str, max_iter: int = 10) -> Tuple[str, str]: | 
|  | for i in range(max_iter): | 
|  | run_uuid = pet_generate(2, "_") | 
|  | results_dir = os.path.join(path, run_uuid) | 
|  | if not os.path.exists(results_dir): | 
|  | break | 
|  | else: | 
|  | run_uuid = str(uuid.uuid4()) | 
|  | results_dir = os.path.join(path, run_uuid) | 
|  |  | 
|  | return results_dir, run_uuid | 
|  |  | 
|  |  | 
|  | def to_ip(host_or_ip: str) -> str: | 
|  | # translate hostname to address | 
|  | try: | 
|  | ipaddress.ip_address(host_or_ip) | 
|  | return host_or_ip | 
|  | except ValueError: | 
|  | ip_addr = socket.gethostbyname(host_or_ip) | 
|  | logger.info("Will use ip_addr %r instead of hostname %r", ip_addr, host_or_ip) | 
|  | return ip_addr | 
|  |  | 
|  |  | 
|  | def get_time_interval_printable_info(seconds: int) -> Tuple[str, str]: | 
|  | exec_time_s = sec_to_str(seconds) | 
|  | now_dt = datetime.datetime.now() | 
|  | end_dt = now_dt + datetime.timedelta(0, seconds) | 
|  | return exec_time_s, "{:%H:%M:%S}".format(end_dt) | 
|  |  | 
|  |  | 
|  | FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT") | 
|  | FM_FUNC_RES = TypeVar("FM_FUNC_RES") | 
|  |  | 
|  |  | 
|  | def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]], | 
|  | inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]: | 
|  | for val in inp_iter: | 
|  | for res in func(val): | 
|  | yield res | 
|  |  | 
|  |  | 
|  | _coefs = { | 
|  | 'n': Fraction(1, 1000**3), | 
|  | 'u': Fraction(1, 1000**2), | 
|  | 'm': Fraction(1, 1000), | 
|  | 'K': 1000, | 
|  | 'M': 1000 ** 2, | 
|  | 'G': 1000 ** 3, | 
|  | 'Ki': 1024, | 
|  | 'Mi': 1024 ** 2, | 
|  | 'Gi': 1024 ** 3, | 
|  | } | 
|  |  | 
|  |  | 
|  | def split_unit(units: str) -> Tuple[Union[Fraction, int], str]: | 
|  | if len(units) > 2 and units[:2] in _coefs: | 
|  | return _coefs[units[:2]], units[2:] | 
|  | if len(units) > 1 and units[0] in _coefs: | 
|  | return _coefs[units[0]], units[1:] | 
|  | else: | 
|  | return 1, units | 
|  |  | 
|  |  | 
|  | conversion_cache = {} | 
|  |  | 
|  |  | 
|  | def unit_conversion_coef(from_unit: str, to_unit: str) -> Union[Fraction, int]: | 
|  | key = (from_unit, to_unit) | 
|  | if key in conversion_cache: | 
|  | return conversion_cache[key] | 
|  |  | 
|  | f1, u1 = split_unit(from_unit) | 
|  | f2, u2 = split_unit(to_unit) | 
|  |  | 
|  | assert u1 == u2, "Can't convert {!r} to {!r}".format(from_unit, to_unit) | 
|  |  | 
|  | if isinstance(f1, int) and isinstance(f2, int): | 
|  | if f1 % f2 != 0: | 
|  | res = Fraction(f1, f2) | 
|  | else: | 
|  | res = f1 // f2 | 
|  | else: | 
|  | res = f1 / f2 | 
|  |  | 
|  | if isinstance(res, Fraction) and cast(Fraction, res).denominator == 1: | 
|  | res = cast(Fraction, res).numerator | 
|  |  | 
|  | conversion_cache[key] = res | 
|  |  | 
|  | return res | 
|  |  | 
|  |  | 
|  | def shape2str(shape: Iterable[int]) -> str: | 
|  | return "*".join(map(str, shape)) | 
|  |  | 
|  |  | 
|  | def str2shape(shape: str) -> Tuple[int, ...]: | 
|  | return tuple(map(int, shape.split('*'))) |