blob: dfd0e8c39b261d5d287fce79a6cad550a117e108 [file] [log] [blame]
import os
import sys
import uuid
import logging
import datetime
import contextlib
from typing import Any, Tuple, Iterator, Iterable
try:
from petname import Generate as pet_generate
except ImportError:
def pet_generate(_1: str, _2: str) -> str:
return str(uuid.uuid4())
from cephlib.common import run_locally, sec_to_str
logger = logging.getLogger("wally")
STORAGE_ROLES = {'ceph-osd'}
class StopTestError(RuntimeError):
pass
class LogError:
def __init__(self, message: str, exc_logger: logging.Logger = None) -> None:
self.message = message
self.exc_logger = exc_logger
def __enter__(self) -> 'LogError':
return self
def __exit__(self, tp: type, value: Exception, traceback: Any) -> bool:
if value is None or isinstance(value, StopTestError):
return False
if self.exc_logger is None:
exc_logger = sys._getframe(1).f_globals.get('logger', logger)
else:
exc_logger = self.exc_logger
exc_logger.exception(self.message, exc_info=(tp, value, traceback))
raise StopTestError(self.message) from value
class TaskFinished(Exception):
pass
def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
logger.debug("Starts : " + message)
return LogError(message, exc_logger)
def check_input_param(is_ok: bool, message: str) -> None:
if not is_ok:
logger.error(message)
raise StopTestError(message)
def yamable(data: Any) -> Any:
if isinstance(data, (tuple, list)):
return map(yamable, data)
if isinstance(data, dict):
res = {}
for k, v in data.items():
res[yamable(k)] = yamable(v)
return res
return data
def get_creds_openrc(path: str) -> Tuple[str, str, str, str, bool]:
fc = open(path).read()
echo = 'echo "$OS_INSECURE:$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
msg = "Failed to get creads from openrc file"
with LogError(msg):
data = run_locally(['/bin/bash'], input_data=(fc + "\n" + echo).encode('utf8')).decode("utf8")
msg = "Failed to get creads from openrc file: " + data
with LogError(msg):
data = data.strip()
insecure_str, user, tenant, passwd_auth_url = data.split(':', 3)
insecure = (insecure_str in ('1', 'True', 'true'))
passwd, auth_url = passwd_auth_url.rsplit("@", 1)
assert (auth_url.startswith("https://") or
auth_url.startswith("http://"))
return user, passwd, tenant, auth_url, insecure
@contextlib.contextmanager
def empty_ctx(val: Any = None) -> Iterator[Any]:
yield val
def get_uniq_path_uuid(path: str, max_iter: int = 10) -> Tuple[str, str]:
for i in range(max_iter):
run_uuid = pet_generate(2, "_")
results_dir = os.path.join(path, run_uuid)
if not os.path.exists(results_dir):
break
else:
run_uuid = str(uuid.uuid4())
results_dir = os.path.join(path, run_uuid)
return results_dir, run_uuid
def get_time_interval_printable_info(seconds: int) -> Tuple[str, str]:
exec_time_s = sec_to_str(seconds)
now_dt = datetime.datetime.now()
end_dt = now_dt + datetime.timedelta(0, seconds)
return exec_time_s, "{:%H:%M:%S}".format(end_dt)
def shape2str(shape: Iterable[int]) -> str:
return "*".join(map(str, shape))
def str2shape(shape: str) -> Tuple[int, ...]:
return tuple(map(int, shape.split('*')))