some work on statistic code
diff --git a/wally/utils.py b/wally/utils.py
index dd9cdd5..13cd675 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,6 +1,8 @@
import re
import os
+import abc
import sys
+import math
import time
import uuid
import socket
@@ -12,8 +14,8 @@
import collections
from .node_interfaces import IRPCNode
-from typing import (Any, Tuple, Union, List, Iterator, Dict, Callable, Iterable, Optional,
- IO, Sequence, NamedTuple, cast)
+from typing import (Any, Tuple, Union, List, Iterator, Dict, Iterable, Optional,
+ IO, Sequence, NamedTuple, cast, TypeVar)
try:
import psutil
@@ -28,14 +30,8 @@
logger = logging.getLogger("wally")
-
-
-def is_ip(data: str) -> bool:
- try:
- ipaddress.ip_address(data)
- return True
- except ValueError:
- return False
+TNumber = TypeVar('TNumber', int, float)
+Number = Union[int, float]
class StopTestError(RuntimeError):
@@ -63,27 +59,20 @@
raise StopTestError(self.message) from value
-def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
- logger.debug("Starts : " + message)
- return LogError(message, exc_logger)
+class IStorable(metaclass=abc.ABCMeta):
+ """Interface for type, which can be stored"""
+
+ @abc.abstractmethod
+ def raw(self) -> Dict[str, Any]:
+ pass
+
+ @abc.abstractclassmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+ pass
-def check_input_param(is_ok: bool, message: str) -> None:
- if not is_ok:
- logger.error(message)
- raise StopTestError(message)
-
-
-def parse_creds(creds: str) -> Tuple[str, str, str]:
- """Parse simple credentials format user[:passwd]@host"""
- user, passwd_host = creds.split(":", 1)
-
- if '@' not in passwd_host:
- passwd, host = passwd_host, None
- else:
- passwd, host = passwd_host.rsplit('@', 1)
-
- return user, passwd, host
+Basic = Union[int, str, bytes, bool, None]
+Storable = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
class TaskFinished(Exception):
@@ -116,6 +105,86 @@
self.exited = True
+class Timeout(Iterable[float]):
+ def __init__(self, timeout: int, message: str = None, min_tick: int = 1, no_exc: bool = False) -> None:
+ self.end_time = time.time() + timeout
+ self.message = message
+ self.min_tick = min_tick
+ self.prev_tick_at = time.time()
+ self.no_exc = no_exc
+
+ def tick(self) -> bool:
+ current_time = time.time()
+
+ if current_time > self.end_time:
+ if self.message:
+ msg = "Timeout: {}".format(self.message)
+ else:
+ msg = "Timeout"
+
+ if self.no_exc:
+ return False
+
+ raise TimeoutError(msg)
+
+ sleep_time = self.min_tick - (current_time - self.prev_tick_at)
+ if sleep_time > 0:
+ time.sleep(sleep_time)
+ self.prev_tick_at = time.time()
+ else:
+ self.prev_tick_at = current_time
+
+ return True
+
+ def __iter__(self) -> Iterator[float]:
+ return cast(Iterator[float], self)
+
+ def __next__(self) -> float:
+ if not self.tick():
+ raise StopIteration()
+ return self.end_time - time.time()
+
+
+def greater_digit_pos(val: Number) -> int:
+ return int(math.floor(math.log10(val))) + 1
+
+
+def round_digits(val: TNumber, num_digits: int = 3) -> TNumber:
+ pow = 10 ** (greater_digit_pos(val) - num_digits)
+ return type(val)(int(val / pow) * pow)
+
+
+def is_ip(data: str) -> bool:
+ try:
+ ipaddress.ip_address(data)
+ return True
+ except ValueError:
+ return False
+
+
+def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
+ logger.debug("Starts : " + message)
+ return LogError(message, exc_logger)
+
+
+def check_input_param(is_ok: bool, message: str) -> None:
+ if not is_ok:
+ logger.error(message)
+ raise StopTestError(message)
+
+
+def parse_creds(creds: str) -> Tuple[str, str, str]:
+ """Parse simple credentials format user[:passwd]@host"""
+ user, passwd_host = creds.split(":", 1)
+
+ if '@' not in passwd_host:
+ passwd, host = passwd_host, None
+ else:
+ passwd, host = passwd_host.rsplit('@', 1)
+
+ return user, passwd, host
+
+
SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
@@ -399,46 +468,6 @@
return results_dir, run_uuid
-class Timeout(Iterable[float]):
- def __init__(self, timeout: int, message: str = None, min_tick: int = 1, no_exc: bool = False) -> None:
- self.end_time = time.time() + timeout
- self.message = message
- self.min_tick = min_tick
- self.prev_tick_at = time.time()
- self.no_exc = no_exc
-
- def tick(self) -> bool:
- current_time = time.time()
-
- if current_time > self.end_time:
- if self.message:
- msg = "Timeout: {}".format(self.message)
- else:
- msg = "Timeout"
-
- if self.no_exc:
- return False
-
- raise TimeoutError(msg)
-
- sleep_time = self.min_tick - (current_time - self.prev_tick_at)
- if sleep_time > 0:
- time.sleep(sleep_time)
- self.prev_tick_at = time.time()
- else:
- self.prev_tick_at = current_time
-
- return True
-
- def __iter__(self) -> Iterator[float]:
- return cast(Iterator[float], self)
-
- def __next__(self) -> float:
- if not self.tick():
- raise StopIteration()
- return self.end_time - time.time()
-
-
def to_ip(host_or_ip: str) -> str:
# translate hostname to address
try: