2.0 refactoring:
* Add type for most of functions
* Remove old fio run code, move to RPC/pluggable
* Remove most of sensors code, will move then to RPC
* Other refactoring
diff --git a/wally/utils.py b/wally/utils.py
index 3fba2b0..32c9056 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,13 +1,18 @@
import re
import os
+import io
import sys
import socket
import logging
+import ipaddress
import threading
import contextlib
import subprocess
import collections
+from .inode import INode
+from typing import Any, Tuple, Union, List, Generator, Dict, Callable, Iterable, Optional
+
try:
import psutil
except ImportError:
@@ -17,48 +22,27 @@
logger = logging.getLogger("wally")
-def is_ip(data):
- if data.count('.') != 3:
- return False
-
+def is_ip(data: str) -> bool:
try:
- for part in map(int, data.split('.')):
- if part > 255 or part < 0:
- raise ValueError()
+ ipaddress.ip_address(data)
+ return True
except ValueError:
return False
- return True
class StopTestError(RuntimeError):
- def __init__(self, reason, orig_exc=None):
- RuntimeError.__init__(self, reason)
- self.orig_exc = orig_exc
+ pass
-@contextlib.contextmanager
-def log_block(message, exc_logger=None):
- logger.debug("Starts : " + message)
- with log_error(message, exc_logger):
- yield
- # try:
- # yield
- # except Exception as exc:
- # if isinstance(exc, types) and not isinstance(exc, StopIteration):
- # templ = "Error during {0} stage: {1!s}"
- # logger.debug(templ.format(action, exc))
- # raise
-
-
-class log_error(object):
- def __init__(self, message, exc_logger=None):
+class LogError:
+ def __init__(self, message: str, exc_logger=None):
self.message = message
self.exc_logger = exc_logger
def __enter__(self):
return self
- def __exit__(self, tp, value, traceback):
+ def __exit__(self, tp: type, value: Exception, traceback: Any):
if value is None or isinstance(value, StopTestError):
return
@@ -71,13 +55,18 @@
raise StopTestError(self.message, value)
-def check_input_param(is_ok, message):
+def log_block(message: str, exc_logger=None) -> LogError:
+ logger.debug("Starts : " + message)
+ return LogError(message, exc_logger)
+
+
+def check_input_param(is_ok: bool, message: str) -> None:
if not is_ok:
logger.error(message)
raise StopTestError(message)
-def parse_creds(creds):
+def parse_creds(creds: str) -> Tuple[str, str, str]:
# parse user:passwd@host
user, passwd_host = creds.split(":", 1)
@@ -93,14 +82,14 @@
pass
-class Barrier(object):
- def __init__(self, count):
+class Barrier:
+ def __init__(self, count: int):
self.count = count
self.curr_count = 0
self.cond = threading.Condition()
self.exited = False
- def wait(self, timeout=None):
+ def wait(self, timeout: int=None) -> bool:
with self.cond:
if self.exited:
raise TaksFinished()
@@ -114,7 +103,7 @@
self.cond.wait(timeout=timeout)
return False
- def exit(self):
+ def exit(self) -> None:
with self.cond:
self.exited = True
@@ -122,9 +111,9 @@
SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
-def ssize2b(ssize):
+def ssize2b(ssize: Union[str, int]) -> int:
try:
- if isinstance(ssize, (int, long)):
+ if isinstance(ssize, int):
return ssize
ssize = ssize.lower()
@@ -132,7 +121,7 @@
return int(ssize[:-1]) * SMAP[ssize[-1]]
return int(ssize)
except (ValueError, TypeError, AttributeError):
- raise ValueError("Unknow size format {0!r}".format(ssize))
+ raise ValueError("Unknow size format {!r}".format(ssize))
RSMAP = [('K', 1024),
@@ -141,18 +130,18 @@
('T', 1024 ** 4)]
-def b2ssize(size):
+def b2ssize(size: int) -> str:
if size < 1024:
return str(size)
for name, scale in RSMAP:
if size < 1024 * scale:
if size % scale == 0:
- return "{0} {1}i".format(size // scale, name)
+ return "{} {}i".format(size // scale, name)
else:
- return "{0:.1f} {1}i".format(float(size) / scale, name)
+ return "{:.1f} {}i".format(float(size) / scale, name)
- return "{0}{1}i".format(size // scale, name)
+ return "{}{}i".format(size // scale, name)
RSMAP_10 = [('k', 1000),
@@ -161,22 +150,22 @@
('t', 1000 ** 4)]
-def b2ssize_10(size):
+def b2ssize_10(size: int) -> str:
if size < 1000:
return str(size)
for name, scale in RSMAP_10:
if size < 1000 * scale:
if size % scale == 0:
- return "{0} {1}".format(size // scale, name)
+ return "{} {}".format(size // scale, name)
else:
- return "{0:.1f} {1}".format(float(size) / scale, name)
+ return "{:.1f} {}".format(float(size) / scale, name)
- return "{0}{1}".format(size // scale, name)
+ return "{}{}".format(size // scale, name)
-def run_locally(cmd, input_data="", timeout=20):
- shell = isinstance(cmd, basestring)
+def run_locally(cmd: Union[str, List[str]], input_data: str="", timeout:int =20) -> str:
+ shell = isinstance(cmd, str)
proc = subprocess.Popen(cmd,
shell=shell,
stdin=subprocess.PIPE,
@@ -184,7 +173,7 @@
stderr=subprocess.PIPE)
res = []
- def thread_func():
+ def thread_func() -> None:
rr = proc.communicate(input_data)
res.extend(rr)
@@ -214,7 +203,7 @@
return out
-def get_ip_for_target(target_ip):
+def get_ip_for_target(target_ip: str) -> str:
if not is_ip(target_ip):
target_ip = socket.gethostbyname(target_ip)
@@ -245,7 +234,7 @@
raise OSError("Can't define interface for {0}".format(target_ip))
-def open_for_append_or_create(fname):
+def open_for_append_or_create(fname: str) -> io.IO:
if not os.path.exists(fname):
return open(fname, "w")
@@ -254,20 +243,17 @@
return fd
-def sec_to_str(seconds):
+def sec_to_str(seconds: int) -> str:
h = seconds // 3600
m = (seconds % 3600) // 60
s = seconds % 60
- return "{0}:{1:02d}:{2:02d}".format(h, m, s)
+ return "{}:{:02d}:{:02d}".format(h, m, s)
-def yamable(data):
+def yamable(data: Any) -> Any:
if isinstance(data, (tuple, list)):
return map(yamable, data)
- if isinstance(data, unicode):
- return str(data)
-
if isinstance(data, dict):
res = {}
for k, v in data.items():
@@ -280,16 +266,16 @@
CLEANING = []
-def clean_resource(func, *args, **kwargs):
+def clean_resource(func: Callable[..., Any], *args, **kwargs) -> None:
CLEANING.append((func, args, kwargs))
-def iter_clean_func():
+def iter_clean_func() -> Generator[Callable[..., Any], List[Any], Dict[str, Any]]:
while CLEANING != []:
yield CLEANING.pop()
-def flatten(data):
+def flatten(data: Iterable[Any]) -> List[Any]:
res = []
for i in data:
if isinstance(i, (list, tuple, set)):
@@ -299,17 +285,17 @@
return res
-def get_creds_openrc(path):
+def get_creds_openrc(path: str) -> Tuple[str, str, str, str, str]:
fc = open(path).read()
echo = 'echo "$OS_INSECURE:$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
msg = "Failed to get creads from openrc file"
- with log_error(msg):
+ with LogError(msg):
data = run_locally(['/bin/bash'], input_data=fc + "\n" + echo)
msg = "Failed to get creads from openrc file: " + data
- with log_error(msg):
+ with LogError(msg):
data = data.strip()
insecure_str, user, tenant, passwd_auth_url = data.split(':', 3)
insecure = (insecure_str in ('1', 'True', 'true'))
@@ -323,20 +309,20 @@
os_release = collections.namedtuple("Distro", ["distro", "release", "arch"])
-def get_os(run_func):
- arch = run_func("arch", nolog=True).strip()
+def get_os(node: INode) -> os_release:
+ arch = node.run("arch", nolog=True).strip()
try:
- run_func("ls -l /etc/redhat-release", nolog=True)
+ node.run("ls -l /etc/redhat-release", nolog=True)
return os_release('redhat', None, arch)
except:
pass
try:
- run_func("ls -l /etc/debian_version", nolog=True)
+ node.run("ls -l /etc/debian_version", nolog=True)
release = None
- for line in run_func("lsb_release -a", nolog=True).split("\n"):
+ for line in node.run("lsb_release -a", nolog=True).split("\n"):
if ':' not in line:
continue
opt, val = line.split(":", 1)
@@ -352,16 +338,16 @@
@contextlib.contextmanager
-def empty_ctx(val=None):
+def empty_ctx(val: Any=None) -> Generator[Any]:
yield val
-def mkdirs_if_unxists(path):
+def mkdirs_if_unxists(path: str) -> None:
if not os.path.exists(path):
os.makedirs(path)
-def log_nodes_statistic(nodes):
+def log_nodes_statistic(nodes: Iterable[INode]) -> None:
logger.info("Found {0} nodes total".format(len(nodes)))
per_role = collections.defaultdict(lambda: 0)
for node in nodes:
@@ -372,7 +358,7 @@
logger.debug("Found {0} nodes with role {1}".format(count, role))
-def which(program):
+def which(program: str) -> Optional[str]:
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)