working on reporting, this commit represent broking code state
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index adb114d..871de56 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -41,7 +41,7 @@
sensors:
online: true
roles_mapping:
-# testnode: system-cpu, block-io, net-io
+ testnode: system-cpu, block-io, net-io
ceph-osd:
system-cpu: ".*"
block-io: ".*"
diff --git a/configs-examples/local_lxc_ceph.yaml b/configs-examples/local_lxc_ceph.yaml
index d5b1403..60afc48 100644
--- a/configs-examples/local_lxc_ceph.yaml
+++ b/configs-examples/local_lxc_ceph.yaml
@@ -1,16 +1,16 @@
include: default.yaml
collect_info: false
-#ceph:
-# root_node: localhost
-
nodes:
- koder@192.168.152.42: testnode
+ koder@192.168.0.100: testnode
tests:
- - io:
- load: rrd
+ - fio:
+ load: rrd_qd_scan
params:
- FILENAME: /tmp/fl.bin
- FILESIZE: 4G
+ FILENAME: /media/koder/test_space/test.bin
+ FILESIZE: 10G
+ RUNTIME: 1200
+# QDS: [1, 2, 4, 8, 16, 32, 64]
+ QDS: [16]
diff --git a/report_templates/index.html b/report_templates/index.html
new file mode 100644
index 0000000..ba3dfd4
--- /dev/null
+++ b/report_templates/index.html
@@ -0,0 +1,35 @@
+<!DOCTYPE html>
+<html>
+
+<head>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+ <title>Report</title>
+ <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+ <link rel="stylesheet" href="main.css">
+ <link href="https://fonts.googleapis.com/css?family=Open+Sans:300,400,600,700,800" rel="stylesheet">
+</head>
+
+<body>
+ <div class="navigation">
+ <!-- Left Sidebar Start -->
+ <div class="sidebar">
+ <div class="navigation" id="MainMenu">
+ <div class="list-group panel">
+ {{{menu}}}
+ </div>
+ </div>
+ </div>
+ <!-- Left Sidebar End -->
+ </div>
+ <div class="container-fluid">
+ <div class="row">
+ <div class="col-lg-12 col-md-12 col-xs-12">
+ {{{content}}}
+ </div>
+ </div>
+ </div>
+ <script type='text/javascript' src="https://ajax.googleapis.com/ajax/libs/jquery/2.0.2/jquery.min.js"></script>
+ <script type='text/javascript' src="https://netdna.bootstrapcdn.com/bootstrap/3.1.0/js/bootstrap.min.js"></script>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/report_templates/main.css b/report_templates/main.css
new file mode 100644
index 0000000..0b5e46f
--- /dev/null
+++ b/report_templates/main.css
@@ -0,0 +1,118 @@
+@import url('//maxcdn.bootstrapcdn.com/font-awesome/4.4.0/css/font-awesome.min.css');
+
+html {
+ font-size: 14px;
+ min-height: 100%;
+ position: relative;
+}
+
+body {
+ background: #ffffff;
+ font-family: 'Open Sans', sans-serif;
+ font-size: 14px;
+ overflow-y: scroll;
+ height: 100%;
+ min-width: 800px;
+}
+
+a:focus {
+ outline: none;
+}
+
+h1, h2, h3, h4, h5 {
+ font-weight: 100;
+}
+
+.navigation {
+ width: 200px;
+ display: block;
+ background-color: #5E6D70;
+ position: absolute;
+ height: 100%;
+ top: 0px;
+ left: 0px;
+ bottom: 0;
+ right: 0;
+ z-index: 99;
+}
+
+.container-fluid {
+ padding-left: 200px;
+}
+
+.row {
+ margin: 0;
+ padding: 0;
+ position: relative;
+}
+
+.sidebar {
+ clear: both;
+ position: fixed;
+}
+
+.panel {
+ background-color: #5E6D70;
+ border: 1px solid transparent;
+ border-radius: 0;
+ box-shadow: 0 0 0 rgba(0, 0, 0, 0.05);
+ margin-bottom: 20px;
+}
+
+.list-group {
+ margin-top: 20px;
+}
+
+.nav-group {
+ display: block;
+ padding: 10px 15px;
+ position: relative;
+ background-color: #424E4F;
+ text-transform: uppercase;
+ font-weight: 500;
+ color: #ffffff;
+ margin: auto;
+ max-width: 300px;
+ text-decoration: none;
+}
+.nav-group:hover, .nav-group:focus {
+ background-color: #424E4F;
+ color: #ffffff;
+ text-decoration: none;
+ border: none;
+}
+
+.nav-group-item {
+ display: block;
+ color: #ffffff;
+ padding: 10px 15px;
+ position: relative;
+ text-decoration: none;
+ font-size: 13px;
+ padding-left: 20px;
+ font-weight: 300;
+}
+
+.nav-group-item:hover {
+ color: #b4d0d6;
+ text-decoration: none;
+}
+
+
+#content1 {
+ display: none;
+ clear: both;
+}
+
+#content1:target {
+ display: block;
+}
+
+#content2 {
+ display: none;
+ clear: both;
+}
+
+#content2:target {
+ display: block;
+}
\ No newline at end of file
diff --git a/v2_plans.md b/v2_plans.md
index a51f29d..e6b2116 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,27 +1,42 @@
+* Remarks:
+ * With current code impossible to do vm count scan test
+
* TODO next
- * Revise structures and types location in files, structures names,
- add dot file for classes and function dependencies
+ * Job description should have tuple of parameters, characterized load and abbreviated/readable description
+ * TS should have units, UI modules should use function to calculate coefficient for show values
+ * Get done iops amount from fio?
+ * Rearrange report layout - make engeneering reports per job
+ * Store sensors and all data in csv, load from csv
+ * Plot aggregated sensors across cluster during test
+ * Aggregated sensors distribution and boxplot
+ * Hitmap for aggregated sensors
+ * automatically find what to plot from storage data (but also allow to seelct via config)
+ * store aggregated and per-node TS in it
+ * show distributions parameters on histogram plots
+ * Fix plot layout, there to much unused space around typical plot
+ * update API for work with storage Should allows select each sensor for some interval and sum of particular sensor
+ across all node devices, all nodes of same type and entire cluster.
+ * Collect latency distribution
+ * iops boxplot as function from QD
* store statistic results in storage
* collect device types mapping from nodes - device should be block/net/...
- * all integral sensors gap interpolation
- * run sensors in thread pool, optimize communication with ceph, can run fist OSD request for
- data validation only on start. Each sensor should collect only one portion of data. During
+ * add integral sensors gap interpolation
+ * Optimize sensor communication with ceph, can run fist OSD request for
+ data validation only on start.
+ * Each sensor should collect only one portion of data. During
start it should scan all awailable sources and tell upper code to create separated funcs for them.
- All funcs should run in separated threads
* run test with sensor on large and small file
* Move test load code to io.fio file
- * Load latency into 2D numpy.array, same for everything else
- * Latency statistic - mostly the same as iops, but no average, dispersion and conf interval
- * Start generating first report images and put them into simple document
- - iops over time
- - bw over time
- - 50ppc + 95ppc Lat over time with boxplots in same graph for selected points
- * Statistic in background?
* UT, which run test with predefined in yaml cluster (cluster and config created separatelly, not with tests)
and check that result storage work as expected. Declare db sheme in seaprated yaml file, UT should check.
- * Update DB test, add tests for stat and plot module
+ * Update Storage test, add tests for stat and plot module
+ * During prefill check io on file
+ * Check FS on device, where test file located
+ * Dump and analyze target block device settings on test nodes
* Code:
+ * RW mixed report
+ * C++/Go disk stat sensors to measure IOPS/Lat on milliseconds
* Allow to cleanup all uncleaned from previous run 'wally cleanup PATH'
* RPC reconnect in case of errors
* store more information for node - OSD settings, etc
@@ -112,6 +127,8 @@
* correct comparison between different systems
* Maybe move to 2.1:
+ * Add sensor collection time to them
+ * Make collection interval configurable per sensor type, make collection time separated for each sensor
* DB <-> files conversion, or just store all the time in files as well
* Automatically scale QD till saturation
* Runtime visualization
diff --git a/wally/common_types.py b/wally/common_types.py
index ce497ad..a4805c3 100644
--- a/wally/common_types.py
+++ b/wally/common_types.py
@@ -1,11 +1,40 @@
-from typing import NamedTuple, Dict, Any
+import abc
+from typing import Any, Union, List, Dict, NamedTuple
-from .istorable import IStorable
IP = str
IPAddr = NamedTuple("IPAddr", [("host", IP), ("port", int)])
+class IStorable(metaclass=abc.ABCMeta):
+ """Interface for type, which can be stored"""
+
+ @abc.abstractmethod
+ def raw(self) -> Dict[str, Any]:
+ pass
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+ pass
+
+
+Basic = Union[int, str, bytes, bool, None]
+StorableType = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
+
+
+class Storable(IStorable):
+ """Default implementation"""
+
+ def raw(self) -> Dict[str, Any]:
+ return {name: val for name, val in self.__dict__.items() if not name.startswith("_")}
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+ obj = cls.__new__(cls)
+ obj.__dict__.update(data)
+ return obj
+
+
class ConnCreds(IStorable):
def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
key_file: str = None, key: bytes = None) -> None:
diff --git a/wally/config.py b/wally/config.py
index 7554fb8..2178d0c 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,5 +1,6 @@
from typing import Any, Dict, Optional
-from .result_classes import IStorable
+
+from .common_types import IStorable
ConfigBlock = Dict[str, Any]
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
new file mode 100644
index 0000000..c71159f
--- /dev/null
+++ b/wally/hlstorage.py
@@ -0,0 +1,205 @@
+import re
+import os
+import array
+import struct
+import logging
+from typing import cast, Iterator, Tuple, Type, Dict, Set, List, Optional
+
+import numpy
+
+from .result_classes import (TestSuiteConfig, TestJobConfig, TimeSeries, DataSource,
+ StatProps, IResultStorage)
+from .storage import Storage
+from .utils import StopTestError
+from .suits.all_suits import all_suits
+
+
+logger = logging.getLogger('wally')
+
+
+class DB_re:
+ node_id = r'\d+.\d+.\d+.\d+:\d+'
+ job_id = r'[-a-zA-Z0-9]+_\d+'
+ sensor = r'[a-z_]+'
+ dev = r'[-a-zA-Z0-9_]+'
+ suite_id = r'[a-z]+_\d+'
+ tag = r'[a-z_.]+'
+
+
+class DB_paths:
+ suite_cfg_r = r'results/{suite_id}_info\.yml'
+ suite_cfg = suite_cfg_r.replace("\\.", '.')
+
+ job_cfg_r = r'results/{suite_id}\.{job_id}/info\.yml'
+ job_cfg = job_cfg_r.replace("\\.", '.')
+
+ job_extra_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+ job_extra = job_extra_r.replace("\\.", '.')
+
+ ts_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+ ts = ts_r.replace("\\.", '.')
+
+ stat_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+ stat = stat_r.replace("\\.", '.')
+
+ plot_r = r'report/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+ plot = plot_r.replace("\\.", '.')
+
+ report = r'report/'
+
+
+DB_rr = {name: r"(?P<{}>{})".format(name, rr) for name, rr in DB_re.__dict__.items() if not name.startswith("__")}
+
+
+class ResultStorage(IResultStorage):
+ # TODO: check that all path components match required patterns
+
+ ts_header_format = "!IIIcc"
+ ts_arr_tag = 'bin'
+ ts_raw_tag = 'txt'
+
+ def __init__(self, storage: Storage) -> None:
+ self.storage = storage
+
+ def sync(self) -> None:
+ self.storage.sync()
+
+ def put_or_check_suite(self, suite: TestSuiteConfig) -> None:
+ path = DB_paths.suite_cfg.format(suite_id=suite.storage_id)
+ if path in self.storage:
+ db_cfg = self.storage.get(path)
+ if db_cfg != suite:
+ logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
+ raise StopTestError()
+
+ self.storage.put(suite, path)
+
+ def put_job(self, suite: TestSuiteConfig, job: TestJobConfig) -> None:
+ path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
+ self.storage.put(job, path)
+
+ def put_ts(self, ts: TimeSeries) -> None:
+ data = cast(List[int], ts.data)
+ times = cast(List[int], ts.times)
+
+ if len(data) % ts.second_axis_size != 0:
+ logger.error("Time series data size(%s) is not propotional to second_axis_size(%s).",
+ len(data), ts.second_axis_size)
+ raise StopTestError()
+
+ if len(data) // ts.second_axis_size != len(times):
+ logger.error("Unbalanced data and time srray sizes. %s", ts)
+ raise StopTestError()
+
+ bin_path = DB_paths.ts.format(**ts.source(tag=self.ts_arr_tag).__dict__)
+
+ with self.storage.get_fd(bin_path, "cb") as fd:
+ header = struct.pack(self.ts_header_format,
+ ts.second_axis_size,
+ len(data),
+ len(times),
+ ts.data.typecode.encode("ascii"),
+ ts.times.typecode.encode("ascii"))
+ fd.write(header)
+ ts.data.tofile(fd) # type: ignore
+ ts.times.tofile(fd) # type: ignore
+
+ if ts.raw:
+ raw_path = DB_paths.ts.format(**ts.source(tag=self.ts_raw_tag).__dict__)
+ self.storage.put_raw(ts.raw, raw_path)
+
+ def put_extra(self, data: bytes, source: DataSource) -> None:
+ path = DB_paths.job_cfg.format(**source.__dict__)
+ self.storage.put_raw(data, path)
+
+ def put_stat(self, data: StatProps, source: DataSource) -> None:
+ path = DB_paths.stat.format(**source.__dict__)
+ self.storage.put(data, path)
+
+ def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
+ path = DB_paths.stat.format(**source.__dict__)
+ return self.storage.load(stat_cls, path)
+
+ def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
+ path = path_glob.format(**DB_rr).split("/")
+ yield from self.storage._iter_paths("", path, {})
+
+ def iter_suite(self, suite_type: str = None) -> Iterator[TestSuiteConfig]:
+ for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r):
+ assert is_file
+ suite = cast(TestSuiteConfig, self.storage.load(TestSuiteConfig, suite_info_path))
+ assert suite.storage_id == groups['suite_id']
+ if not suite_type or suite.test_type == suite_type:
+ yield suite
+
+ def iter_job(self, suite: TestSuiteConfig) -> Iterator[TestJobConfig]:
+ job_glob = DB_paths.job_cfg_r.replace('{suite_id}', suite.storage_id)
+ job_config_cls = all_suits[suite.test_type].job_config_cls
+
+ for is_file, path, groups in self.iter_paths(job_glob):
+ assert is_file
+ job = cast(TestJobConfig, self.storage.load(job_config_cls, path))
+ assert job.storage_id == groups['job_id']
+ yield job
+
+ def iter_datasource(self, suite: TestSuiteConfig, job: TestJobConfig) -> Iterator[Tuple[DataSource, Dict[str, str]]]:
+ ts_glob = DB_paths.ts_r.replace('{suite_id}', suite.storage_id).replace('{job_id}', job.storage_id)
+ ts_found = {} # type: Dict[Tuple[str, str, str], Dict[str, str]]
+
+ for is_file, path, groups in self.iter_paths(ts_glob):
+ assert is_file
+ key = (groups['node_id'], groups['dev'], groups['sensor'])
+ ts_found.setdefault(key, {})[groups['tag']] = path
+
+ for (node_id, dev, sensor), tag2path in ts_found.items():
+ if self.ts_arr_tag in tag2path:
+ yield DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id=node_id,
+ dev=dev, sensor=sensor, tag=None), tag2path
+
+ def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
+ with self.storage.get_fd(path, "rb") as fd:
+ header = fd.read(struct.calcsize(self.ts_header_format))
+ second_axis_size, data_sz, time_sz, data_typecode, time_typecode = \
+ struct.unpack(self.ts_header_format, header)
+
+ data = array.array(data_typecode.decode("ascii"))
+ times = array.array(time_typecode.decode("ascii"))
+
+ data.fromfile(fd, data_sz) # type: ignore
+ times.fromfile(fd, time_sz) # type: ignore
+
+ return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
+ raw=None,
+ data=numpy.array(data, dtype=numpy.dtype('float32')),
+ times=numpy.array(times),
+ second_axis_size=second_axis_size,
+ source=ds)
+
+ def iter_ts(self, suite: TestSuiteConfig, job: TestJobConfig, **filters) -> Iterator[TimeSeries]:
+ for ds, tag2path in self.iter_datasource(suite, job):
+ for name, val in filters.items():
+ if val != getattr(ds, name):
+ break
+ else:
+ ts = self.load_ts(ds, tag2path[self.ts_arr_tag])
+ if self.ts_raw_tag in tag2path:
+ ts.raw = self.storage.get_raw(tag2path[self.ts_raw_tag])
+
+ yield ts
+
+ # return path to file to be inserted into report
+ def put_plot_file(self, data: bytes, source: DataSource) -> str:
+ path = DB_paths.plot.format(**source.__dict__)
+ return cast(str, self.storage.put_raw(data, path))
+
+ def check_plot_file(self, source: DataSource) -> Optional[str]:
+ path = DB_paths.plot.format(**source.__dict__)
+ fpath = self.storage.resolve_raw(path)
+ if os.path.exists(fpath):
+ return fpath
+ return None
+
+ def put_report(self, report: str, name: str) -> str:
+ return self.storage.put_raw(report.encode("utf8"), DB_paths.report + name)
diff --git a/wally/html.py b/wally/html.py
new file mode 100644
index 0000000..e92e7d1
--- /dev/null
+++ b/wally/html.py
@@ -0,0 +1,2 @@
+def img(link):
+ return '<img src="{}">'.format(link)
diff --git a/wally/hw_info.py b/wally/hw_info.py
index aa53f8e..9da8cb7 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -5,6 +5,7 @@
from typing import List, Tuple, cast, Optional
from . import utils
+from .node_utils import get_os
from .node_interfaces import IRPCNode
@@ -130,7 +131,7 @@
def get_sw_info(node: IRPCNode) -> SWInfo:
res = SWInfo()
- res.OS_version = utils.get_os(node)
+ res.OS_version = get_os(node)
res.kernel_version = node.get_file_content('/proc/version').decode('utf8').strip()
res.mtab = node.get_file_content('/etc/mtab').decode('utf8').strip()
@@ -159,7 +160,7 @@
try:
lshw_out = node.run('sudo lshw -xml 2>/dev/null')
except Exception as exc:
- logger.warning("lshw failed on node %s: %s", node.info.node_id(), exc)
+ logger.warning("lshw failed on node %s: %s", node.node_id, exc)
return None
res = HWInfo()
diff --git a/wally/istorable.py b/wally/istorable.py
deleted file mode 100644
index 467f901..0000000
--- a/wally/istorable.py
+++ /dev/null
@@ -1,31 +0,0 @@
-import abc
-from typing import Any, Union, List, Dict
-
-
-class IStorable(metaclass=abc.ABCMeta):
- """Interface for type, which can be stored"""
-
- @abc.abstractmethod
- def raw(self) -> Dict[str, Any]:
- pass
-
- @abc.abstractclassmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
- pass
-
-
-class Storable(IStorable):
- """Default implementation"""
-
- def raw(self) -> Dict[str, Any]:
- return self.__dict__
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
- obj = cls.__new__(cls)
- obj.__dict__.update(data)
- return obj
-
-
-Basic = Union[int, str, bytes, bool, None]
-StorableType = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
diff --git a/wally/main.py b/wally/main.py
index 0553b4e..fd9b5a0 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -4,8 +4,10 @@
import pprint
import getpass
import logging
+import tempfile
import argparse
import functools
+import subprocess
import contextlib
from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
from yaml import load as _yaml_load
@@ -30,6 +32,7 @@
faulthandler = None
from . import utils, node
+from .node_utils import log_nodes_statistic
from .storage import make_storage, Storage
from .config import Config
from .logger import setup_loggers
@@ -91,7 +94,7 @@
def log_nodes_statistic_stage(ctx: TestRun) -> None:
- utils.log_nodes_statistic(ctx.nodes)
+ log_nodes_statistic(ctx.nodes)
def parse_args(argv):
@@ -121,6 +124,15 @@
report_parser.add_argument("data_dir", help="folder with rest results")
# ---------------------------------------------------------------------
+ ipython_help = 'run ipython in prepared environment'
+ ipython_parser = subparsers.add_parser('ipython', help=ipython_help)
+ ipython_parser.add_argument("storage_dir", help="Storage path")
+ # ---------------------------------------------------------------------
+ jupyter_help = 'run ipython in prepared environment'
+ jupyter_parser = subparsers.add_parser('jupyter', help=jupyter_help)
+ jupyter_parser.add_argument("storage_dir", help="Storage path")
+
+ # ---------------------------------------------------------------------
test_parser = subparsers.add_parser('test', help='run tests')
test_parser.add_argument('--build-description', type=str, default="Build info")
test_parser.add_argument('--build-id', type=str, default="id")
@@ -141,7 +153,7 @@
test_parser.add_argument("storage_dir", help="Path to test directory")
# ---------------------------------------------------------------------
- test_parser = subparsers.add_parser('db', help='resume tests')
+ test_parser = subparsers.add_parser('db', help='Exec command on DB')
test_parser.add_argument("cmd", choices=("show",), help="Command to execute")
test_parser.add_argument("params", nargs='*', help="Command params")
test_parser.add_argument("storage_dir", help="Storage path")
@@ -206,6 +218,48 @@
PrepareNodes()]
+notebook_kern = """
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {
+ "collapsed": true
+ },
+ "outputs": [],
+ "source": [
+ "from wally.storage import make_storage\n",
+ "from wally.hlstorage import ResultStorage\n"
+ "storage = make_storage(\"$STORAGE\", existing=True)\n",
+ "rstorage = ResultStorage(storage=storage)\n"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.5.2"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}"""
+
+
def main(argv: List[str]) -> int:
if faulthandler is not None:
faulthandler.register(signal.SIGUSR1, all_threads=True)
@@ -250,7 +304,8 @@
config = storage.load(Config, 'config')
stages.extend(get_run_stages())
stages.append(LoadStoredNodesStage())
- prev_opts = storage.get('cli')
+ prev_opts = storage.get('cli') # type: List[str]
+
if '--ssh-key-passwd' in prev_opts and opts.ssh_key_passwd:
prev_opts[prev_opts.index("--ssh-key-passwd") + 1] = opts.ssh_key_passwd
@@ -293,11 +348,26 @@
print("Unknown/not_implemented command {!r}".format(opts.cmd))
return 1
return 0
+ elif opts.subparser_name == 'ipython':
+ storage = make_storage(opts.storage_dir, existing=True)
+ from .hlstorage import ResultStorage
+ rstorage = ResultStorage(storage=storage)
+
+ import IPython
+ IPython.embed()
+
+ return 0
+ elif opts.subparser_name == 'jupyter':
+ with tempfile.NamedTemporaryFile() as fd:
+ fd.write(notebook_kern.replace("$STORAGE", opts.storage_dir))
+ subprocess.call("jupyter notebook ", shell=True)
+ return 0
+
report_stages = [] # type: List[Stage]
if not getattr(opts, "no_report", False):
- report_stages.append(CalcStatisticStage())
- report_stages.append(ConsoleReportStage())
+ # report_stages.append(CalcStatisticStage())
+ # report_stages.append(ConsoleReportStage())
report_stages.append(HtmlReportStage())
# log level is not a part of config
diff --git a/wally/node.py b/wally/node.py
index d4da52a..38342c6 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -1,4 +1,5 @@
import os
+import zlib
import time
import json
import socket
@@ -25,7 +26,11 @@
self.info = info
def __str__(self) -> str:
- return self.info.node_id()
+ return self.node_id
+
+ @property
+ def node_id(self) -> str:
+ return self.info.node_id
def put_to_file(self, path: Optional[str], content: bytes) -> str:
if path is None:
@@ -163,17 +168,19 @@
def __repr__(self) -> str:
return str(self)
- def get_file_content(self, path: str, expanduser: bool = False) -> bytes:
+ def get_file_content(self, path: str, expanduser: bool = False, compress: bool = True) -> bytes:
logger.debug("GET %s from %s", path, self.info)
if expanduser:
path = self.conn.fs.expanduser(path)
- res = self.conn.fs.get_file(path)
+ res = self.conn.fs.get_file(path, compress)
logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
+ if compress:
+ res = zlib.decompress(res)
return res
def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
if not nolog:
- logger.debug("Node %s - run %s", self.info.node_id(), cmd)
+ logger.debug("Node %s - run %s", self.node_id, cmd)
cmd_b = cmd.encode("utf8")
proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
@@ -188,21 +195,26 @@
if code != 0:
templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
- raise OSError(templ.format(self.info.node_id(), cmd, code, out))
+ raise OSError(templ.format(self.node_id, cmd, code, out))
return out
- def copy_file(self, local_path: str, remote_path: str = None, expanduser: bool = False) -> str:
+ def copy_file(self, local_path: str, remote_path: str = None,
+ expanduser: bool = False,
+ compress: bool = False) -> str:
+
if expanduser:
remote_path = self.conn.fs.expanduser(remote_path)
data = open(local_path, 'rb').read() # type: bytes
- return self.put_to_file(remote_path, data)
+ return self.put_to_file(remote_path, data, compress=compress)
- def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False) -> str:
+ def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
if expanduser:
path = self.conn.fs.expanduser(path)
- return self.conn.fs.store_file(path, content)
+ if compress:
+ content = zlib.compress(content)
+ return self.conn.fs.store_file(path, content, compress)
def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
if expanduser:
@@ -267,7 +279,7 @@
cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
- node.info.node_id(), log_file, log_level))
+ node.node_id, log_file, log_level))
else:
cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
cmd = cmd.format(python_cmd, code_file, ip, port)
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index 71efc54..935ca41 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -1,11 +1,13 @@
import abc
-from typing import Any, Set, Dict, NamedTuple, Optional
+import logging
+from typing import Any, Set, Dict, Optional, NamedTuple
+
from .ssh_utils import ConnCreds
-from .common_types import IPAddr
-from .istorable import IStorable
+from .common_types import IPAddr, IStorable
RPCCreds = NamedTuple("RPCCreds", [("addr", IPAddr), ("key_file", str), ("cert_file", str)])
+logger = logging.getLogger("wally")
class NodeInfo(IStorable):
@@ -23,11 +25,12 @@
if params is not None:
self.params = params
+ @property
def node_id(self) -> str:
return "{0.host}:{0.port}".format(self.ssh_creds.addr)
def __str__(self) -> str:
- return self.node_id()
+ return self.node_id
def __repr__(self) -> str:
return str(self)
@@ -89,6 +92,10 @@
conn = None # type: Any
rpc_log_file = None # type: str
+ @property
+ def node_id(self) -> str:
+ return self.info.node_id
+
@abc.abstractmethod
def __str__(self) -> str:
pass
diff --git a/wally/node_utils.py b/wally/node_utils.py
new file mode 100644
index 0000000..e66ba44
--- /dev/null
+++ b/wally/node_utils.py
@@ -0,0 +1,56 @@
+import logging
+import collections
+from typing import Dict, Sequence, NamedTuple
+
+from .node_interfaces import IRPCNode
+
+logger = logging.getLogger("wally")
+
+
+def log_nodes_statistic(nodes: Sequence[IRPCNode]) -> None:
+ logger.info("Found {0} nodes total".format(len(nodes)))
+
+ per_role = collections.defaultdict(int) # type: Dict[str, int]
+ for node in nodes:
+ for role in node.info.roles:
+ per_role[role] += 1
+
+ for role, count in sorted(per_role.items()):
+ logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+
+OSRelease = NamedTuple("OSRelease",
+ [("distro", str),
+ ("release", str),
+ ("arch", str)])
+
+
+def get_os(node: IRPCNode) -> OSRelease:
+ """return os type, release and architecture for node.
+ """
+ arch = node.run("arch", nolog=True).strip()
+
+ try:
+ node.run("ls -l /etc/redhat-release", nolog=True)
+ return OSRelease('redhat', None, arch)
+ except:
+ pass
+
+ try:
+ node.run("ls -l /etc/debian_version", nolog=True)
+
+ release = None
+ for line in node.run("lsb_release -a", nolog=True).split("\n"):
+ if ':' not in line:
+ continue
+ opt, val = line.split(":", 1)
+
+ if opt == 'Codename':
+ release = val.strip()
+
+ return OSRelease('ubuntu', release, arch)
+ except:
+ pass
+
+ raise RuntimeError("Unknown os")
diff --git a/wally/openstack.py b/wally/openstack.py
index 8aa9913..b7fbe31 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -155,7 +155,7 @@
creds = ConnCreds(host=to_ip(ip), user=user_name, key_file=private_key_path)
info = NodeInfo(creds, {'testnode'})
info.os_vm_id = vm_id
- nid = info.node_id()
+ nid = info.node_id
if nid in ctx.nodes_info:
logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
raise StopTestError()
@@ -199,7 +199,7 @@
with ctx.get_pool() as pool:
for info in launch_vms(ctx.os_connection, params, pool):
info.roles.add('testnode')
- nid = info.node_id()
+ nid = info.node_id
if nid in ctx.nodes_info:
logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
raise StopTestError()
diff --git a/wally/process_results.py b/wally/process_results.py
index 5ca53af..112826e 100644
--- a/wally/process_results.py
+++ b/wally/process_results.py
@@ -1,58 +1,53 @@
# put all result preprocessing here
# selection, aggregation
+from io import BytesIO
import logging
-
+from typing import Any
from .stage import Stage, StepOrder
from .test_run_class import TestRun
from .statistic import calc_norm_stat_props, calc_histo_stat_props
-from .result_classes import TestJobConfig
-from .suits.itest import ResultStorage
+from .result_classes import StatProps, DataSource, TimeSeries
+from .hlstorage import ResultStorage
from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+from .suits.io.fio import FioTest
from .utils import StopTestError
-logger = logging.getLogger("wally")
-
import matplotlib
-
-# have to be before pyplot import to avoid tkinter(default graph frontend) import error
matplotlib.use('svg')
-
import matplotlib.pyplot as plt
+logger = logging.getLogger("wally")
+
+
class CalcStatisticStage(Stage):
priority = StepOrder.TEST + 1
def run(self, ctx: TestRun) -> None:
- rstorage = ResultStorage(ctx.storage, TestJobConfig)
+ rstorage = ResultStorage(ctx.storage)
- for suite_cfg, path in rstorage.list_suites():
- if suite_cfg.test_type != 'fio':
- continue
-
- for job_cfg, path, _ in rstorage.list_jobs_in_suite(path):
+ for suite in rstorage.iter_suite(FioTest.name):
+ for job in rstorage.iter_job(suite):
results = {}
- for node_id, dev, sensor_name in rstorage.list_ts_in_job(path):
- ts = rstorage.load_ts(path, node_id, dev, sensor_name)
- if dev == 'fio' and sensor_name == 'lat':
+ for ts in rstorage.iter_ts(suite, job):
+ if ts.source.sensor == 'lat':
if ts.second_axis_size != expected_lat_bins:
logger.error("Sensor %s.%s on node %s has" +
"second_axis_size=%s. Can only process sensors with second_axis_size=%s.",
- dev, sensor_name, node_id, ts.second_axis_size, expected_lat_bins)
+ ts.source.dev, ts.source.sensor, ts.source.node_id,
+ ts.second_axis_size, expected_lat_bins)
continue
ts.bins_edges = get_lat_vals(ts.second_axis_size)
- stat_prop = calc_histo_stat_props(ts)
+ stat_prop = calc_histo_stat_props(ts) # type: StatProps
elif ts.second_axis_size != 1:
logger.warning("Sensor %s.%s on node %s provide 2D data with " +
"ts.second_axis_size=%s. Can't process it.",
- dev, sensor_name, node_id, ts.second_axis_size)
+ ts.source.dev, ts.source.sensor, ts.source.node_id, ts.second_axis_size)
continue
else:
stat_prop = calc_norm_stat_props(ts)
- results[(node_id, dev, sensor_name)] = stat_prop
-
raise StopTestError()
diff --git a/wally/report.py b/wally/report.py
index cf1289b..f8d8c5a 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,44 +1,90 @@
+import os
+import re
import abc
+import bisect
import logging
-from typing import Dict, Any, Iterator, Tuple, cast, List
+from io import BytesIO
+from functools import wraps
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable
+from collections import defaultdict
import numpy
-import scipy
import matplotlib
-
# have to be before pyplot import to avoid tkinter(default graph frontend) import error
matplotlib.use('svg')
-
import matplotlib.pyplot as plt
+import scipy.stats
+import wally
-
-
-from .utils import ssize2b
+from . import html
+from .utils import b2ssize
from .stage import Stage, StepOrder
from .test_run_class import TestRun
-from .result_classes import NormStatProps
+from .hlstorage import ResultStorage
+from .node_interfaces import NodeInfo
+from .storage import Storage
+from .statistic import calc_norm_stat_props, calc_histo_stat_props
+from .result_classes import (StatProps, DataSource, TimeSeries, TestSuiteConfig,
+ NormStatProps, HistoStatProps, TestJobConfig)
+from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+from .suits.io.fio import FioTest, FioJobConfig
+from .suits.io.fio_task_parser import FioTestSumm
+from .statistic import approximate_curve, average, dev
logger = logging.getLogger("wally")
-class ConsoleReportStage(Stage):
-
- priority = StepOrder.REPORT
-
- def run(self, ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
+# ---------------- CONSTS ---------------------------------------------------------------------------------------------
-class HtmlReportStage(Stage):
+DEBUG = False
+LARGE_BLOCKS = 256
+MiB2KiB = 1024
+MS2S = 1000
- priority = StepOrder.REPORT
- def run(self, ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
+# ---------------- PROFILES ------------------------------------------------------------------------------------------
+
+
+class ColorProfile:
+ primary_color = 'b'
+ suppl_color1 = 'teal'
+ suppl_color2 = 'magenta'
+ box_color = 'y'
+
+ noise_alpha = 0.3
+ subinfo_alpha = 0.7
+
+
+class StyleProfile:
+ grid = True
+ tide_layout = True
+ hist_boxes = 10
+ min_points_for_dev = 5
+
+ dev_range_x = 2.0
+ dev_perc = 95
+
+ avg_range = 20
+
+ curve_approx_level = 5
+ curve_approx_points = 100
+ assert avg_range >= min_points_for_dev
+
+ extra_io_spine = True
+
+ legend_for_eng = True
+
+ units = {
+ 'bw': ("MiBps", MiB2KiB, "bandwith"),
+ 'iops': ("IOPS", 1, "iops"),
+ 'lat': ("ms", 1, "latency")
+ }
+
+
+# ---------------- STRUCTS -------------------------------------------------------------------------------------------
# TODO: need to be revised, have to user StatProps fields instead
@@ -63,15 +109,497 @@
self.lat_95 = None # type: float
+class IOSummary:
+ def __init__(self,
+ qd: int,
+ block_size: int,
+ nodes_count:int,
+ bw: NormStatProps,
+ lat: HistoStatProps) -> None:
+
+ self.qd = qd
+ self.nodes_count = nodes_count
+ self.block_size = block_size
+
+ self.bw = bw
+ self.lat = lat
+
+
+# -------------- AGGREGATION AND STAT FUNCTIONS ----------------------------------------------------------------------
+rexpr = {
+ 'sensor': r'(?P<sensor>[-a-z]+)',
+ 'dev': r'(?P<dev>[^.]+)',
+ 'metric': r'(?P<metric>[a-z_]+)',
+ 'node': r'(?P<node>\d+\.\d+\.\d+\.\d+:\d+)',
+}
+
+def iter_sensors(storage: Storage, node: str = None, sensor: str = None, dev: str = None, metric: str = None):
+ if node is None:
+ node = rexpr['node']
+ if sensor is None:
+ sensor = rexpr['sensor']
+ if dev is None:
+ dev = rexpr['dev']
+ if metric is None:
+ metric = rexpr['metric']
+
+ rr = r"{}_{}\.{}\.{}$".format(node, sensor, dev, metric)
+ sensor_name_re = re.compile(rr)
+
+ for is_file, sensor_data_name in storage.list("sensors"):
+ if is_file:
+ rr = sensor_name_re.match(sensor_data_name)
+ if rr:
+ yield 'sensors/' + sensor_data_name, rr.groupdict()
+
+
+def make_iosum(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig) -> IOSummary:
+ lat = get_aggregated(rstorage, suite, job, "lat")
+ bins_edges = numpy.array(get_lat_vals(lat.second_axis_size), dtype='float32') / 1000
+ io = get_aggregated(rstorage, suite, job, "bw")
+
+ return IOSummary(job.qd,
+ nodes_count=len(suite.nodes_ids),
+ block_size=job.bsize,
+ lat=calc_histo_stat_props(lat, bins_edges, StyleProfile.hist_boxes),
+ bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
+
+#
+# def iter_io_results(rstorage: ResultStorage,
+# qds: List[int] = None,
+# op_types: List[str] = None,
+# sync_types: List[str] = None,
+# block_sizes: List[int] = None) -> Iterator[Tuple[TestSuiteConfig, FioJobConfig]]:
+#
+# for suite in rstorage.iter_suite(FioTest.name):
+# for job in rstorage.iter_job(suite):
+# fjob = cast(FioJobConfig, job)
+# assert int(fjob.vals['numjobs']) == 1
+#
+# if sync_types is not None and fjob.sync_mode in sync_types:
+# continue
+#
+# if block_sizes is not None and fjob.bsize not in block_sizes:
+# continue
+#
+# if op_types is not None and fjob.op_type not in op_types:
+# continue
+#
+# if qds is not None and fjob.qd not in qds:
+# continue
+#
+# yield suite, fjob
+
+
+def get_aggregated(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig, sensor: str) -> TimeSeries:
+ tss = list(rstorage.iter_ts(suite, job, sensor=sensor))
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id="__all__",
+ dev='fio',
+ sensor=sensor,
+ tag=None)
+
+ agg_ts = TimeSeries(sensor,
+ raw=None,
+ source=ds,
+ data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
+ times=tss[0].times.copy(),
+ second_axis_size=tss[0].second_axis_size)
+
+ for ts in tss:
+ if sensor == 'lat' and ts.second_axis_size != expected_lat_bins:
+ logger.error("Sensor %s.%s on node %s has" +
+ "second_axis_size=%s. Can only process sensors with second_axis_size=%s.",
+ ts.source.dev, ts.source.sensor, ts.source.node_id,
+ ts.second_axis_size, expected_lat_bins)
+ continue
+
+ if sensor != 'lat' and ts.second_axis_size != 1:
+ logger.error("Sensor %s.%s on node %s has" +
+ "second_axis_size=%s. Can only process sensors with second_axis_size=1.",
+ ts.source.dev, ts.source.sensor, ts.source.node_id, ts.second_axis_size)
+ continue
+
+ # TODO: match times on different ts
+ agg_ts.data += ts.data
+
+ return agg_ts
+
+
+# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
+
+def get_emb_data_svg(plt: Any) -> bytes:
+ bio = BytesIO()
+ plt.savefig(bio, format='svg')
+ img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+ return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
+
+
+def provide_plot(func: Callable[..., None]) -> Callable[..., str]:
+ @wraps(func)
+ def closure1(storage: ResultStorage, path: DataSource, *args, **kwargs) -> str:
+ fpath = storage.check_plot_file(path)
+ if not fpath:
+ func(*args, **kwargs)
+ fpath = storage.put_plot_file(get_emb_data_svg(plt), path)
+ plt.clf()
+ logger.debug("Save plot for %s to %r", path, fpath)
+ return fpath
+ return closure1
+
+
+def apply_style(style: StyleProfile, eng: bool = True, no_legend: bool = False) -> None:
+ if style.grid:
+ plt.grid(True)
+
+ if (style.legend_for_eng or not eng) and not no_legend:
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.03, 0.81)
+ plt.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+
+
+# -------------- PLOT FUNCTIONS --------------------------------------------------------------------------------------
+
+
+@provide_plot
+def plot_hist(title: str, units: str,
+ prop: StatProps,
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+
+ # TODO: unit should came from ts
+ total = sum(prop.bins_populations)
+ mids = prop.bins_mids
+ normed_bins = [population / total for population in prop.bins_populations]
+ bar_width = mids[1] - mids[0]
+ plt.bar(mids - bar_width / 2, normed_bins, color=colors.box_color, width=bar_width, label="Real data")
+
+ plt.xlabel(units)
+ plt.ylabel("Value probability")
+ plt.title(title)
+
+ dist_plotted = False
+ if isinstance(prop, NormStatProps):
+ nprop = cast(NormStatProps, prop)
+ stats = scipy.stats.norm(nprop.average, nprop.deviation)
+
+ # xpoints = numpy.linspace(mids[0], mids[-1], style.curve_approx_points)
+ # ypoints = stats.pdf(xpoints) / style.curve_approx_points
+
+ edges, step = numpy.linspace(mids[0], mids[-1], len(mids) * 10, retstep=True)
+
+ ypoints = stats.cdf(edges) * 11
+ ypoints = [next - prev for (next, prev) in zip(ypoints[1:], ypoints[:-1])]
+ xpoints = (edges[1:] + edges[:-1]) / 2
+
+ plt.plot(xpoints, ypoints, color=colors.primary_color, label="Expected from\nnormal distribution")
+ dist_plotted = True
+
+ apply_style(style, eng=True, no_legend=not dist_plotted)
+
+
+@provide_plot
+def plot_v_over_time(title: str, units: str,
+ ts: TimeSeries,
+ plot_avg_dev: bool = True,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+
+ min_time = min(ts.times)
+
+ # /1000 is us to ms conversion
+ time_points = [(val_time - min_time) / 1000 for val_time in ts.times]
+
+ alpha = colors.noise_alpha if plot_avg_dev else 1.0
+ plt.plot(time_points, ts.data, "o", color=colors.primary_color, alpha=alpha, label="Data")
+
+ if plot_avg_dev:
+ avg_vals = []
+ low_vals_dev = []
+ hight_vals_dev = []
+ avg_times = []
+ dev_times = []
+
+ start = (len(ts.data) % style.avg_range) // 2
+ points = list(range(start, len(ts.data) + 1, style.avg_range))
+
+ for begin, end in zip(points[:-1], points[1:]):
+ vals = ts.data[begin: end]
+
+ cavg = average(vals)
+ cdev = dev(vals)
+ tavg = average(time_points[begin: end])
+
+ avg_vals.append(cavg)
+ avg_times.append(tavg)
+
+ low_vals_dev.append(cavg - style.dev_range_x * cdev)
+ hight_vals_dev.append(cavg + style.dev_range_x * cdev)
+ dev_times.append(tavg)
+
+ avg_timepoints = cast(List[float], numpy.linspace(avg_times[0], avg_times[-1], style.curve_approx_points))
+
+ low_vals_dev = approximate_curve(dev_times, low_vals_dev, avg_timepoints, style.curve_approx_level)
+ hight_vals_dev = approximate_curve(dev_times, hight_vals_dev, avg_timepoints, style.curve_approx_level)
+ new_vals_avg = approximate_curve(avg_times, avg_vals, avg_timepoints, style.curve_approx_level)
+
+ plt.plot(avg_timepoints, new_vals_avg, c=colors.suppl_color1,
+ label="Average\nover {}s".format(style.avg_range))
+ plt.plot(avg_timepoints, low_vals_dev, c=colors.suppl_color2,
+ label="Avg \xB1 {} * stdev\nover {}s".format(style.dev_range_x, style.avg_range))
+ plt.plot(avg_timepoints, hight_vals_dev, c=colors.suppl_color2)
+
+ plt.xlim(-5, max(time_points) + 5)
+
+ plt.xlabel("Time, seconds from test begin")
+ plt.ylabel("{}. Average and \xB1stddev over {} points".format(units, style.avg_range))
+ plt.title(title)
+ apply_style(style, eng=True)
+
+
+@provide_plot
+def plot_lat_over_time(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+
+ min_time = min(ts.times)
+ times = [int(tm - min_time + 500) // 1000 for tm in ts.times]
+ ts_len = len(times)
+ step = ts_len / samples
+ points = [times[int(i * step + 0.5)] for i in range(samples)]
+ points.append(times[-1])
+ bounds = list(zip(points[:-1], points[1:]))
+ data = numpy.array(ts.data, dtype='int32')
+ data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size] # type: ignore
+ agg_data = []
+ positions = []
+ labels = []
+
+ min_idxs = []
+ max_idxs = []
+
+ for begin, end in bounds:
+ agg_hist = numpy.sum(data[begin:end], axis=0)
+
+ vals = numpy.empty(shape=(numpy.sum(agg_hist),), dtype='float32')
+ cidx = 0
+ non_zero = agg_hist.nonzero()[0]
+ min_idxs.append(non_zero[0])
+ max_idxs.append(non_zero[-1])
+
+ for pos in non_zero:
+ vals[cidx:cidx + agg_hist[pos]] = bins_vals[pos]
+ cidx += agg_hist[pos]
+
+ agg_data.append(vals)
+ positions.append((end + begin) / 2)
+ labels.append(str((end + begin) // 2))
+
+ min_y = bins_vals[min(min_idxs)]
+ max_y = bins_vals[max(max_idxs)]
+
+ min_y -= (max_y - min_y) * 0.05
+ max_y += (max_y - min_y) * 0.05
+
+ # plot box size adjust (only plot, not spines and legend)
+ plt.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
+ plt.xlim(min(times), max(times))
+ plt.ylim(min_y, max_y)
+ plt.xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
+ plt.ylabel("Latency, ms")
+ plt.title(title)
+ apply_style(style, eng=True, no_legend=True)
+
+
+@provide_plot
+def plot_heatmap(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+ hist_bins_count = 20
+ bin_top = [100 * 2 ** i for i in range(20)]
+ bin_ranges = [[0, 0]]
+ cborder_it = iter(bin_top)
+ cborder = next(cborder_it)
+ for bin_val in bins_vals:
+ if bin_val < cborder:
+ bin_ranges
+
+ # bins: [100us, 200us, ...., 104s]
+ # msp origin bins ranges to heatmap bins
+
+@provide_plot
+def io_chart(title: str,
+ legend: str,
+ iosums: List[IOSummary],
+ iops_log_spine: bool = False,
+ lat_log_spine: bool = False,
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+
+ # -------------- MAGIC VALUES ---------------------
+ # IOPS bar width
+ width = 0.35
+
+ # offset from center of bar to deviation/confidence range indicator
+ err_x_offset = 0.05
+
+ # figure size in inches
+ figsize = (12, 6)
+
+ # extra space on top and bottom, comparing to maximal tight layout
+ extra_y_space = 0.05
+
+ # additional spine for BW/IOPS on left side of plot
+ extra_io_spine_x_offset = -0.1
+
+ # extra space on left and right sides
+ extra_x_space = 0.5
+
+ # legend location settings
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.1, 0.81)
+
+ # plot box size adjust (only plot, not spines and legend)
+ plot_box_adjust = {'right': 0.66}
+ # -------------- END OF MAGIC VALUES ---------------------
+
+ block_size = iosums[0].block_size
+ lc = len(iosums)
+ xt = list(range(1, lc + 1))
+
+ # x coordinate of middle of the bars
+ xpos = [i - width / 2 for i in xt]
+
+ # import matplotlib.gridspec as gridspec
+ # gs = gridspec.GridSpec(1, 3, width_ratios=[1, 4, 1])
+ # p1 = plt.subplot(gs[1])
+
+ fig, p1 = plt.subplots(figsize=figsize)
+
+ # plot IOPS/BW bars
+ if block_size >= LARGE_BLOCKS:
+ iops_primary = False
+ coef = MiB2KiB
+ p1.set_ylabel("BW (MiBps)")
+ else:
+ iops_primary = True
+ coef = block_size
+ p1.set_ylabel("IOPS")
+
+ p1.bar(xpos, [iosum.bw.average / coef for iosum in iosums], width=width, color=colors.box_color, label=legend)
+
+ # set correct x limits for primary IO spine
+ min_io = min(iosum.bw.average - iosum.bw.deviation * style.dev_range_x for iosum in iosums)
+ max_io = max(iosum.bw.average + iosum.bw.deviation * style.dev_range_x for iosum in iosums)
+ border = (max_io - min_io) * extra_y_space
+ io_lims = (min_io - border, max_io + border)
+
+ p1.set_ylim(io_lims[0] / coef, io_lims[-1] / coef)
+
+ # plot deviation and confidence error ranges
+ err1_legend = err2_legend = None
+ for pos, iosum in zip(xpos, iosums):
+ err1_legend = p1.errorbar(pos + width / 2 - err_x_offset,
+ iosum.bw.average / coef,
+ iosum.bw.deviation * style.dev_range_x / coef,
+ alpha=colors.subinfo_alpha,
+ color=colors.suppl_color1) # 'magenta'
+ err2_legend = p1.errorbar(pos + width / 2 + err_x_offset,
+ iosum.bw.average / coef,
+ iosum.bw.confidence / coef,
+ alpha=colors.subinfo_alpha,
+ color=colors.suppl_color2) # 'teal'
+
+ if style.grid:
+ p1.grid(True)
+
+ handles1, labels1 = p1.get_legend_handles_labels()
+
+ handles1 += [err1_legend, err2_legend]
+ labels1 += ["{}% dev".format(style.dev_perc),
+ "{}% conf".format(int(100 * iosums[0].bw.confidence_level))]
+
+ # extra y spine for latency on right side
+ p2 = p1.twinx()
+
+ # plot median and 95 perc latency
+ p2.plot(xt, [iosum.lat.perc_50 for iosum in iosums], label="lat med")
+ p2.plot(xt, [iosum.lat.perc_95 for iosum in iosums], label="lat 95%")
+
+ # limit and label x spine
+ plt.xlim(extra_x_space, lc + extra_x_space)
+ plt.xticks(xt, ["{0} * {1}".format(iosum.qd, iosum.nodes_count) for iosum in iosums])
+ p1.set_xlabel("QD * Test node count")
+
+ # apply log scales for X spines, if set
+ if iops_log_spine:
+ p1.set_yscale('log')
+
+ if lat_log_spine:
+ p2.set_yscale('log')
+
+ # extra y spine for BW/IOPS on left side
+ if style.extra_io_spine:
+ p3 = p1.twinx()
+ if iops_log_spine:
+ p3.set_yscale('log')
+
+ if iops_primary:
+ p3.set_ylabel("BW (MiBps)")
+ p3.set_ylim(io_lims[0] / MiB2KiB, io_lims[1] / MiB2KiB)
+ else:
+ p3.set_ylabel("IOPS")
+ p3.set_ylim(io_lims[0] / block_size, io_lims[1] / block_size)
+
+ p3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
+ p3.spines["left"].set_visible(True)
+ p3.yaxis.set_label_position('left')
+ p3.yaxis.set_ticks_position('left')
+
+ p2.set_ylabel("Latency (ms)")
+
+ plt.title(title)
+
+ # legend box
+ handles2, labels2 = p2.get_legend_handles_labels()
+ plt.legend(handles1 + handles2, labels1 + labels2, loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+
+ # adjust central box size to fit legend
+ plt.subplots_adjust(**plot_box_adjust)
+ apply_style(style, eng=False, no_legend=True)
+
+
+# -------------------- REPORT HELPERS --------------------------------------------------------------------------------
+
+
class HTMLBlock:
data = None # type: str
js_links = [] # type: List[str]
css_links = [] # type: List[str]
+class Menu1st:
+ engineering = "Engineering"
+ summary = "Summary"
+
+
+class Menu2ndEng:
+ iops_time = "IOPS(time)"
+ hist = "IOPS/lat overall histogram"
+ lat_time = "Lat(time)"
+
+
+class Menu2ndSumm:
+ io_lat_qd = "IO & Lat vs QD"
+
+
+menu_1st_order = [Menu1st.summary, Menu1st.engineering]
+
+
+# -------------------- REPORTS --------------------------------------------------------------------------------------
+
+
class Reporter(metaclass=abc.ABCMeta):
@abc.abstractmethod
- def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: TestSuiteConfig, storage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
pass
@@ -81,8 +609,38 @@
# Main performance report
-class IOPS_QD(Reporter):
+class IO_QD(Reporter):
"""Creates graph, which show how IOPS and Latency depend on QD"""
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ ts_map = {} # type: Dict[FioTestSumm, List[IOSummary]]
+ str_summary = {} # type: Dict[FioTestSumm, List[IOSummary]]
+ for job in rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ tpl_no_qd = fjob.characterized_tuple_no_qd()
+ io_summ = make_iosum(rstorage, suite, job)
+
+ if tpl_no_qd not in ts_map:
+ ts_map[tpl_no_qd] = [io_summ]
+ str_summary[tpl_no_qd] = (fjob.summary_no_qd(), fjob.long_summary_no_qd())
+ else:
+ ts_map[tpl_no_qd].append(io_summ)
+
+ for tpl, iosums in ts_map.items():
+ iosums.sort(key=lambda x: x.qd)
+ summary, summary_long = str_summary[tlp]
+
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id="io_over_qd_".format(summary),
+ node_id="__all__",
+ dev='fio',
+ sensor="io_over_qd",
+ tag="svg")
+
+ title = "IOPS, BW, Lat vs. QD.\n" + summary_long
+ fpath = io_chart(rstorage, ds, title=title, legend="IOPS/BW", iosums=iosums)
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ if DEBUG:
+ return
# Linearization report
@@ -91,21 +649,231 @@
# IOPS/latency distribution
-class IOPSHist(Reporter):
+class IOHist(Reporter):
"""IOPS.latency distribution histogram"""
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ for job in rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000 # convert us to ms
+ lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, bins_count=StyleProfile.hist_boxes)
+
+ title = "Latency distribution. " + fjob.long_summary
+ units = "ms"
+
+ fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ else:
+ yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
+
+ agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+
+ if fjob.bsize >= LARGE_BLOCKS:
+ title = "BW distribution. " + fjob.long_summary
+ units = "MiBps"
+ agg_io.data /= MiB2KiB
+ else:
+ title = "IOPS distribution. " + fjob.long_summary
+ agg_io.data /= fjob.bsize
+ units = "IOPS"
+
+ io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
+ fpath = plot_hist(rstorage, agg_io.source(tag='hist.svg'), title, units, io_stat_prop)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ return
+ else:
+ yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
-# IOPS/latency over test time
-class IOPSTime(Reporter):
+# IOPS/latency over test time for each job
+class IOTime(Reporter):
"""IOPS/latency during test"""
- def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- pass
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ for job in rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000
+ title = "Latency during test. " + fjob.long_summary
+
+ fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), title, agg_lat, bins_edges)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ else:
+ yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+
+ fpath = plot_heatmap(rstorage, agg_lat.source(tag='hmap.svg'), title, agg_lat, bins_edges)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ else:
+ yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+
+ agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+ if fjob.bsize >= LARGE_BLOCKS:
+ title = "BW during test. " + fjob.long_summary
+ units = "MiBps"
+ agg_io.data /= MiB2KiB
+ else:
+ title = "IOPS during test. " + fjob.long_summary
+ agg_io.data /= fjob.bsize
+ units = "IOPS"
+
+ fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io)
+
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ return
+ else:
+ yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
+
+
+def is_sensor_numarray(sensor: str, metric: str) -> bool:
+ """Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
+ return True
+
+
+LEVEL_SENSORS = {("block-io", "io_queue"),
+ ("system-cpu", "procs_blocked"),
+ ("system-cpu", "procs_queue")}
+
+
+def is_level_sensor(sensor: str, metric: str) -> bool:
+ """Returns True if sensor measure level of any kind, E.g. queue depth."""
+ return (sensor, metric) in LEVEL_SENSORS
+
+
+def is_delta_sensor(sensor: str, metric: str) -> bool:
+ """Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
+ return not is_level_sensor(sensor, metric)
+
+
+
+def get_sensor(storage: Storage, node: str, sensor: str, dev: str, metric: str,
+ time_range: Tuple[int, int]) -> numpy.array:
+ """Return sensor values for given node for given period. Return per second estimated values array
+
+ Raise an error if required range is not full covered by data in storage.
+ First it finds range of results from sensor, which fully covers requested range.
+ ...."""
+
+ collected_at = numpy.array(storage.get_array("sensors/{}_collected_at".format(node)), dtype="int")
+ data = numpy.array(storage.get_array("sensors/{}_{}.{}.{}".format(node, sensor, dev, metric)))
+
+ # collected_at is array of pairs (collection_started_at, collection_finished_at)
+ collection_start_at = collected_at[::2]
+
+ MICRO = 1000000
+
+ # convert secods to us
+ begin = time_range[0] * MICRO
+ end = time_range[1] * MICRO
+
+ if begin < collection_start_at[0] or end > collection_start_at[-1] or end <= begin:
+ raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
+ "sensor = {}_{}.{}.{}").format(time_range,
+ collected_at[0] // MICRO,
+ collected_at[-1] // MICRO,
+ node, sensor, dev, metric))
+
+ pos1, pos2 = numpy.searchsorted(collection_start_at, (begin, end))
+ assert pos1 >= 1
+
+ time_bounds = collection_start_at[pos1 - 1: pos2]
+ edge_it = iter(time_bounds)
+ val_it = iter(data[pos1 - 1: pos2])
+
+ result = []
+ curr_summ = 0
+
+ results_cell_ends = begin + MICRO
+ curr_end = next(edge_it)
+
+ while results_cell_ends <= end:
+ curr_start = curr_end
+ curr_end = next(edge_it)
+ curr_val = next(val_it)
+ while curr_end >= results_cell_ends and results_cell_ends <= end:
+ current_part = (results_cell_ends - curr_start) / (curr_end - curr_start) * curr_val
+ result.append(curr_summ + current_part)
+ curr_summ = 0
+ curr_val -= current_part
+ curr_start = results_cell_ends
+ results_cell_ends += MICRO
+ curr_summ += curr_val
+
+ assert len(result) == (end - begin) // MICRO
+ return result
+
+
+class ResourceUsage:
+ def __init__(self, io_r_ops: int, io_w_ops: int, io_r_kb: int, io_w_kb: int) -> None:
+ self.io_w_ops = io_w_ops
+ self.io_r_ops = io_r_ops
+ self.io_w_kb = io_w_kb
+ self.io_r_kb = io_r_kb
+
+ self.cpu_used_user = None # type: int
+ self.cpu_used_sys = None # type: int
+ self.cpu_wait_io = None # type: int
+
+ self.net_send_packets = None # type: int
+ self.net_recv_packets = None # type: int
+ self.net_send_kb = None # type: int
+ self.net_recv_kb = None # type: int
# Cluster load over test time
class ClusterLoad(Reporter):
"""IOPS/latency during test"""
+ storage_sensors = [
+ ('block-io', 'reads_completed', "Read ops"),
+ ('block-io', 'writes_completed', "Write ops"),
+ ('block-io', 'sectors_read', "Read kb"),
+ ('block-io', 'sectors_written', "Write kb"),
+ ]
+
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ # split nodes on test and other
+ storage = rstorage.storage
+ nodes = storage.load_list(NodeInfo, "all_nodes") # type: List[NodeInfo]
+
+ test_nodes = {node.node_id for node in nodes if 'testnode' in node.roles}
+ cluster_nodes = {node.node_id for node in nodes if 'testnode' not in node.roles}
+
+ for job in rstorage.iter_job(suite):
+ # convert ms to s
+ time_range = (job.reliable_info_starts_at // MS2S, job.reliable_info_stops_at // MS2S)
+ len = time_range[1] - time_range[0]
+
+ for sensor, metric, sensor_title in self.storage_sensors:
+ sum_testnode = numpy.zeros((len,))
+ sum_other = numpy.zeros((len,))
+
+ for path, groups in iter_sensors(rstorage.storage, sensor=sensor, metric=metric):
+ data = get_sensor(rstorage.storage, groups['node'], sensor, groups['dev'], metric, time_range)
+ if groups['node'] in test_nodes:
+ sum_testnode += data
+ else:
+ sum_other += data
+
+ ds = DataSource(suite_id=suite.storage_id, job_id=job.summary, node_id="cluster",
+ dev=sensor, sensor=metric, tag="ts.svg")
+
+ # s to ms
+ ts = TimeSeries(name="", times=numpy.arange(*time_range) * MS2S, data=sum_testnode, raw=None)
+ fpath = plot_v_over_time(rstorage, ds, "{}.{}".format(sensor, metric), sensor_title, ts=ts)
+ yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
+
+ if DEBUG:
+ return
+
+
+# Ceph cluster summary
+class ResourceConsumption(Reporter):
+ """Resources consumption report, only text"""
+
# Node load over test time
class NodeLoad(Reporter):
@@ -117,12 +885,72 @@
"""IOPS/latency during test"""
-# TODO: Resource consumption report
# TODO: Ceph operation breakout report
# TODO: Resource consumption for different type of test
-#
+# ------------------------------------------ REPORT STAGES -----------------------------------------------------------
+
+
+class HtmlReportStage(Stage):
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ rstorage = ResultStorage(ctx.storage)
+ reporters = [ClusterLoad()] # IO_QD(), IOTime(), IOHist()] # type: List[Reporter]
+
+ root_dir = os.path.dirname(os.path.dirname(wally.__file__))
+ doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
+ report_template = open(doc_templ_path, "rt").read()
+ css_file_src = os.path.join(root_dir, "report_templates/main.css")
+ css_file = open(css_file_src, "rt").read()
+
+ menu_block = []
+ content_block = []
+ link_idx = 0
+
+ matplotlib.rcParams.update({'font.size': 10})
+
+ items = defaultdict(lambda: defaultdict(list)) # type: Dict[str, Dict[str, list]]
+ for suite in rstorage.iter_suite(FioTest.name):
+ for reporter in reporters:
+ for block, item, html in reporter.get_divs(suite, rstorage):
+ items[block][item].append(html)
+
+ for idx_1st, menu_1st in enumerate(sorted(items, key=lambda x: menu_1st_order.index(x))):
+ menu_block.append(
+ '<a href="#item{}" class="nav-group" data-toggle="collapse" data-parent="#MainMenu">{}</a>'
+ .format(idx_1st, menu_1st)
+ )
+ menu_block.append('<div class="collapse" id="item{}">'.format(idx_1st))
+ for menu_2nd in sorted(items[menu_1st]):
+ menu_block.append(' <a href="#content{}" class="nav-group-item">{}</a>'
+ .format(link_idx, menu_2nd))
+ content_block.append('<div id="content{}">'.format(link_idx))
+ content_block.extend(" " + x for x in items[menu_1st][menu_2nd])
+ content_block.append('</div>')
+ link_idx += 1
+ menu_block.append('</div>')
+
+ report = report_template.replace("{{{menu}}}", ("\n" + " " * 16).join(menu_block))
+ report = report.replace("{{{content}}}", ("\n" + " " * 16).join(content_block))
+ report_path = rstorage.put_report(report, "index.html")
+ rstorage.put_report(css_file, "main.css")
+ logger.info("Report is stored into %r", report_path)
+
+
+class ConsoleReportStage(Stage):
+
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ # TODO(koder): load data from storage
+ raise NotImplementedError("...")
+
+
+# --------------------------- LEGASY --------------------------------------------------------------------------------
+
+
# # disk_info = None
# # base = None
# # linearity = None
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 3616f47..1a148ed 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,5 +1,7 @@
+import abc
import array
-from typing import Dict, List, Any, Optional, Tuple, cast
+from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator
+from collections import OrderedDict
import numpy
@@ -7,13 +9,31 @@
from .node_interfaces import IRPCNode
-from .istorable import IStorable, Storable
+from .common_types import IStorable, Storable
from .utils import round_digits, Number
-class TestJobConfig(Storable):
- def __init__(self) -> None:
- self.summary = None # type: str
+class TestJobConfig(Storable, metaclass=abc.ABCMeta):
+ def __init__(self, idx: int) -> None:
+ self.idx = idx
+ self.reliable_info_time_range = None # type: Tuple[int, int]
+ self.vals = OrderedDict() # type: Dict[str, Any]
+
+ @property
+ def storage_id(self) -> str:
+ return "{}_{}".format(self.summary, self.idx)
+
+ @abc.abstractproperty
+ def characterized_tuple(self) -> Tuple:
+ pass
+
+ @abc.abstractproperty
+ def summary(self, *excluded_fields) -> str:
+ pass
+
+ @abc.abstractproperty
+ def long_summary(self, *excluded_fields) -> str:
+ pass
class TestSuiteConfig(IStorable):
@@ -31,15 +51,22 @@
params: Dict[str, Any],
run_uuid: str,
nodes: List[IRPCNode],
- remote_dir: str) -> None:
+ remote_dir: str,
+ idx: int) -> None:
self.test_type = test_type
self.params = params
self.run_uuid = run_uuid
self.nodes = nodes
- self.nodes_ids = [node.info.node_id() for node in nodes]
+ self.nodes_ids = [node.node_id for node in nodes]
self.remote_dir = remote_dir
+ self.storage_id = "{}_{}".format(self.test_type, idx)
- def __eq__(self, other: 'TestSuiteConfig') -> bool:
+ def __eq__(self, o: object) -> bool:
+ if type(o) is not self.__class__:
+ return False
+
+ other = cast(TestSuiteConfig, o)
+
return (self.test_type == other.test_type and
self.params == other.params and
set(self.nodes_ids) == set(other.nodes_ids))
@@ -62,23 +89,50 @@
return obj
+class DataSource:
+ def __init__(self,
+ suite_id: str = None,
+ job_id: str = None,
+ node_id: str = None,
+ dev: str = None,
+ sensor: str = None,
+ tag: str = None) -> None:
+ self.suite_id = suite_id
+ self.job_id = job_id
+ self.node_id = node_id
+ self.dev = dev
+ self.sensor = sensor
+ self.tag = tag
+
+ def __call__(self, **kwargs) -> 'DataSource':
+ dct = self.__dict__.copy()
+ dct.update(kwargs)
+ return self.__class__(**dct)
+
+ def __str__(self) -> str:
+ return "{0.suite_id}.{0.job_id}/{0.node_id}/{0.dev}.{0.sensor}.{0.tag}".format(self)
+
+ def __repr__(self) -> str:
+ return str(self)
+
+
class TimeSeries:
"""Data series from sensor - either system sensor or from load generator tool (e.g. fio)"""
def __init__(self,
name: str,
raw: Optional[bytes],
- data: array.array,
- times: array.array,
+ data: numpy.array,
+ times: numpy.array,
second_axis_size: int = 1,
- bins_edges: List[float] = None) -> None:
+ source: DataSource = None) -> None:
# Sensor name. Typically DEV_NAME.METRIC
self.name = name
# Time series times and values. Time in ms from Unix epoch.
- self.times = times # type: List[int]
- self.data = data # type: List[int]
+ self.times = times
+ self.data = data
# Not equal to 1 in case of 2d sensors, like latency, when each measurement is a histogram.
self.second_axis_size = second_axis_size
@@ -86,8 +140,18 @@
# Raw sensor data (is provided). Like log file for fio iops/bw/lat.
self.raw = raw
- # bin edges for historgam timeseries
- self.bins_edges = bins_edges
+ self.source = source
+
+ def __str__(self) -> str:
+ res = "TS({}):\n".format(self.name)
+ res += " source={}\n".format(self.source)
+ res += " times_size={}\n".format(len(self.times))
+ res += " data_size={}\n".format(len(self.data))
+ res += " data_shape={}x{}\n".format(len(self.data) // self.second_axis_size, self.second_axis_size)
+ return res
+
+ def __repr__(self) -> str:
+ return str(self)
# (node_name, source_dev, metric_name) => metric_results
@@ -96,7 +160,7 @@
class StatProps(IStorable):
"Statistic properties for timeseries with unknown data distribution"
- def __init__(self, data: List[Number]) -> None:
+ def __init__(self, data: numpy.array) -> None:
self.perc_99 = None # type: float
self.perc_95 = None # type: float
self.perc_90 = None # type: float
@@ -106,8 +170,8 @@
self.max = None # type: Number
# bin_center: bin_count
- self.bins_populations = None # type: List[int]
- self.bins_edges = None # type: List[float]
+ self.bins_populations = None # type: numpy.array
+ self.bins_mids = None # type: numpy.array
self.data = data
def __str__(self) -> str:
@@ -122,14 +186,16 @@
def raw(self) -> Dict[str, Any]:
data = self.__dict__.copy()
- data['bins_edges'] = list(self.bins_edges)
+ del data['data']
+ data['bins_mids'] = list(self.bins_mids)
data['bins_populations'] = list(self.bins_populations)
return data
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
- data['bins_edges'] = numpy.array(data['bins_edges'])
+ data['bins_mids'] = numpy.array(data['bins_mids'])
data['bins_populations'] = numpy.array(data['bins_populations'])
+ data['data'] = None
res = cls.__new__(cls)
res.__dict__.update(data)
return res
@@ -138,14 +204,14 @@
class HistoStatProps(StatProps):
"""Statistic properties for 2D timeseries with unknown data distribution and histogram as input value.
Used for latency"""
- def __init__(self, data: List[Number], second_axis_size: int) -> None:
+ def __init__(self, data: numpy.array, second_axis_size: int) -> None:
self.second_axis_size = second_axis_size
StatProps.__init__(self, data)
class NormStatProps(StatProps):
"Statistic properties for timeseries with normal data distribution. Used for iops/bw"
- def __init__(self, data: List[Number]) -> None:
+ def __init__(self, data: numpy.array) -> None:
StatProps.__init__(self, data)
self.average = None # type: float
@@ -153,6 +219,8 @@
self.confidence = None # type: float
self.confidence_level = None # type: float
self.normtest = None # type: NormaltestResult
+ self.skew = None # type: float
+ self.kurt = None # type: float
def __str__(self) -> str:
res = ["NormStatProps(size = {}):".format(len(self.data)),
@@ -163,13 +231,16 @@
" perc_95 = {}".format(round_digits(self.perc_95)),
" perc_99 = {}".format(round_digits(self.perc_99)),
" range {} {}".format(round_digits(self.min), round_digits(self.max)),
- " normtest = {0.normtest}".format(self)]
+ " normtest = {0.normtest}".format(self),
+ " skew ~ kurt = {0.skew} ~ {0.kurt}".format(self)]
return "\n".join(res)
def raw(self) -> Dict[str, Any]:
data = self.__dict__.copy()
data['normtest'] = (data['nortest'].statistic, data['nortest'].pvalue)
- data['bins_edges'] = list(self.bins_edges)
+ del data['data']
+ data['bins_mids'] = list(self.bins_mids)
+ data['bins_populations'] = list(self.bins_populations)
return data
@classmethod
@@ -195,3 +266,51 @@
self.run_interval = (begin_time, end_time)
self.raw = raw # type: JobMetrics
self.processed = None # type: JobStatMetrics
+
+
+class IResultStorage(metaclass=abc.ABCMeta):
+
+ @abc.abstractmethod
+ def sync(self) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_or_check_suite(self, suite: TestSuiteConfig) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_job(self, suite: TestSuiteConfig, job: TestJobConfig) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_ts(self, ts: TimeSeries) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_extra(self, data: bytes, source: DataSource) -> None:
+ pass
+
+ @abc.abstractmethod
+ def put_stat(self, data: StatProps, source: DataSource) -> None:
+ pass
+
+ @abc.abstractmethod
+ def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
+ pass
+
+ @abc.abstractmethod
+ def iter_suite(self, suite_type: str = None) -> Iterator[TestSuiteConfig]:
+ pass
+
+ @abc.abstractmethod
+ def iter_job(self, suite: TestSuiteConfig) -> Iterator[TestJobConfig]:
+ pass
+
+ @abc.abstractmethod
+ def iter_ts(self, suite: TestSuiteConfig, job: TestJobConfig) -> Iterator[TimeSeries]:
+ pass
+
+ # return path to file to be inserted into report
+ @abc.abstractmethod
+ def put_plot_file(self, data: bytes, source: DataSource) -> str:
+ pass
diff --git a/wally/run_test.py b/wally/run_test.py
index a11b43d..d3c68b6 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -10,21 +10,11 @@
from .node_interfaces import NodeInfo, IRPCNode
from .stage import Stage, StepOrder
from .sensors import collect_sensors_data
-from .suits.io.fio import IOPerfTest
-from .suits.itest import TestSuiteConfig
-from .suits.mysql import MysqlTest
-from .suits.omgbench import OmgTest
-from .suits.postgres import PgBenchTest
+from .suits.all_suits import all_suits
from .test_run_class import TestRun
from .utils import StopTestError
-
-
-TOOL_TYPE_MAPPER = {
- "io": IOPerfTest,
- "pgbench": PgBenchTest,
- "mysql": MysqlTest,
- "omg": OmgTest,
-}
+from .result_classes import TestSuiteConfig
+from .hlstorage import ResultStorage
logger = logging.getLogger("wally")
@@ -81,7 +71,7 @@
if ctx.config.get("download_rpc_logs", False):
for node in ctx.nodes:
if node.rpc_log_file is not None:
- nid = node.info.node_id()
+ nid = node.node_id
path = "rpc_logs/" + nid
node.conn.server.flush_logs()
log = node.get_file_content(node.rpc_log_file)
@@ -110,7 +100,7 @@
with ctx.get_pool() as pool:
# can't make next RPC request until finish with previous
for node in ctx.nodes:
- nid = node.info.node_id()
+ nid = node.node_id
hw_info_path = "hw_info/{}".format(nid)
if hw_info_path not in ctx.storage:
futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
@@ -124,7 +114,7 @@
futures.clear()
for node in ctx.nodes:
- nid = node.info.node_id()
+ nid = node.node_id
sw_info_path = "sw_info/{}".format(nid)
if sw_info_path not in ctx.storage:
futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
@@ -253,15 +243,17 @@
logger.error("No nodes found for test, skipping it.")
continue
- test_cls = TOOL_TYPE_MAPPER[name]
+ test_cls = all_suits[name]
remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
- test_cfg = TestSuiteConfig(test_cls.name,
- params=params,
- run_uuid=ctx.config.run_uuid,
- nodes=test_nodes,
- remote_dir=remote_dir)
+ suite = TestSuiteConfig(test_cls.name,
+ params=params,
+ run_uuid=ctx.config.run_uuid,
+ nodes=test_nodes,
+ remote_dir=remote_dir,
+ idx=suite_idx)
- test_cls(storage=ctx.storage, config=test_cfg, idx=suite_idx,
+ test_cls(storage=ResultStorage(ctx.storage),
+ suite=suite,
on_idle=lambda: collect_sensors_data(ctx, False)).run()
@classmethod
@@ -278,6 +270,6 @@
logger.error("Internal error: Some nodes already stored in " +
"nodes_info before LoadStoredNodesStage stage")
raise StopTestError()
- ctx.nodes_info = {node.node_id(): node
+ ctx.nodes_info = {node.node_id: node
for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
logger.info("%s nodes loaded from database", len(ctx.nodes_info))
diff --git a/wally/sensors.py b/wally/sensors.py
index 0a1bbe9..54ae1ad 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -63,7 +63,7 @@
for role in node.info.roles:
node_cfg.update(per_role_config.get(role, {})) # type: ignore
- nid = node.info.node_id()
+ nid = node.node_id
if node_cfg:
# ceph requires additional settings
if 'ceph' in node_cfg:
@@ -81,7 +81,7 @@
def collect_sensors_data(ctx: TestRun, stop: bool = False):
for node in ctx.nodes:
- node_id = node.info.node_id()
+ node_id = node.node_id
if node_id in ctx.sensors_run_on:
if stop:
@@ -91,7 +91,7 @@
# TODO: data is unpacked/repacked here with no reason
for path, value in sensors_rpc_plugin.unpack_rpc_updates(func()):
- ctx.storage.append(value, "metric", node_id, path)
+ ctx.storage.append(value, "sensors/{}_{}".format(node_id, path))
class CollectSensorsStage(Stage):
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index be5e5db..7400ab5 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -2,6 +2,7 @@
import sys
import json
import time
+import zlib
import array
import pprint
import logging
@@ -11,6 +12,9 @@
import collections
+import Pool # type: ignore
+
+
mod_name = "sensors"
__version__ = (0, 1)
@@ -36,7 +40,7 @@
pass
@classmethod
- def unpack_results(cls, device, metrics, data):
+ def unpack_results(cls, device, metrics, data, typecode):
pass
def init(self):
@@ -52,26 +56,30 @@
def __init__(self, params, allowed=None, disallowed=None):
Sensor.__init__(self, params, allowed, disallowed)
self.data = collections.defaultdict(lambda: array.array(self.typecode))
+ self.prev_vals = {}
def add_data(self, device, name, value):
self.data[(device, name)].append(value)
+ def add_relative(self, device, name, value):
+ key = (device, name)
+ pval = self.prev_vals.get(key)
+ if pval is not None:
+ self.data[key].append(value - pval)
+ self.prev_vals[key] = value
+
def get_updates(self):
res = self.data
self.data = collections.defaultdict(lambda: array.array(self.typecode))
- packed = {
- name: arr.typecode + arr.tostring()
- for name, arr in res.items()
- }
- return packed
+ return {key: (arr.typecode, arr.tostring()) for key, arr in res.items()}
@classmethod
- def unpack_results(cls, device, metrics, packed):
- arr = array.array(chr(packed[0]))
+ def unpack_results(cls, device, metrics, packed, typecode):
+ arr = array.array(typecode)
if sys.version_info >= (3, 0, 0):
- arr.frombytes(packed[1:])
+ arr.frombytes(packed)
else:
- arr.fromstring(packed[1:])
+ arr.fromstring(packed)
return arr
def is_dev_accepted(self, name):
@@ -86,6 +94,9 @@
return dev_ok
+time_array_typechar = ArraysSensor.typecode
+
+
def provides(name):
def closure(cls):
SensorsMap[name] = cls
@@ -157,24 +168,32 @@
def __init__(self, *args, **kwargs):
ArraysSensor.__init__(self, *args, **kwargs)
+
if self.disallowed is None:
self.disallowed = ('ram', 'loop')
- self.allowed_devs = set()
for line in open('/proc/diskstats'):
- dev_name = line.split()[2]
- if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
- self.allowed_devs.add(dev_name)
-
- def collect(self):
- for line in open('/proc/diskstats'):
vals = line.split()
dev_name = vals[2]
- if dev_name not in self.allowed_devs:
+ if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
+ self.allowed_names.add(dev_name)
+
+ self.collect(init_rel=True)
+
+ def collect(self, init_rel=False):
+ for line in open('/proc/diskstats'):
+ vals = line.split()
+ dev_name = vals[2]
+
+ if dev_name not in self.allowed_names:
continue
- for pos, name, _ in self.io_values_pos:
- self.add_data(dev_name, name, int(vals[pos]))
+ for pos, name, aggregated in self.io_values_pos:
+ vl = int(vals[pos])
+ if aggregated:
+ self.add_relative(dev_name, name, vl)
+ elif not init_rel:
+ self.add_data(dev_name, name, int(vals[pos]))
@provides("net-io")
@@ -210,21 +229,26 @@
if self.allowed is None:
self.allowed = ('eth',)
- self.allowed_devs = set()
- for line in open('/proc/net/dev').readlines()[2:]:
- dev_name = line.split(":", 1)[0].strip()
- dev_ok = self.is_dev_accepted(dev_name)
- if dev_ok and ('.' not in dev_name or not dev_name.split('.')[-1].isdigit()):
- self.allowed_devs.add(dev_name)
+ for _, _, aggregated in self.net_values_pos:
+ assert aggregated, "Non-aggregated values is not supported in net sensor"
- def collect(self):
for line in open('/proc/net/dev').readlines()[2:]:
dev_name, stats = line.split(":", 1)
dev_name = dev_name.strip()
- if dev_name in self.allowed_devs:
+ if self.is_dev_accepted(dev_name):
+ self.allowed_names.add(dev_name)
+
+ self.collect(init_rel=True)
+
+ def collect(self, init_rel=False):
+ for line in open('/proc/net/dev').readlines()[2:]:
+ dev_name, stats = line.split(":", 1)
+ dev_name = dev_name.strip()
+ if dev_name in self.allowed_names:
vals = stats.split()
for pos, name, _ in self.net_values_pos:
- self.add_data(dev_name, name, int(vals[pos]))
+ vl = int(vals[pos])
+ self.add_relative(dev_name, name, vl )
def pid_stat(pid):
@@ -448,27 +472,28 @@
res = super().get_updates()
for osd_id, data in self.historic.items():
- res[("osd{}".format(osd_id), "historic")] = data
+ res[("osd{}".format(osd_id), "historic")] = (None, data)
self.historic = {}
for osd_id, data in self.in_flight.items():
- res[("osd{}".format(osd_id), "in_flight")] = data
+ res[("osd{}".format(osd_id), "in_flight")] = (None, data)
self.in_flight = {}
return res
@classmethod
- def unpack_results(cls, device, metrics, packed):
+ def unpack_results(cls, device, metrics, packed, typecode):
if metrics in ('historic', 'in_flight'):
+ assert typecode is None
return packed
- arr = array.array(chr(packed[0]))
+ arr = array.array(typecode)
if sys.version_info >= (3, 0, 0):
- arr.frombytes(packed[1:])
+ arr.frombytes(packed)
else:
- arr.fromstring(packed[1:])
+ arr.fromstring(packed)
return arr
@@ -476,7 +501,7 @@
class SensorsData(object):
def __init__(self):
self.cond = threading.Condition()
- self.collected_at = array.array("f")
+ self.collected_at = array.array(time_array_typechar)
self.stop = False
self.sensors = {}
self.data_fd = None # temporary file to store results
@@ -501,12 +526,23 @@
def sensors_bg_thread(sensors_config, sdata):
try:
next_collect_at = time.time()
+ if "pool_sz" in sensors_config:
+ sensors_config = sensors_config.copy()
+ pool_sz = sensors_config.pop("pool_sz")
+ else:
+ pool_sz = 32
+
+ if pool_sz != 0:
+ pool = Pool(sensors_config.get("pool_sz"))
+ else:
+ pool = None
# prepare sensor classes
with sdata.cond:
sdata.sensors = {}
for name, config in sensors_config.items():
params = {'params': config}
+ logger.debug("Start sensor %r with config %r", name, config)
if "allow" in config:
params["allowed_prefixes"] = config["allow"]
@@ -535,11 +571,18 @@
ctm = time.time()
with sdata.cond:
- sdata.collected_at.append(ctm)
- for sensor in sdata.sensors.values():
- sensor.collect()
+ sdata.collected_at.append(int(ctm * 1000000))
+ if pool is not None:
+ caller = lambda x: x()
+ for ok, val in pool.map(caller, [sensor.collect for sensor in sdata.sensors.values()]):
+ if not ok:
+ raise val
+ else:
+ for sensor in sdata.sensors.values():
+ sensor.collect()
etm = time.time()
- sdata.collected_at.append(etm)
+ sdata.collected_at.append(int(etm * 1000000))
+ logger.debug("Add data to collected_at - %s, %s", ctm, etm)
if etm - ctm > 0.1:
# TODO(koder): need to signal that something in not really ok with sensor collecting
@@ -551,7 +594,6 @@
finally:
for sensor in sdata.sensors.values():
sensor.stop()
- sdata.sensors = None
sensors_thread = None
@@ -577,39 +619,46 @@
def unpack_rpc_updates(res_tuple):
- data, collected_at_b = res_tuple
- collected_at = array.array('f')
+ offset_map, compressed_blob, compressed_collected_at_b = res_tuple
+ blob = zlib.decompress(compressed_blob)
+ collected_at_b = zlib.decompress(compressed_collected_at_b)
+ collected_at = array.array(time_array_typechar)
collected_at.frombytes(collected_at_b)
yield 'collected_at', collected_at
# TODO: data is unpacked/repacked here with no reason
- for sensor_path, packed_data in data.items():
+ for sensor_path, (offset, size, typecode) in offset_map.items():
sensor_path = sensor_path.decode("utf8")
sensor_name, device, metric = sensor_path.split('.', 2)
- data = SensorsMap[sensor_name].unpack_results(device, metric, packed_data)
- yield sensor_path, data
+ sensor_data = SensorsMap[sensor_name].unpack_results(device,
+ metric,
+ blob[offset:offset + size],
+ typecode.decode("ascii"))
+ yield sensor_path, sensor_data
def rpc_get_updates():
if sdata is None:
raise ValueError("No sensor thread running")
- res = collected_at = None
+ offset_map = collected_at = None
+ blob = ""
with sdata.cond:
if sdata.exception:
raise Exception(sdata.exception)
- res = {}
+ offset_map = {}
for sensor_name, sensor in sdata.sensors.items():
- for (device, metrics), val in sensor.get_updates().items():
- res["{}.{}.{}".format(sensor_name, device, metrics)] = val
+ for (device, metrics), (typecode, val) in sensor.get_updates().items():
+ offset_map["{}.{}.{}".format(sensor_name, device, metrics)] = (len(blob), len(val), typecode)
+ blob += val
collected_at = sdata.collected_at
sdata.collected_at = array.array(sdata.collected_at.typecode)
- # TODO: pack data before send
- return res, collected_at.tostring()
+ logger.debug(str(collected_at))
+ return offset_map, zlib.compress(blob), zlib.compress(collected_at.tostring())
def rpc_stop():
diff --git a/wally/statistic.py b/wally/statistic.py
index 259ac69..b80fb22 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -1,5 +1,4 @@
import math
-import array
import logging
import itertools
import statistics
@@ -17,17 +16,19 @@
logger = logging.getLogger("wally")
DOUBLE_DELTA = 1e-8
+MIN_VALUES_FOR_CONFIDENCE = 7
-average = statistics.mean
-dev = lambda x: math.sqrt(statistics.variance(x))
+average = numpy.mean
+dev = lambda x: math.sqrt(numpy.var(x, ddof=1))
-def calc_norm_stat_props(ts: TimeSeries, confidence: float = 0.95) -> NormStatProps:
+def calc_norm_stat_props(ts: TimeSeries, bins_count: int, confidence: float = 0.95) -> NormStatProps:
"Calculate statistical properties of array of numbers"
- data = ts.data
- res = NormStatProps(data)
+ # array.array has very basic support
+ data = cast(List[int], ts.data)
+ res = NormStatProps(data) # type: ignore
if len(data) == 0:
raise ValueError("Input array is empty")
@@ -39,57 +40,104 @@
res.max = data[-1]
res.min = data[0]
- res.perc_50 = numpy.percentile(data, 50)
- res.perc_90 = numpy.percentile(data, 90)
- res.perc_95 = numpy.percentile(data, 95)
- res.perc_99 = numpy.percentile(data, 99)
+ res.perc_50, res.perc_90, res.perc_99, res.perc_99 = numpy.percentile(data, q=[50., 90., 95., 99.])
- if len(data) >= 3:
+ if len(data) >= MIN_VALUES_FOR_CONFIDENCE:
res.confidence = stats.sem(data) * \
stats.t.ppf((1 + confidence) / 2, len(data) - 1)
+ res.confidence_level = confidence
else:
res.confidence = None
+ res.confidence_level = None
- res.bin_populations, res.bin_edges = numpy.histogram(data, 'auto')
+ res.bins_populations, bins_edges = numpy.histogram(data, bins=bins_count)
+ res.bins_mids = (bins_edges[:-1] + bins_edges[1:]) / 2
try:
res.normtest = stats.mstats.normaltest(data)
except Exception as exc:
logger.warning("stats.mstats.normaltest failed with error: %s", exc)
+ res.skew = stats.skew(data)
+ res.kurt = stats.kurtosis(data)
+
return res
-def calc_histo_stat_props(ts: TimeSeries) -> HistoStatProps:
- data = numpy.array(ts.data)
- data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size]
+def calc_histo_stat_props(ts: TimeSeries,
+ bins_edges: numpy.array,
+ bins_count: int,
+ min_valuable: float = 0.0001) -> HistoStatProps:
+ data = numpy.array(ts.data, dtype='int')
+ data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size] # type: ignore
res = HistoStatProps(ts.data, ts.second_axis_size)
- aggregated = numpy.sum(data, axis=0)
- full_sum = numpy.sum(aggregated)
- expected = [full_sum * 0.5, full_sum * 0.9, full_sum * 0.95, full_sum * 0.99]
+ # summ across all series
+ aggregated = numpy.sum(data, axis=0, dtype='int')
+ total = numpy.sum(aggregated)
+
+ # minimal value used for histo
+ min_val_on_histo = total * min_valuable
+
+ # percentiles levels
+ expected = [total * 0.5, total * 0.9, total * 0.95, total * 0.99]
percentiles = []
- val_min = None
- val_max = None
+ # all indexes, where values greater than min_val_on_histo
+ valuable_idxs = []
- for idx, val in enumerate(aggregated):
- while expected and full_sum + val >= expected[0]:
- percentiles.append(idx)
+ curr_summ = 0
+ non_zero = aggregated.nonzero()[0]
+
+ # calculate percentiles and valuable_indexes
+ for idx in non_zero:
+ val = aggregated[idx]
+ while expected and curr_summ + val >= expected[0]:
+ percentiles.append(bins_edges[idx])
del expected[0]
- full_sum += val
+ curr_summ += val
- if val != 0:
- if val_min is None:
- val_min = idx
- val_max = idx
+ if val >= min_val_on_histo:
+ valuable_idxs.append(idx)
- res.perc_50, res.perc_90, res.perc_95, res.perc_99 = map(ts.bins_edges.__getitem__, percentiles)
- res.min = ts.bins_edges[val_min]
- res.max = ts.bins_edges[val_max]
- res.bin_populations = aggregated
+ res.perc_50, res.perc_90, res.perc_95, res.perc_99 = percentiles
+
+ # minimax and maximal non-zero elements
+ res.min = bins_edges[aggregated[non_zero[0]]]
+ res.max = bins_edges[non_zero[-1] + (1 if non_zero[-1] != len(bins_edges) else 0)]
+
+ # minimal and maximal valueble evelemts
+ val_idx_min = valuable_idxs[0]
+ val_idx_max = valuable_idxs[-1]
+
+ raw_bins_populations = aggregated[val_idx_min: val_idx_max + 1]
+ raw_bins_edges = bins_edges[val_idx_min: val_idx_max + 2]
+ raw_bins_mids = cast(numpy.array, (raw_bins_edges[1:] + raw_bins_edges[:-1]) / 2)
+
+ step = (raw_bins_mids[-1] + raw_bins_mids[0]) / bins_count
+ next = raw_bins_mids[0]
+
+ # aggregate raw histogram with many bins into result histogram with bins_count bins
+ cidx = 0
+ bins_populations = []
+ bins_mids = []
+
+ while cidx < len(raw_bins_mids):
+ next += step
+ bin_population = 0
+
+ while cidx < len(raw_bins_mids) and raw_bins_mids[cidx] <= next:
+ bin_population += raw_bins_populations[cidx]
+ cidx += 1
+
+ bins_populations.append(bin_population)
+ bins_mids.append(next - step / 2)
+
+ res.bins_populations = numpy.array(bins_populations, dtype='int')
+ res.bins_mids = numpy.array(bins_mids, dtype='float32')
+
return res
diff --git a/wally/storage.py b/wally/storage.py
index e4e010c..3e8bbab 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -3,11 +3,12 @@
"""
import os
+import re
import abc
import array
import shutil
import sqlite3
-import threading
+import logging
from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable, Iterator
import yaml
@@ -17,7 +18,10 @@
from yaml import Loader, Dumper # type: ignore
-from .result_classes import IStorable
+from .common_types import IStorable
+
+
+logger = logging.getLogger("wally")
class ISimpleStorage(metaclass=abc.ABCMeta):
@@ -278,6 +282,10 @@
class Storage:
"""interface for storage"""
+
+ typechar_pad_size = 16
+ typepad = bytes(0 for i in range(typechar_pad_size - 1))
+
def __init__(self, fs_storage: ISimpleStorage, db_storage: ISimpleStorage, serializer: ISerializer) -> None:
self.fs = fs_storage
self.db = db_storage
@@ -287,15 +295,15 @@
fpath = "/".join(path)
return self.__class__(self.fs.sub_storage(fpath), self.db.sub_storage(fpath), self.serializer)
- def put(self, value: IStorable, *path: str) -> None:
- dct_value = value.raw() if isinstance(value, IStorable) else value
- serialized = self.serializer.pack(dct_value)
+ def put(self, value: Any, *path: str) -> None:
+ dct_value = cast(IStorable, value).raw() if isinstance(value, IStorable) else value
+ serialized = self.serializer.pack(dct_value) # type: ignore
fpath = "/".join(path)
self.db.put(serialized, fpath)
self.fs.put(serialized, fpath)
def put_list(self, value: Iterable[IStorable], *path: str) -> None:
- serialized = self.serializer.pack([obj.raw() for obj in value])
+ serialized = self.serializer.pack([obj.raw() for obj in value]) # type: ignore
fpath = "/".join(path)
self.db.put(serialized, fpath)
self.fs.put(serialized, fpath)
@@ -318,8 +326,14 @@
def __contains__(self, path: str) -> bool:
return path in self.fs or path in self.db
- def put_raw(self, val: bytes, *path: str) -> None:
- self.fs.put(val, "/".join(path))
+ def put_raw(self, val: bytes, *path: str) -> str:
+ fpath = "/".join(path)
+ self.fs.put(val, fpath)
+ # TODO: dirty hack
+ return self.resolve_raw(fpath)
+
+ def resolve_raw(self, fpath) -> str:
+ return cast(FSStorage, self.fs).j(fpath)
def get_raw(self, *path: str) -> bytes:
return self.fs.get("/".join(path))
@@ -333,35 +347,52 @@
return self.fs.get_fd(path, mode)
def put_array(self, value: array.array, *path: str) -> None:
+ typechar = value.typecode.encode('ascii')
+ assert len(typechar) == 1
with self.get_fd("/".join(path), "wb") as fd:
+ fd.write(typechar + self.typepad)
value.tofile(fd) # type: ignore
- def get_array(self, typecode: str, *path: str) -> array.array:
- res = array.array(typecode)
+ def get_array(self, *path: str) -> array.array:
path_s = "/".join(path)
with self.get_fd(path_s, "rb") as fd:
fd.seek(0, os.SEEK_END)
- size = fd.tell()
+ size = fd.tell() - self.typechar_pad_size
fd.seek(0, os.SEEK_SET)
+ typecode = chr(fd.read(self.typechar_pad_size)[0])
+ res = array.array(typecode)
assert size % res.itemsize == 0, "Storage object at path {} contains no array of {} or corrupted."\
.format(path_s, typecode)
res.fromfile(fd, size // res.itemsize) # type: ignore
return res
def append(self, value: array.array, *path: str) -> None:
+ typechar = value.typecode.encode('ascii')
+ assert len(typechar) == 1
+ expected_typeheader = typechar + self.typepad
with self.get_fd("/".join(path), "cb") as fd:
fd.seek(0, os.SEEK_END)
+ if fd.tell() != 0:
+ fd.seek(0, os.SEEK_SET)
+ real_typecode = fd.read(self.typechar_pad_size)
+ if real_typecode[0] != expected_typeheader[0]:
+ logger.error("Try to append array with typechar %r to array with typechar %r at path %r",
+ value.typecode, typechar, "/".join(path))
+ raise StopIteration()
+ fd.seek(0, os.SEEK_END)
+ else:
+ fd.write(expected_typeheader)
value.tofile(fd) # type: ignore
def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
path_s = "/".join(path)
raw_val = cast(List[Dict[str, Any]], self.get(path_s))
assert isinstance(raw_val, list)
- return [obj_class.fromraw(val) for val in raw_val]
+ return [cast(ObjClass, obj_class.fromraw(val)) for val in raw_val]
def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
path_s = "/".join(path)
- return obj_class.fromraw(self.get(path_s))
+ return cast(ObjClass, obj_class.fromraw(self.get(path_s)))
def sync(self) -> None:
self.db.sync()
@@ -376,6 +407,33 @@
def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
return self.fs.list("/".join(path))
+ def _iter_paths(self,
+ root: str,
+ path_parts: List[str],
+ groups: Dict[str, str]) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
+
+ curr = path_parts[0]
+ rest = path_parts[1:]
+
+ for is_file, name in self.list(root):
+ if rest and is_file:
+ continue
+
+ rr = re.match(pattern=curr + "$", string=name)
+ if rr:
+ if root:
+ path = root + "/" + name
+ else:
+ path = name
+
+ new_groups = rr.groupdict().copy()
+ new_groups.update(groups)
+
+ if rest:
+ yield from self._iter_paths(path, rest, new_groups)
+ else:
+ yield is_file, path, new_groups
+
def make_storage(url: str, existing: bool = False) -> Storage:
return Storage(FSStorage(url, existing),
diff --git a/wally/storage_structure.yaml b/wally/storage_structure.yaml
index e748dd8..8493e29 100644
--- a/wally/storage_structure.yaml
+++ b/wally/storage_structure.yaml
@@ -5,6 +5,7 @@
# {dev} - device name '[^.]+'
# {suite} - suite name '[a-z]+'
# {profile} - profile name '[a-z_]+'
+# {sensor} - sensor name '[-a-z]+'
config: Config # test input configuration
@@ -14,6 +15,7 @@
fuel_version: List[int] # FUEL master node version
fuel_os_creds: OSCreds # openstack creds, discovered from fuel (or None)
openstack_openrc: OSCreds # openrc used for openstack cluster
+
info:
comment : str # run comment
run_uuid : str # run uuid
@@ -26,11 +28,11 @@
# dev in next line is tool name - fio/vdbench/....
'{node}_{dev}.{metric_name}:raw' : bytes # raw log, where name from {'bw', 'iops', 'lat', ..}
+ '{node}_{dev}.{metric_name}:stat' : StatProps # type of props detected by content
'{node}_{dev}.{metric_name}': List[uint64] # measurements data concatenated with collect times in
- # microseconds from unix epoch
-
-sensors:
- '{node}_{dev}.{metric_name}:raw' : bytes # raw log, where name from {'bw', 'iops', 'lat', ..}
- '{node}_{dev}.{metric_name}': List[uint64] # measurements data cotaneted with collect times in microseconds from unix epoch
+ # microseconds from unix epoch and typechars
+'sensors/{node}_{sensor}.{dev}.{metric_name}': typechar + array[uint64] # sensor values
+'sensors/{node}_{sensor}.{dev}.{metric_name}:stat': StatProps # statistic data
+'sensors/{node}_collected_at': typechar + array[uint64] # collection time
'rpc_logs/{node}' : bytes # rpc server log from node
diff --git a/wally/suits/all_suits.py b/wally/suits/all_suits.py
new file mode 100644
index 0000000..310b1f5
--- /dev/null
+++ b/wally/suits/all_suits.py
@@ -0,0 +1,8 @@
+from .io.fio import FioTest
+# from .suits.itest import TestSuiteConfig
+# from .suits.mysql import MysqlTest
+# from .suits.omgbench import OmgTest
+# from .suits.postgres import PgBenchTest
+
+
+all_suits = {suite.name: suite for suite in [FioTest]}
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index fcf5c16..c3dee19 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -18,7 +18,6 @@
filename={FILENAME}
size={FILESIZE}
-write_iops_log=fio_iops_log
write_bw_log=fio_bw_log
log_avg_msec=1000
write_hist_log=fio_lat_hist_log
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 7b2c3e3..33e8343 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,14 +1,15 @@
import array
import os.path
import logging
-from typing import cast, Any
+from typing import cast, Any, Tuple, List
import wally
-from ...utils import StopTestError, get_os, ssize2b
+from ...utils import StopTestError, ssize2b, b2ssize
from ...node_interfaces import IRPCNode
+from ...node_utils import get_os
from ..itest import ThreadedTest
-from ...result_classes import TimeSeries, JobMetrics
+from ...result_classes import TimeSeries, DataSource, TestJobConfig
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
from . import rpc_plugin
from .fio_hist import expected_lat_bins
@@ -17,7 +18,7 @@
logger = logging.getLogger("wally")
-class IOPerfTest(ThreadedTest):
+class FioTest(ThreadedTest):
soft_runcycle = 5 * 60
retry_time = 30
configs_dir = os.path.dirname(__file__) # type: str
@@ -27,7 +28,7 @@
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
- get = self.config.params.get
+ get = self.suite.params.get
self.remote_task_file = self.join_remote("task.fio")
self.remote_output_file = self.join_remote("fio_result.json")
@@ -35,7 +36,7 @@
self.use_sudo = get("use_sudo", True) # type: bool
self.force_prefill = get('force_prefill', False) # type: bool
- self.load_profile_name = self.config.params['load'] # type: str
+ self.load_profile_name = self.suite.params['load'] # type: str
if os.path.isfile(self.load_profile_name):
self.load_profile_path = self.load_profile_name # type: str
@@ -47,16 +48,16 @@
if self.use_system_fio:
self.fio_path = "fio" # type: str
else:
- self.fio_path = os.path.join(self.config.remote_dir, "fio")
+ self.fio_path = os.path.join(self.suite.remote_dir, "fio")
- self.load_params = self.config.params['params']
+ self.load_params = self.suite.params['params']
self.file_name = self.load_params['FILENAME']
if 'FILESIZE' not in self.load_params:
logger.debug("Getting test file sizes on all nodes")
try:
- sizes = {node.conn.fs.file_stat(self.file_name)['size']
- for node in self.config.nodes}
+ sizes = {node.conn.fs.file_stat(self.file_name)[b'size']
+ for node in self.suite.nodes}
except Exception:
logger.exception("FILESIZE is not set in config file and fail to detect it." +
"Set FILESIZE or fix error and rerun test")
@@ -68,7 +69,7 @@
raise StopTestError()
self.file_size = list(sizes)[0]
- logger.info("Detected test file size is %s", self.file_size)
+ logger.info("Detected test file size is %sB", b2ssize(self.file_size))
self.load_params['FILESIZE'] = self.file_size
else:
self.file_size = ssize2b(self.load_params['FILESIZE'])
@@ -80,31 +81,41 @@
logger.error("Empty fio config provided")
raise StopTestError()
- self.exec_folder = self.config.remote_dir
+ self.exec_folder = self.suite.remote_dir
def config_node(self, node: IRPCNode) -> None:
plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read() # type: bytes
node.upload_plugin("fio", plugin_code)
try:
- node.conn.fs.rmtree(self.config.remote_dir)
+ node.conn.fs.rmtree(self.suite.remote_dir)
except Exception:
pass
try:
- node.conn.fs.makedirs(self.config.remote_dir)
+ node.conn.fs.makedirs(self.suite.remote_dir)
except Exception:
- msg = "Failed to recreate folder {} on remote {}.".format(self.config.remote_dir, node)
+ msg = "Failed to recreate folder {} on remote {}.".format(self.suite.remote_dir, node)
logger.exception(msg)
raise StopTestError()
+ # TODO: check this during config validation
+ if self.file_size % (4 * (1024 ** 2)) != 0:
+ logger.error("Test file size must be proportional to 4MiB")
+ raise StopTestError()
+
self.install_utils(node)
mb = int(self.file_size / 1024 ** 2)
- logger.info("Filling test file %s with %sMiB of random data", self.file_name, mb)
- fill_bw = node.conn.fio.fill_file(self.file_name, mb, force=self.force_prefill, fio_path=self.fio_path)
- if fill_bw is not None:
- logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node))
+ logger.info("Filling test file %s on node %s with %sMiB of random data", self.file_name, node.info, mb)
+ is_prefilled, fill_bw = node.conn.fio.fill_file(self.file_name, mb,
+ force=self.force_prefill,
+ fio_path=self.fio_path)
+
+ if not is_prefilled:
+ logger.info("Test file on node %s is already prefilled", node.info)
+ elif fill_bw is not None:
+ logger.info("Initial fio fill bw is %s MiBps for %s", fill_bw, node.info)
def install_utils(self, node: IRPCNode) -> None:
os_info = get_os(node)
@@ -126,19 +137,19 @@
raise StopTestError()
bz_dest = self.join_remote('fio.bz2') # type: str
- node.copy_file(fio_path, bz_dest)
+ node.copy_file(fio_path, bz_dest, compress=False)
node.run("bzip2 --decompress {} ; chmod a+x {}".format(bz_dest, self.join_remote("fio")))
- def get_expected_runtime(self, job_config: FioJobConfig) -> int:
+ def get_expected_runtime(self, job_config: TestJobConfig) -> int:
return execution_time(cast(FioJobConfig, job_config))
- def prepare_iteration(self, node: IRPCNode, job_config: FioJobConfig) -> None:
- node.put_to_file(self.remote_task_file, str(job_config).encode("utf8"))
+ def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
+ node.put_to_file(self.remote_task_file, str(job).encode("utf8"))
# TODO: get a link to substorage as a parameter
- def run_iteration(self, node: IRPCNode, iter_config: FioJobConfig, job_root: str) -> JobMetrics:
- f_iter_config = cast(FioJobConfig, iter_config)
- exec_time = execution_time(f_iter_config)
+ def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
+ exec_time = execution_time(cast(FioJobConfig, job))
+
fio_cmd_templ = "cd {exec_folder}; " + \
"{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
@@ -152,20 +163,26 @@
if must_be_empty:
logger.error("Unexpected fio output: %r", must_be_empty)
- res = {} # type: JobMetrics
-
# put fio output into storage
fio_out = node.get_file_content(self.remote_output_file)
- self.rstorage.put_extra(job_root, node.info.node_id(), "fio_raw", fio_out)
+
+ path = DataSource(suite_id=self.suite.storage_id,
+ job_id=job.storage_id,
+ node_id=node.node_id,
+ dev='fio',
+ sensor='stdout',
+ tag='json')
+
+ self.storage.put_extra(fio_out, path)
node.conn.fs.unlink(self.remote_output_file)
files = [name for name in node.conn.fs.listdir(self.exec_folder)]
-
- for name, path in get_log_files(f_iter_config):
- log_files = [fname for fname in files if fname.startswith(path)]
+ result = []
+ for name, file_path in get_log_files(cast(FioJobConfig, job)):
+ log_files = [fname for fname in files if fname.startswith(file_path)]
if len(log_files) != 1:
logger.error("Found %s files, match log pattern %s(%s) - %s",
- len(log_files), path, name, ",".join(log_files[10:]))
+ len(log_files), file_path, name, ",".join(log_files[10:]))
raise StopTestError()
fname = os.path.join(self.exec_folder, log_files[0])
@@ -203,14 +220,13 @@
logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
raise StopTestError()
- ts = TimeSeries(name=name,
- raw=raw_result,
- second_axis_size=expected_lat_bins if name == 'lat' else 1,
- data=parsed,
- times=times)
- res[(node.info.node_id(), 'fio', name)] = ts
-
- return res
+ result.append(TimeSeries(name=name,
+ raw=raw_result,
+ second_axis_size=expected_lat_bins if name == 'lat' else 1,
+ data=parsed,
+ times=times,
+ source=path(sensor=name, tag=None)))
+ return result
def format_for_console(self, data: Any) -> str:
raise NotImplementedError()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 6940aaf..03702ae 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -7,13 +7,12 @@
import os.path
import argparse
import itertools
-from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any
+from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any, cast
from collections import OrderedDict
-from ...result_classes import IStorable
from ...result_classes import TestJobConfig
-from ...utils import sec_to_str, ssize2b
+from ...utils import sec_to_str, ssize2b, b2ssize, flatmap
SECTION = 0
@@ -29,23 +28,162 @@
('tp', int),
('name', str),
('val', Any)])
+FioTestSumm = NamedTuple("FioTestSumm",
+ [("oper", str),
+ ("sync_mode", str),
+ ("bsize", int),
+ ("qd", int),
+ ("thcount", int),
+ ("write_perc", Optional[int])])
-TestSumm = NamedTuple("TestSumm",
- [("oper", str),
- ("mode", str),
- ("bsize", int),
- ("iodepth", int),
- ("vm_count", int)])
+
+def is_fio_opt_true(vl: Union[str, int]) -> bool:
+ return str(vl).lower() in ['1', 'true', 't', 'yes', 'y']
class FioJobConfig(TestJobConfig):
- def __init__(self, name: str) -> None:
- TestJobConfig.__init__(self)
- self.vals = OrderedDict() # type: Dict[str, Any]
- self.name = name
- def __eq__(self, other: 'FioJobConfig') -> bool:
- return self.vals == other.vals
+ ds2mode = {(True, True): 'x',
+ (True, False): 's',
+ (False, True): 'd',
+ (False, False): 'a'}
+
+ sync2long = {'x': "sync direct",
+ 's': "sync",
+ 'd': "direct",
+ 'a': "buffered"}
+
+ op_type2short = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw",
+ "randrw": "rx"}
+
+ def __init__(self, name: str, idx: int) -> None:
+ TestJobConfig.__init__(self, idx)
+ self.name = name
+ self._sync_mode = None # type: Optional[str]
+ self._ctuple = None # type: Optional[FioTestSumm]
+ self._ctuple_no_qd = None # type: Optional[FioTestSumm]
+
+ # ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
+
+ @property
+ def write_perc(self) -> Optional[int]:
+ try:
+ return int(self.vals["rwmixwrite"])
+ except (KeyError, TypeError):
+ try:
+ return 100 - int(self.vals["rwmixread"])
+ except (KeyError, TypeError):
+ return None
+
+ @property
+ def qd(self) -> int:
+ return int(self.vals['iodepth'])
+
+ @property
+ def bsize(self) -> int:
+ return ssize2b(self.vals['blocksize']) // 1024
+
+ @property
+ def oper(self) -> str:
+ return self.vals['rw']
+
+ @property
+ def op_type_short(self) -> str:
+ return self.op_type2short[self.vals['rw']]
+
+ @property
+ def thcount(self) -> int:
+ return int(self.vals.get('numjobs', 1))
+
+ @property
+ def sync_mode(self) -> str:
+ if self._sync_mode is None:
+ direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
+ not is_fio_opt_true(self.vals.get('buffered', '0'))
+ sync = is_fio_opt_true(self.vals.get('sync', '0'))
+ self._sync_mode = self.ds2mode[(sync, direct)]
+ return cast(str, self._sync_mode)
+
+ @property
+ def sync_mode_long(self) -> str:
+ return self.sync2long[self.sync_mode]
+
+ # ----------- COMPLEX PROPERTIES -----------------------------------------------------------------------------------
+
+ @property
+ def characterized_tuple(self) -> Tuple:
+ if self._ctuple is None:
+ self._ctuple = FioTestSumm(oper=self.oper,
+ sync_mode=self.sync_mode,
+ bsize=self.bsize,
+ qd=self.qd,
+ thcount=self.thcount,
+ write_perc=self.write_perc)
+
+ return cast(Tuple, self._ctuple)
+
+ @property
+ def characterized_tuple_no_qd(self) -> FioTestSumm:
+ if self._ctuple_no_qd is None:
+ self._ctuple_no_qd = FioTestSumm(oper=self.oper,
+ sync_mode=self.sync_mode,
+ bsize=self.bsize,
+ qd=None,
+ thcount=self.thcount,
+ write_perc=self.write_perc)
+
+ return cast(FioTestSumm, self._ctuple_no_qd)
+
+ @property
+ def long_summary(self) -> str:
+ res = "{0.sync_mode_long} {0.oper} {1} QD={0.qd}".format(self, b2ssize(self.bsize * 1024))
+ if self.thcount != 1:
+ res += " threads={}".format(self.thcount)
+ if self.write_perc is not None:
+ res += " write_perc={}%".format(self.write_perc)
+ return res
+
+ @property
+ def long_summary_no_qd(self) -> str:
+ res = "{0.sync_mode_long} {0.oper} {1}".format(self, b2ssize(self.bsize * 1024))
+ if self.thcount != 1:
+ res += " threads={}".format(self.thcount)
+ if self.write_perc is not None:
+ res += " write_perc={}%".format(self.write_perc)
+ return res
+
+ @property
+ def summary(self) -> str:
+ tpl = cast(FioTestSumm, self.characterized_tuple)
+ res = "{0.oper}{0.sync_mode}{0.bsize}_qd{0.qd}".format(tpl)
+
+ if tpl.thcount != 1:
+ res += "th" + str(tpl.thcount)
+ if tpl.write_perc != 1:
+ res += "wr" + str(tpl.write_perc)
+
+ return res
+
+ @property
+ def summary_no_qd(self) -> str:
+ tpl = cast(FioTestSumm, self.characterized_tuple)
+ res = "{0.oper}{0.sync_mode}{0.bsize}".format(tpl)
+
+ if tpl.thcount != 1:
+ res += "th" + str(tpl.thcount)
+ if tpl.write_perc != 1:
+ res += "wr" + str(tpl.write_perc)
+
+ return res
+ # ------------------------------------------------------------------------------------------------------------------
+
+ def __eq__(self, o: object) -> bool:
+ if not isinstance(o, FioJobConfig):
+ return False
+ return self.vals == cast(FioJobConfig, o).vals
def copy(self) -> 'FioJobConfig':
return copy.deepcopy(self)
@@ -75,17 +213,17 @@
return str(self)
def raw(self) -> Dict[str, Any]:
- return {
- 'vals': [[key, val] for key, val in self.vals.items()],
- 'summary': self.summary,
- 'name': self.name
- }
+ res = self.__dict__.copy()
+ del res['_sync_mode']
+ res['vals'] = [[key, val] for key, val in self.vals.items()]
+ return res
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'FioJobConfig':
- obj = cls(data['name'])
- obj.summary = data['summary']
- obj.vals.update(data['vals'])
+ obj = cls.__new__(cls)
+ data['vals'] = OrderedDict(data['vals'])
+ data['_sync_mode'] = None
+ obj.__dict__.update(data)
return obj
@@ -203,6 +341,8 @@
lexed_lines = new_lines
+ suite_section_idx = 0
+
for fname, lineno, oline, tp, name, val in lexed_lines:
if tp == SECTION:
if curr_section is not None:
@@ -215,7 +355,8 @@
in_globals = True
else:
in_globals = False
- curr_section = FioJobConfig(name)
+ curr_section = FioJobConfig(name, idx=suite_section_idx)
+ suite_section_idx += 1
curr_section.vals = glob_vals.copy()
sections_count += 1
else:
@@ -332,68 +473,13 @@
params = sec.vals.copy()
params['UNIQ'] = 'UN{0}'.format(counter[0])
params['COUNTER'] = str(counter[0])
- params['TEST_SUMM'] = get_test_summary(sec)
+ params['TEST_SUMM'] = sec.summary
sec.name = sec.name.format(**params)
counter[0] += 1
return sec
-def get_test_sync_mode(sec: FioJobConfig) -> str:
- if isinstance(sec, dict):
- vals = sec
- else:
- vals = sec.vals
-
- is_sync = str(vals.get("sync", "0")) == "1"
- is_direct = str(vals.get("direct", "0")) == "1"
-
- if is_sync and is_direct:
- return 'x'
- elif is_sync:
- return 's'
- elif is_direct:
- return 'd'
- else:
- return 'a'
-
-
-def get_test_summary_tuple(sec: FioJobConfig, vm_count: int = None) -> TestSumm:
- if isinstance(sec, dict):
- vals = sec
- else:
- vals = sec.vals
-
- rw = {"randread": "rr",
- "randwrite": "rw",
- "read": "sr",
- "write": "sw",
- "randrw": "rm",
- "rw": "sm",
- "readwrite": "sm"}[vals["rw"]]
-
- sync_mode = get_test_sync_mode(sec)
-
- return TestSumm(rw,
- sync_mode,
- vals['blocksize'],
- vals.get('iodepth', '1'),
- vm_count)
-
-
-def get_test_summary(sec: FioJobConfig, vm_count: int = None, noiodepth: bool = False) -> str:
- tpl = get_test_summary_tuple(sec, vm_count)
-
- res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
- if not noiodepth:
- res += "qd{}".format(tpl.iodepth)
-
- if tpl.vm_count is not None:
- res += "vm{}".format(tpl.vm_count)
-
- return res
-
-
def execution_time(sec: FioJobConfig) -> int:
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
@@ -402,23 +488,18 @@
return fio_config_parse(fio_config_lexer(source, fname))
-FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
-FM_FUNC_RES = TypeVar("FM_FUNC_RES")
-
-
-def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
- inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
- for val in inp_iter:
- for res in func(val):
- yield res
-
-
-def get_log_files(sec: FioJobConfig) -> List[Tuple[str, str]]:
+def get_log_files(sec: FioJobConfig, iops: bool = False) -> List[Tuple[str, str]]:
res = [] # type: List[Tuple[str, str]]
- for key, name in (('write_iops_log', 'iops'), ('write_bw_log', 'bw'), ('write_hist_log', 'lat')):
+
+ keys = [('write_bw_log', 'bw'), ('write_hist_log', 'lat')]
+ if iops:
+ keys.append(('write_iops_log', 'iops'))
+
+ for key, name in keys:
log = sec.vals.get(key)
if log is not None:
res.append((name, log))
+
return res
@@ -427,7 +508,6 @@
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
for sec in map(final_process, it):
- sec.summary = get_test_summary(sec)
yield sec
diff --git a/wally/suits/io/one_step.cfg b/wally/suits/io/one_step.cfg
new file mode 100644
index 0000000..3e08c8b
--- /dev/null
+++ b/wally/suits/io/one_step.cfg
@@ -0,0 +1,9 @@
+[global]
+include defaults_qd.cfg
+ramp_time=0
+runtime={RUNTIME}
+
+[test_{TEST_SUMM}]
+blocksize=60k
+rw=randread
+iodepth=1
\ No newline at end of file
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 98e55f0..5f5cfb5 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -13,6 +13,7 @@
logger = logging.getLogger("agent.fio")
+# TODO: fix this in case if file is block device
def check_file_prefilled(path, used_size_mb):
used_size = used_size_mb * 1024 ** 2
blocks_to_check = 16
@@ -20,9 +21,9 @@
try:
fstats = os.stat(path)
if stat.S_ISREG(fstats.st_mode) and fstats.st_size < used_size:
- return True
+ return False
except EnvironmentError:
- return True
+ return False
offsets = [random.randrange(used_size - 1024) for _ in range(blocks_to_check)]
offsets.append(used_size - 1024)
@@ -32,15 +33,15 @@
for offset in offsets:
fd.seek(offset)
if b"\x00" * 1024 == fd.read(1024):
- return True
+ return False
- return False
+ return True
def rpc_fill_file(fname, size, force=False, fio_path='fio'):
if not force:
- if not check_file_prefilled(fname, size):
- return
+ if check_file_prefilled(fname, size):
+ return False, None
assert size % 4 == 0, "File size must be proportional to 4M"
@@ -50,7 +51,9 @@
subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
run_time = time.time() - run_time
- return None if run_time < 1.0 else int(size / run_time)
+ prefill_bw = None if run_time < 1.0 else int(size / run_time)
+
+ return True, prefill_bw
def rpc_install(name, binary):
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index ca3c613..ae2d960 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,7 +1,7 @@
[global]
include defaults_qd.cfg
ramp_time=0
-runtime=10
+runtime={RUNTIME}
[test_{TEST_SUMM}]
blocksize=60k
@@ -12,3 +12,23 @@
iodepth=16
blocksize=60k
rw=randread
+
+[test_{TEST_SUMM}]
+blocksize=60k
+rw=randwrite
+iodepth=1
+
+[test_{TEST_SUMM}]
+iodepth=16
+blocksize=60k
+rw=randwrite
+
+[test_{TEST_SUMM}]
+iodepth=1
+blocksize=1m
+rw=write
+
+[test_{TEST_SUMM}]
+iodepth=1
+blocksize=1m
+rw=read
diff --git a/wally/suits/io/rrd_qd_scan.cfg b/wally/suits/io/rrd_qd_scan.cfg
new file mode 100644
index 0000000..e0937c9
--- /dev/null
+++ b/wally/suits/io/rrd_qd_scan.cfg
@@ -0,0 +1,9 @@
+[global]
+include defaults_qd.cfg
+ramp_time=0
+runtime={RUNTIME}
+
+[test_{TEST_SUMM}]
+blocksize=4k
+rw=randread
+iodepth={QDS}
diff --git a/wally/suits/io/rrd_raw.cfg b/wally/suits/io/rrd_raw.cfg
new file mode 100644
index 0000000..2b0fc74
--- /dev/null
+++ b/wally/suits/io/rrd_raw.cfg
@@ -0,0 +1,21 @@
+[test]
+blocksize=4k
+rw=randread
+iodepth=1
+ramp_time=0
+runtime=120
+buffered=0
+direct=1
+sync=0
+ioengine=libaio
+group_reporting=1
+unified_rw_reporting=1
+norandommap=1
+numjobs=1
+thread=1
+time_based=1
+wait_for_previous=1
+per_job_logs=0
+randrepeat=0
+filename=/dev/sdb
+size=100G
\ No newline at end of file
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index bc6b115..ac9e1c1 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,19 +1,14 @@
-import re
import abc
import time
-import array
-import struct
import logging
import os.path
-import datetime
-from typing import Any, List, Optional, Callable, cast, Iterator, Tuple, Iterable
+from typing import Any, List, Optional, Callable, Tuple, Iterable, cast
from concurrent.futures import ThreadPoolExecutor, wait, Future
-from ..utils import StopTestError, sec_to_str, get_time_interval_printable_info
+from ..utils import StopTestError, get_time_interval_printable_info
from ..node_interfaces import IRPCNode
-from ..storage import Storage
-from ..result_classes import TestSuiteConfig, TestJobConfig, JobMetrics, TimeSeries
+from ..result_classes import TestSuiteConfig, TestJobConfig, JobMetrics, TimeSeries, IResultStorage
logger = logging.getLogger("wally")
@@ -22,127 +17,6 @@
__doc__ = "Contains base classes for performance tests"
-class ResultStorage:
- ts_header_format = "!IIIcc"
-
- def __init__(self, storage: Storage, job_config_cls: type) -> None:
- self.storage = storage
- self.job_config_cls = job_config_cls
-
- def get_suite_root(self, suite_type: str, idx: int) -> str:
- return "results/{}_{}".format(suite_type, idx)
-
- def get_job_root(self, suite_root: str, summary: str, run_id: int) -> str:
- return "{}/{}_{}".format(suite_root, summary, run_id)
-
- # store
- def put_suite_config(self, config: TestSuiteConfig, root: str) -> None:
- self.storage.put(config, root, "config.yml")
-
- def put_job_config(self, config: TestJobConfig, root: str) -> None:
- self.storage.put(config, root, "config.yml")
-
- def get_suite_config(self, suite_root: str) -> TestSuiteConfig:
- return self.storage.load(TestSuiteConfig, suite_root, "config.yml")
-
- def get_job_node_prefix(self, job_root_path: str, node_id: str) -> str:
- return "{}/{}".format(job_root_path, node_id)
-
- def get_ts_path(self, job_root_path: str, node_id: str, dev: str, sensor_name: str) -> str:
- return "{}_{}.{}".format(self.get_job_node_prefix(job_root_path, node_id), dev, sensor_name)
-
- def put_ts(self, ts: TimeSeries, job_root_path: str, node_id: str, dev: str, sensor_name: str) -> None:
- # TODO: check that 'metrics', 'dev' and 'node_id' match required patterns
- root_path = self.get_ts_path(job_root_path, node_id, dev, sensor_name)
-
- if len(ts.data) / ts.second_axis_size != len(ts.times):
- logger.error("Unbalanced time series data. Array size has % elements, while time size has %",
- len(ts.data) / ts.second_axis_size, len(ts.times))
- raise StopTestError()
-
- with self.storage.get_fd(root_path, "cb") as fd:
- header = struct.pack(self.ts_header_format,
- ts.second_axis_size,
- len(ts.data),
- len(ts.times),
- cast(array.array, ts.data).typecode.encode("ascii"),
- cast(array.array, ts.times).typecode.encode("ascii"))
- fd.write(header)
- cast(array.array, ts.data).tofile(fd)
- cast(array.array, ts.times).tofile(fd)
-
- if ts.raw is not None:
- self.storage.put_raw(ts.raw, root_path + ":raw")
-
- def put_extra(self, job_root: str, node_id: str, key: str, data: bytes) -> None:
- self.storage.put_raw(data, job_root, node_id + "_" + key)
-
- def list_suites(self) -> Iterator[Tuple[TestSuiteConfig, str]]:
- """iterates over (suite_name, suite_id, suite_root_path)
- primary this function output should be used as input into list_jobs_in_suite method
- """
- ts_re = re.compile(r"[a-zA-Z]+_\d+$")
- for is_file, name in self.storage.list("results"):
- if not is_file:
- rr = ts_re.match(name)
- if rr:
- path = "results/" + name
- yield self.get_suite_config(path), path
-
- def list_jobs_in_suite(self, suite_root_path: str) -> Iterator[Tuple[TestJobConfig, str, int]]:
- """iterates over (job_summary, job_root_path)
- primary this function output should be used as input into list_ts_in_job method
- """
- ts_re = re.compile(r"(?P<job_summary>[a-zA-Z0-9]+)_(?P<id>\d+)$")
- for is_file, name in self.storage.list(suite_root_path):
- if is_file:
- continue
- rr = ts_re.match(name)
- if rr:
- config_path = "{}/{}/config.yml".format(suite_root_path, name)
- if config_path in self.storage:
- cfg = self.storage.load(self.job_config_cls, config_path)
- yield cfg, "{}/{}".format(suite_root_path, name), int(rr.group("id"))
-
- def list_ts_in_job(self, job_root_path: str) -> Iterator[Tuple[str, str, str]]:
- """iterates over (node_id, device_name, sensor_name)
- primary this function output should be used as input into load_ts method
- """
- # TODO: check that all TS files available
- ts_re = re.compile(r"(?P<node_id>\d+\.\d+\.\d+\.\d+:\d+)_(?P<dev>[^.]+)\.(?P<sensor>[a-z_]+)$")
- already_found = set()
- for is_file, name in self.storage.list(job_root_path):
- if not is_file:
- continue
- rr = ts_re.match(name)
- if rr:
- key = (rr.group("node_id"), rr.group("dev"), rr.group("sensor"))
- if key not in already_found:
- already_found.add(key)
- yield key
-
- def load_ts(self, root_path: str, node_id: str, dev: str, sensor_name: str) -> TimeSeries:
- path = self.get_ts_path(root_path, node_id, dev, sensor_name)
-
- with self.storage.get_fd(path, "rb") as fd:
- header = fd.read(struct.calcsize(self.ts_header_format))
- second_axis_size, data_sz, time_sz, data_typecode, time_typecode = \
- struct.unpack(self.ts_header_format, header)
-
- data = array.array(data_typecode.decode("ascii"))
- times = array.array(time_typecode.decode("ascii"))
-
- data.fromfile(fd, data_sz)
- times.fromfile(fd, time_sz)
-
- # calculate number of elements
- return TimeSeries("{}.{}".format(dev, sensor_name),
- raw=None,
- data=data,
- times=times,
- second_axis_size=second_axis_size)
-
-
class PerfTest(metaclass=abc.ABCMeta):
"""Base class for all tests"""
name = None # type: str
@@ -150,20 +24,18 @@
retry_time = 30
job_config_cls = None # type: type
- def __init__(self, storage: Storage, config: TestSuiteConfig, idx: int, on_idle: Callable[[], None] = None) -> None:
- self.config = config
+ def __init__(self, storage: IResultStorage, suite: TestSuiteConfig, on_idle: Callable[[], None] = None) -> None:
+ self.suite = suite
self.stop_requested = False
- self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.config.nodes)
+ self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
self.on_idle = on_idle
self.storage = storage
- self.rstorage = ResultStorage(self.storage, self.job_config_cls)
- self.idx = idx
def request_stop(self) -> None:
self.stop_requested = True
def join_remote(self, path: str) -> str:
- return os.path.join(self.config.remote_dir, path)
+ return os.path.join(self.suite.remote_dir, path)
@abc.abstractmethod
def run(self) -> None:
@@ -185,62 +57,51 @@
def __init__(self, *args, **kwargs) -> None:
PerfTest.__init__(self, *args, **kwargs)
- self.job_configs = [None] # type: List[Optional[TestJobConfig]]
- self.suite_root_path = self.rstorage.get_suite_root(self.config.test_type, self.idx)
+ self.job_configs = None # type: List[TestJobConfig]
@abc.abstractmethod
def get_expected_runtime(self, iter_cfg: TestJobConfig) -> Optional[int]:
pass
- def get_not_done_stages(self) -> Iterable[Tuple[int, TestJobConfig]]:
- all_jobs = dict(enumerate(self.job_configs))
- for db_config, path, jid in self.rstorage.list_jobs_in_suite(self.suite_root_path):
- if jid in all_jobs:
- job_config = all_jobs[jid]
- if job_config != db_config:
- logger.error("Test info at path '%s/config' is not equal to expected config for iteration %s.%s." +
+ def get_not_done_jobs(self) -> Iterable[TestJobConfig]:
+ jobs_map = {job.storage_id: job for job in self.job_configs}
+ already_in_storage = set()
+ for db_config in cast(List[TestJobConfig], self.storage.iter_job(self.suite)):
+ if db_config.storage_id in jobs_map:
+ job = jobs_map[db_config.storage_id]
+ if job != db_config:
+ logger.error("Test info at '%s.%s' is not equal to expected config for iteration %s.%s." +
" Maybe configuration was changed before test was restarted. " +
"DB cfg is:\n %s\nExpected cfg is:\n %s\nFix DB or rerun test from beginning",
- path, self.name, job_config.summary,
+ self.suite.storage_id, job.storage_id, self.name, job.summary,
str(db_config).replace("\n", "\n "),
- str(job_config).replace("\n", "\n "))
+ str(job).replace("\n", "\n "))
raise StopTestError()
- logger.info("Test iteration %s.%s found in storage and will be skipped",
- self.name, job_config.summary)
- del all_jobs[jid]
- return all_jobs.items()
+ logger.info("Test iteration %s.%s found in storage and will be skipped", self.name, job.summary)
+ already_in_storage.add(db_config.storage_id)
+
+ return [job for job in self.job_configs if job.storage_id not in already_in_storage]
def run(self) -> None:
- try:
- cfg = self.rstorage.get_suite_config(self.suite_root_path)
- except KeyError:
- cfg = None
+ self.storage.put_or_check_suite(self.suite)
- if cfg is not None and cfg != self.config:
- logger.error("Current suite %s config is not equal to found in storage at %s",
- self.config.test_type, self.suite_root_path)
- raise StopTestError()
-
- not_in_storage = list(self.get_not_done_stages())
-
+ not_in_storage = list(self.get_not_done_jobs())
if not not_in_storage:
logger.info("All test iteration in storage already. Skip test")
return
- self.rstorage.put_suite_config(self.config, self.suite_root_path)
-
logger.debug("Run test %s with profile %r on nodes %s.", self.name,
self.load_profile_name,
",".join(self.sorted_nodes_ids))
logger.debug("Prepare nodes")
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+ with ThreadPoolExecutor(len(self.suite.nodes)) as pool:
# config nodes
- list(pool.map(self.config_node, self.config.nodes))
+ list(pool.map(self.config_node, self.suite.nodes))
- run_times = [self.get_expected_runtime(job_config) for _, job_config in not_in_storage]
+ run_times = list(map(self.get_expected_runtime, not_in_storage))
if None not in run_times:
# +5% - is a rough estimation for additional operations
@@ -249,51 +110,52 @@
exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
logger.info("Entire test should takes around %s and finished at %s", exec_time_s, end_dt_s)
- for run_id, job_config in not_in_storage:
- job_path = self.rstorage.get_job_root(self.suite_root_path, job_config.summary, run_id)
-
- jfutures = [] # type: List[Future]
- for idx in range(self.max_retry):
- logger.debug("Prepare job %s", job_config.summary)
+ for job in not_in_storage:
+ results = [] # type: List[TimeSeries]
+ for retry_idx in range(self.max_retry):
+ logger.debug("Prepare job %s", job.summary)
# prepare nodes for new iterations
- wait([pool.submit(self.prepare_iteration, node, job_config) for node in self.config.nodes])
+ wait([pool.submit(self.prepare_iteration, node, job) for node in self.suite.nodes])
- expected_job_time = self.get_expected_runtime(job_config)
+ expected_job_time = self.get_expected_runtime(job)
exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
logger.info("Job should takes around %s and finished at %s", exec_time_s, end_dt_s)
- try:
- jfutures = []
- for node in self.config.nodes:
- future = pool.submit(self.run_iteration, node, job_config, job_path)
- jfutures.append(future)
- # test completed successfully, stop retrying
- break
- except EnvironmentError:
- if self.max_retry - 1 == idx:
- logger.exception("Fio failed")
- raise StopTestError()
- logger.exception("During fio run")
- logger.info("Sleeping %ss and retrying job", self.retry_time)
- time.sleep(self.retry_time)
+ jfutures = [pool.submit(self.run_iteration, node, job) for node in self.suite.nodes]
+ failed = False
+ for future in jfutures:
+ try:
+ results.extend(future.result())
+ except EnvironmentError:
+ failed = True
+ if not failed:
+ break
+
+ if self.max_retry - 1 == retry_idx:
+ logger.exception("Fio failed")
+ raise StopTestError()
+
+ logger.exception("During fio run")
+ logger.info("Sleeping %ss and retrying job", self.retry_time)
+ time.sleep(self.retry_time)
+ results = []
+
+ # per node jobs start and stop times
start_times = [] # type: List[int]
stop_times = [] # type: List[int]
- for future in jfutures:
- for (node_id, dev, sensor_name), ts in future.result().items():
- self.rstorage.put_ts(ts, job_path, node_id=node_id, dev=dev, sensor_name=sensor_name)
-
- if len(ts.times) >= 2:
- start_times.append(ts.times[0])
- stop_times.append(ts.times[-1])
+ for ts in results:
+ self.storage.put_ts(ts)
+ if len(ts.times) >= 2: # type: ignore
+ start_times.append(ts.times[0])
+ stop_times.append(ts.times[-1])
if len(start_times) > 0:
min_start_time = min(start_times)
max_start_time = max(start_times)
min_stop_time = min(stop_times)
- max_stop_time = max(stop_times)
max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
@@ -301,16 +163,19 @@
if min_start_time + self.max_time_diff < max_allowed_time_diff:
logger.warning("Too large difference in %s:%s start time - %s. " +
"Max recommended difference is %s",
- self.name, job_config.summary,
+ self.name, job.summary,
max_start_time - min_start_time, self.max_time_diff)
if min_stop_time + self.max_time_diff < max_allowed_time_diff:
logger.warning("Too large difference in %s:%s stop time - %s. " +
"Max recommended difference is %s",
- self.name, job_config.summary,
+ self.name, job.summary,
max_start_time - min_start_time, self.max_time_diff)
- self.rstorage.put_job_config(job_config, job_path)
+ job.reliable_info_starts_at = max_start_time
+ job.reliable_info_stops_at = min_stop_time
+
+ self.storage.put_job(self.suite, job)
self.storage.sync()
if self.on_idle is not None:
@@ -321,24 +186,25 @@
pass
@abc.abstractmethod
- def prepare_iteration(self, node: IRPCNode, iter_config: TestJobConfig) -> None:
+ def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
pass
@abc.abstractmethod
- def run_iteration(self, node: IRPCNode, iter_config: TestJobConfig, stor_prefix: str) -> JobMetrics:
+ def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
pass
class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
def __init__(self, *dt, **mp) -> None:
ThreadedTest.__init__(self, *dt, **mp)
- self.prerun_script = self.config.params['prerun_script']
- self.run_script = self.config.params['run_script']
- self.prerun_tout = self.config.params.get('prerun_tout', 3600)
- self.run_tout = self.config.params.get('run_tout', 3600)
- self.iterations_configs = [None]
+ self.prerun_script = self.suite.params['prerun_script']
+ self.run_script = self.suite.params['run_script']
+ self.prerun_tout = self.suite.params.get('prerun_tout', 3600)
+ self.run_tout = self.suite.params.get('run_tout', 3600)
+ # TODO: fix job_configs field
+ raise NotImplementedError("Fix job configs")
- def get_expected_runtime(self, iter_cfg: TestJobConfig) -> Optional[int]:
+ def get_expected_runtime(self, job: TestJobConfig) -> Optional[int]:
return None
def config_node(self, node: IRPCNode) -> None:
@@ -346,19 +212,19 @@
node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
cmd = self.join_remote(self.prerun_script)
- cmd += ' ' + self.config.params.get('prerun_opts', '')
+ cmd += ' ' + self.suite.params.get('prerun_opts', '')
node.run(cmd, timeout=self.prerun_tout)
- def prepare_iteration(self, node: IRPCNode, iter_config: TestJobConfig) -> None:
+ def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
pass
- def run_iteration(self, node: IRPCNode, iter_config: TestJobConfig, stor_prefix: str) -> JobMetrics:
+ def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
# TODO: have to store logs
cmd = self.join_remote(self.run_script)
- cmd += ' ' + self.config.params.get('run_opts', '')
+ cmd += ' ' + self.suite.params.get('run_opts', '')
return self.parse_results(node.run(cmd, timeout=self.run_tout))
@abc.abstractmethod
- def parse_results(self, data: str) -> JobMetrics:
+ def parse_results(self, data: str) -> List[TimeSeries]:
pass
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index a0c014f..fea846d 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -43,7 +43,7 @@
def merge_node(self, creds: ConnCreds, roles: Set[str]) -> NodeInfo:
info = NodeInfo(creds, roles)
- nid = info.node_id()
+ nid = info.node_id
if nid in self.nodes_info:
self.nodes_info[nid].roles.update(info.roles)
diff --git a/wally/utils.py b/wally/utils.py
index 078a019..78235a8 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -11,11 +11,8 @@
import threading
import contextlib
import subprocess
-import collections
-from .node_interfaces import IRPCNode
-from typing import (Any, Tuple, Union, List, Iterator, Dict, Iterable, Optional,
- IO, Sequence, NamedTuple, cast, TypeVar)
+from typing import Any, Tuple, Union, List, Iterator, Iterable, Optional, IO, cast, TypeVar, Callable
try:
import psutil
@@ -347,59 +344,6 @@
return user, passwd, tenant, auth_url, insecure
-OSRelease = NamedTuple("OSRelease",
- [("distro", str),
- ("release", str),
- ("arch", str)])
-
-
-def get_os(node: IRPCNode) -> OSRelease:
- """return os type, release and architecture for node.
- """
- arch = node.run("arch", nolog=True).strip()
-
- try:
- node.run("ls -l /etc/redhat-release", nolog=True)
- return OSRelease('redhat', None, arch)
- except:
- pass
-
- try:
- node.run("ls -l /etc/debian_version", nolog=True)
-
- release = None
- for line in node.run("lsb_release -a", nolog=True).split("\n"):
- if ':' not in line:
- continue
- opt, val = line.split(":", 1)
-
- if opt == 'Codename':
- release = val.strip()
-
- return OSRelease('ubuntu', release, arch)
- except:
- pass
-
- raise RuntimeError("Unknown os")
-
-
-@contextlib.contextmanager
-def empty_ctx(val: Any = None) -> Iterator[Any]:
- yield val
-
-
-def log_nodes_statistic(nodes: Sequence[IRPCNode]) -> None:
- logger.info("Found {0} nodes total".format(len(nodes)))
-
- per_role = collections.defaultdict(int) # type: Dict[str, int]
- for node in nodes:
- for role in node.info.roles:
- per_role[role] += 1
-
- for role, count in sorted(per_role.items()):
- logger.debug("Found {0} nodes with role {1}".format(count, role))
-
-
def which(program: str) -> Optional[str]:
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
@@ -413,6 +357,11 @@
return None
+@contextlib.contextmanager
+def empty_ctx(val: Any = None) -> Iterator[Any]:
+ yield val
+
+
def get_uniq_path_uuid(path: str, max_iter: int = 10) -> Tuple[str, str]:
for i in range(max_iter):
run_uuid = pet_generate(2, "_")
@@ -442,3 +391,16 @@
now_dt = datetime.datetime.now()
end_dt = now_dt + datetime.timedelta(0, seconds)
return exec_time_s, "{:%H:%M:%S}".format(end_dt)
+
+
+FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
+FM_FUNC_RES = TypeVar("FM_FUNC_RES")
+
+
+def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
+ inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
+ for val in inp_iter:
+ for res in func(val):
+ yield res
+
+