working on reporting, this commit represent broking code state
diff --git a/wally/common_types.py b/wally/common_types.py
index ce497ad..a4805c3 100644
--- a/wally/common_types.py
+++ b/wally/common_types.py
@@ -1,11 +1,40 @@
-from typing import NamedTuple, Dict, Any
+import abc
+from typing import Any, Union, List, Dict, NamedTuple
-from .istorable import IStorable
IP = str
IPAddr = NamedTuple("IPAddr", [("host", IP), ("port", int)])
+class IStorable(metaclass=abc.ABCMeta):
+ """Interface for type, which can be stored"""
+
+ @abc.abstractmethod
+ def raw(self) -> Dict[str, Any]:
+ pass
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+ pass
+
+
+Basic = Union[int, str, bytes, bool, None]
+StorableType = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
+
+
+class Storable(IStorable):
+ """Default implementation"""
+
+ def raw(self) -> Dict[str, Any]:
+ return {name: val for name, val in self.__dict__.items() if not name.startswith("_")}
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+ obj = cls.__new__(cls)
+ obj.__dict__.update(data)
+ return obj
+
+
class ConnCreds(IStorable):
def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
key_file: str = None, key: bytes = None) -> None:
diff --git a/wally/config.py b/wally/config.py
index 7554fb8..2178d0c 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,5 +1,6 @@
from typing import Any, Dict, Optional
-from .result_classes import IStorable
+
+from .common_types import IStorable
ConfigBlock = Dict[str, Any]
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
new file mode 100644
index 0000000..c71159f
--- /dev/null
+++ b/wally/hlstorage.py
@@ -0,0 +1,205 @@
+import re
+import os
+import array
+import struct
+import logging
+from typing import cast, Iterator, Tuple, Type, Dict, Set, List, Optional
+
+import numpy
+
+from .result_classes import (TestSuiteConfig, TestJobConfig, TimeSeries, DataSource,
+ StatProps, IResultStorage)
+from .storage import Storage
+from .utils import StopTestError
+from .suits.all_suits import all_suits
+
+
+logger = logging.getLogger('wally')
+
+
+class DB_re:
+ node_id = r'\d+.\d+.\d+.\d+:\d+'
+ job_id = r'[-a-zA-Z0-9]+_\d+'
+ sensor = r'[a-z_]+'
+ dev = r'[-a-zA-Z0-9_]+'
+ suite_id = r'[a-z]+_\d+'
+ tag = r'[a-z_.]+'
+
+
+class DB_paths:
+ suite_cfg_r = r'results/{suite_id}_info\.yml'
+ suite_cfg = suite_cfg_r.replace("\\.", '.')
+
+ job_cfg_r = r'results/{suite_id}\.{job_id}/info\.yml'
+ job_cfg = job_cfg_r.replace("\\.", '.')
+
+ job_extra_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+ job_extra = job_extra_r.replace("\\.", '.')
+
+ ts_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+ ts = ts_r.replace("\\.", '.')
+
+ stat_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+ stat = stat_r.replace("\\.", '.')
+
+ plot_r = r'report/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+ plot = plot_r.replace("\\.", '.')
+
+ report = r'report/'
+
+
+DB_rr = {name: r"(?P<{}>{})".format(name, rr) for name, rr in DB_re.__dict__.items() if not name.startswith("__")}
+
+
+class ResultStorage(IResultStorage):
+ # TODO: check that all path components match required patterns
+
+ ts_header_format = "!IIIcc"
+ ts_arr_tag = 'bin'
+ ts_raw_tag = 'txt'
+
+ def __init__(self, storage: Storage) -> None:
+ self.storage = storage
+
+ def sync(self) -> None:
+ self.storage.sync()
+
+ def put_or_check_suite(self, suite: TestSuiteConfig) -> None:
+ path = DB_paths.suite_cfg.format(suite_id=suite.storage_id)
+ if path in self.storage:
+ db_cfg = self.storage.get(path)
+ if db_cfg != suite:
+ logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
+ raise StopTestError()
+
+ self.storage.put(suite, path)
+
+ def put_job(self, suite: TestSuiteConfig, job: TestJobConfig) -> None:
+ path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
+ self.storage.put(job, path)
+
+ def put_ts(self, ts: TimeSeries) -> None:
+ data = cast(List[int], ts.data)
+ times = cast(List[int], ts.times)
+
+ if len(data) % ts.second_axis_size != 0:
+ logger.error("Time series data size(%s) is not propotional to second_axis_size(%s).",
+ len(data), ts.second_axis_size)
+ raise StopTestError()
+
+ if len(data) // ts.second_axis_size != len(times):
+ logger.error("Unbalanced data and time srray sizes. %s", ts)
+ raise StopTestError()
+
+ bin_path = DB_paths.ts.format(**ts.source(tag=self.ts_arr_tag).__dict__)
+
+ with self.storage.get_fd(bin_path, "cb") as fd:
+ header = struct.pack(self.ts_header_format,
+ ts.second_axis_size,
+ len(data),
+ len(times),
+ ts.data.typecode.encode("ascii"),
+ ts.times.typecode.encode("ascii"))
+ fd.write(header)
+ ts.data.tofile(fd) # type: ignore
+ ts.times.tofile(fd) # type: ignore
+
+ if ts.raw:
+ raw_path = DB_paths.ts.format(**ts.source(tag=self.ts_raw_tag).__dict__)
+ self.storage.put_raw(ts.raw, raw_path)
+
+ def put_extra(self, data: bytes, source: DataSource) -> None:
+ path = DB_paths.job_cfg.format(**source.__dict__)
+ self.storage.put_raw(data, path)
+
+ def put_stat(self, data: StatProps, source: DataSource) -> None:
+ path = DB_paths.stat.format(**source.__dict__)
+ self.storage.put(data, path)
+
+ def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
+ path = DB_paths.stat.format(**source.__dict__)
+ return self.storage.load(stat_cls, path)
+
+ def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
+ path = path_glob.format(**DB_rr).split("/")
+ yield from self.storage._iter_paths("", path, {})
+
+ def iter_suite(self, suite_type: str = None) -> Iterator[TestSuiteConfig]:
+ for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r):
+ assert is_file
+ suite = cast(TestSuiteConfig, self.storage.load(TestSuiteConfig, suite_info_path))
+ assert suite.storage_id == groups['suite_id']
+ if not suite_type or suite.test_type == suite_type:
+ yield suite
+
+ def iter_job(self, suite: TestSuiteConfig) -> Iterator[TestJobConfig]:
+ job_glob = DB_paths.job_cfg_r.replace('{suite_id}', suite.storage_id)
+ job_config_cls = all_suits[suite.test_type].job_config_cls
+
+ for is_file, path, groups in self.iter_paths(job_glob):
+ assert is_file
+ job = cast(TestJobConfig, self.storage.load(job_config_cls, path))
+ assert job.storage_id == groups['job_id']
+ yield job
+
+ def iter_datasource(self, suite: TestSuiteConfig, job: TestJobConfig) -> Iterator[Tuple[DataSource, Dict[str, str]]]:
+ ts_glob = DB_paths.ts_r.replace('{suite_id}', suite.storage_id).replace('{job_id}', job.storage_id)
+ ts_found = {} # type: Dict[Tuple[str, str, str], Dict[str, str]]
+
+ for is_file, path, groups in self.iter_paths(ts_glob):
+ assert is_file
+ key = (groups['node_id'], groups['dev'], groups['sensor'])
+ ts_found.setdefault(key, {})[groups['tag']] = path
+
+ for (node_id, dev, sensor), tag2path in ts_found.items():
+ if self.ts_arr_tag in tag2path:
+ yield DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id=node_id,
+ dev=dev, sensor=sensor, tag=None), tag2path
+
+ def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
+ with self.storage.get_fd(path, "rb") as fd:
+ header = fd.read(struct.calcsize(self.ts_header_format))
+ second_axis_size, data_sz, time_sz, data_typecode, time_typecode = \
+ struct.unpack(self.ts_header_format, header)
+
+ data = array.array(data_typecode.decode("ascii"))
+ times = array.array(time_typecode.decode("ascii"))
+
+ data.fromfile(fd, data_sz) # type: ignore
+ times.fromfile(fd, time_sz) # type: ignore
+
+ return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
+ raw=None,
+ data=numpy.array(data, dtype=numpy.dtype('float32')),
+ times=numpy.array(times),
+ second_axis_size=second_axis_size,
+ source=ds)
+
+ def iter_ts(self, suite: TestSuiteConfig, job: TestJobConfig, **filters) -> Iterator[TimeSeries]:
+ for ds, tag2path in self.iter_datasource(suite, job):
+ for name, val in filters.items():
+ if val != getattr(ds, name):
+ break
+ else:
+ ts = self.load_ts(ds, tag2path[self.ts_arr_tag])
+ if self.ts_raw_tag in tag2path:
+ ts.raw = self.storage.get_raw(tag2path[self.ts_raw_tag])
+
+ yield ts
+
+ # return path to file to be inserted into report
+ def put_plot_file(self, data: bytes, source: DataSource) -> str:
+ path = DB_paths.plot.format(**source.__dict__)
+ return cast(str, self.storage.put_raw(data, path))
+
+ def check_plot_file(self, source: DataSource) -> Optional[str]:
+ path = DB_paths.plot.format(**source.__dict__)
+ fpath = self.storage.resolve_raw(path)
+ if os.path.exists(fpath):
+ return fpath
+ return None
+
+ def put_report(self, report: str, name: str) -> str:
+ return self.storage.put_raw(report.encode("utf8"), DB_paths.report + name)
diff --git a/wally/html.py b/wally/html.py
new file mode 100644
index 0000000..e92e7d1
--- /dev/null
+++ b/wally/html.py
@@ -0,0 +1,2 @@
+def img(link):
+ return '<img src="{}">'.format(link)
diff --git a/wally/hw_info.py b/wally/hw_info.py
index aa53f8e..9da8cb7 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -5,6 +5,7 @@
from typing import List, Tuple, cast, Optional
from . import utils
+from .node_utils import get_os
from .node_interfaces import IRPCNode
@@ -130,7 +131,7 @@
def get_sw_info(node: IRPCNode) -> SWInfo:
res = SWInfo()
- res.OS_version = utils.get_os(node)
+ res.OS_version = get_os(node)
res.kernel_version = node.get_file_content('/proc/version').decode('utf8').strip()
res.mtab = node.get_file_content('/etc/mtab').decode('utf8').strip()
@@ -159,7 +160,7 @@
try:
lshw_out = node.run('sudo lshw -xml 2>/dev/null')
except Exception as exc:
- logger.warning("lshw failed on node %s: %s", node.info.node_id(), exc)
+ logger.warning("lshw failed on node %s: %s", node.node_id, exc)
return None
res = HWInfo()
diff --git a/wally/istorable.py b/wally/istorable.py
deleted file mode 100644
index 467f901..0000000
--- a/wally/istorable.py
+++ /dev/null
@@ -1,31 +0,0 @@
-import abc
-from typing import Any, Union, List, Dict
-
-
-class IStorable(metaclass=abc.ABCMeta):
- """Interface for type, which can be stored"""
-
- @abc.abstractmethod
- def raw(self) -> Dict[str, Any]:
- pass
-
- @abc.abstractclassmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
- pass
-
-
-class Storable(IStorable):
- """Default implementation"""
-
- def raw(self) -> Dict[str, Any]:
- return self.__dict__
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
- obj = cls.__new__(cls)
- obj.__dict__.update(data)
- return obj
-
-
-Basic = Union[int, str, bytes, bool, None]
-StorableType = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
diff --git a/wally/main.py b/wally/main.py
index 0553b4e..fd9b5a0 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -4,8 +4,10 @@
import pprint
import getpass
import logging
+import tempfile
import argparse
import functools
+import subprocess
import contextlib
from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
from yaml import load as _yaml_load
@@ -30,6 +32,7 @@
faulthandler = None
from . import utils, node
+from .node_utils import log_nodes_statistic
from .storage import make_storage, Storage
from .config import Config
from .logger import setup_loggers
@@ -91,7 +94,7 @@
def log_nodes_statistic_stage(ctx: TestRun) -> None:
- utils.log_nodes_statistic(ctx.nodes)
+ log_nodes_statistic(ctx.nodes)
def parse_args(argv):
@@ -121,6 +124,15 @@
report_parser.add_argument("data_dir", help="folder with rest results")
# ---------------------------------------------------------------------
+ ipython_help = 'run ipython in prepared environment'
+ ipython_parser = subparsers.add_parser('ipython', help=ipython_help)
+ ipython_parser.add_argument("storage_dir", help="Storage path")
+ # ---------------------------------------------------------------------
+ jupyter_help = 'run ipython in prepared environment'
+ jupyter_parser = subparsers.add_parser('jupyter', help=jupyter_help)
+ jupyter_parser.add_argument("storage_dir", help="Storage path")
+
+ # ---------------------------------------------------------------------
test_parser = subparsers.add_parser('test', help='run tests')
test_parser.add_argument('--build-description', type=str, default="Build info")
test_parser.add_argument('--build-id', type=str, default="id")
@@ -141,7 +153,7 @@
test_parser.add_argument("storage_dir", help="Path to test directory")
# ---------------------------------------------------------------------
- test_parser = subparsers.add_parser('db', help='resume tests')
+ test_parser = subparsers.add_parser('db', help='Exec command on DB')
test_parser.add_argument("cmd", choices=("show",), help="Command to execute")
test_parser.add_argument("params", nargs='*', help="Command params")
test_parser.add_argument("storage_dir", help="Storage path")
@@ -206,6 +218,48 @@
PrepareNodes()]
+notebook_kern = """
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {
+ "collapsed": true
+ },
+ "outputs": [],
+ "source": [
+ "from wally.storage import make_storage\n",
+ "from wally.hlstorage import ResultStorage\n"
+ "storage = make_storage(\"$STORAGE\", existing=True)\n",
+ "rstorage = ResultStorage(storage=storage)\n"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.5.2"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}"""
+
+
def main(argv: List[str]) -> int:
if faulthandler is not None:
faulthandler.register(signal.SIGUSR1, all_threads=True)
@@ -250,7 +304,8 @@
config = storage.load(Config, 'config')
stages.extend(get_run_stages())
stages.append(LoadStoredNodesStage())
- prev_opts = storage.get('cli')
+ prev_opts = storage.get('cli') # type: List[str]
+
if '--ssh-key-passwd' in prev_opts and opts.ssh_key_passwd:
prev_opts[prev_opts.index("--ssh-key-passwd") + 1] = opts.ssh_key_passwd
@@ -293,11 +348,26 @@
print("Unknown/not_implemented command {!r}".format(opts.cmd))
return 1
return 0
+ elif opts.subparser_name == 'ipython':
+ storage = make_storage(opts.storage_dir, existing=True)
+ from .hlstorage import ResultStorage
+ rstorage = ResultStorage(storage=storage)
+
+ import IPython
+ IPython.embed()
+
+ return 0
+ elif opts.subparser_name == 'jupyter':
+ with tempfile.NamedTemporaryFile() as fd:
+ fd.write(notebook_kern.replace("$STORAGE", opts.storage_dir))
+ subprocess.call("jupyter notebook ", shell=True)
+ return 0
+
report_stages = [] # type: List[Stage]
if not getattr(opts, "no_report", False):
- report_stages.append(CalcStatisticStage())
- report_stages.append(ConsoleReportStage())
+ # report_stages.append(CalcStatisticStage())
+ # report_stages.append(ConsoleReportStage())
report_stages.append(HtmlReportStage())
# log level is not a part of config
diff --git a/wally/node.py b/wally/node.py
index d4da52a..38342c6 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -1,4 +1,5 @@
import os
+import zlib
import time
import json
import socket
@@ -25,7 +26,11 @@
self.info = info
def __str__(self) -> str:
- return self.info.node_id()
+ return self.node_id
+
+ @property
+ def node_id(self) -> str:
+ return self.info.node_id
def put_to_file(self, path: Optional[str], content: bytes) -> str:
if path is None:
@@ -163,17 +168,19 @@
def __repr__(self) -> str:
return str(self)
- def get_file_content(self, path: str, expanduser: bool = False) -> bytes:
+ def get_file_content(self, path: str, expanduser: bool = False, compress: bool = True) -> bytes:
logger.debug("GET %s from %s", path, self.info)
if expanduser:
path = self.conn.fs.expanduser(path)
- res = self.conn.fs.get_file(path)
+ res = self.conn.fs.get_file(path, compress)
logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
+ if compress:
+ res = zlib.decompress(res)
return res
def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
if not nolog:
- logger.debug("Node %s - run %s", self.info.node_id(), cmd)
+ logger.debug("Node %s - run %s", self.node_id, cmd)
cmd_b = cmd.encode("utf8")
proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
@@ -188,21 +195,26 @@
if code != 0:
templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
- raise OSError(templ.format(self.info.node_id(), cmd, code, out))
+ raise OSError(templ.format(self.node_id, cmd, code, out))
return out
- def copy_file(self, local_path: str, remote_path: str = None, expanduser: bool = False) -> str:
+ def copy_file(self, local_path: str, remote_path: str = None,
+ expanduser: bool = False,
+ compress: bool = False) -> str:
+
if expanduser:
remote_path = self.conn.fs.expanduser(remote_path)
data = open(local_path, 'rb').read() # type: bytes
- return self.put_to_file(remote_path, data)
+ return self.put_to_file(remote_path, data, compress=compress)
- def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False) -> str:
+ def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
if expanduser:
path = self.conn.fs.expanduser(path)
- return self.conn.fs.store_file(path, content)
+ if compress:
+ content = zlib.compress(content)
+ return self.conn.fs.store_file(path, content, compress)
def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
if expanduser:
@@ -267,7 +279,7 @@
cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
- node.info.node_id(), log_file, log_level))
+ node.node_id, log_file, log_level))
else:
cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
cmd = cmd.format(python_cmd, code_file, ip, port)
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index 71efc54..935ca41 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -1,11 +1,13 @@
import abc
-from typing import Any, Set, Dict, NamedTuple, Optional
+import logging
+from typing import Any, Set, Dict, Optional, NamedTuple
+
from .ssh_utils import ConnCreds
-from .common_types import IPAddr
-from .istorable import IStorable
+from .common_types import IPAddr, IStorable
RPCCreds = NamedTuple("RPCCreds", [("addr", IPAddr), ("key_file", str), ("cert_file", str)])
+logger = logging.getLogger("wally")
class NodeInfo(IStorable):
@@ -23,11 +25,12 @@
if params is not None:
self.params = params
+ @property
def node_id(self) -> str:
return "{0.host}:{0.port}".format(self.ssh_creds.addr)
def __str__(self) -> str:
- return self.node_id()
+ return self.node_id
def __repr__(self) -> str:
return str(self)
@@ -89,6 +92,10 @@
conn = None # type: Any
rpc_log_file = None # type: str
+ @property
+ def node_id(self) -> str:
+ return self.info.node_id
+
@abc.abstractmethod
def __str__(self) -> str:
pass
diff --git a/wally/node_utils.py b/wally/node_utils.py
new file mode 100644
index 0000000..e66ba44
--- /dev/null
+++ b/wally/node_utils.py
@@ -0,0 +1,56 @@
+import logging
+import collections
+from typing import Dict, Sequence, NamedTuple
+
+from .node_interfaces import IRPCNode
+
+logger = logging.getLogger("wally")
+
+
+def log_nodes_statistic(nodes: Sequence[IRPCNode]) -> None:
+ logger.info("Found {0} nodes total".format(len(nodes)))
+
+ per_role = collections.defaultdict(int) # type: Dict[str, int]
+ for node in nodes:
+ for role in node.info.roles:
+ per_role[role] += 1
+
+ for role, count in sorted(per_role.items()):
+ logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+
+OSRelease = NamedTuple("OSRelease",
+ [("distro", str),
+ ("release", str),
+ ("arch", str)])
+
+
+def get_os(node: IRPCNode) -> OSRelease:
+ """return os type, release and architecture for node.
+ """
+ arch = node.run("arch", nolog=True).strip()
+
+ try:
+ node.run("ls -l /etc/redhat-release", nolog=True)
+ return OSRelease('redhat', None, arch)
+ except:
+ pass
+
+ try:
+ node.run("ls -l /etc/debian_version", nolog=True)
+
+ release = None
+ for line in node.run("lsb_release -a", nolog=True).split("\n"):
+ if ':' not in line:
+ continue
+ opt, val = line.split(":", 1)
+
+ if opt == 'Codename':
+ release = val.strip()
+
+ return OSRelease('ubuntu', release, arch)
+ except:
+ pass
+
+ raise RuntimeError("Unknown os")
diff --git a/wally/openstack.py b/wally/openstack.py
index 8aa9913..b7fbe31 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -155,7 +155,7 @@
creds = ConnCreds(host=to_ip(ip), user=user_name, key_file=private_key_path)
info = NodeInfo(creds, {'testnode'})
info.os_vm_id = vm_id
- nid = info.node_id()
+ nid = info.node_id
if nid in ctx.nodes_info:
logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
raise StopTestError()
@@ -199,7 +199,7 @@
with ctx.get_pool() as pool:
for info in launch_vms(ctx.os_connection, params, pool):
info.roles.add('testnode')
- nid = info.node_id()
+ nid = info.node_id
if nid in ctx.nodes_info:
logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
raise StopTestError()
diff --git a/wally/process_results.py b/wally/process_results.py
index 5ca53af..112826e 100644
--- a/wally/process_results.py
+++ b/wally/process_results.py
@@ -1,58 +1,53 @@
# put all result preprocessing here
# selection, aggregation
+from io import BytesIO
import logging
-
+from typing import Any
from .stage import Stage, StepOrder
from .test_run_class import TestRun
from .statistic import calc_norm_stat_props, calc_histo_stat_props
-from .result_classes import TestJobConfig
-from .suits.itest import ResultStorage
+from .result_classes import StatProps, DataSource, TimeSeries
+from .hlstorage import ResultStorage
from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+from .suits.io.fio import FioTest
from .utils import StopTestError
-logger = logging.getLogger("wally")
-
import matplotlib
-
-# have to be before pyplot import to avoid tkinter(default graph frontend) import error
matplotlib.use('svg')
-
import matplotlib.pyplot as plt
+logger = logging.getLogger("wally")
+
+
class CalcStatisticStage(Stage):
priority = StepOrder.TEST + 1
def run(self, ctx: TestRun) -> None:
- rstorage = ResultStorage(ctx.storage, TestJobConfig)
+ rstorage = ResultStorage(ctx.storage)
- for suite_cfg, path in rstorage.list_suites():
- if suite_cfg.test_type != 'fio':
- continue
-
- for job_cfg, path, _ in rstorage.list_jobs_in_suite(path):
+ for suite in rstorage.iter_suite(FioTest.name):
+ for job in rstorage.iter_job(suite):
results = {}
- for node_id, dev, sensor_name in rstorage.list_ts_in_job(path):
- ts = rstorage.load_ts(path, node_id, dev, sensor_name)
- if dev == 'fio' and sensor_name == 'lat':
+ for ts in rstorage.iter_ts(suite, job):
+ if ts.source.sensor == 'lat':
if ts.second_axis_size != expected_lat_bins:
logger.error("Sensor %s.%s on node %s has" +
"second_axis_size=%s. Can only process sensors with second_axis_size=%s.",
- dev, sensor_name, node_id, ts.second_axis_size, expected_lat_bins)
+ ts.source.dev, ts.source.sensor, ts.source.node_id,
+ ts.second_axis_size, expected_lat_bins)
continue
ts.bins_edges = get_lat_vals(ts.second_axis_size)
- stat_prop = calc_histo_stat_props(ts)
+ stat_prop = calc_histo_stat_props(ts) # type: StatProps
elif ts.second_axis_size != 1:
logger.warning("Sensor %s.%s on node %s provide 2D data with " +
"ts.second_axis_size=%s. Can't process it.",
- dev, sensor_name, node_id, ts.second_axis_size)
+ ts.source.dev, ts.source.sensor, ts.source.node_id, ts.second_axis_size)
continue
else:
stat_prop = calc_norm_stat_props(ts)
- results[(node_id, dev, sensor_name)] = stat_prop
-
raise StopTestError()
diff --git a/wally/report.py b/wally/report.py
index cf1289b..f8d8c5a 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,44 +1,90 @@
+import os
+import re
import abc
+import bisect
import logging
-from typing import Dict, Any, Iterator, Tuple, cast, List
+from io import BytesIO
+from functools import wraps
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable
+from collections import defaultdict
import numpy
-import scipy
import matplotlib
-
# have to be before pyplot import to avoid tkinter(default graph frontend) import error
matplotlib.use('svg')
-
import matplotlib.pyplot as plt
+import scipy.stats
+import wally
-
-
-from .utils import ssize2b
+from . import html
+from .utils import b2ssize
from .stage import Stage, StepOrder
from .test_run_class import TestRun
-from .result_classes import NormStatProps
+from .hlstorage import ResultStorage
+from .node_interfaces import NodeInfo
+from .storage import Storage
+from .statistic import calc_norm_stat_props, calc_histo_stat_props
+from .result_classes import (StatProps, DataSource, TimeSeries, TestSuiteConfig,
+ NormStatProps, HistoStatProps, TestJobConfig)
+from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+from .suits.io.fio import FioTest, FioJobConfig
+from .suits.io.fio_task_parser import FioTestSumm
+from .statistic import approximate_curve, average, dev
logger = logging.getLogger("wally")
-class ConsoleReportStage(Stage):
-
- priority = StepOrder.REPORT
-
- def run(self, ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
+# ---------------- CONSTS ---------------------------------------------------------------------------------------------
-class HtmlReportStage(Stage):
+DEBUG = False
+LARGE_BLOCKS = 256
+MiB2KiB = 1024
+MS2S = 1000
- priority = StepOrder.REPORT
- def run(self, ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
+# ---------------- PROFILES ------------------------------------------------------------------------------------------
+
+
+class ColorProfile:
+ primary_color = 'b'
+ suppl_color1 = 'teal'
+ suppl_color2 = 'magenta'
+ box_color = 'y'
+
+ noise_alpha = 0.3
+ subinfo_alpha = 0.7
+
+
+class StyleProfile:
+ grid = True
+ tide_layout = True
+ hist_boxes = 10
+ min_points_for_dev = 5
+
+ dev_range_x = 2.0
+ dev_perc = 95
+
+ avg_range = 20
+
+ curve_approx_level = 5
+ curve_approx_points = 100
+ assert avg_range >= min_points_for_dev
+
+ extra_io_spine = True
+
+ legend_for_eng = True
+
+ units = {
+ 'bw': ("MiBps", MiB2KiB, "bandwith"),
+ 'iops': ("IOPS", 1, "iops"),
+ 'lat': ("ms", 1, "latency")
+ }
+
+
+# ---------------- STRUCTS -------------------------------------------------------------------------------------------
# TODO: need to be revised, have to user StatProps fields instead
@@ -63,15 +109,497 @@
self.lat_95 = None # type: float
+class IOSummary:
+ def __init__(self,
+ qd: int,
+ block_size: int,
+ nodes_count:int,
+ bw: NormStatProps,
+ lat: HistoStatProps) -> None:
+
+ self.qd = qd
+ self.nodes_count = nodes_count
+ self.block_size = block_size
+
+ self.bw = bw
+ self.lat = lat
+
+
+# -------------- AGGREGATION AND STAT FUNCTIONS ----------------------------------------------------------------------
+rexpr = {
+ 'sensor': r'(?P<sensor>[-a-z]+)',
+ 'dev': r'(?P<dev>[^.]+)',
+ 'metric': r'(?P<metric>[a-z_]+)',
+ 'node': r'(?P<node>\d+\.\d+\.\d+\.\d+:\d+)',
+}
+
+def iter_sensors(storage: Storage, node: str = None, sensor: str = None, dev: str = None, metric: str = None):
+ if node is None:
+ node = rexpr['node']
+ if sensor is None:
+ sensor = rexpr['sensor']
+ if dev is None:
+ dev = rexpr['dev']
+ if metric is None:
+ metric = rexpr['metric']
+
+ rr = r"{}_{}\.{}\.{}$".format(node, sensor, dev, metric)
+ sensor_name_re = re.compile(rr)
+
+ for is_file, sensor_data_name in storage.list("sensors"):
+ if is_file:
+ rr = sensor_name_re.match(sensor_data_name)
+ if rr:
+ yield 'sensors/' + sensor_data_name, rr.groupdict()
+
+
+def make_iosum(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig) -> IOSummary:
+ lat = get_aggregated(rstorage, suite, job, "lat")
+ bins_edges = numpy.array(get_lat_vals(lat.second_axis_size), dtype='float32') / 1000
+ io = get_aggregated(rstorage, suite, job, "bw")
+
+ return IOSummary(job.qd,
+ nodes_count=len(suite.nodes_ids),
+ block_size=job.bsize,
+ lat=calc_histo_stat_props(lat, bins_edges, StyleProfile.hist_boxes),
+ bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
+
+#
+# def iter_io_results(rstorage: ResultStorage,
+# qds: List[int] = None,
+# op_types: List[str] = None,
+# sync_types: List[str] = None,
+# block_sizes: List[int] = None) -> Iterator[Tuple[TestSuiteConfig, FioJobConfig]]:
+#
+# for suite in rstorage.iter_suite(FioTest.name):
+# for job in rstorage.iter_job(suite):
+# fjob = cast(FioJobConfig, job)
+# assert int(fjob.vals['numjobs']) == 1
+#
+# if sync_types is not None and fjob.sync_mode in sync_types:
+# continue
+#
+# if block_sizes is not None and fjob.bsize not in block_sizes:
+# continue
+#
+# if op_types is not None and fjob.op_type not in op_types:
+# continue
+#
+# if qds is not None and fjob.qd not in qds:
+# continue
+#
+# yield suite, fjob
+
+
+def get_aggregated(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig, sensor: str) -> TimeSeries:
+ tss = list(rstorage.iter_ts(suite, job, sensor=sensor))
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id="__all__",
+ dev='fio',
+ sensor=sensor,
+ tag=None)
+
+ agg_ts = TimeSeries(sensor,
+ raw=None,
+ source=ds,
+ data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
+ times=tss[0].times.copy(),
+ second_axis_size=tss[0].second_axis_size)
+
+ for ts in tss:
+ if sensor == 'lat' and ts.second_axis_size != expected_lat_bins:
+ logger.error("Sensor %s.%s on node %s has" +
+ "second_axis_size=%s. Can only process sensors with second_axis_size=%s.",
+ ts.source.dev, ts.source.sensor, ts.source.node_id,
+ ts.second_axis_size, expected_lat_bins)
+ continue
+
+ if sensor != 'lat' and ts.second_axis_size != 1:
+ logger.error("Sensor %s.%s on node %s has" +
+ "second_axis_size=%s. Can only process sensors with second_axis_size=1.",
+ ts.source.dev, ts.source.sensor, ts.source.node_id, ts.second_axis_size)
+ continue
+
+ # TODO: match times on different ts
+ agg_ts.data += ts.data
+
+ return agg_ts
+
+
+# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
+
+def get_emb_data_svg(plt: Any) -> bytes:
+ bio = BytesIO()
+ plt.savefig(bio, format='svg')
+ img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+ return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
+
+
+def provide_plot(func: Callable[..., None]) -> Callable[..., str]:
+ @wraps(func)
+ def closure1(storage: ResultStorage, path: DataSource, *args, **kwargs) -> str:
+ fpath = storage.check_plot_file(path)
+ if not fpath:
+ func(*args, **kwargs)
+ fpath = storage.put_plot_file(get_emb_data_svg(plt), path)
+ plt.clf()
+ logger.debug("Save plot for %s to %r", path, fpath)
+ return fpath
+ return closure1
+
+
+def apply_style(style: StyleProfile, eng: bool = True, no_legend: bool = False) -> None:
+ if style.grid:
+ plt.grid(True)
+
+ if (style.legend_for_eng or not eng) and not no_legend:
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.03, 0.81)
+ plt.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+
+
+# -------------- PLOT FUNCTIONS --------------------------------------------------------------------------------------
+
+
+@provide_plot
+def plot_hist(title: str, units: str,
+ prop: StatProps,
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+
+ # TODO: unit should came from ts
+ total = sum(prop.bins_populations)
+ mids = prop.bins_mids
+ normed_bins = [population / total for population in prop.bins_populations]
+ bar_width = mids[1] - mids[0]
+ plt.bar(mids - bar_width / 2, normed_bins, color=colors.box_color, width=bar_width, label="Real data")
+
+ plt.xlabel(units)
+ plt.ylabel("Value probability")
+ plt.title(title)
+
+ dist_plotted = False
+ if isinstance(prop, NormStatProps):
+ nprop = cast(NormStatProps, prop)
+ stats = scipy.stats.norm(nprop.average, nprop.deviation)
+
+ # xpoints = numpy.linspace(mids[0], mids[-1], style.curve_approx_points)
+ # ypoints = stats.pdf(xpoints) / style.curve_approx_points
+
+ edges, step = numpy.linspace(mids[0], mids[-1], len(mids) * 10, retstep=True)
+
+ ypoints = stats.cdf(edges) * 11
+ ypoints = [next - prev for (next, prev) in zip(ypoints[1:], ypoints[:-1])]
+ xpoints = (edges[1:] + edges[:-1]) / 2
+
+ plt.plot(xpoints, ypoints, color=colors.primary_color, label="Expected from\nnormal distribution")
+ dist_plotted = True
+
+ apply_style(style, eng=True, no_legend=not dist_plotted)
+
+
+@provide_plot
+def plot_v_over_time(title: str, units: str,
+ ts: TimeSeries,
+ plot_avg_dev: bool = True,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+
+ min_time = min(ts.times)
+
+ # /1000 is us to ms conversion
+ time_points = [(val_time - min_time) / 1000 for val_time in ts.times]
+
+ alpha = colors.noise_alpha if plot_avg_dev else 1.0
+ plt.plot(time_points, ts.data, "o", color=colors.primary_color, alpha=alpha, label="Data")
+
+ if plot_avg_dev:
+ avg_vals = []
+ low_vals_dev = []
+ hight_vals_dev = []
+ avg_times = []
+ dev_times = []
+
+ start = (len(ts.data) % style.avg_range) // 2
+ points = list(range(start, len(ts.data) + 1, style.avg_range))
+
+ for begin, end in zip(points[:-1], points[1:]):
+ vals = ts.data[begin: end]
+
+ cavg = average(vals)
+ cdev = dev(vals)
+ tavg = average(time_points[begin: end])
+
+ avg_vals.append(cavg)
+ avg_times.append(tavg)
+
+ low_vals_dev.append(cavg - style.dev_range_x * cdev)
+ hight_vals_dev.append(cavg + style.dev_range_x * cdev)
+ dev_times.append(tavg)
+
+ avg_timepoints = cast(List[float], numpy.linspace(avg_times[0], avg_times[-1], style.curve_approx_points))
+
+ low_vals_dev = approximate_curve(dev_times, low_vals_dev, avg_timepoints, style.curve_approx_level)
+ hight_vals_dev = approximate_curve(dev_times, hight_vals_dev, avg_timepoints, style.curve_approx_level)
+ new_vals_avg = approximate_curve(avg_times, avg_vals, avg_timepoints, style.curve_approx_level)
+
+ plt.plot(avg_timepoints, new_vals_avg, c=colors.suppl_color1,
+ label="Average\nover {}s".format(style.avg_range))
+ plt.plot(avg_timepoints, low_vals_dev, c=colors.suppl_color2,
+ label="Avg \xB1 {} * stdev\nover {}s".format(style.dev_range_x, style.avg_range))
+ plt.plot(avg_timepoints, hight_vals_dev, c=colors.suppl_color2)
+
+ plt.xlim(-5, max(time_points) + 5)
+
+ plt.xlabel("Time, seconds from test begin")
+ plt.ylabel("{}. Average and \xB1stddev over {} points".format(units, style.avg_range))
+ plt.title(title)
+ apply_style(style, eng=True)
+
+
+@provide_plot
+def plot_lat_over_time(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+
+ min_time = min(ts.times)
+ times = [int(tm - min_time + 500) // 1000 for tm in ts.times]
+ ts_len = len(times)
+ step = ts_len / samples
+ points = [times[int(i * step + 0.5)] for i in range(samples)]
+ points.append(times[-1])
+ bounds = list(zip(points[:-1], points[1:]))
+ data = numpy.array(ts.data, dtype='int32')
+ data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size] # type: ignore
+ agg_data = []
+ positions = []
+ labels = []
+
+ min_idxs = []
+ max_idxs = []
+
+ for begin, end in bounds:
+ agg_hist = numpy.sum(data[begin:end], axis=0)
+
+ vals = numpy.empty(shape=(numpy.sum(agg_hist),), dtype='float32')
+ cidx = 0
+ non_zero = agg_hist.nonzero()[0]
+ min_idxs.append(non_zero[0])
+ max_idxs.append(non_zero[-1])
+
+ for pos in non_zero:
+ vals[cidx:cidx + agg_hist[pos]] = bins_vals[pos]
+ cidx += agg_hist[pos]
+
+ agg_data.append(vals)
+ positions.append((end + begin) / 2)
+ labels.append(str((end + begin) // 2))
+
+ min_y = bins_vals[min(min_idxs)]
+ max_y = bins_vals[max(max_idxs)]
+
+ min_y -= (max_y - min_y) * 0.05
+ max_y += (max_y - min_y) * 0.05
+
+ # plot box size adjust (only plot, not spines and legend)
+ plt.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
+ plt.xlim(min(times), max(times))
+ plt.ylim(min_y, max_y)
+ plt.xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
+ plt.ylabel("Latency, ms")
+ plt.title(title)
+ apply_style(style, eng=True, no_legend=True)
+
+
+@provide_plot
+def plot_heatmap(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+ hist_bins_count = 20
+ bin_top = [100 * 2 ** i for i in range(20)]
+ bin_ranges = [[0, 0]]
+ cborder_it = iter(bin_top)
+ cborder = next(cborder_it)
+ for bin_val in bins_vals:
+ if bin_val < cborder:
+ bin_ranges
+
+ # bins: [100us, 200us, ...., 104s]
+ # msp origin bins ranges to heatmap bins
+
+@provide_plot
+def io_chart(title: str,
+ legend: str,
+ iosums: List[IOSummary],
+ iops_log_spine: bool = False,
+ lat_log_spine: bool = False,
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+
+ # -------------- MAGIC VALUES ---------------------
+ # IOPS bar width
+ width = 0.35
+
+ # offset from center of bar to deviation/confidence range indicator
+ err_x_offset = 0.05
+
+ # figure size in inches
+ figsize = (12, 6)
+
+ # extra space on top and bottom, comparing to maximal tight layout
+ extra_y_space = 0.05
+
+ # additional spine for BW/IOPS on left side of plot
+ extra_io_spine_x_offset = -0.1
+
+ # extra space on left and right sides
+ extra_x_space = 0.5
+
+ # legend location settings
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.1, 0.81)
+
+ # plot box size adjust (only plot, not spines and legend)
+ plot_box_adjust = {'right': 0.66}
+ # -------------- END OF MAGIC VALUES ---------------------
+
+ block_size = iosums[0].block_size
+ lc = len(iosums)
+ xt = list(range(1, lc + 1))
+
+ # x coordinate of middle of the bars
+ xpos = [i - width / 2 for i in xt]
+
+ # import matplotlib.gridspec as gridspec
+ # gs = gridspec.GridSpec(1, 3, width_ratios=[1, 4, 1])
+ # p1 = plt.subplot(gs[1])
+
+ fig, p1 = plt.subplots(figsize=figsize)
+
+ # plot IOPS/BW bars
+ if block_size >= LARGE_BLOCKS:
+ iops_primary = False
+ coef = MiB2KiB
+ p1.set_ylabel("BW (MiBps)")
+ else:
+ iops_primary = True
+ coef = block_size
+ p1.set_ylabel("IOPS")
+
+ p1.bar(xpos, [iosum.bw.average / coef for iosum in iosums], width=width, color=colors.box_color, label=legend)
+
+ # set correct x limits for primary IO spine
+ min_io = min(iosum.bw.average - iosum.bw.deviation * style.dev_range_x for iosum in iosums)
+ max_io = max(iosum.bw.average + iosum.bw.deviation * style.dev_range_x for iosum in iosums)
+ border = (max_io - min_io) * extra_y_space
+ io_lims = (min_io - border, max_io + border)
+
+ p1.set_ylim(io_lims[0] / coef, io_lims[-1] / coef)
+
+ # plot deviation and confidence error ranges
+ err1_legend = err2_legend = None
+ for pos, iosum in zip(xpos, iosums):
+ err1_legend = p1.errorbar(pos + width / 2 - err_x_offset,
+ iosum.bw.average / coef,
+ iosum.bw.deviation * style.dev_range_x / coef,
+ alpha=colors.subinfo_alpha,
+ color=colors.suppl_color1) # 'magenta'
+ err2_legend = p1.errorbar(pos + width / 2 + err_x_offset,
+ iosum.bw.average / coef,
+ iosum.bw.confidence / coef,
+ alpha=colors.subinfo_alpha,
+ color=colors.suppl_color2) # 'teal'
+
+ if style.grid:
+ p1.grid(True)
+
+ handles1, labels1 = p1.get_legend_handles_labels()
+
+ handles1 += [err1_legend, err2_legend]
+ labels1 += ["{}% dev".format(style.dev_perc),
+ "{}% conf".format(int(100 * iosums[0].bw.confidence_level))]
+
+ # extra y spine for latency on right side
+ p2 = p1.twinx()
+
+ # plot median and 95 perc latency
+ p2.plot(xt, [iosum.lat.perc_50 for iosum in iosums], label="lat med")
+ p2.plot(xt, [iosum.lat.perc_95 for iosum in iosums], label="lat 95%")
+
+ # limit and label x spine
+ plt.xlim(extra_x_space, lc + extra_x_space)
+ plt.xticks(xt, ["{0} * {1}".format(iosum.qd, iosum.nodes_count) for iosum in iosums])
+ p1.set_xlabel("QD * Test node count")
+
+ # apply log scales for X spines, if set
+ if iops_log_spine:
+ p1.set_yscale('log')
+
+ if lat_log_spine:
+ p2.set_yscale('log')
+
+ # extra y spine for BW/IOPS on left side
+ if style.extra_io_spine:
+ p3 = p1.twinx()
+ if iops_log_spine:
+ p3.set_yscale('log')
+
+ if iops_primary:
+ p3.set_ylabel("BW (MiBps)")
+ p3.set_ylim(io_lims[0] / MiB2KiB, io_lims[1] / MiB2KiB)
+ else:
+ p3.set_ylabel("IOPS")
+ p3.set_ylim(io_lims[0] / block_size, io_lims[1] / block_size)
+
+ p3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
+ p3.spines["left"].set_visible(True)
+ p3.yaxis.set_label_position('left')
+ p3.yaxis.set_ticks_position('left')
+
+ p2.set_ylabel("Latency (ms)")
+
+ plt.title(title)
+
+ # legend box
+ handles2, labels2 = p2.get_legend_handles_labels()
+ plt.legend(handles1 + handles2, labels1 + labels2, loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+
+ # adjust central box size to fit legend
+ plt.subplots_adjust(**plot_box_adjust)
+ apply_style(style, eng=False, no_legend=True)
+
+
+# -------------------- REPORT HELPERS --------------------------------------------------------------------------------
+
+
class HTMLBlock:
data = None # type: str
js_links = [] # type: List[str]
css_links = [] # type: List[str]
+class Menu1st:
+ engineering = "Engineering"
+ summary = "Summary"
+
+
+class Menu2ndEng:
+ iops_time = "IOPS(time)"
+ hist = "IOPS/lat overall histogram"
+ lat_time = "Lat(time)"
+
+
+class Menu2ndSumm:
+ io_lat_qd = "IO & Lat vs QD"
+
+
+menu_1st_order = [Menu1st.summary, Menu1st.engineering]
+
+
+# -------------------- REPORTS --------------------------------------------------------------------------------------
+
+
class Reporter(metaclass=abc.ABCMeta):
@abc.abstractmethod
- def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: TestSuiteConfig, storage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
pass
@@ -81,8 +609,38 @@
# Main performance report
-class IOPS_QD(Reporter):
+class IO_QD(Reporter):
"""Creates graph, which show how IOPS and Latency depend on QD"""
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ ts_map = {} # type: Dict[FioTestSumm, List[IOSummary]]
+ str_summary = {} # type: Dict[FioTestSumm, List[IOSummary]]
+ for job in rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ tpl_no_qd = fjob.characterized_tuple_no_qd()
+ io_summ = make_iosum(rstorage, suite, job)
+
+ if tpl_no_qd not in ts_map:
+ ts_map[tpl_no_qd] = [io_summ]
+ str_summary[tpl_no_qd] = (fjob.summary_no_qd(), fjob.long_summary_no_qd())
+ else:
+ ts_map[tpl_no_qd].append(io_summ)
+
+ for tpl, iosums in ts_map.items():
+ iosums.sort(key=lambda x: x.qd)
+ summary, summary_long = str_summary[tlp]
+
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id="io_over_qd_".format(summary),
+ node_id="__all__",
+ dev='fio',
+ sensor="io_over_qd",
+ tag="svg")
+
+ title = "IOPS, BW, Lat vs. QD.\n" + summary_long
+ fpath = io_chart(rstorage, ds, title=title, legend="IOPS/BW", iosums=iosums)
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ if DEBUG:
+ return
# Linearization report
@@ -91,21 +649,231 @@
# IOPS/latency distribution
-class IOPSHist(Reporter):
+class IOHist(Reporter):
"""IOPS.latency distribution histogram"""
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ for job in rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000 # convert us to ms
+ lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, bins_count=StyleProfile.hist_boxes)
+
+ title = "Latency distribution. " + fjob.long_summary
+ units = "ms"
+
+ fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ else:
+ yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
+
+ agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+
+ if fjob.bsize >= LARGE_BLOCKS:
+ title = "BW distribution. " + fjob.long_summary
+ units = "MiBps"
+ agg_io.data /= MiB2KiB
+ else:
+ title = "IOPS distribution. " + fjob.long_summary
+ agg_io.data /= fjob.bsize
+ units = "IOPS"
+
+ io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
+ fpath = plot_hist(rstorage, agg_io.source(tag='hist.svg'), title, units, io_stat_prop)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ return
+ else:
+ yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
-# IOPS/latency over test time
-class IOPSTime(Reporter):
+# IOPS/latency over test time for each job
+class IOTime(Reporter):
"""IOPS/latency during test"""
- def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- pass
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ for job in rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000
+ title = "Latency during test. " + fjob.long_summary
+
+ fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), title, agg_lat, bins_edges)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ else:
+ yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+
+ fpath = plot_heatmap(rstorage, agg_lat.source(tag='hmap.svg'), title, agg_lat, bins_edges)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ else:
+ yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+
+ agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+ if fjob.bsize >= LARGE_BLOCKS:
+ title = "BW during test. " + fjob.long_summary
+ units = "MiBps"
+ agg_io.data /= MiB2KiB
+ else:
+ title = "IOPS during test. " + fjob.long_summary
+ agg_io.data /= fjob.bsize
+ units = "IOPS"
+
+ fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io)
+
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ return
+ else:
+ yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
+
+
+def is_sensor_numarray(sensor: str, metric: str) -> bool:
+ """Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
+ return True
+
+
+LEVEL_SENSORS = {("block-io", "io_queue"),
+ ("system-cpu", "procs_blocked"),
+ ("system-cpu", "procs_queue")}
+
+
+def is_level_sensor(sensor: str, metric: str) -> bool:
+ """Returns True if sensor measure level of any kind, E.g. queue depth."""
+ return (sensor, metric) in LEVEL_SENSORS
+
+
+def is_delta_sensor(sensor: str, metric: str) -> bool:
+ """Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
+ return not is_level_sensor(sensor, metric)
+
+
+
+def get_sensor(storage: Storage, node: str, sensor: str, dev: str, metric: str,
+ time_range: Tuple[int, int]) -> numpy.array:
+ """Return sensor values for given node for given period. Return per second estimated values array
+
+ Raise an error if required range is not full covered by data in storage.
+ First it finds range of results from sensor, which fully covers requested range.
+ ...."""
+
+ collected_at = numpy.array(storage.get_array("sensors/{}_collected_at".format(node)), dtype="int")
+ data = numpy.array(storage.get_array("sensors/{}_{}.{}.{}".format(node, sensor, dev, metric)))
+
+ # collected_at is array of pairs (collection_started_at, collection_finished_at)
+ collection_start_at = collected_at[::2]
+
+ MICRO = 1000000
+
+ # convert secods to us
+ begin = time_range[0] * MICRO
+ end = time_range[1] * MICRO
+
+ if begin < collection_start_at[0] or end > collection_start_at[-1] or end <= begin:
+ raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
+ "sensor = {}_{}.{}.{}").format(time_range,
+ collected_at[0] // MICRO,
+ collected_at[-1] // MICRO,
+ node, sensor, dev, metric))
+
+ pos1, pos2 = numpy.searchsorted(collection_start_at, (begin, end))
+ assert pos1 >= 1
+
+ time_bounds = collection_start_at[pos1 - 1: pos2]
+ edge_it = iter(time_bounds)
+ val_it = iter(data[pos1 - 1: pos2])
+
+ result = []
+ curr_summ = 0
+
+ results_cell_ends = begin + MICRO
+ curr_end = next(edge_it)
+
+ while results_cell_ends <= end:
+ curr_start = curr_end
+ curr_end = next(edge_it)
+ curr_val = next(val_it)
+ while curr_end >= results_cell_ends and results_cell_ends <= end:
+ current_part = (results_cell_ends - curr_start) / (curr_end - curr_start) * curr_val
+ result.append(curr_summ + current_part)
+ curr_summ = 0
+ curr_val -= current_part
+ curr_start = results_cell_ends
+ results_cell_ends += MICRO
+ curr_summ += curr_val
+
+ assert len(result) == (end - begin) // MICRO
+ return result
+
+
+class ResourceUsage:
+ def __init__(self, io_r_ops: int, io_w_ops: int, io_r_kb: int, io_w_kb: int) -> None:
+ self.io_w_ops = io_w_ops
+ self.io_r_ops = io_r_ops
+ self.io_w_kb = io_w_kb
+ self.io_r_kb = io_r_kb
+
+ self.cpu_used_user = None # type: int
+ self.cpu_used_sys = None # type: int
+ self.cpu_wait_io = None # type: int
+
+ self.net_send_packets = None # type: int
+ self.net_recv_packets = None # type: int
+ self.net_send_kb = None # type: int
+ self.net_recv_kb = None # type: int
# Cluster load over test time
class ClusterLoad(Reporter):
"""IOPS/latency during test"""
+ storage_sensors = [
+ ('block-io', 'reads_completed', "Read ops"),
+ ('block-io', 'writes_completed', "Write ops"),
+ ('block-io', 'sectors_read', "Read kb"),
+ ('block-io', 'sectors_written', "Write kb"),
+ ]
+
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ # split nodes on test and other
+ storage = rstorage.storage
+ nodes = storage.load_list(NodeInfo, "all_nodes") # type: List[NodeInfo]
+
+ test_nodes = {node.node_id for node in nodes if 'testnode' in node.roles}
+ cluster_nodes = {node.node_id for node in nodes if 'testnode' not in node.roles}
+
+ for job in rstorage.iter_job(suite):
+ # convert ms to s
+ time_range = (job.reliable_info_starts_at // MS2S, job.reliable_info_stops_at // MS2S)
+ len = time_range[1] - time_range[0]
+
+ for sensor, metric, sensor_title in self.storage_sensors:
+ sum_testnode = numpy.zeros((len,))
+ sum_other = numpy.zeros((len,))
+
+ for path, groups in iter_sensors(rstorage.storage, sensor=sensor, metric=metric):
+ data = get_sensor(rstorage.storage, groups['node'], sensor, groups['dev'], metric, time_range)
+ if groups['node'] in test_nodes:
+ sum_testnode += data
+ else:
+ sum_other += data
+
+ ds = DataSource(suite_id=suite.storage_id, job_id=job.summary, node_id="cluster",
+ dev=sensor, sensor=metric, tag="ts.svg")
+
+ # s to ms
+ ts = TimeSeries(name="", times=numpy.arange(*time_range) * MS2S, data=sum_testnode, raw=None)
+ fpath = plot_v_over_time(rstorage, ds, "{}.{}".format(sensor, metric), sensor_title, ts=ts)
+ yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
+
+ if DEBUG:
+ return
+
+
+# Ceph cluster summary
+class ResourceConsumption(Reporter):
+ """Resources consumption report, only text"""
+
# Node load over test time
class NodeLoad(Reporter):
@@ -117,12 +885,72 @@
"""IOPS/latency during test"""
-# TODO: Resource consumption report
# TODO: Ceph operation breakout report
# TODO: Resource consumption for different type of test
-#
+# ------------------------------------------ REPORT STAGES -----------------------------------------------------------
+
+
+class HtmlReportStage(Stage):
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ rstorage = ResultStorage(ctx.storage)
+ reporters = [ClusterLoad()] # IO_QD(), IOTime(), IOHist()] # type: List[Reporter]
+
+ root_dir = os.path.dirname(os.path.dirname(wally.__file__))
+ doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
+ report_template = open(doc_templ_path, "rt").read()
+ css_file_src = os.path.join(root_dir, "report_templates/main.css")
+ css_file = open(css_file_src, "rt").read()
+
+ menu_block = []
+ content_block = []
+ link_idx = 0
+
+ matplotlib.rcParams.update({'font.size': 10})
+
+ items = defaultdict(lambda: defaultdict(list)) # type: Dict[str, Dict[str, list]]
+ for suite in rstorage.iter_suite(FioTest.name):
+ for reporter in reporters:
+ for block, item, html in reporter.get_divs(suite, rstorage):
+ items[block][item].append(html)
+
+ for idx_1st, menu_1st in enumerate(sorted(items, key=lambda x: menu_1st_order.index(x))):
+ menu_block.append(
+ '<a href="#item{}" class="nav-group" data-toggle="collapse" data-parent="#MainMenu">{}</a>'
+ .format(idx_1st, menu_1st)
+ )
+ menu_block.append('<div class="collapse" id="item{}">'.format(idx_1st))
+ for menu_2nd in sorted(items[menu_1st]):
+ menu_block.append(' <a href="#content{}" class="nav-group-item">{}</a>'
+ .format(link_idx, menu_2nd))
+ content_block.append('<div id="content{}">'.format(link_idx))
+ content_block.extend(" " + x for x in items[menu_1st][menu_2nd])
+ content_block.append('</div>')
+ link_idx += 1
+ menu_block.append('</div>')
+
+ report = report_template.replace("{{{menu}}}", ("\n" + " " * 16).join(menu_block))
+ report = report.replace("{{{content}}}", ("\n" + " " * 16).join(content_block))
+ report_path = rstorage.put_report(report, "index.html")
+ rstorage.put_report(css_file, "main.css")
+ logger.info("Report is stored into %r", report_path)
+
+
+class ConsoleReportStage(Stage):
+
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ # TODO(koder): load data from storage
+ raise NotImplementedError("...")
+
+
+# --------------------------- LEGASY --------------------------------------------------------------------------------
+
+
# # disk_info = None
# # base = None
# # linearity = None
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 3616f47..1a148ed 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,5 +1,7 @@
+import abc
import array
-from typing import Dict, List, Any, Optional, Tuple, cast
+from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator
+from collections import OrderedDict
import numpy
@@ -7,13 +9,31 @@
from .node_interfaces import IRPCNode
-from .istorable import IStorable, Storable
+from .common_types import IStorable, Storable
from .utils import round_digits, Number
-class TestJobConfig(Storable):
- def __init__(self) -> None:
- self.summary = None # type: str
+class TestJobConfig(Storable, metaclass=abc.ABCMeta):
+ def __init__(self, idx: int) -> None:
+ self.idx = idx
+ self.reliable_info_time_range = None # type: Tuple[int, int]
+ self.vals = OrderedDict() # type: Dict[str, Any]
+
+ @property
+ def storage_id(self) -> str:
+ return "{}_{}".format(self.summary, self.idx)
+
+ @abc.abstractproperty
+ def characterized_tuple(self) -> Tuple:
+ pass
+
+ @abc.abstractproperty
+ def summary(self, *excluded_fields) -> str:
+ pass
+
+ @abc.abstractproperty
+ def long_summary(self, *excluded_fields) -> str:
+ pass
class TestSuiteConfig(IStorable):
@@ -31,15 +51,22 @@
params: Dict[str, Any],
run_uuid: str,
nodes: List[IRPCNode],
- remote_dir: str) -> None:
+ remote_dir: str,
+ idx: int) -> None:
self.test_type = test_type
self.params = params
self.run_uuid = run_uuid
self.nodes = nodes
- self.nodes_ids = [node.info.node_id() for node in nodes]
+ self.nodes_ids = [node.node_id for node in nodes]
self.remote_dir = remote_dir
+ self.storage_id = "{}_{}".format(self.test_type, idx)
- def __eq__(self, other: 'TestSuiteConfig') -> bool:
+ def __eq__(self, o: object) -> bool:
+ if type(o) is not self.__class__:
+ return False
+
+ other = cast(TestSuiteConfig, o)
+
return (self.test_type == other.test_type and
self.params == other.params and
set(self.nodes_ids) == set(other.nodes_ids))
@@ -62,23 +89,50 @@
return obj
+class DataSource:
+ def __init__(self,
+ suite_id: str = None,
+ job_id: str = None,
+ node_id: str = None,
+ dev: str = None,
+ sensor: str = None,
+ tag: str = None) -> None:
+ self.suite_id = suite_id
+ self.job_id = job_id
+ self.node_id = node_id
+ self.dev = dev
+ self.sensor = sensor
+ self.tag = tag
+
+ def __call__(self, **kwargs) -> 'DataSource':
+ dct = self.__dict__.copy()
+ dct.update(kwargs)
+ return self.__class__(**dct)
+
+ def __str__(self) -> str:
+ return "{0.suite_id}.{0.job_id}/{0.node_id}/{0.dev}.{0.sensor}.{0.tag}".format(self)
+
+ def __repr__(self) -> str:
+ return str(self)
+
+
class TimeSeries:
"""Data series from sensor - either system sensor or from load generator tool (e.g. fio)"""
def __init__(self,
name: str,
raw: Optional[bytes],
- data: array.array,
- times: array.array,
+ data: numpy.array,
+ times: numpy.array,
second_axis_size: int = 1,
- bins_edges: List[float] = None) -> None:
+ source: DataSource = None) -> None:
# Sensor name. Typically DEV_NAME.METRIC
self.name = name
# Time series times and values. Time in ms from Unix epoch.
- self.times = times # type: List[int]
- self.data = data # type: List[int]
+ self.times = times
+ self.data = data
# Not equal to 1 in case of 2d sensors, like latency, when each measurement is a histogram.
self.second_axis_size = second_axis_size
@@ -86,8 +140,18 @@
# Raw sensor data (is provided). Like log file for fio iops/bw/lat.
self.raw = raw
- # bin edges for historgam timeseries
- self.bins_edges = bins_edges
+ self.source = source
+
+ def __str__(self) -> str:
+ res = "TS({}):\n".format(self.name)
+ res += " source={}\n".format(self.source)
+ res += " times_size={}\n".format(len(self.times))
+ res += " data_size={}\n".format(len(self.data))
+ res += " data_shape={}x{}\n".format(len(self.data) // self.second_axis_size, self.second_axis_size)
+ return res
+
+ def __repr__(self) -> str:
+ return str(self)
# (node_name, source_dev, metric_name) => metric_results
@@ -96,7 +160,7 @@
class StatProps(IStorable):
"Statistic properties for timeseries with unknown data distribution"
- def __init__(self, data: List[Number]) -> None:
+ def __init__(self, data: numpy.array) -> None:
self.perc_99 = None # type: float
self.perc_95 = None # type: float
self.perc_90 = None # type: float
@@ -106,8 +170,8 @@
self.max = None # type: Number
# bin_center: bin_count
- self.bins_populations = None # type: List[int]
- self.bins_edges = None # type: List[float]
+ self.bins_populations = None # type: numpy.array
+ self.bins_mids = None # type: numpy.array
self.data = data
def __str__(self) -> str:
@@ -122,14 +186,16 @@
def raw(self) -> Dict[str, Any]:
data = self.__dict__.copy()
- data['bins_edges'] = list(self.bins_edges)
+ del data['data']
+ data['bins_mids'] = list(self.bins_mids)
data['bins_populations'] = list(self.bins_populations)
return data
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
- data['bins_edges'] = numpy.array(data['bins_edges'])
+ data['bins_mids'] = numpy.array(data['bins_mids'])
data['bins_populations'] = numpy.array(data['bins_populations'])
+ data['data'] = None
res = cls.__new__(cls)
res.__dict__.update(data)
return res
@@ -138,14 +204,14 @@
class HistoStatProps(StatProps):
"""Statistic properties for 2D timeseries with unknown data distribution and histogram as input value.
Used for latency"""
- def __init__(self, data: List[Number], second_axis_size: int) -> None:
+ def __init__(self, data: numpy.array, second_axis_size: int) -> None:
self.second_axis_size = second_axis_size
StatProps.__init__(self, data)
class NormStatProps(StatProps):
"Statistic properties for timeseries with normal data distribution. Used for iops/bw"
- def __init__(self, data: List[Number]) -> None:
+ def __init__(self, data: numpy.array) -> None:
StatProps.__init__(self, data)
self.average = None # type: float
@@ -153,6 +219,8 @@
self.confidence = None # type: float
self.confidence_level = None # type: float
self.normtest = None # type: NormaltestResult
+ self.skew = None # type: float
+ self.kurt = None # type: float
def __str__(self) -> str:
res = ["NormStatProps(size = {}):".format(len(self.data)),
@@ -163,13 +231,16 @@
" perc_95 = {}".format(round_digits(self.perc_95)),
" perc_99 = {}".format(round_digits(self.perc_99)),
" range {} {}".format(round_digits(self.min), round_digits(self.max)),
- " normtest = {0.normtest}".format(self)]
+ " normtest = {0.normtest}".format(self),
+ " skew ~ kurt = {0.skew} ~ {0.kurt}".format(self)]
return "\n".join(res)
def raw(self) -> Dict[str, Any]:
data = self.__dict__.copy()
data['normtest'] = (data['nortest'].statistic, data['nortest'].pvalue)
- data['bins_edges'] = list(self.bins_edges)
+ del data['data']
+ data['bins_mids'] = list(self.bins_mids)
+ data['bins_populations'] = list(self.bins_populations)
return data
@classmethod
@@ -195,3 +266,51 @@
self.run_interval = (begin_time, end_time)
self.raw = raw # type: JobMetrics
self.processed = None # type: JobStatMetrics
+
+
+class IResultStorage(metaclass=abc.ABCMeta):
+
+ @abc.abstractmethod
+ def sync(self) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_or_check_suite(self, suite: TestSuiteConfig) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_job(self, suite: TestSuiteConfig, job: TestJobConfig) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_ts(self, ts: TimeSeries) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_extra(self, data: bytes, source: DataSource) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_stat(self, data: StatProps, source: DataSource) -> None:
+ pass
+
+ @abc.abstractmethod
+ def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
+ pass
+
+ @abc.abstractmethod
+ def iter_suite(self, suite_type: str = None) -> Iterator[TestSuiteConfig]:
+ pass
+
+ @abc.abstractmethod
+ def iter_job(self, suite: TestSuiteConfig) -> Iterator[TestJobConfig]:
+ pass
+
+ @abc.abstractmethod
+ def iter_ts(self, suite: TestSuiteConfig, job: TestJobConfig) -> Iterator[TimeSeries]:
+ pass
+
+ # return path to file to be inserted into report
+ @abc.abstractmethod
+ def put_plot_file(self, data: bytes, source: DataSource) -> str:
+ pass
diff --git a/wally/run_test.py b/wally/run_test.py
index a11b43d..d3c68b6 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -10,21 +10,11 @@
from .node_interfaces import NodeInfo, IRPCNode
from .stage import Stage, StepOrder
from .sensors import collect_sensors_data
-from .suits.io.fio import IOPerfTest
-from .suits.itest import TestSuiteConfig
-from .suits.mysql import MysqlTest
-from .suits.omgbench import OmgTest
-from .suits.postgres import PgBenchTest
+from .suits.all_suits import all_suits
from .test_run_class import TestRun
from .utils import StopTestError
-
-
-TOOL_TYPE_MAPPER = {
- "io": IOPerfTest,
- "pgbench": PgBenchTest,
- "mysql": MysqlTest,
- "omg": OmgTest,
-}
+from .result_classes import TestSuiteConfig
+from .hlstorage import ResultStorage
logger = logging.getLogger("wally")
@@ -81,7 +71,7 @@
if ctx.config.get("download_rpc_logs", False):
for node in ctx.nodes:
if node.rpc_log_file is not None:
- nid = node.info.node_id()
+ nid = node.node_id
path = "rpc_logs/" + nid
node.conn.server.flush_logs()
log = node.get_file_content(node.rpc_log_file)
@@ -110,7 +100,7 @@
with ctx.get_pool() as pool:
# can't make next RPC request until finish with previous
for node in ctx.nodes:
- nid = node.info.node_id()
+ nid = node.node_id
hw_info_path = "hw_info/{}".format(nid)
if hw_info_path not in ctx.storage:
futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
@@ -124,7 +114,7 @@
futures.clear()
for node in ctx.nodes:
- nid = node.info.node_id()
+ nid = node.node_id
sw_info_path = "sw_info/{}".format(nid)
if sw_info_path not in ctx.storage:
futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
@@ -253,15 +243,17 @@
logger.error("No nodes found for test, skipping it.")
continue
- test_cls = TOOL_TYPE_MAPPER[name]
+ test_cls = all_suits[name]
remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
- test_cfg = TestSuiteConfig(test_cls.name,
- params=params,
- run_uuid=ctx.config.run_uuid,
- nodes=test_nodes,
- remote_dir=remote_dir)
+ suite = TestSuiteConfig(test_cls.name,
+ params=params,
+ run_uuid=ctx.config.run_uuid,
+ nodes=test_nodes,
+ remote_dir=remote_dir,
+ idx=suite_idx)
- test_cls(storage=ctx.storage, config=test_cfg, idx=suite_idx,
+ test_cls(storage=ResultStorage(ctx.storage),
+ suite=suite,
on_idle=lambda: collect_sensors_data(ctx, False)).run()
@classmethod
@@ -278,6 +270,6 @@
logger.error("Internal error: Some nodes already stored in " +
"nodes_info before LoadStoredNodesStage stage")
raise StopTestError()
- ctx.nodes_info = {node.node_id(): node
+ ctx.nodes_info = {node.node_id: node
for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
logger.info("%s nodes loaded from database", len(ctx.nodes_info))
diff --git a/wally/sensors.py b/wally/sensors.py
index 0a1bbe9..54ae1ad 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -63,7 +63,7 @@
for role in node.info.roles:
node_cfg.update(per_role_config.get(role, {})) # type: ignore
- nid = node.info.node_id()
+ nid = node.node_id
if node_cfg:
# ceph requires additional settings
if 'ceph' in node_cfg:
@@ -81,7 +81,7 @@
def collect_sensors_data(ctx: TestRun, stop: bool = False):
for node in ctx.nodes:
- node_id = node.info.node_id()
+ node_id = node.node_id
if node_id in ctx.sensors_run_on:
if stop:
@@ -91,7 +91,7 @@
# TODO: data is unpacked/repacked here with no reason
for path, value in sensors_rpc_plugin.unpack_rpc_updates(func()):
- ctx.storage.append(value, "metric", node_id, path)
+ ctx.storage.append(value, "sensors/{}_{}".format(node_id, path))
class CollectSensorsStage(Stage):
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index be5e5db..7400ab5 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -2,6 +2,7 @@
import sys
import json
import time
+import zlib
import array
import pprint
import logging
@@ -11,6 +12,9 @@
import collections
+import Pool # type: ignore
+
+
mod_name = "sensors"
__version__ = (0, 1)
@@ -36,7 +40,7 @@
pass
@classmethod
- def unpack_results(cls, device, metrics, data):
+ def unpack_results(cls, device, metrics, data, typecode):
pass
def init(self):
@@ -52,26 +56,30 @@
def __init__(self, params, allowed=None, disallowed=None):
Sensor.__init__(self, params, allowed, disallowed)
self.data = collections.defaultdict(lambda: array.array(self.typecode))
+ self.prev_vals = {}
def add_data(self, device, name, value):
self.data[(device, name)].append(value)
+ def add_relative(self, device, name, value):
+ key = (device, name)
+ pval = self.prev_vals.get(key)
+ if pval is not None:
+ self.data[key].append(value - pval)
+ self.prev_vals[key] = value
+
def get_updates(self):
res = self.data
self.data = collections.defaultdict(lambda: array.array(self.typecode))
- packed = {
- name: arr.typecode + arr.tostring()
- for name, arr in res.items()
- }
- return packed
+ return {key: (arr.typecode, arr.tostring()) for key, arr in res.items()}
@classmethod
- def unpack_results(cls, device, metrics, packed):
- arr = array.array(chr(packed[0]))
+ def unpack_results(cls, device, metrics, packed, typecode):
+ arr = array.array(typecode)
if sys.version_info >= (3, 0, 0):
- arr.frombytes(packed[1:])
+ arr.frombytes(packed)
else:
- arr.fromstring(packed[1:])
+ arr.fromstring(packed)
return arr
def is_dev_accepted(self, name):
@@ -86,6 +94,9 @@
return dev_ok
+time_array_typechar = ArraysSensor.typecode
+
+
def provides(name):
def closure(cls):
SensorsMap[name] = cls
@@ -157,24 +168,32 @@
def __init__(self, *args, **kwargs):
ArraysSensor.__init__(self, *args, **kwargs)
+
if self.disallowed is None:
self.disallowed = ('ram', 'loop')
- self.allowed_devs = set()
for line in open('/proc/diskstats'):
- dev_name = line.split()[2]
- if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
- self.allowed_devs.add(dev_name)
-
- def collect(self):
- for line in open('/proc/diskstats'):
vals = line.split()
dev_name = vals[2]
- if dev_name not in self.allowed_devs:
+ if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
+ self.allowed_names.add(dev_name)
+
+ self.collect(init_rel=True)
+
+ def collect(self, init_rel=False):
+ for line in open('/proc/diskstats'):
+ vals = line.split()
+ dev_name = vals[2]
+
+ if dev_name not in self.allowed_names:
continue
- for pos, name, _ in self.io_values_pos:
- self.add_data(dev_name, name, int(vals[pos]))
+ for pos, name, aggregated in self.io_values_pos:
+ vl = int(vals[pos])
+ if aggregated:
+ self.add_relative(dev_name, name, vl)
+ elif not init_rel:
+ self.add_data(dev_name, name, int(vals[pos]))
@provides("net-io")
@@ -210,21 +229,26 @@
if self.allowed is None:
self.allowed = ('eth',)
- self.allowed_devs = set()
- for line in open('/proc/net/dev').readlines()[2:]:
- dev_name = line.split(":", 1)[0].strip()
- dev_ok = self.is_dev_accepted(dev_name)
- if dev_ok and ('.' not in dev_name or not dev_name.split('.')[-1].isdigit()):
- self.allowed_devs.add(dev_name)
+ for _, _, aggregated in self.net_values_pos:
+ assert aggregated, "Non-aggregated values is not supported in net sensor"
- def collect(self):
for line in open('/proc/net/dev').readlines()[2:]:
dev_name, stats = line.split(":", 1)
dev_name = dev_name.strip()
- if dev_name in self.allowed_devs:
+ if self.is_dev_accepted(dev_name):
+ self.allowed_names.add(dev_name)
+
+ self.collect(init_rel=True)
+
+ def collect(self, init_rel=False):
+ for line in open('/proc/net/dev').readlines()[2:]:
+ dev_name, stats = line.split(":", 1)
+ dev_name = dev_name.strip()
+ if dev_name in self.allowed_names:
vals = stats.split()
for pos, name, _ in self.net_values_pos:
- self.add_data(dev_name, name, int(vals[pos]))
+ vl = int(vals[pos])
+ self.add_relative(dev_name, name, vl )
def pid_stat(pid):
@@ -448,27 +472,28 @@
res = super().get_updates()
for osd_id, data in self.historic.items():
- res[("osd{}".format(osd_id), "historic")] = data
+ res[("osd{}".format(osd_id), "historic")] = (None, data)
self.historic = {}
for osd_id, data in self.in_flight.items():
- res[("osd{}".format(osd_id), "in_flight")] = data
+ res[("osd{}".format(osd_id), "in_flight")] = (None, data)
self.in_flight = {}
return res
@classmethod
- def unpack_results(cls, device, metrics, packed):
+ def unpack_results(cls, device, metrics, packed, typecode):
if metrics in ('historic', 'in_flight'):
+ assert typecode is None
return packed
- arr = array.array(chr(packed[0]))
+ arr = array.array(typecode)
if sys.version_info >= (3, 0, 0):
- arr.frombytes(packed[1:])
+ arr.frombytes(packed)
else:
- arr.fromstring(packed[1:])
+ arr.fromstring(packed)
return arr
@@ -476,7 +501,7 @@
class SensorsData(object):
def __init__(self):
self.cond = threading.Condition()
- self.collected_at = array.array("f")
+ self.collected_at = array.array(time_array_typechar)
self.stop = False
self.sensors = {}
self.data_fd = None # temporary file to store results
@@ -501,12 +526,23 @@
def sensors_bg_thread(sensors_config, sdata):
try:
next_collect_at = time.time()
+ if "pool_sz" in sensors_config:
+ sensors_config = sensors_config.copy()
+ pool_sz = sensors_config.pop("pool_sz")
+ else:
+ pool_sz = 32
+
+ if pool_sz != 0:
+ pool = Pool(sensors_config.get("pool_sz"))
+ else:
+ pool = None
# prepare sensor classes
with sdata.cond:
sdata.sensors = {}
for name, config in sensors_config.items():
params = {'params': config}
+ logger.debug("Start sensor %r with config %r", name, config)
if "allow" in config:
params["allowed_prefixes"] = config["allow"]
@@ -535,11 +571,18 @@
ctm = time.time()
with sdata.cond:
- sdata.collected_at.append(ctm)
- for sensor in sdata.sensors.values():
- sensor.collect()
+ sdata.collected_at.append(int(ctm * 1000000))
+ if pool is not None:
+ caller = lambda x: x()
+ for ok, val in pool.map(caller, [sensor.collect for sensor in sdata.sensors.values()]):
+ if not ok:
+ raise val
+ else:
+ for sensor in sdata.sensors.values():
+ sensor.collect()
etm = time.time()
- sdata.collected_at.append(etm)
+ sdata.collected_at.append(int(etm * 1000000))
+ logger.debug("Add data to collected_at - %s, %s", ctm, etm)
if etm - ctm > 0.1:
# TODO(koder): need to signal that something in not really ok with sensor collecting
@@ -551,7 +594,6 @@
finally:
for sensor in sdata.sensors.values():
sensor.stop()
- sdata.sensors = None
sensors_thread = None
@@ -577,39 +619,46 @@
def unpack_rpc_updates(res_tuple):
- data, collected_at_b = res_tuple
- collected_at = array.array('f')
+ offset_map, compressed_blob, compressed_collected_at_b = res_tuple
+ blob = zlib.decompress(compressed_blob)
+ collected_at_b = zlib.decompress(compressed_collected_at_b)
+ collected_at = array.array(time_array_typechar)
collected_at.frombytes(collected_at_b)
yield 'collected_at', collected_at
# TODO: data is unpacked/repacked here with no reason
- for sensor_path, packed_data in data.items():
+ for sensor_path, (offset, size, typecode) in offset_map.items():
sensor_path = sensor_path.decode("utf8")
sensor_name, device, metric = sensor_path.split('.', 2)
- data = SensorsMap[sensor_name].unpack_results(device, metric, packed_data)
- yield sensor_path, data
+ sensor_data = SensorsMap[sensor_name].unpack_results(device,
+ metric,
+ blob[offset:offset + size],
+ typecode.decode("ascii"))
+ yield sensor_path, sensor_data
def rpc_get_updates():
if sdata is None:
raise ValueError("No sensor thread running")
- res = collected_at = None
+ offset_map = collected_at = None
+ blob = ""
with sdata.cond:
if sdata.exception:
raise Exception(sdata.exception)
- res = {}
+ offset_map = {}
for sensor_name, sensor in sdata.sensors.items():
- for (device, metrics), val in sensor.get_updates().items():
- res["{}.{}.{}".format(sensor_name, device, metrics)] = val
+ for (device, metrics), (typecode, val) in sensor.get_updates().items():
+ offset_map["{}.{}.{}".format(sensor_name, device, metrics)] = (len(blob), len(val), typecode)
+ blob += val
collected_at = sdata.collected_at
sdata.collected_at = array.array(sdata.collected_at.typecode)
- # TODO: pack data before send
- return res, collected_at.tostring()
+ logger.debug(str(collected_at))
+ return offset_map, zlib.compress(blob), zlib.compress(collected_at.tostring())
def rpc_stop():
diff --git a/wally/statistic.py b/wally/statistic.py
index 259ac69..b80fb22 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -1,5 +1,4 @@
import math
-import array
import logging
import itertools
import statistics
@@ -17,17 +16,19 @@
logger = logging.getLogger("wally")
DOUBLE_DELTA = 1e-8
+MIN_VALUES_FOR_CONFIDENCE = 7
-average = statistics.mean
-dev = lambda x: math.sqrt(statistics.variance(x))
+average = numpy.mean
+dev = lambda x: math.sqrt(numpy.var(x, ddof=1))
-def calc_norm_stat_props(ts: TimeSeries, confidence: float = 0.95) -> NormStatProps:
+def calc_norm_stat_props(ts: TimeSeries, bins_count: int, confidence: float = 0.95) -> NormStatProps:
"Calculate statistical properties of array of numbers"
- data = ts.data
- res = NormStatProps(data)
+ # array.array has very basic support
+ data = cast(List[int], ts.data)
+ res = NormStatProps(data) # type: ignore
if len(data) == 0:
raise ValueError("Input array is empty")
@@ -39,57 +40,104 @@
res.max = data[-1]
res.min = data[0]
- res.perc_50 = numpy.percentile(data, 50)
- res.perc_90 = numpy.percentile(data, 90)
- res.perc_95 = numpy.percentile(data, 95)
- res.perc_99 = numpy.percentile(data, 99)
+ res.perc_50, res.perc_90, res.perc_99, res.perc_99 = numpy.percentile(data, q=[50., 90., 95., 99.])
- if len(data) >= 3:
+ if len(data) >= MIN_VALUES_FOR_CONFIDENCE:
res.confidence = stats.sem(data) * \
stats.t.ppf((1 + confidence) / 2, len(data) - 1)
+ res.confidence_level = confidence
else:
res.confidence = None
+ res.confidence_level = None
- res.bin_populations, res.bin_edges = numpy.histogram(data, 'auto')
+ res.bins_populations, bins_edges = numpy.histogram(data, bins=bins_count)
+ res.bins_mids = (bins_edges[:-1] + bins_edges[1:]) / 2
try:
res.normtest = stats.mstats.normaltest(data)
except Exception as exc:
logger.warning("stats.mstats.normaltest failed with error: %s", exc)
+ res.skew = stats.skew(data)
+ res.kurt = stats.kurtosis(data)
+
return res
-def calc_histo_stat_props(ts: TimeSeries) -> HistoStatProps:
- data = numpy.array(ts.data)
- data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size]
+def calc_histo_stat_props(ts: TimeSeries,
+ bins_edges: numpy.array,
+ bins_count: int,
+ min_valuable: float = 0.0001) -> HistoStatProps:
+ data = numpy.array(ts.data, dtype='int')
+ data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size] # type: ignore
res = HistoStatProps(ts.data, ts.second_axis_size)
- aggregated = numpy.sum(data, axis=0)
- full_sum = numpy.sum(aggregated)
- expected = [full_sum * 0.5, full_sum * 0.9, full_sum * 0.95, full_sum * 0.99]
+ # summ across all series
+ aggregated = numpy.sum(data, axis=0, dtype='int')
+ total = numpy.sum(aggregated)
+
+ # minimal value used for histo
+ min_val_on_histo = total * min_valuable
+
+ # percentiles levels
+ expected = [total * 0.5, total * 0.9, total * 0.95, total * 0.99]
percentiles = []
- val_min = None
- val_max = None
+ # all indexes, where values greater than min_val_on_histo
+ valuable_idxs = []
- for idx, val in enumerate(aggregated):
- while expected and full_sum + val >= expected[0]:
- percentiles.append(idx)
+ curr_summ = 0
+ non_zero = aggregated.nonzero()[0]
+
+ # calculate percentiles and valuable_indexes
+ for idx in non_zero:
+ val = aggregated[idx]
+ while expected and curr_summ + val >= expected[0]:
+ percentiles.append(bins_edges[idx])
del expected[0]
- full_sum += val
+ curr_summ += val
- if val != 0:
- if val_min is None:
- val_min = idx
- val_max = idx
+ if val >= min_val_on_histo:
+ valuable_idxs.append(idx)
- res.perc_50, res.perc_90, res.perc_95, res.perc_99 = map(ts.bins_edges.__getitem__, percentiles)
- res.min = ts.bins_edges[val_min]
- res.max = ts.bins_edges[val_max]
- res.bin_populations = aggregated
+ res.perc_50, res.perc_90, res.perc_95, res.perc_99 = percentiles
+
+ # minimax and maximal non-zero elements
+ res.min = bins_edges[aggregated[non_zero[0]]]
+ res.max = bins_edges[non_zero[-1] + (1 if non_zero[-1] != len(bins_edges) else 0)]
+
+ # minimal and maximal valueble evelemts
+ val_idx_min = valuable_idxs[0]
+ val_idx_max = valuable_idxs[-1]
+
+ raw_bins_populations = aggregated[val_idx_min: val_idx_max + 1]
+ raw_bins_edges = bins_edges[val_idx_min: val_idx_max + 2]
+ raw_bins_mids = cast(numpy.array, (raw_bins_edges[1:] + raw_bins_edges[:-1]) / 2)
+
+ step = (raw_bins_mids[-1] + raw_bins_mids[0]) / bins_count
+ next = raw_bins_mids[0]
+
+ # aggregate raw histogram with many bins into result histogram with bins_count bins
+ cidx = 0
+ bins_populations = []
+ bins_mids = []
+
+ while cidx < len(raw_bins_mids):
+ next += step
+ bin_population = 0
+
+ while cidx < len(raw_bins_mids) and raw_bins_mids[cidx] <= next:
+ bin_population += raw_bins_populations[cidx]
+ cidx += 1
+
+ bins_populations.append(bin_population)
+ bins_mids.append(next - step / 2)
+
+ res.bins_populations = numpy.array(bins_populations, dtype='int')
+ res.bins_mids = numpy.array(bins_mids, dtype='float32')
+
return res
diff --git a/wally/storage.py b/wally/storage.py
index e4e010c..3e8bbab 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -3,11 +3,12 @@
"""
import os
+import re
import abc
import array
import shutil
import sqlite3
-import threading
+import logging
from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable, Iterator
import yaml
@@ -17,7 +18,10 @@
from yaml import Loader, Dumper # type: ignore
-from .result_classes import IStorable
+from .common_types import IStorable
+
+
+logger = logging.getLogger("wally")
class ISimpleStorage(metaclass=abc.ABCMeta):
@@ -278,6 +282,10 @@
class Storage:
"""interface for storage"""
+
+ typechar_pad_size = 16
+ typepad = bytes(0 for i in range(typechar_pad_size - 1))
+
def __init__(self, fs_storage: ISimpleStorage, db_storage: ISimpleStorage, serializer: ISerializer) -> None:
self.fs = fs_storage
self.db = db_storage
@@ -287,15 +295,15 @@
fpath = "/".join(path)
return self.__class__(self.fs.sub_storage(fpath), self.db.sub_storage(fpath), self.serializer)
- def put(self, value: IStorable, *path: str) -> None:
- dct_value = value.raw() if isinstance(value, IStorable) else value
- serialized = self.serializer.pack(dct_value)
+ def put(self, value: Any, *path: str) -> None:
+ dct_value = cast(IStorable, value).raw() if isinstance(value, IStorable) else value
+ serialized = self.serializer.pack(dct_value) # type: ignore
fpath = "/".join(path)
self.db.put(serialized, fpath)
self.fs.put(serialized, fpath)
def put_list(self, value: Iterable[IStorable], *path: str) -> None:
- serialized = self.serializer.pack([obj.raw() for obj in value])
+ serialized = self.serializer.pack([obj.raw() for obj in value]) # type: ignore
fpath = "/".join(path)
self.db.put(serialized, fpath)
self.fs.put(serialized, fpath)
@@ -318,8 +326,14 @@
def __contains__(self, path: str) -> bool:
return path in self.fs or path in self.db
- def put_raw(self, val: bytes, *path: str) -> None:
- self.fs.put(val, "/".join(path))
+ def put_raw(self, val: bytes, *path: str) -> str:
+ fpath = "/".join(path)
+ self.fs.put(val, fpath)
+ # TODO: dirty hack
+ return self.resolve_raw(fpath)
+
+ def resolve_raw(self, fpath) -> str:
+ return cast(FSStorage, self.fs).j(fpath)
def get_raw(self, *path: str) -> bytes:
return self.fs.get("/".join(path))
@@ -333,35 +347,52 @@
return self.fs.get_fd(path, mode)
def put_array(self, value: array.array, *path: str) -> None:
+ typechar = value.typecode.encode('ascii')
+ assert len(typechar) == 1
with self.get_fd("/".join(path), "wb") as fd:
+ fd.write(typechar + self.typepad)
value.tofile(fd) # type: ignore
- def get_array(self, typecode: str, *path: str) -> array.array:
- res = array.array(typecode)
+ def get_array(self, *path: str) -> array.array:
path_s = "/".join(path)
with self.get_fd(path_s, "rb") as fd:
fd.seek(0, os.SEEK_END)
- size = fd.tell()
+ size = fd.tell() - self.typechar_pad_size
fd.seek(0, os.SEEK_SET)
+ typecode = chr(fd.read(self.typechar_pad_size)[0])
+ res = array.array(typecode)
assert size % res.itemsize == 0, "Storage object at path {} contains no array of {} or corrupted."\
.format(path_s, typecode)
res.fromfile(fd, size // res.itemsize) # type: ignore
return res
def append(self, value: array.array, *path: str) -> None:
+ typechar = value.typecode.encode('ascii')
+ assert len(typechar) == 1
+ expected_typeheader = typechar + self.typepad
with self.get_fd("/".join(path), "cb") as fd:
fd.seek(0, os.SEEK_END)
+ if fd.tell() != 0:
+ fd.seek(0, os.SEEK_SET)
+ real_typecode = fd.read(self.typechar_pad_size)
+ if real_typecode[0] != expected_typeheader[0]:
+ logger.error("Try to append array with typechar %r to array with typechar %r at path %r",
+ value.typecode, typechar, "/".join(path))
+ raise StopIteration()
+ fd.seek(0, os.SEEK_END)
+ else:
+ fd.write(expected_typeheader)
value.tofile(fd) # type: ignore
def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
path_s = "/".join(path)
raw_val = cast(List[Dict[str, Any]], self.get(path_s))
assert isinstance(raw_val, list)
- return [obj_class.fromraw(val) for val in raw_val]
+ return [cast(ObjClass, obj_class.fromraw(val)) for val in raw_val]
def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
path_s = "/".join(path)
- return obj_class.fromraw(self.get(path_s))
+ return cast(ObjClass, obj_class.fromraw(self.get(path_s)))
def sync(self) -> None:
self.db.sync()
@@ -376,6 +407,33 @@
def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
return self.fs.list("/".join(path))
+ def _iter_paths(self,
+ root: str,
+ path_parts: List[str],
+ groups: Dict[str, str]) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
+
+ curr = path_parts[0]
+ rest = path_parts[1:]
+
+ for is_file, name in self.list(root):
+ if rest and is_file:
+ continue
+
+ rr = re.match(pattern=curr + "$", string=name)
+ if rr:
+ if root:
+ path = root + "/" + name
+ else:
+ path = name
+
+ new_groups = rr.groupdict().copy()
+ new_groups.update(groups)
+
+ if rest:
+ yield from self._iter_paths(path, rest, new_groups)
+ else:
+ yield is_file, path, new_groups
+
def make_storage(url: str, existing: bool = False) -> Storage:
return Storage(FSStorage(url, existing),
diff --git a/wally/storage_structure.yaml b/wally/storage_structure.yaml
index e748dd8..8493e29 100644
--- a/wally/storage_structure.yaml
+++ b/wally/storage_structure.yaml
@@ -5,6 +5,7 @@
# {dev} - device name '[^.]+'
# {suite} - suite name '[a-z]+'
# {profile} - profile name '[a-z_]+'
+# {sensor} - sensor name '[-a-z]+'
config: Config # test input configuration
@@ -14,6 +15,7 @@
fuel_version: List[int] # FUEL master node version
fuel_os_creds: OSCreds # openstack creds, discovered from fuel (or None)
openstack_openrc: OSCreds # openrc used for openstack cluster
+
info:
comment : str # run comment
run_uuid : str # run uuid
@@ -26,11 +28,11 @@
# dev in next line is tool name - fio/vdbench/....
'{node}_{dev}.{metric_name}:raw' : bytes # raw log, where name from {'bw', 'iops', 'lat', ..}
+ '{node}_{dev}.{metric_name}:stat' : StatProps # type of props detected by content
'{node}_{dev}.{metric_name}': List[uint64] # measurements data concatenated with collect times in
- # microseconds from unix epoch
-
-sensors:
- '{node}_{dev}.{metric_name}:raw' : bytes # raw log, where name from {'bw', 'iops', 'lat', ..}
- '{node}_{dev}.{metric_name}': List[uint64] # measurements data cotaneted with collect times in microseconds from unix epoch
+ # microseconds from unix epoch and typechars
+'sensors/{node}_{sensor}.{dev}.{metric_name}': typechar + array[uint64] # sensor values
+'sensors/{node}_{sensor}.{dev}.{metric_name}:stat': StatProps # statistic data
+'sensors/{node}_collected_at': typechar + array[uint64] # collection time
'rpc_logs/{node}' : bytes # rpc server log from node
diff --git a/wally/suits/all_suits.py b/wally/suits/all_suits.py
new file mode 100644
index 0000000..310b1f5
--- /dev/null
+++ b/wally/suits/all_suits.py
@@ -0,0 +1,8 @@
+from .io.fio import FioTest
+# from .suits.itest import TestSuiteConfig
+# from .suits.mysql import MysqlTest
+# from .suits.omgbench import OmgTest
+# from .suits.postgres import PgBenchTest
+
+
+all_suits = {suite.name: suite for suite in [FioTest]}
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index fcf5c16..c3dee19 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -18,7 +18,6 @@
filename={FILENAME}
size={FILESIZE}
-write_iops_log=fio_iops_log
write_bw_log=fio_bw_log
log_avg_msec=1000
write_hist_log=fio_lat_hist_log
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 7b2c3e3..33e8343 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,14 +1,15 @@
import array
import os.path
import logging
-from typing import cast, Any
+from typing import cast, Any, Tuple, List
import wally
-from ...utils import StopTestError, get_os, ssize2b
+from ...utils import StopTestError, ssize2b, b2ssize
from ...node_interfaces import IRPCNode
+from ...node_utils import get_os
from ..itest import ThreadedTest
-from ...result_classes import TimeSeries, JobMetrics
+from ...result_classes import TimeSeries, DataSource, TestJobConfig
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
from . import rpc_plugin
from .fio_hist import expected_lat_bins
@@ -17,7 +18,7 @@
logger = logging.getLogger("wally")
-class IOPerfTest(ThreadedTest):
+class FioTest(ThreadedTest):
soft_runcycle = 5 * 60
retry_time = 30
configs_dir = os.path.dirname(__file__) # type: str
@@ -27,7 +28,7 @@
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
- get = self.config.params.get
+ get = self.suite.params.get
self.remote_task_file = self.join_remote("task.fio")
self.remote_output_file = self.join_remote("fio_result.json")
@@ -35,7 +36,7 @@
self.use_sudo = get("use_sudo", True) # type: bool
self.force_prefill = get('force_prefill', False) # type: bool
- self.load_profile_name = self.config.params['load'] # type: str
+ self.load_profile_name = self.suite.params['load'] # type: str
if os.path.isfile(self.load_profile_name):
self.load_profile_path = self.load_profile_name # type: str
@@ -47,16 +48,16 @@
if self.use_system_fio:
self.fio_path = "fio" # type: str
else:
- self.fio_path = os.path.join(self.config.remote_dir, "fio")
+ self.fio_path = os.path.join(self.suite.remote_dir, "fio")
- self.load_params = self.config.params['params']
+ self.load_params = self.suite.params['params']
self.file_name = self.load_params['FILENAME']
if 'FILESIZE' not in self.load_params:
logger.debug("Getting test file sizes on all nodes")
try:
- sizes = {node.conn.fs.file_stat(self.file_name)['size']
- for node in self.config.nodes}
+ sizes = {node.conn.fs.file_stat(self.file_name)[b'size']
+ for node in self.suite.nodes}
except Exception:
logger.exception("FILESIZE is not set in config file and fail to detect it." +
"Set FILESIZE or fix error and rerun test")
@@ -68,7 +69,7 @@
raise StopTestError()
self.file_size = list(sizes)[0]
- logger.info("Detected test file size is %s", self.file_size)
+ logger.info("Detected test file size is %sB", b2ssize(self.file_size))
self.load_params['FILESIZE'] = self.file_size
else:
self.file_size = ssize2b(self.load_params['FILESIZE'])
@@ -80,31 +81,41 @@
logger.error("Empty fio config provided")
raise StopTestError()
- self.exec_folder = self.config.remote_dir
+ self.exec_folder = self.suite.remote_dir
def config_node(self, node: IRPCNode) -> None:
plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read() # type: bytes
node.upload_plugin("fio", plugin_code)
try:
- node.conn.fs.rmtree(self.config.remote_dir)
+ node.conn.fs.rmtree(self.suite.remote_dir)
except Exception:
pass
try:
- node.conn.fs.makedirs(self.config.remote_dir)
+ node.conn.fs.makedirs(self.suite.remote_dir)
except Exception:
- msg = "Failed to recreate folder {} on remote {}.".format(self.config.remote_dir, node)
+ msg = "Failed to recreate folder {} on remote {}.".format(self.suite.remote_dir, node)
logger.exception(msg)
raise StopTestError()
+ # TODO: check this during config validation
+ if self.file_size % (4 * (1024 ** 2)) != 0:
+ logger.error("Test file size must be proportional to 4MiB")
+ raise StopTestError()
+
self.install_utils(node)
mb = int(self.file_size / 1024 ** 2)
- logger.info("Filling test file %s with %sMiB of random data", self.file_name, mb)
- fill_bw = node.conn.fio.fill_file(self.file_name, mb, force=self.force_prefill, fio_path=self.fio_path)
- if fill_bw is not None:
- logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node))
+ logger.info("Filling test file %s on node %s with %sMiB of random data", self.file_name, node.info, mb)
+ is_prefilled, fill_bw = node.conn.fio.fill_file(self.file_name, mb,
+ force=self.force_prefill,
+ fio_path=self.fio_path)
+
+ if not is_prefilled:
+ logger.info("Test file on node %s is already prefilled", node.info)
+ elif fill_bw is not None:
+ logger.info("Initial fio fill bw is %s MiBps for %s", fill_bw, node.info)
def install_utils(self, node: IRPCNode) -> None:
os_info = get_os(node)
@@ -126,19 +137,19 @@
raise StopTestError()
bz_dest = self.join_remote('fio.bz2') # type: str
- node.copy_file(fio_path, bz_dest)
+ node.copy_file(fio_path, bz_dest, compress=False)
node.run("bzip2 --decompress {} ; chmod a+x {}".format(bz_dest, self.join_remote("fio")))
- def get_expected_runtime(self, job_config: FioJobConfig) -> int:
+ def get_expected_runtime(self, job_config: TestJobConfig) -> int:
return execution_time(cast(FioJobConfig, job_config))
- def prepare_iteration(self, node: IRPCNode, job_config: FioJobConfig) -> None:
- node.put_to_file(self.remote_task_file, str(job_config).encode("utf8"))
+ def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
+ node.put_to_file(self.remote_task_file, str(job).encode("utf8"))
# TODO: get a link to substorage as a parameter
- def run_iteration(self, node: IRPCNode, iter_config: FioJobConfig, job_root: str) -> JobMetrics:
- f_iter_config = cast(FioJobConfig, iter_config)
- exec_time = execution_time(f_iter_config)
+ def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
+ exec_time = execution_time(cast(FioJobConfig, job))
+
fio_cmd_templ = "cd {exec_folder}; " + \
"{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
@@ -152,20 +163,26 @@
if must_be_empty:
logger.error("Unexpected fio output: %r", must_be_empty)
- res = {} # type: JobMetrics
-
# put fio output into storage
fio_out = node.get_file_content(self.remote_output_file)
- self.rstorage.put_extra(job_root, node.info.node_id(), "fio_raw", fio_out)
+
+ path = DataSource(suite_id=self.suite.storage_id,
+ job_id=job.storage_id,
+ node_id=node.node_id,
+ dev='fio',
+ sensor='stdout',
+ tag='json')
+
+ self.storage.put_extra(fio_out, path)
node.conn.fs.unlink(self.remote_output_file)
files = [name for name in node.conn.fs.listdir(self.exec_folder)]
-
- for name, path in get_log_files(f_iter_config):
- log_files = [fname for fname in files if fname.startswith(path)]
+ result = []
+ for name, file_path in get_log_files(cast(FioJobConfig, job)):
+ log_files = [fname for fname in files if fname.startswith(file_path)]
if len(log_files) != 1:
logger.error("Found %s files, match log pattern %s(%s) - %s",
- len(log_files), path, name, ",".join(log_files[10:]))
+ len(log_files), file_path, name, ",".join(log_files[10:]))
raise StopTestError()
fname = os.path.join(self.exec_folder, log_files[0])
@@ -203,14 +220,13 @@
logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
raise StopTestError()
- ts = TimeSeries(name=name,
- raw=raw_result,
- second_axis_size=expected_lat_bins if name == 'lat' else 1,
- data=parsed,
- times=times)
- res[(node.info.node_id(), 'fio', name)] = ts
-
- return res
+ result.append(TimeSeries(name=name,
+ raw=raw_result,
+ second_axis_size=expected_lat_bins if name == 'lat' else 1,
+ data=parsed,
+ times=times,
+ source=path(sensor=name, tag=None)))
+ return result
def format_for_console(self, data: Any) -> str:
raise NotImplementedError()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 6940aaf..03702ae 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -7,13 +7,12 @@
import os.path
import argparse
import itertools
-from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any
+from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any, cast
from collections import OrderedDict
-from ...result_classes import IStorable
from ...result_classes import TestJobConfig
-from ...utils import sec_to_str, ssize2b
+from ...utils import sec_to_str, ssize2b, b2ssize, flatmap
SECTION = 0
@@ -29,23 +28,162 @@
('tp', int),
('name', str),
('val', Any)])
+FioTestSumm = NamedTuple("FioTestSumm",
+ [("oper", str),
+ ("sync_mode", str),
+ ("bsize", int),
+ ("qd", int),
+ ("thcount", int),
+ ("write_perc", Optional[int])])
-TestSumm = NamedTuple("TestSumm",
- [("oper", str),
- ("mode", str),
- ("bsize", int),
- ("iodepth", int),
- ("vm_count", int)])
+
+def is_fio_opt_true(vl: Union[str, int]) -> bool:
+ return str(vl).lower() in ['1', 'true', 't', 'yes', 'y']
class FioJobConfig(TestJobConfig):
- def __init__(self, name: str) -> None:
- TestJobConfig.__init__(self)
- self.vals = OrderedDict() # type: Dict[str, Any]
- self.name = name
- def __eq__(self, other: 'FioJobConfig') -> bool:
- return self.vals == other.vals
+ ds2mode = {(True, True): 'x',
+ (True, False): 's',
+ (False, True): 'd',
+ (False, False): 'a'}
+
+ sync2long = {'x': "sync direct",
+ 's': "sync",
+ 'd': "direct",
+ 'a': "buffered"}
+
+ op_type2short = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw",
+ "randrw": "rx"}
+
+ def __init__(self, name: str, idx: int) -> None:
+ TestJobConfig.__init__(self, idx)
+ self.name = name
+ self._sync_mode = None # type: Optional[str]
+ self._ctuple = None # type: Optional[FioTestSumm]
+ self._ctuple_no_qd = None # type: Optional[FioTestSumm]
+
+ # ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
+
+ @property
+ def write_perc(self) -> Optional[int]:
+ try:
+ return int(self.vals["rwmixwrite"])
+ except (KeyError, TypeError):
+ try:
+ return 100 - int(self.vals["rwmixread"])
+ except (KeyError, TypeError):
+ return None
+
+ @property
+ def qd(self) -> int:
+ return int(self.vals['iodepth'])
+
+ @property
+ def bsize(self) -> int:
+ return ssize2b(self.vals['blocksize']) // 1024
+
+ @property
+ def oper(self) -> str:
+ return self.vals['rw']
+
+ @property
+ def op_type_short(self) -> str:
+ return self.op_type2short[self.vals['rw']]
+
+ @property
+ def thcount(self) -> int:
+ return int(self.vals.get('numjobs', 1))
+
+ @property
+ def sync_mode(self) -> str:
+ if self._sync_mode is None:
+ direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
+ not is_fio_opt_true(self.vals.get('buffered', '0'))
+ sync = is_fio_opt_true(self.vals.get('sync', '0'))
+ self._sync_mode = self.ds2mode[(sync, direct)]
+ return cast(str, self._sync_mode)
+
+ @property
+ def sync_mode_long(self) -> str:
+ return self.sync2long[self.sync_mode]
+
+ # ----------- COMPLEX PROPERTIES -----------------------------------------------------------------------------------
+
+ @property
+ def characterized_tuple(self) -> Tuple:
+ if self._ctuple is None:
+ self._ctuple = FioTestSumm(oper=self.oper,
+ sync_mode=self.sync_mode,
+ bsize=self.bsize,
+ qd=self.qd,
+ thcount=self.thcount,
+ write_perc=self.write_perc)
+
+ return cast(Tuple, self._ctuple)
+
+ @property
+ def characterized_tuple_no_qd(self) -> FioTestSumm:
+ if self._ctuple_no_qd is None:
+ self._ctuple_no_qd = FioTestSumm(oper=self.oper,
+ sync_mode=self.sync_mode,
+ bsize=self.bsize,
+ qd=None,
+ thcount=self.thcount,
+ write_perc=self.write_perc)
+
+ return cast(FioTestSumm, self._ctuple_no_qd)
+
+ @property
+ def long_summary(self) -> str:
+ res = "{0.sync_mode_long} {0.oper} {1} QD={0.qd}".format(self, b2ssize(self.bsize * 1024))
+ if self.thcount != 1:
+ res += " threads={}".format(self.thcount)
+ if self.write_perc is not None:
+ res += " write_perc={}%".format(self.write_perc)
+ return res
+
+ @property
+ def long_summary_no_qd(self) -> str:
+ res = "{0.sync_mode_long} {0.oper} {1}".format(self, b2ssize(self.bsize * 1024))
+ if self.thcount != 1:
+ res += " threads={}".format(self.thcount)
+ if self.write_perc is not None:
+ res += " write_perc={}%".format(self.write_perc)
+ return res
+
+ @property
+ def summary(self) -> str:
+ tpl = cast(FioTestSumm, self.characterized_tuple)
+ res = "{0.oper}{0.sync_mode}{0.bsize}_qd{0.qd}".format(tpl)
+
+ if tpl.thcount != 1:
+ res += "th" + str(tpl.thcount)
+ if tpl.write_perc != 1:
+ res += "wr" + str(tpl.write_perc)
+
+ return res
+
+ @property
+ def summary_no_qd(self) -> str:
+ tpl = cast(FioTestSumm, self.characterized_tuple)
+ res = "{0.oper}{0.sync_mode}{0.bsize}".format(tpl)
+
+ if tpl.thcount != 1:
+ res += "th" + str(tpl.thcount)
+ if tpl.write_perc != 1:
+ res += "wr" + str(tpl.write_perc)
+
+ return res
+ # ------------------------------------------------------------------------------------------------------------------
+
+ def __eq__(self, o: object) -> bool:
+ if not isinstance(o, FioJobConfig):
+ return False
+ return self.vals == cast(FioJobConfig, o).vals
def copy(self) -> 'FioJobConfig':
return copy.deepcopy(self)
@@ -75,17 +213,17 @@
return str(self)
def raw(self) -> Dict[str, Any]:
- return {
- 'vals': [[key, val] for key, val in self.vals.items()],
- 'summary': self.summary,
- 'name': self.name
- }
+ res = self.__dict__.copy()
+ del res['_sync_mode']
+ res['vals'] = [[key, val] for key, val in self.vals.items()]
+ return res
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'FioJobConfig':
- obj = cls(data['name'])
- obj.summary = data['summary']
- obj.vals.update(data['vals'])
+ obj = cls.__new__(cls)
+ data['vals'] = OrderedDict(data['vals'])
+ data['_sync_mode'] = None
+ obj.__dict__.update(data)
return obj
@@ -203,6 +341,8 @@
lexed_lines = new_lines
+ suite_section_idx = 0
+
for fname, lineno, oline, tp, name, val in lexed_lines:
if tp == SECTION:
if curr_section is not None:
@@ -215,7 +355,8 @@
in_globals = True
else:
in_globals = False
- curr_section = FioJobConfig(name)
+ curr_section = FioJobConfig(name, idx=suite_section_idx)
+ suite_section_idx += 1
curr_section.vals = glob_vals.copy()
sections_count += 1
else:
@@ -332,68 +473,13 @@
params = sec.vals.copy()
params['UNIQ'] = 'UN{0}'.format(counter[0])
params['COUNTER'] = str(counter[0])
- params['TEST_SUMM'] = get_test_summary(sec)
+ params['TEST_SUMM'] = sec.summary
sec.name = sec.name.format(**params)
counter[0] += 1
return sec
-def get_test_sync_mode(sec: FioJobConfig) -> str:
- if isinstance(sec, dict):
- vals = sec
- else:
- vals = sec.vals
-
- is_sync = str(vals.get("sync", "0")) == "1"
- is_direct = str(vals.get("direct", "0")) == "1"
-
- if is_sync and is_direct:
- return 'x'
- elif is_sync:
- return 's'
- elif is_direct:
- return 'd'
- else:
- return 'a'
-
-
-def get_test_summary_tuple(sec: FioJobConfig, vm_count: int = None) -> TestSumm:
- if isinstance(sec, dict):
- vals = sec
- else:
- vals = sec.vals
-
- rw = {"randread": "rr",
- "randwrite": "rw",
- "read": "sr",
- "write": "sw",
- "randrw": "rm",
- "rw": "sm",
- "readwrite": "sm"}[vals["rw"]]
-
- sync_mode = get_test_sync_mode(sec)
-
- return TestSumm(rw,
- sync_mode,
- vals['blocksize'],
- vals.get('iodepth', '1'),
- vm_count)
-
-
-def get_test_summary(sec: FioJobConfig, vm_count: int = None, noiodepth: bool = False) -> str:
- tpl = get_test_summary_tuple(sec, vm_count)
-
- res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
- if not noiodepth:
- res += "qd{}".format(tpl.iodepth)
-
- if tpl.vm_count is not None:
- res += "vm{}".format(tpl.vm_count)
-
- return res
-
-
def execution_time(sec: FioJobConfig) -> int:
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
@@ -402,23 +488,18 @@
return fio_config_parse(fio_config_lexer(source, fname))
-FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
-FM_FUNC_RES = TypeVar("FM_FUNC_RES")
-
-
-def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
- inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
- for val in inp_iter:
- for res in func(val):
- yield res
-
-
-def get_log_files(sec: FioJobConfig) -> List[Tuple[str, str]]:
+def get_log_files(sec: FioJobConfig, iops: bool = False) -> List[Tuple[str, str]]:
res = [] # type: List[Tuple[str, str]]
- for key, name in (('write_iops_log', 'iops'), ('write_bw_log', 'bw'), ('write_hist_log', 'lat')):
+
+ keys = [('write_bw_log', 'bw'), ('write_hist_log', 'lat')]
+ if iops:
+ keys.append(('write_iops_log', 'iops'))
+
+ for key, name in keys:
log = sec.vals.get(key)
if log is not None:
res.append((name, log))
+
return res
@@ -427,7 +508,6 @@
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
for sec in map(final_process, it):
- sec.summary = get_test_summary(sec)
yield sec
diff --git a/wally/suits/io/one_step.cfg b/wally/suits/io/one_step.cfg
new file mode 100644
index 0000000..3e08c8b
--- /dev/null
+++ b/wally/suits/io/one_step.cfg
@@ -0,0 +1,9 @@
+[global]
+include defaults_qd.cfg
+ramp_time=0
+runtime={RUNTIME}
+
+[test_{TEST_SUMM}]
+blocksize=60k
+rw=randread
+iodepth=1
\ No newline at end of file
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 98e55f0..5f5cfb5 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -13,6 +13,7 @@
logger = logging.getLogger("agent.fio")
+# TODO: fix this in case if file is block device
def check_file_prefilled(path, used_size_mb):
used_size = used_size_mb * 1024 ** 2
blocks_to_check = 16
@@ -20,9 +21,9 @@
try:
fstats = os.stat(path)
if stat.S_ISREG(fstats.st_mode) and fstats.st_size < used_size:
- return True
+ return False
except EnvironmentError:
- return True
+ return False
offsets = [random.randrange(used_size - 1024) for _ in range(blocks_to_check)]
offsets.append(used_size - 1024)
@@ -32,15 +33,15 @@
for offset in offsets:
fd.seek(offset)
if b"\x00" * 1024 == fd.read(1024):
- return True
+ return False
- return False
+ return True
def rpc_fill_file(fname, size, force=False, fio_path='fio'):
if not force:
- if not check_file_prefilled(fname, size):
- return
+ if check_file_prefilled(fname, size):
+ return False, None
assert size % 4 == 0, "File size must be proportional to 4M"
@@ -50,7 +51,9 @@
subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
run_time = time.time() - run_time
- return None if run_time < 1.0 else int(size / run_time)
+ prefill_bw = None if run_time < 1.0 else int(size / run_time)
+
+ return True, prefill_bw
def rpc_install(name, binary):
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index ca3c613..ae2d960 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,7 +1,7 @@
[global]
include defaults_qd.cfg
ramp_time=0
-runtime=10
+runtime={RUNTIME}
[test_{TEST_SUMM}]
blocksize=60k
@@ -12,3 +12,23 @@
iodepth=16
blocksize=60k
rw=randread
+
+[test_{TEST_SUMM}]
+blocksize=60k
+rw=randwrite
+iodepth=1
+
+[test_{TEST_SUMM}]
+iodepth=16
+blocksize=60k
+rw=randwrite
+
+[test_{TEST_SUMM}]
+iodepth=1
+blocksize=1m
+rw=write
+
+[test_{TEST_SUMM}]
+iodepth=1
+blocksize=1m
+rw=read
diff --git a/wally/suits/io/rrd_qd_scan.cfg b/wally/suits/io/rrd_qd_scan.cfg
new file mode 100644
index 0000000..e0937c9
--- /dev/null
+++ b/wally/suits/io/rrd_qd_scan.cfg
@@ -0,0 +1,9 @@
+[global]
+include defaults_qd.cfg
+ramp_time=0
+runtime={RUNTIME}
+
+[test_{TEST_SUMM}]
+blocksize=4k
+rw=randread
+iodepth={QDS}
diff --git a/wally/suits/io/rrd_raw.cfg b/wally/suits/io/rrd_raw.cfg
new file mode 100644
index 0000000..2b0fc74
--- /dev/null
+++ b/wally/suits/io/rrd_raw.cfg
@@ -0,0 +1,21 @@
+[test]
+blocksize=4k
+rw=randread
+iodepth=1
+ramp_time=0
+runtime=120
+buffered=0
+direct=1
+sync=0
+ioengine=libaio
+group_reporting=1
+unified_rw_reporting=1
+norandommap=1
+numjobs=1
+thread=1
+time_based=1
+wait_for_previous=1
+per_job_logs=0
+randrepeat=0
+filename=/dev/sdb
+size=100G
\ No newline at end of file
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index bc6b115..ac9e1c1 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,19 +1,14 @@
-import re
import abc
import time
-import array
-import struct
import logging
import os.path
-import datetime
-from typing import Any, List, Optional, Callable, cast, Iterator, Tuple, Iterable
+from typing import Any, List, Optional, Callable, Tuple, Iterable, cast
from concurrent.futures import ThreadPoolExecutor, wait, Future
-from ..utils import StopTestError, sec_to_str, get_time_interval_printable_info
+from ..utils import StopTestError, get_time_interval_printable_info
from ..node_interfaces import IRPCNode
-from ..storage import Storage
-from ..result_classes import TestSuiteConfig, TestJobConfig, JobMetrics, TimeSeries
+from ..result_classes import TestSuiteConfig, TestJobConfig, JobMetrics, TimeSeries, IResultStorage
logger = logging.getLogger("wally")
@@ -22,127 +17,6 @@
__doc__ = "Contains base classes for performance tests"
-class ResultStorage:
- ts_header_format = "!IIIcc"
-
- def __init__(self, storage: Storage, job_config_cls: type) -> None:
- self.storage = storage
- self.job_config_cls = job_config_cls
-
- def get_suite_root(self, suite_type: str, idx: int) -> str:
- return "results/{}_{}".format(suite_type, idx)
-
- def get_job_root(self, suite_root: str, summary: str, run_id: int) -> str:
- return "{}/{}_{}".format(suite_root, summary, run_id)
-
- # store
- def put_suite_config(self, config: TestSuiteConfig, root: str) -> None:
- self.storage.put(config, root, "config.yml")
-
- def put_job_config(self, config: TestJobConfig, root: str) -> None:
- self.storage.put(config, root, "config.yml")
-
- def get_suite_config(self, suite_root: str) -> TestSuiteConfig:
- return self.storage.load(TestSuiteConfig, suite_root, "config.yml")
-
- def get_job_node_prefix(self, job_root_path: str, node_id: str) -> str:
- return "{}/{}".format(job_root_path, node_id)
-
- def get_ts_path(self, job_root_path: str, node_id: str, dev: str, sensor_name: str) -> str:
- return "{}_{}.{}".format(self.get_job_node_prefix(job_root_path, node_id), dev, sensor_name)
-
- def put_ts(self, ts: TimeSeries, job_root_path: str, node_id: str, dev: str, sensor_name: str) -> None:
- # TODO: check that 'metrics', 'dev' and 'node_id' match required patterns
- root_path = self.get_ts_path(job_root_path, node_id, dev, sensor_name)
-
- if len(ts.data) / ts.second_axis_size != len(ts.times):
- logger.error("Unbalanced time series data. Array size has % elements, while time size has %",
- len(ts.data) / ts.second_axis_size, len(ts.times))
- raise StopTestError()
-
- with self.storage.get_fd(root_path, "cb") as fd:
- header = struct.pack(self.ts_header_format,
- ts.second_axis_size,
- len(ts.data),
- len(ts.times),
- cast(array.array, ts.data).typecode.encode("ascii"),
- cast(array.array, ts.times).typecode.encode("ascii"))
- fd.write(header)
- cast(array.array, ts.data).tofile(fd)
- cast(array.array, ts.times).tofile(fd)
-
- if ts.raw is not None:
- self.storage.put_raw(ts.raw, root_path + ":raw")
-
- def put_extra(self, job_root: str, node_id: str, key: str, data: bytes) -> None:
- self.storage.put_raw(data, job_root, node_id + "_" + key)
-
- def list_suites(self) -> Iterator[Tuple[TestSuiteConfig, str]]:
- """iterates over (suite_name, suite_id, suite_root_path)
- primary this function output should be used as input into list_jobs_in_suite method
- """
- ts_re = re.compile(r"[a-zA-Z]+_\d+$")
- for is_file, name in self.storage.list("results"):
- if not is_file:
- rr = ts_re.match(name)
- if rr:
- path = "results/" + name
- yield self.get_suite_config(path), path
-
- def list_jobs_in_suite(self, suite_root_path: str) -> Iterator[Tuple[TestJobConfig, str, int]]:
- """iterates over (job_summary, job_root_path)
- primary this function output should be used as input into list_ts_in_job method
- """
- ts_re = re.compile(r"(?P<job_summary>[a-zA-Z0-9]+)_(?P<id>\d+)$")
- for is_file, name in self.storage.list(suite_root_path):
- if is_file:
- continue
- rr = ts_re.match(name)
- if rr:
- config_path = "{}/{}/config.yml".format(suite_root_path, name)
- if config_path in self.storage:
- cfg = self.storage.load(self.job_config_cls, config_path)
- yield cfg, "{}/{}".format(suite_root_path, name), int(rr.group("id"))
-
- def list_ts_in_job(self, job_root_path: str) -> Iterator[Tuple[str, str, str]]:
- """iterates over (node_id, device_name, sensor_name)
- primary this function output should be used as input into load_ts method
- """
- # TODO: check that all TS files available
- ts_re = re.compile(r"(?P<node_id>\d+\.\d+\.\d+\.\d+:\d+)_(?P<dev>[^.]+)\.(?P<sensor>[a-z_]+)$")
- already_found = set()
- for is_file, name in self.storage.list(job_root_path):
- if not is_file:
- continue
- rr = ts_re.match(name)
- if rr:
- key = (rr.group("node_id"), rr.group("dev"), rr.group("sensor"))
- if key not in already_found:
- already_found.add(key)
- yield key
-
- def load_ts(self, root_path: str, node_id: str, dev: str, sensor_name: str) -> TimeSeries:
- path = self.get_ts_path(root_path, node_id, dev, sensor_name)
-
- with self.storage.get_fd(path, "rb") as fd:
- header = fd.read(struct.calcsize(self.ts_header_format))
- second_axis_size, data_sz, time_sz, data_typecode, time_typecode = \
- struct.unpack(self.ts_header_format, header)
-
- data = array.array(data_typecode.decode("ascii"))
- times = array.array(time_typecode.decode("ascii"))
-
- data.fromfile(fd, data_sz)
- times.fromfile(fd, time_sz)
-
- # calculate number of elements
- return TimeSeries("{}.{}".format(dev, sensor_name),
- raw=None,
- data=data,
- times=times,
- second_axis_size=second_axis_size)
-
-
class PerfTest(metaclass=abc.ABCMeta):
"""Base class for all tests"""
name = None # type: str
@@ -150,20 +24,18 @@
retry_time = 30
job_config_cls = None # type: type
- def __init__(self, storage: Storage, config: TestSuiteConfig, idx: int, on_idle: Callable[[], None] = None) -> None:
- self.config = config
+ def __init__(self, storage: IResultStorage, suite: TestSuiteConfig, on_idle: Callable[[], None] = None) -> None:
+ self.suite = suite
self.stop_requested = False
- self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.config.nodes)
+ self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
self.on_idle = on_idle
self.storage = storage
- self.rstorage = ResultStorage(self.storage, self.job_config_cls)
- self.idx = idx
def request_stop(self) -> None:
self.stop_requested = True
def join_remote(self, path: str) -> str:
- return os.path.join(self.config.remote_dir, path)
+ return os.path.join(self.suite.remote_dir, path)
@abc.abstractmethod
def run(self) -> None:
@@ -185,62 +57,51 @@
def __init__(self, *args, **kwargs) -> None:
PerfTest.__init__(self, *args, **kwargs)
- self.job_configs = [None] # type: List[Optional[TestJobConfig]]
- self.suite_root_path = self.rstorage.get_suite_root(self.config.test_type, self.idx)
+ self.job_configs = None # type: List[TestJobConfig]
@abc.abstractmethod
def get_expected_runtime(self, iter_cfg: TestJobConfig) -> Optional[int]:
pass
- def get_not_done_stages(self) -> Iterable[Tuple[int, TestJobConfig]]:
- all_jobs = dict(enumerate(self.job_configs))
- for db_config, path, jid in self.rstorage.list_jobs_in_suite(self.suite_root_path):
- if jid in all_jobs:
- job_config = all_jobs[jid]
- if job_config != db_config:
- logger.error("Test info at path '%s/config' is not equal to expected config for iteration %s.%s." +
+ def get_not_done_jobs(self) -> Iterable[TestJobConfig]:
+ jobs_map = {job.storage_id: job for job in self.job_configs}
+ already_in_storage = set()
+ for db_config in cast(List[TestJobConfig], self.storage.iter_job(self.suite)):
+ if db_config.storage_id in jobs_map:
+ job = jobs_map[db_config.storage_id]
+ if job != db_config:
+ logger.error("Test info at '%s.%s' is not equal to expected config for iteration %s.%s." +
" Maybe configuration was changed before test was restarted. " +
"DB cfg is:\n %s\nExpected cfg is:\n %s\nFix DB or rerun test from beginning",
- path, self.name, job_config.summary,
+ self.suite.storage_id, job.storage_id, self.name, job.summary,
str(db_config).replace("\n", "\n "),
- str(job_config).replace("\n", "\n "))
+ str(job).replace("\n", "\n "))
raise StopTestError()
- logger.info("Test iteration %s.%s found in storage and will be skipped",
- self.name, job_config.summary)
- del all_jobs[jid]
- return all_jobs.items()
+ logger.info("Test iteration %s.%s found in storage and will be skipped", self.name, job.summary)
+ already_in_storage.add(db_config.storage_id)
+
+ return [job for job in self.job_configs if job.storage_id not in already_in_storage]
def run(self) -> None:
- try:
- cfg = self.rstorage.get_suite_config(self.suite_root_path)
- except KeyError:
- cfg = None
+ self.storage.put_or_check_suite(self.suite)
- if cfg is not None and cfg != self.config:
- logger.error("Current suite %s config is not equal to found in storage at %s",
- self.config.test_type, self.suite_root_path)
- raise StopTestError()
-
- not_in_storage = list(self.get_not_done_stages())
-
+ not_in_storage = list(self.get_not_done_jobs())
if not not_in_storage:
logger.info("All test iteration in storage already. Skip test")
return
- self.rstorage.put_suite_config(self.config, self.suite_root_path)
-
logger.debug("Run test %s with profile %r on nodes %s.", self.name,
self.load_profile_name,
",".join(self.sorted_nodes_ids))
logger.debug("Prepare nodes")
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+ with ThreadPoolExecutor(len(self.suite.nodes)) as pool:
# config nodes
- list(pool.map(self.config_node, self.config.nodes))
+ list(pool.map(self.config_node, self.suite.nodes))
- run_times = [self.get_expected_runtime(job_config) for _, job_config in not_in_storage]
+ run_times = list(map(self.get_expected_runtime, not_in_storage))
if None not in run_times:
# +5% - is a rough estimation for additional operations
@@ -249,51 +110,52 @@
exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
logger.info("Entire test should takes around %s and finished at %s", exec_time_s, end_dt_s)
- for run_id, job_config in not_in_storage:
- job_path = self.rstorage.get_job_root(self.suite_root_path, job_config.summary, run_id)
-
- jfutures = [] # type: List[Future]
- for idx in range(self.max_retry):
- logger.debug("Prepare job %s", job_config.summary)
+ for job in not_in_storage:
+ results = [] # type: List[TimeSeries]
+ for retry_idx in range(self.max_retry):
+ logger.debug("Prepare job %s", job.summary)
# prepare nodes for new iterations
- wait([pool.submit(self.prepare_iteration, node, job_config) for node in self.config.nodes])
+ wait([pool.submit(self.prepare_iteration, node, job) for node in self.suite.nodes])
- expected_job_time = self.get_expected_runtime(job_config)
+ expected_job_time = self.get_expected_runtime(job)
exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
logger.info("Job should takes around %s and finished at %s", exec_time_s, end_dt_s)
- try:
- jfutures = []
- for node in self.config.nodes:
- future = pool.submit(self.run_iteration, node, job_config, job_path)
- jfutures.append(future)
- # test completed successfully, stop retrying
- break
- except EnvironmentError:
- if self.max_retry - 1 == idx:
- logger.exception("Fio failed")
- raise StopTestError()
- logger.exception("During fio run")
- logger.info("Sleeping %ss and retrying job", self.retry_time)
- time.sleep(self.retry_time)
+ jfutures = [pool.submit(self.run_iteration, node, job) for node in self.suite.nodes]
+ failed = False
+ for future in jfutures:
+ try:
+ results.extend(future.result())
+ except EnvironmentError:
+ failed = True
+ if not failed:
+ break
+
+ if self.max_retry - 1 == retry_idx:
+ logger.exception("Fio failed")
+ raise StopTestError()
+
+ logger.exception("During fio run")
+ logger.info("Sleeping %ss and retrying job", self.retry_time)
+ time.sleep(self.retry_time)
+ results = []
+
+ # per node jobs start and stop times
start_times = [] # type: List[int]
stop_times = [] # type: List[int]
- for future in jfutures:
- for (node_id, dev, sensor_name), ts in future.result().items():
- self.rstorage.put_ts(ts, job_path, node_id=node_id, dev=dev, sensor_name=sensor_name)
-
- if len(ts.times) >= 2:
- start_times.append(ts.times[0])
- stop_times.append(ts.times[-1])
+ for ts in results:
+ self.storage.put_ts(ts)
+ if len(ts.times) >= 2: # type: ignore
+ start_times.append(ts.times[0])
+ stop_times.append(ts.times[-1])
if len(start_times) > 0:
min_start_time = min(start_times)
max_start_time = max(start_times)
min_stop_time = min(stop_times)
- max_stop_time = max(stop_times)
max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
@@ -301,16 +163,19 @@
if min_start_time + self.max_time_diff < max_allowed_time_diff:
logger.warning("Too large difference in %s:%s start time - %s. " +
"Max recommended difference is %s",
- self.name, job_config.summary,
+ self.name, job.summary,
max_start_time - min_start_time, self.max_time_diff)
if min_stop_time + self.max_time_diff < max_allowed_time_diff:
logger.warning("Too large difference in %s:%s stop time - %s. " +
"Max recommended difference is %s",
- self.name, job_config.summary,
+ self.name, job.summary,
max_start_time - min_start_time, self.max_time_diff)
- self.rstorage.put_job_config(job_config, job_path)
+ job.reliable_info_starts_at = max_start_time
+ job.reliable_info_stops_at = min_stop_time
+
+ self.storage.put_job(self.suite, job)
self.storage.sync()
if self.on_idle is not None:
@@ -321,24 +186,25 @@
pass
@abc.abstractmethod
- def prepare_iteration(self, node: IRPCNode, iter_config: TestJobConfig) -> None:
+ def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
pass
@abc.abstractmethod
- def run_iteration(self, node: IRPCNode, iter_config: TestJobConfig, stor_prefix: str) -> JobMetrics:
+ def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
pass
class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
def __init__(self, *dt, **mp) -> None:
ThreadedTest.__init__(self, *dt, **mp)
- self.prerun_script = self.config.params['prerun_script']
- self.run_script = self.config.params['run_script']
- self.prerun_tout = self.config.params.get('prerun_tout', 3600)
- self.run_tout = self.config.params.get('run_tout', 3600)
- self.iterations_configs = [None]
+ self.prerun_script = self.suite.params['prerun_script']
+ self.run_script = self.suite.params['run_script']
+ self.prerun_tout = self.suite.params.get('prerun_tout', 3600)
+ self.run_tout = self.suite.params.get('run_tout', 3600)
+ # TODO: fix job_configs field
+ raise NotImplementedError("Fix job configs")
- def get_expected_runtime(self, iter_cfg: TestJobConfig) -> Optional[int]:
+ def get_expected_runtime(self, job: TestJobConfig) -> Optional[int]:
return None
def config_node(self, node: IRPCNode) -> None:
@@ -346,19 +212,19 @@
node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
cmd = self.join_remote(self.prerun_script)
- cmd += ' ' + self.config.params.get('prerun_opts', '')
+ cmd += ' ' + self.suite.params.get('prerun_opts', '')
node.run(cmd, timeout=self.prerun_tout)
- def prepare_iteration(self, node: IRPCNode, iter_config: TestJobConfig) -> None:
+ def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
pass
- def run_iteration(self, node: IRPCNode, iter_config: TestJobConfig, stor_prefix: str) -> JobMetrics:
+ def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
# TODO: have to store logs
cmd = self.join_remote(self.run_script)
- cmd += ' ' + self.config.params.get('run_opts', '')
+ cmd += ' ' + self.suite.params.get('run_opts', '')
return self.parse_results(node.run(cmd, timeout=self.run_tout))
@abc.abstractmethod
- def parse_results(self, data: str) -> JobMetrics:
+ def parse_results(self, data: str) -> List[TimeSeries]:
pass
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index a0c014f..fea846d 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -43,7 +43,7 @@
def merge_node(self, creds: ConnCreds, roles: Set[str]) -> NodeInfo:
info = NodeInfo(creds, roles)
- nid = info.node_id()
+ nid = info.node_id
if nid in self.nodes_info:
self.nodes_info[nid].roles.update(info.roles)
diff --git a/wally/utils.py b/wally/utils.py
index 078a019..78235a8 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -11,11 +11,8 @@
import threading
import contextlib
import subprocess
-import collections
-from .node_interfaces import IRPCNode
-from typing import (Any, Tuple, Union, List, Iterator, Dict, Iterable, Optional,
- IO, Sequence, NamedTuple, cast, TypeVar)
+from typing import Any, Tuple, Union, List, Iterator, Iterable, Optional, IO, cast, TypeVar, Callable
try:
import psutil
@@ -347,59 +344,6 @@
return user, passwd, tenant, auth_url, insecure
-OSRelease = NamedTuple("OSRelease",
- [("distro", str),
- ("release", str),
- ("arch", str)])
-
-
-def get_os(node: IRPCNode) -> OSRelease:
- """return os type, release and architecture for node.
- """
- arch = node.run("arch", nolog=True).strip()
-
- try:
- node.run("ls -l /etc/redhat-release", nolog=True)
- return OSRelease('redhat', None, arch)
- except:
- pass
-
- try:
- node.run("ls -l /etc/debian_version", nolog=True)
-
- release = None
- for line in node.run("lsb_release -a", nolog=True).split("\n"):
- if ':' not in line:
- continue
- opt, val = line.split(":", 1)
-
- if opt == 'Codename':
- release = val.strip()
-
- return OSRelease('ubuntu', release, arch)
- except:
- pass
-
- raise RuntimeError("Unknown os")
-
-
-@contextlib.contextmanager
-def empty_ctx(val: Any = None) -> Iterator[Any]:
- yield val
-
-
-def log_nodes_statistic(nodes: Sequence[IRPCNode]) -> None:
- logger.info("Found {0} nodes total".format(len(nodes)))
-
- per_role = collections.defaultdict(int) # type: Dict[str, int]
- for node in nodes:
- for role in node.info.roles:
- per_role[role] += 1
-
- for role, count in sorted(per_role.items()):
- logger.debug("Found {0} nodes with role {1}".format(count, role))
-
-
def which(program: str) -> Optional[str]:
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
@@ -413,6 +357,11 @@
return None
+@contextlib.contextmanager
+def empty_ctx(val: Any = None) -> Iterator[Any]:
+ yield val
+
+
def get_uniq_path_uuid(path: str, max_iter: int = 10) -> Tuple[str, str]:
for i in range(max_iter):
run_uuid = pet_generate(2, "_")
@@ -442,3 +391,16 @@
now_dt = datetime.datetime.now()
end_dt = now_dt + datetime.timedelta(0, seconds)
return exec_time_s, "{:%H:%M:%S}".format(end_dt)
+
+
+FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
+FM_FUNC_RES = TypeVar("FM_FUNC_RES")
+
+
+def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
+ inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
+ for val in inp_iter:
+ for res in func(val):
+ yield res
+
+