Move common storage, plot and statistic code to cephlib
diff --git a/wally/utils.py b/wally/utils.py
index a0f10e8..dfd0e8c 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,33 +1,19 @@
-import re
import os
import sys
-import math
-import time
import uuid
-import socket
import logging
import datetime
-import ipaddress
-import threading
import contextlib
-import subprocess
-from fractions import Fraction
-from typing import Any, Tuple, Union, List, Iterator, Iterable, Optional, IO, cast, TypeVar, Callable
-
-try:
- import psutil
-except ImportError:
- psutil = None
+from typing import Any, Tuple, Iterator, Iterable
try:
from petname import Generate as pet_generate
except ImportError:
- def pet_generate(x: str, y: str) -> str:
+ def pet_generate(_1: str, _2: str) -> str:
return str(uuid.uuid4())
-
-from .types import TNumber, Number
+from cephlib.common import run_locally, sec_to_str
logger = logging.getLogger("wally")
@@ -65,63 +51,6 @@
pass
-class Timeout(Iterable[float]):
- def __init__(self, timeout: int, message: str = None, min_tick: int = 1, no_exc: bool = False) -> None:
- self.end_time = time.time() + timeout
- self.message = message
- self.min_tick = min_tick
- self.prev_tick_at = time.time()
- self.no_exc = no_exc
-
- def tick(self) -> bool:
- current_time = time.time()
-
- if current_time > self.end_time:
- if self.message:
- msg = "Timeout: {}".format(self.message)
- else:
- msg = "Timeout"
-
- if self.no_exc:
- return False
-
- raise TimeoutError(msg)
-
- sleep_time = self.min_tick - (current_time - self.prev_tick_at)
- if sleep_time > 0:
- time.sleep(sleep_time)
- self.prev_tick_at = time.time()
- else:
- self.prev_tick_at = current_time
-
- return True
-
- def __iter__(self) -> Iterator[float]:
- return cast(Iterator[float], self)
-
- def __next__(self) -> float:
- if not self.tick():
- raise StopIteration()
- return self.end_time - time.time()
-
-
-def greater_digit_pos(val: Number) -> int:
- return int(math.floor(math.log10(val))) + 1
-
-
-def round_digits(val: TNumber, num_digits: int = 3) -> TNumber:
- pow = 10 ** (greater_digit_pos(val) - num_digits)
- return type(val)(int(val / pow) * pow)
-
-
-def is_ip(data: str) -> bool:
- try:
- ipaddress.ip_address(data)
- return True
- except ValueError:
- return False
-
-
def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
logger.debug("Starts : " + message)
return LogError(message, exc_logger)
@@ -133,212 +62,6 @@
raise StopTestError(message)
-def parse_creds(creds: str) -> Tuple[str, str, str]:
- """Parse simple credentials format user[:passwd]@host"""
- user, passwd_host = creds.split(":", 1)
-
- if '@' not in passwd_host:
- passwd, host = passwd_host, None
- else:
- passwd, host = passwd_host.rsplit('@', 1)
-
- return user, passwd, host
-
-
-SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
-
-
-def ssize2b(ssize: Union[str, int]) -> int:
- try:
- if isinstance(ssize, int):
- return ssize
-
- ssize = ssize.lower()
- if ssize[-1] in SMAP:
- return int(ssize[:-1]) * SMAP[ssize[-1]]
- return int(ssize)
- except (ValueError, TypeError, AttributeError):
- raise ValueError("Unknow size format {!r}".format(ssize))
-
-
-RSMAP = [('K', 1024),
- ('M', 1024 ** 2),
- ('G', 1024 ** 3),
- ('T', 1024 ** 4)]
-
-
-def b2ssize(value: Union[int, float]) -> str:
- if isinstance(value, float) and value < 100:
- return b2ssize_10(value)
-
- value = int(value)
- if value < 1024:
- return str(value) + " "
-
- # make mypy happy
- scale = 1
- name = ""
-
- for name, scale in RSMAP:
- if value < 1024 * scale:
- if value % scale == 0:
- return "{} {}i".format(value // scale, name)
- else:
- return "{:.1f} {}i".format(float(value) / scale, name)
-
- return "{}{}i".format(value // scale, name)
-
-
-RSMAP_10 = [(' f', 0.001 ** 4),
- (' n', 0.001 ** 3),
- (' u', 0.001 ** 2),
- (' m', 0.001),
- (' ', 1),
- (' K', 1000),
- (' M', 1000 ** 2),
- (' G', 1000 ** 3),
- (' T', 1000 ** 4),
- (' P', 1000 ** 5),
- (' E', 1000 ** 6)]
-
-
-def has_next_digit_after_coma(x: float) -> bool:
- return x * 10 - int(x * 10) > 1
-
-
-def has_second_digit_after_coma(x: float) -> bool:
- return (x * 10 - int(x * 10)) * 10 > 1
-
-
-def b2ssize_10(value: Union[int, float]) -> str:
- # make mypy happy
- scale = 1
- name = " "
-
- if value == 0.0:
- return "0 "
-
- if value / RSMAP_10[0][1] < 1.0:
- return "{:.2e} ".format(value)
-
- for name, scale in RSMAP_10:
- cval = value / scale
- if cval < 1000:
- # detect how many digits after dot to show
- if cval > 100:
- return "{}{}".format(int(cval), name)
- if cval > 10:
- if has_next_digit_after_coma(cval):
- return "{:.1f}{}".format(cval, name)
- else:
- return "{}{}".format(int(cval), name)
- if cval >= 1:
- if has_second_digit_after_coma(cval):
- return "{:.2f}{}".format(cval, name)
- elif has_next_digit_after_coma(cval):
- return "{:.1f}{}".format(cval, name)
- return "{}{}".format(int(cval), name)
- raise AssertionError("Can't get here")
-
- return "{}{}".format(int(value // scale), name)
-
-
-def run_locally(cmd: Union[str, List[str]], input_data: str = "", timeout: int = 20) -> str:
- if isinstance(cmd, str):
- shell = True
- cmd_str = cmd
- else:
- shell = False
- cmd_str = " ".join(cmd)
-
- proc = subprocess.Popen(cmd,
- shell=shell,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- res = [] # type: List[Tuple[bytes, bytes]]
-
- def thread_func() -> None:
- rr = proc.communicate(input_data.encode("utf8"))
- res.extend(rr)
-
- thread = threading.Thread(target=thread_func,
- name="Local cmd execution")
- thread.daemon = True
- thread.start()
- thread.join(timeout)
-
- if thread.is_alive():
- if psutil is not None:
- parent = psutil.Process(proc.pid)
- for child in parent.children(recursive=True):
- child.kill()
- parent.kill()
- else:
- proc.kill()
-
- thread.join()
- raise RuntimeError("Local process timeout: " + cmd_str)
-
- stdout_data, stderr_data = zip(*res) # type: List[bytes], List[bytes]
-
- out = b"".join(stdout_data).decode("utf8")
- err = b"".join(stderr_data).decode("utf8")
-
- if 0 != proc.returncode:
- raise subprocess.CalledProcessError(proc.returncode,
- cmd_str, out + err)
-
- return out
-
-
-def get_ip_for_target(target_ip: str) -> str:
- if not is_ip(target_ip):
- target_ip = socket.gethostbyname(target_ip)
-
- first_dig = map(int, target_ip.split("."))
- if first_dig == 127:
- return '127.0.0.1'
-
- data = run_locally('ip route get to'.split(" ") + [target_ip])
-
- rr1 = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
- rr1 = rr1.replace(" ", r'\s+')
- rr1 = rr1.format(target_ip.replace('.', r'\.'))
-
- rr2 = r'{0} dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
- rr2 = rr2.replace(" ", r'\s+')
- rr2 = rr2.format(target_ip.replace('.', r'\.'))
-
- data_line = data.split("\n")[0].strip()
- res1 = re.match(rr1, data_line)
- res2 = re.match(rr2, data_line)
-
- if res1 is not None:
- return res1.group('ip')
-
- if res2 is not None:
- return res2.group('ip')
-
- raise OSError("Can't define interface for {0}".format(target_ip))
-
-
-def open_for_append_or_create(fname: str) -> IO[str]:
- if not os.path.exists(fname):
- return open(fname, "w")
-
- fd = open(fname, 'r+')
- fd.seek(0, os.SEEK_END)
- return fd
-
-
-def sec_to_str(seconds: int) -> str:
- h = seconds // 3600
- m = (seconds % 3600) // 60
- s = seconds % 60
- return "{}:{:02d}:{:02d}".format(h, m, s)
-
-
def yamable(data: Any) -> Any:
if isinstance(data, (tuple, list)):
return map(yamable, data)
@@ -352,16 +75,6 @@
return data
-def flatten(data: Iterable[Any]) -> List[Any]:
- res = []
- for i in data:
- if isinstance(i, (list, tuple, set)):
- res.extend(flatten(i))
- else:
- res.append(i)
- return res
-
-
def get_creds_openrc(path: str) -> Tuple[str, str, str, str, bool]:
fc = open(path).read()
@@ -369,7 +82,7 @@
msg = "Failed to get creads from openrc file"
with LogError(msg):
- data = run_locally(['/bin/bash'], input_data=fc + "\n" + echo)
+ data = run_locally(['/bin/bash'], input_data=(fc + "\n" + echo).encode('utf8')).decode("utf8")
msg = "Failed to get creads from openrc file: " + data
with LogError(msg):
@@ -383,19 +96,6 @@
return user, passwd, tenant, auth_url, insecure
-def which(program: str) -> Optional[str]:
- def is_exe(fpath):
- return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
-
- for path in os.environ["PATH"].split(os.pathsep):
- path = path.strip('"')
- exe_file = os.path.join(path, program)
- if is_exe(exe_file):
- return exe_file
-
- return None
-
-
@contextlib.contextmanager
def empty_ctx(val: Any = None) -> Iterator[Any]:
yield val
@@ -414,17 +114,6 @@
return results_dir, run_uuid
-def to_ip(host_or_ip: str) -> str:
- # translate hostname to address
- try:
- ipaddress.ip_address(host_or_ip)
- return host_or_ip
- except ValueError:
- ip_addr = socket.gethostbyname(host_or_ip)
- logger.info("Will use ip_addr %r instead of hostname %r", ip_addr, host_or_ip)
- return ip_addr
-
-
def get_time_interval_printable_info(seconds: int) -> Tuple[str, str]:
exec_time_s = sec_to_str(seconds)
now_dt = datetime.datetime.now()
@@ -432,68 +121,6 @@
return exec_time_s, "{:%H:%M:%S}".format(end_dt)
-FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
-FM_FUNC_RES = TypeVar("FM_FUNC_RES")
-
-
-def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
- inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
- for val in inp_iter:
- for res in func(val):
- yield res
-
-
-_coefs = {
- 'n': Fraction(1, 1000**3),
- 'u': Fraction(1, 1000**2),
- 'm': Fraction(1, 1000),
- 'K': 1000,
- 'M': 1000 ** 2,
- 'G': 1000 ** 3,
- 'Ki': 1024,
- 'Mi': 1024 ** 2,
- 'Gi': 1024 ** 3,
-}
-
-
-def split_unit(units: str) -> Tuple[Union[Fraction, int], str]:
- if len(units) > 2 and units[:2] in _coefs:
- return _coefs[units[:2]], units[2:]
- if len(units) > 1 and units[0] in _coefs:
- return _coefs[units[0]], units[1:]
- else:
- return 1, units
-
-
-conversion_cache = {}
-
-
-def unit_conversion_coef(from_unit: str, to_unit: str) -> Union[Fraction, int]:
- key = (from_unit, to_unit)
- if key in conversion_cache:
- return conversion_cache[key]
-
- f1, u1 = split_unit(from_unit)
- f2, u2 = split_unit(to_unit)
-
- assert u1 == u2, "Can't convert {!r} to {!r}".format(from_unit, to_unit)
-
- if isinstance(f1, int) and isinstance(f2, int):
- if f1 % f2 != 0:
- res = Fraction(f1, f2)
- else:
- res = f1 // f2
- else:
- res = f1 / f2
-
- if isinstance(res, Fraction) and cast(Fraction, res).denominator == 1:
- res = cast(Fraction, res).numerator
-
- conversion_cache[key] = res
-
- return res
-
-
def shape2str(shape: Iterable[int]) -> str:
return "*".join(map(str, shape))