typing and refactoring on the way
diff --git a/wally/utils.py b/wally/utils.py
index 32c9056..d2b867e 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,7 +1,8 @@
import re
import os
-import io
import sys
+import time
+import uuid
import socket
import logging
import ipaddress
@@ -10,14 +11,20 @@
import subprocess
import collections
-from .inode import INode
-from typing import Any, Tuple, Union, List, Generator, Dict, Callable, Iterable, Optional
+from .interfaces import IRemoteNode
+from typing import Any, Tuple, Union, List, Iterator, Dict, Callable, Iterable, Optional, IO, Sequence
try:
import psutil
except ImportError:
psutil = None
+try:
+ from petname import Generate as pet_generate
+except ImportError:
+ def pet_generate(x: str, y: str) -> str:
+ return str(uuid.uuid4())
+
logger = logging.getLogger("wally")
@@ -35,16 +42,16 @@
class LogError:
- def __init__(self, message: str, exc_logger=None):
+ def __init__(self, message: str, exc_logger: logging.Logger = None) -> None:
self.message = message
self.exc_logger = exc_logger
- def __enter__(self):
+ def __enter__(self) -> 'LogError':
return self
- def __exit__(self, tp: type, value: Exception, traceback: Any):
+ def __exit__(self, tp: type, value: Exception, traceback: Any) -> bool:
if value is None or isinstance(value, StopTestError):
- return
+ return False
if self.exc_logger is None:
exc_logger = sys._getframe(1).f_globals.get('logger', logger)
@@ -52,10 +59,10 @@
exc_logger = self.exc_logger
exc_logger.exception(self.message, exc_info=(tp, value, traceback))
- raise StopTestError(self.message, value)
+ raise StopTestError(self.message) from value
-def log_block(message: str, exc_logger=None) -> LogError:
+def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
logger.debug("Starts : " + message)
return LogError(message, exc_logger)
@@ -67,7 +74,7 @@
def parse_creds(creds: str) -> Tuple[str, str, str]:
- # parse user:passwd@host
+ """Parse simple credentials format user[:passwd]@host"""
user, passwd_host = creds.split(":", 1)
if '@' not in passwd_host:
@@ -78,12 +85,12 @@
return user, passwd, host
-class TaksFinished(Exception):
+class TaskFinished(Exception):
pass
class Barrier:
- def __init__(self, count: int):
+ def __init__(self, count: int) -> None:
self.count = count
self.curr_count = 0
self.cond = threading.Condition()
@@ -92,7 +99,7 @@
def wait(self, timeout: int=None) -> bool:
with self.cond:
if self.exited:
- raise TaksFinished()
+ raise TaskFinished()
self.curr_count += 1
if self.curr_count == self.count:
@@ -134,6 +141,10 @@
if size < 1024:
return str(size)
+ # make mypy happy
+ scale = 1
+ name = ""
+
for name, scale in RSMAP:
if size < 1024 * scale:
if size % scale == 0:
@@ -154,6 +165,10 @@
if size < 1000:
return str(size)
+ # make mypy happy
+ scale = 1
+ name = ""
+
for name, scale in RSMAP_10:
if size < 1000 * scale:
if size % scale == 0:
@@ -165,16 +180,21 @@
def run_locally(cmd: Union[str, List[str]], input_data: str="", timeout:int =20) -> str:
- shell = isinstance(cmd, str)
+ if isinstance(cmd, str):
+ shell = True
+ cmd_str = cmd
+ else:
+ cmd_str = " ".join(cmd)
+
proc = subprocess.Popen(cmd,
shell=shell,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- res = []
+ res = [] # type: List[Tuple[bytes, bytes]]
def thread_func() -> None:
- rr = proc.communicate(input_data)
+ rr = proc.communicate(input_data.encode("utf8"))
res.extend(rr)
thread = threading.Thread(target=thread_func,
@@ -193,12 +213,16 @@
proc.kill()
thread.join()
- raise RuntimeError("Local process timeout: " + str(cmd))
+ raise RuntimeError("Local process timeout: " + cmd_str)
- out, err = res
+ stdout_data, stderr_data = zip(*res) # type: List[bytes], List[bytes]
+
+ out = b"".join(stdout_data).decode("utf8")
+ err = b"".join(stderr_data).decode("utf8")
+
if 0 != proc.returncode:
raise subprocess.CalledProcessError(proc.returncode,
- cmd, out + err)
+ cmd_str, out + err)
return out
@@ -234,7 +258,7 @@
raise OSError("Can't define interface for {0}".format(target_ip))
-def open_for_append_or_create(fname: str) -> io.IO:
+def open_for_append_or_create(fname: str) -> IO:
if not os.path.exists(fname):
return open(fname, "w")
@@ -263,15 +287,15 @@
return data
-CLEANING = []
+CLEANING = [] # type: List[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]
-def clean_resource(func: Callable[..., Any], *args, **kwargs) -> None:
- CLEANING.append((func, args, kwargs))
+def clean_resource(func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
+ CLEANING.append((func, list(args), kwargs))
-def iter_clean_func() -> Generator[Callable[..., Any], List[Any], Dict[str, Any]]:
- while CLEANING != []:
+def iter_clean_func() -> Iterator[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]:
+ while CLEANING:
yield CLEANING.pop()
@@ -285,7 +309,7 @@
return res
-def get_creds_openrc(path: str) -> Tuple[str, str, str, str, str]:
+def get_creds_openrc(path: str) -> Tuple[str, str, str, str, bool]:
fc = open(path).read()
echo = 'echo "$OS_INSECURE:$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
@@ -309,7 +333,9 @@
os_release = collections.namedtuple("Distro", ["distro", "release", "arch"])
-def get_os(node: INode) -> os_release:
+def get_os(node: IRemoteNode) -> os_release:
+ """return os type, release and architecture for node.
+ """
arch = node.run("arch", nolog=True).strip()
try:
@@ -338,7 +364,7 @@
@contextlib.contextmanager
-def empty_ctx(val: Any=None) -> Generator[Any]:
+def empty_ctx(val: Any=None) -> Iterator[Any]:
yield val
@@ -347,9 +373,10 @@
os.makedirs(path)
-def log_nodes_statistic(nodes: Iterable[INode]) -> None:
+def log_nodes_statistic(nodes: Sequence[IRemoteNode]) -> None:
logger.info("Found {0} nodes total".format(len(nodes)))
- per_role = collections.defaultdict(lambda: 0)
+
+ per_role = collections.defaultdict(int) # type: Dict[str, int]
for node in nodes:
for role in node.roles:
per_role[role] += 1
@@ -369,3 +396,31 @@
return exe_file
return None
+
+
+def get_uniq_path_uuid(path: str, max_iter: int = 10) -> Tuple[str, str]:
+ for i in range(max_iter):
+ run_uuid = pet_generate(2, "_")
+ results_dir = os.path.join(path, run_uuid)
+ if not os.path.exists(results_dir):
+ break
+ else:
+ run_uuid = str(uuid.uuid4())
+ results_dir = os.path.join(path, run_uuid)
+
+ return results_dir, run_uuid
+
+
+class Timeout:
+ def __init__(self, timeout: int, message: str = None) -> None:
+ self.etime = time.time() + timeout
+ self.message = message
+
+ def tick(self) -> None:
+ if time.time() > self.etime:
+ if self.message:
+ msg = "Timeout: {}".format(self.message)
+ else:
+ msg = "Timeout"
+
+ raise TimeoutError(msg)
\ No newline at end of file