a lot of fixes, improve visualization speed, add c++ code
diff --git a/clib/Makefile b/clib/Makefile
new file mode 100644
index 0000000..9786d22
--- /dev/null
+++ b/clib/Makefile
@@ -0,0 +1,6 @@
+libwally.so: interpolate.o Makefile
+ g++ interpolate.o -shared -o libwally.so
+
+interpolate.o: interpolate.cpp
+ g++ -c -Wall -std=c++11 -fPIC -O3 interpolate.cpp -o interpolate.o
+
diff --git a/clib/interpolate.cpp b/clib/interpolate.cpp
new file mode 100644
index 0000000..478eb88
--- /dev/null
+++ b/clib/interpolate.cpp
@@ -0,0 +1,120 @@
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+
+extern "C"
+void interpolate_ts_on_seconds_border(
+ unsigned int input_size,
+ unsigned int output_size,
+ uint64_t * times,
+ uint64_t * values,
+ unsigned int time_scale_coef,
+ uint64_t * output)
+{
+ auto first_output_cell = (*times / time_scale_coef) * time_scale_coef;
+ auto input_cell_end = *times - time_scale_coef; // hack to unify loop
+ uint64_t input_cell_begin;
+
+// std::printf("first_output_cell = %ld\n", (long)first_output_cell);
+
+ for(auto curr_times=times, curr_data_ptr=values;
+ curr_times < times + input_size ; ++curr_times, ++curr_data_ptr)
+ {
+ // take next cell from input array and calculate data rate in it
+ auto data_left = *curr_data_ptr;
+ input_cell_begin = input_cell_end;
+ input_cell_end = *curr_times;
+
+ auto rate = data_left / double(input_cell_end - input_cell_begin);
+
+// std::printf("input_cell_begin=%ld input_cell_end=%ld\n", (long)input_cell_begin, (long)input_cell_end);
+// std::printf("rate = %lf data_left=%ld\n", rate, (long)data_left);
+
+ uint32_t first_output_cell_idx;
+ if (input_cell_begin <= first_output_cell)
+ first_output_cell_idx = 0;
+ else
+ // +1 because first_output_cell is actually the end of first cell
+ first_output_cell_idx = (input_cell_begin - first_output_cell) / time_scale_coef + 1;
+
+ uint32_t last_output_cell_idx = (input_cell_end - first_output_cell) / time_scale_coef;
+
+ if ((input_cell_end - first_output_cell) % time_scale_coef != 0)
+ ++last_output_cell_idx;
+
+ last_output_cell_idx = std::min(last_output_cell_idx, output_size - 1);
+
+// std::printf("fidx=%d lidx=%d\n", (int)first_output_cell_idx, (int)last_output_cell_idx);
+
+ for(auto output_idx = first_output_cell_idx; output_idx <= last_output_cell_idx ; ++output_idx)
+ {
+ // current output cell time slot
+ auto out_cell_begin = output_idx * time_scale_coef + first_output_cell - time_scale_coef;
+ auto out_cell_end = out_cell_begin + time_scale_coef;
+ auto slot = std::min(out_cell_end, input_cell_end) - std::max(out_cell_begin, input_cell_begin);
+
+ auto slice = uint64_t(rate * slot);
+
+// std::printf("slot=%ld slice=%lf output_idx=%ld\n", (long)slot, (double)slice, (long)output_idx);
+
+ data_left -= slice;
+ output[output_idx] += slice;
+ }
+ output[last_output_cell_idx] += data_left;
+ }
+}
+
+
+extern "C"
+void interpolate_ts_on_seconds_border_v2(
+ unsigned int input_size,
+ unsigned int output_size,
+ uint64_t * times,
+ uint64_t * values,
+ unsigned int time_step,
+ uint64_t * output)
+{
+ auto output_end = (*times / time_step) * time_step;
+ auto output_begin = output_end - time_step;
+ auto output_cell = output;
+
+ auto input_cell = values;
+ auto input_time = times;
+ auto input_val = *input_cell;
+ auto input_begin = *input_time - time_step;
+ auto input_end = *input_time;
+ auto rate = ((double)*input_cell) / (input_end - input_begin);
+
+ // output array mush fully cover input array
+ while(output_cell < output + output_size) {
+ // check if cells intersect
+ auto intersection = ((int64_t)std::min(output_end, input_end)) - std::max(output_begin, input_begin);
+
+ // add intersection slice to output array
+ if(intersection > 0) {
+ auto slice = (uint64_t)(intersection * rate);
+ *output_cell += slice;
+ input_val -= slice;
+ }
+
+ // switch to next input or output cell
+ if (output_end >= input_end){
+ *output_cell += input_val;
+
+ ++input_cell;
+ ++input_time;
+
+ if(input_time == times + input_size)
+ return;
+
+ input_val = *input_cell;
+ input_begin = input_end;
+ input_end = *input_time;
+ rate = ((double)*input_cell) / (input_end - input_begin);
+ } else {
+ ++output_cell;
+ output_begin = output_end;
+ output_end += time_step;
+ }
+ }
+}
\ No newline at end of file
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index c3614ba..da50150 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -1,22 +1,30 @@
# ------------------------------------ CONFIGS -------------------------------------------------------------------
-fuel:
- url: http://172.16.44.13:8000/
- creds: admin:admin@admin
- ssh_creds: root:r00tme
- openstack_env: test
-
-openstack:
- skip_preparation: false
- openrc: /home/koder/workspace/scale_openrc
- openrc:
- user: USER
- passwd: PASSWD
- tenant: KEY_FILE
- auth_url: URL
- SOME_OTHER_OPTS: OPTIONAL
- vms:
- - "USERNAME[:PASSWD]@VM_NAME_PREFIX[::KEY_FILE]"
-
+#fuel:
+# url: http://172.16.44.13:8000/
+# creds: admin:admin@admin
+# ssh_creds: root:r00tme
+# openstack_env: test
+#
+#openstack:
+# skip_preparation: false
+# openrc: /home/koder/workspace/scale_openrc
+# openrc:
+# user: USER
+# passwd: PASSWD
+# tenant: KEY_FILE
+# auth_url: URL
+# SOME_OTHER_OPTS: OPTIONAL
+# vms:
+# - "USERNAME[:PASSWD]@VM_NAME_PREFIX[::KEY_FILE]"
+#
+#ceph:
+# cluster: ceph << Optional
+# config: PATH << Optional
+# keyfile: PATH << Optional
+# key: KEY << not supported for now
+# root_node: NODE_NAME
+#
+#
# nodes: - map of explicit nodes URLS to node roles
# in format
# USERNAME[:PASSWD]@VM_NAME_PREFIX[::KEY_FILE] or localhost: role1, role2, role3....
diff --git a/configs-examples/local_vm_ceph.yml b/configs-examples/local_vm_ceph.yml
new file mode 100644
index 0000000..2c75e31
--- /dev/null
+++ b/configs-examples/local_vm_ceph.yml
@@ -0,0 +1,19 @@
+include: default.yaml
+run_sensors: true
+results_storage: /var/wally_results
+discover: ceph
+
+ceph:
+ root_node: ceph-client
+
+sleep: 0
+
+nodes:
+ koder@ceph-client: testnode
+
+tests:
+ - fio:
+ load: verify
+ params:
+ FILENAME: /dev/rbd0
+ FILESIZE: 4G
diff --git a/configs-examples/perf_lab.yml b/configs-examples/perf_lab.yml
new file mode 100644
index 0000000..b53dfa4
--- /dev/null
+++ b/configs-examples/perf_lab.yml
@@ -0,0 +1,17 @@
+include: default.yaml
+run_sensors: true
+results_storage: /var/wally_results
+discover: ceph
+
+ceph:
+ root_node: root@cz7625
+
+nodes:
+ root@cz7625: testnode
+
+tests:
+ - fio:
+ load: ceph
+ params:
+ FILENAME: /dev/rbd0
+ FILESIZE: 100G
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 004d52d..2162299 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -15,3 +15,12 @@
python-cinderclient
python-keystoneclient
python-glanceclient
+oktest
+iso8601==0.1.10
+scipy
+numpy
+matplotlib
+psutil
+seaborn
+pytest
+
diff --git a/requirements_extra.txt b/requirements_extra.txt
index 6551654..e69de29 100644
--- a/requirements_extra.txt
+++ b/requirements_extra.txt
@@ -1,6 +0,0 @@
-oktest
-iso8601==0.1.10
-scipy
-numpy
-matplotlib
-psutil
diff --git a/run.py b/run.py
new file mode 100644
index 0000000..a8c0e3d
--- /dev/null
+++ b/run.py
@@ -0,0 +1,5 @@
+import sys
+from wally.main import main
+
+exit(main(sys.argv))
+
diff --git a/tests/test_hlstorage.py b/tests/test_hlstorage.py
new file mode 100644
index 0000000..dcc3f6e
--- /dev/null
+++ b/tests/test_hlstorage.py
@@ -0,0 +1,128 @@
+import shutil
+import tempfile
+import contextlib
+from typing import Tuple, Union, Dict, Any
+
+import numpy
+from oktest import ok
+
+
+from wally.result_classes import DataSource, TimeSeries, SuiteConfig
+from wally.suits.job import JobConfig, JobParams
+from wally.storage import make_storage
+from wally.hlstorage import ResultStorage
+
+
+@contextlib.contextmanager
+def in_temp_dir():
+ dname = tempfile.mkdtemp()
+ try:
+ yield dname
+ finally:
+ shutil.rmtree(dname)
+
+
+SUITE_ID = "suite1"
+JOB_ID = "job1"
+NODE_ID = "node1"
+SENSOR = "sensor"
+DEV = "dev"
+METRIC = "metric"
+TAG = "csv"
+DATA_UNITS = "x"
+TIME_UNITS = "us"
+
+
+class TestJobParams(JobParams):
+ def __init__(self) -> None:
+ JobParams.__init__(self)
+
+ @property
+ def summary(self) -> str:
+ return "UT_Job_CFG"
+
+ @property
+ def long_summary(self) -> str:
+ return "UT_Job_Config"
+
+ def copy(self, **updated) -> 'JobParams':
+ return self.__class__()
+
+ @property
+ def char_tpl(self) -> Tuple[Union[str, int, float, bool], ...]:
+ return (1, 2, 3)
+
+
+class TestJobConfig(JobConfig):
+ @property
+ def storage_id(self) -> str:
+ return JOB_ID
+
+ @property
+ def params(self) -> JobParams:
+ return TestJobParams()
+
+ def raw(self) -> Dict[str, Any]:
+ return {}
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'TestJobConfig':
+ return cls()
+
+
+class TestSuiteConfig(SuiteConfig):
+ def __init__(self):
+ SuiteConfig.__init__(self, "UT", {}, "run_uuid", [], "/tmp", 0, False)
+ self.storage_id = SUITE_ID
+
+
+
+def test_sensor_ts():
+ with in_temp_dir() as root:
+ sensor_data = numpy.arange(5)
+ collected_at = numpy.arange(5) + 100
+
+ ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+ cds = DataSource(node_id=NODE_ID, metric='collected_at')
+
+ with make_storage(root, existing=False) as storage:
+ rstorage = ResultStorage(storage)
+
+ rstorage.append_sensor(sensor_data, ds, units=DATA_UNITS, histo_bins=None)
+ rstorage.append_sensor(sensor_data, ds, units=DATA_UNITS, histo_bins=None)
+
+ rstorage.append_sensor(collected_at, cds, units=TIME_UNITS, histo_bins=None)
+ rstorage.append_sensor(collected_at + 5, cds, units=TIME_UNITS, histo_bins=None)
+
+ with make_storage(root, existing=True) as storage2:
+ rstorage2 = ResultStorage(storage2)
+ ts = rstorage2.load_sensor(ds)
+ assert (ts.data == numpy.array([0, 1, 2, 3, 4, 0, 1, 2, 3, 4])).all()
+ assert (ts.times == numpy.arange(10) + 100).all()
+
+
+def test_result_ts():
+ with in_temp_dir() as root:
+ sensor_data = numpy.arange(5, dtype=numpy.uint32)
+ collected_at = numpy.arange(5, dtype=numpy.uint32) + 100
+ ds = DataSource(suite_id=SUITE_ID, job_id=JOB_ID,
+ node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC, tag=TAG)
+
+ ts = TimeSeries("xxxx", None, sensor_data, collected_at, DATA_UNITS, ds, TIME_UNITS)
+
+ suite = TestSuiteConfig()
+ job = TestJobConfig(1)
+
+ with make_storage(root, existing=False) as storage:
+ rstorage = ResultStorage(storage)
+ rstorage.put_or_check_suite(suite)
+ rstorage.put_job(suite, job)
+ rstorage.put_ts(ts)
+
+ with make_storage(root, existing=True) as storage2:
+ rstorage2 = ResultStorage(storage2)
+ suits = list(rstorage2.iter_suite('UT'))
+ suits2 = list(rstorage2.iter_suite())
+ ok(len(suits)) == 1
+ ok(len(suits2)) == 1
+
diff --git a/tests/test_math.py b/tests/test_math.py
index 5a75858..d9c9688 100644
--- a/tests/test_math.py
+++ b/tests/test_math.py
@@ -1,5 +1,7 @@
import numpy
from wally.statistic import rebin_histogram
+from wally.result_classes import DataSource, TimeSeries
+from wally.data_selectors import interpolate_ts_on_seconds_border, c_interpolate_ts_on_seconds_border
def array_eq(x: numpy.array, y: numpy.array, max_diff: float = 1E-3) -> bool:
@@ -30,3 +32,63 @@
assert array_eq(new_edges, numpy.array([20, 30, 40]))
assert new_histo.sum() == curr_histo.sum()
assert list(new_histo) == [30, 10, 60]
+
+
+SUITE_ID = "suite1"
+JOB_ID = "job1"
+NODE_ID = "node1"
+SENSOR = "sensor"
+DEV = "dev"
+METRIC = "metric"
+TAG = "csv"
+DATA_UNITS = "x"
+TIME_UNITS = "ms"
+
+
+def test_interpolate():
+ ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+ samples = 200
+ ms_coef = 1000
+ s_offset = 377 * ms_coef
+ ms_offset = 300 + s_offset
+
+ for i in range(16):
+ source_times = numpy.random.randint(100, size=samples, dtype='uint64') + \
+ ms_coef * numpy.arange(samples, dtype='uint64') + s_offset + ms_offset
+ source_values = numpy.random.randint(30, 60, size=samples, dtype='uint64')
+
+ ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
+ source=ds, time_units=TIME_UNITS)
+
+ # ts2 = interpolate_ts_on_seconds_border(ts)
+ ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True)
+
+ # print()
+ # print(ts.times)
+ # print(ts.data, ts.data.sum())
+ # print(ts2.times)
+ # print(ts2.data, ts2.data.sum())
+
+ assert ts.time_units == 'ms'
+ assert ts2.time_units == 's'
+ assert ts2.times.dtype == ts.times.dtype
+ assert ts2.data.dtype == ts.data.dtype
+
+ assert ts.data.sum() == ts2.data.sum()
+
+ borders = 5
+ block_size = samples // 10
+ for begin_idx in numpy.random.randint(borders, samples - borders, size=20):
+ begin_idx = int(begin_idx)
+ end_idx = min(begin_idx + block_size, ts.times.size - 1)
+
+ first_cell_begin_time = ts.times[begin_idx - 1]
+ last_cell_end_time = ts.times[end_idx]
+ ts_sum = ts.data[begin_idx:end_idx].sum()
+
+ ts2_begin_idx = numpy.searchsorted(ts2.times, first_cell_begin_time // ms_coef)
+ ts2_end_idx = numpy.searchsorted(ts2.times, last_cell_end_time // ms_coef) + 1
+ ts2_max = ts.data[ts2_begin_idx: ts2_end_idx].sum()
+ ts2_min = ts.data[ts2_begin_idx + 1: ts2_end_idx - 1].sum()
+
+ assert ts2_min <= ts_sum <= ts2_max, "NOT {} <= {} <= {}".format(ts2_min, ts_sum, ts2_max)
diff --git a/v2_plans.md b/v2_plans.md
index 70a0caf..90bb288 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -46,7 +46,7 @@
TODO next
---------
-* Remove DBStorage, merge FSStorage and serializer into
+* Merge FSStorage and serializer into
ObjStorage, separate TSStorage.
* Build WallyStorage on top of it, use only WallyStorage in code
* check that OS key match what is stored on disk
@@ -58,7 +58,6 @@
* Cumulative statistic table for all jobs
* Add column for job params, which show how many cluster resource consumed
* show extra outliers with arrows
-* More X = func(QD) plots. Eg. - kurt/skew, etc.
* Hide cluster load if no nodes available
* Show latency skew and curtosis
* Sort engineering report by result tuple
diff --git a/wally/ceph.py b/wally/ceph.py
index 36d6b15..3527db3 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -1,7 +1,7 @@
""" Collect data about ceph nodes"""
-import json
+import os
import logging
-from typing import Dict, cast, List, Set, Optional
+from typing import Dict, cast, List, Set
from .node_interfaces import NodeInfo, IRPCNode
@@ -21,16 +21,16 @@
logger = logging.getLogger("wally")
-def get_osds_info(node: IRPCNode, ceph_extra_args: str = "") -> Dict[IP, List[OSDInfo]]:
+def get_osds_info(node: IRPCNode, ceph_extra_args: str = "", thcount: int = 8) -> Dict[IP, List[OSDInfo]]:
"""Get set of osd's ip"""
res = {} # type: Dict[IP, List[OSDInfo]]
return {IP(ip): osd_info_list
- for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args)}
+ for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args, thcount=thcount).items()}
def get_mons_ips(node: IRPCNode, ceph_extra_args: str = "") -> Set[IP]:
"""Return mon ip set"""
- return {IP(ip) for ip in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
+ return {IP(ip) for ip, _ in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
class DiscoverCephStage(Stage):
@@ -40,7 +40,8 @@
def run(self, ctx: TestRun) -> None:
"""Return list of ceph's nodes NodeInfo"""
- if 'ceph' not in ctx.config.discovery:
+ if 'ceph' not in ctx.config.discover:
+ print(ctx.config.discover)
logger.debug("Skip ceph discovery due to config setting")
return
@@ -48,11 +49,11 @@
logger.debug("Skip ceph discovery, use previously discovered nodes")
return
- if 'metadata' in ctx.config.discovery:
+ if 'metadata' in ctx.config.discover:
logger.exception("Ceph metadata discovery is not implemented")
raise StopTestError()
- ignore_errors = 'ignore_errors' in ctx.config.discovery
+ ignore_errors = 'ignore_errors' in ctx.config.discover
ceph = ctx.config.ceph
root_node_uri = cast(str, ceph.root_node)
cluster = ceph.get("cluster", "ceph")
@@ -81,18 +82,31 @@
ceph_params = {"cluster": cluster, "conf": conf, "key": key}
+ ip_remap = {
+ '10.8.0.4': '172.16.164.71',
+ '10.8.0.3': '172.16.164.72',
+ '10.8.0.2': '172.16.164.73',
+ '10.8.0.5': '172.16.164.74',
+ '10.8.0.6': '172.16.164.75',
+ '10.8.0.7': '172.16.164.76',
+ '10.8.0.8': '172.16.164.77',
+ '10.8.0.9': '172.16.164.78',
+ }
+
with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
log_level=ctx.config.rpc_log_level) as node:
- ssh_key = node.get_file_content("~/.ssh/id_rsa")
+ # ssh_key = node.get_file_content("~/.ssh/id_rsa", expanduser=True)
try:
ips = set()
- for ip, osds_info in get_osds_info(node, ceph_extra_args).items():
+ for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
+ ip = ip_remap[ip]
ips.add(ip)
- creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+ # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+ creds = ConnCreds(to_ip(cast(str, ip)), user="root")
info = ctx.merge_node(creds, {'ceph-osd'})
- info.params.setdefault('ceph-osds', []).extend(osds_info)
+ info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
info.params['ceph'] = ceph_params
@@ -107,7 +121,9 @@
try:
counter = 0
for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
- creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+ ip = ip_remap[ip]
+ # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+ creds = ConnCreds(to_ip(cast(str, ip)), user="root")
info = ctx.merge_node(creds, {'ceph-mon'})
assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
info.params['ceph'] = ceph_params
@@ -118,3 +134,33 @@
raise StopTestError()
else:
logger.warning("MON discovery failed %s", exc)
+
+
+def raw_dev_name(path):
+ if path.startswith("/dev/"):
+ path = path[5:]
+ while path and path[-1].isdigit():
+ path = path[:-1]
+ return path
+
+
+class FillCephInfoStage(Stage):
+ config_block = 'ceph'
+ priority = StepOrder.UPDATE_NODES_INFO
+
+ def run(self, ctx: TestRun) -> None:
+ for node in ctx.nodes:
+ if 'ceph_storage_devs' not in node.info.params:
+ if 'ceph-osd' in node.info.roles:
+ jdevs = set()
+ sdevs = set()
+ for osd_info in node.info.params['ceph-osds']:
+ for key, sset in [('journal', jdevs), ('storage', sdevs)]:
+ path = osd_info.get(key)
+ if path:
+ dpath = node.conn.fs.get_dev_for_file(path)
+ if isinstance(dpath, bytes):
+ dpath = dpath.decode('utf8')
+ sset.add(raw_dev_name(dpath))
+ node.info.params['ceph_storage_devs'] = list(sdevs)
+ node.info.params['ceph_journal_devs'] = list(jdevs)
diff --git a/wally/config.py b/wally/config.py
index add4df5..95ae511 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -25,7 +25,7 @@
self.debug_agents = False # type: bool
# None, disabled, enabled, metadata, ignore_errors
- self.discovery = None # type: Optional[str]
+ self.discover = None # type: Optional[str]
self.logging = None # type: 'Config'
self.ceph = None # type: 'Config'
@@ -33,7 +33,7 @@
self.fuel = None # type: 'Config'
self.test = None # type: 'Config'
self.sensors = None # type: 'Config'
- self.discovery = None # type: Set[str]
+ self.discover = None # type: Set[str]
self._dct.clear()
self._dct.update(dct)
diff --git a/wally/console_report.py b/wally/console_report.py
index bfa8881..3733de1 100644
--- a/wally/console_report.py
+++ b/wally/console_report.py
@@ -19,8 +19,8 @@
for suite in rstorage.iter_suite(FioTest.name):
table = texttable.Texttable(max_width=200)
- table.header(["Description", "IOPS ~ Dev", 'Skew/Kurt', 'lat med', 'lat 95'])
- table.set_cols_align(('l', 'r', 'r', 'r', 'r'))
+ table.header(["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms'])
+ table.set_cols_align(('l', 'r', 'r', 'r', 'r', 'r'))
for job in sorted(rstorage.iter_job(suite), key=lambda job: job.params):
bw_ts, = list(rstorage.iter_ts(suite, job, metric='bw'))
@@ -31,9 +31,9 @@
lat_ts, = list(rstorage.iter_ts(suite, job, metric='lat'))
bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000 # convert us to ms
lat_props = calc_histo_stat_props(lat_ts, bins_edges)
-
table.add_row([job.params.summary,
"{} ~ {}".format(float2str(avg_iops), float2str(iops_dev)),
+ float2str(props.average / 1024), # Ki -> Mi
"{}/{}".format(float2str(props.skew), float2str(props.kurt)),
float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
new file mode 100644
index 0000000..53b822b
--- /dev/null
+++ b/wally/data_selectors.py
@@ -0,0 +1,411 @@
+import ctypes
+import logging
+import os.path
+from typing import Tuple, List, Iterable, Iterator, Optional, Union
+from fractions import Fraction
+
+import numpy
+
+from cephlib.numeric import auto_edges2
+
+import wally
+from .hlstorage import ResultStorage
+from .node_interfaces import NodeInfo
+from .result_classes import DataSource, TimeSeries, SuiteConfig, JobConfig
+from .suits.io.fio import FioJobConfig
+from .suits.io.fio_hist import expected_lat_bins
+from .utils import unit_conversion_coef
+
+
+logger = logging.getLogger("wally")
+
+# Separately for each test heatmaps & agg acroos whole time histos:
+# * fio latency heatmap for all instances
+# * data dev iops across all osd
+# * data dev bw across all osd
+# * date dev qd across all osd
+# * journal dev iops across all osd
+# * journal dev bw across all osd
+# * journal dev qd across all osd
+# * net dev pps across all hosts
+# * net dev bps across all hosts
+
+# Main API's
+# get sensors by pattern
+# allign values to seconds
+# cut ranges for particular test
+# transform into 2d histos (either make histos or rebin them) and clip outliers same time
+
+
+AGG_TAG = 'ALL'
+
+
+def find_nodes_by_roles(rstorage: ResultStorage, node_roles: Iterable[str]) -> List[NodeInfo]:
+ nodes = rstorage.storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
+ node_roles_s = set(node_roles)
+ return [node for node in nodes if node.roles.intersection(node_roles_s)]
+
+
+def find_all_sensors(rstorage: ResultStorage,
+ node_roles: Iterable[str],
+ sensor: str,
+ metric: str) -> Iterator[TimeSeries]:
+ all_nodes_rr = "|".join(node.node_id for node in find_nodes_by_roles(rstorage, node_roles))
+ all_nodes_rr = "(?P<node>{})".format(all_nodes_rr)
+
+ for path, ds in rstorage.iter_sensors(all_nodes_rr, sensor=sensor, metric=metric):
+ ts = rstorage.load_sensor(ds)
+
+ # for sensors ts.times is array of pairs - collection_start_at, colelction_finished_at
+ # to make this array consistent with times in load data second item if each pair is dropped
+ ts.times = ts.times[::2]
+ yield ts
+
+
+def find_all_series(rstorage: ResultStorage, suite: SuiteConfig, job: JobConfig, metric: str) -> Iterator[TimeSeries]:
+ "Iterated over selected metric for all nodes for given Suite/job"
+ return rstorage.iter_ts(suite, job, metric=metric)
+
+
+def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
+ "Sum selected metric for all nodes for given Suite/job"
+
+ tss = list(find_all_series(rstorage, suite, job, metric))
+
+ if len(tss) == 0:
+ raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
+
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id=AGG_TAG,
+ sensor='fio',
+ dev=AGG_TAG,
+ metric=metric,
+ tag='csv')
+
+ agg_ts = TimeSeries(metric,
+ raw=None,
+ source=ds,
+ data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
+ times=tss[0].times.copy(),
+ units=tss[0].units,
+ histo_bins=tss[0].histo_bins,
+ time_units=tss[0].time_units)
+
+ for ts in tss:
+ if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
+ msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
+ ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
+ logger.error(msg)
+ raise ValueError(msg)
+
+ if metric != 'lat' and len(ts.data.shape) != 1:
+ msg = "Sensor {}.{} on node {} has shape={}. Can only process 1D sensors.".format(
+ ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
+ logger.error(msg)
+ raise ValueError(msg)
+
+ # TODO: match times on different ts
+ agg_ts.data += ts.data
+
+ return agg_ts
+
+
+interpolated_cache = {}
+
+
+def interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
+ "Interpolate time series to values on seconds borders"
+
+ if not nc and ts.source.tpl in interpolated_cache:
+ return interpolated_cache[ts.source.tpl]
+
+ assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
+ .format(len(ts.times), len(ts.data), ts.source)
+
+ rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
+
+ if isinstance(rcoef, Fraction):
+ assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
+ rcoef = rcoef.numerator
+
+ assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
+ coef = int(rcoef) # make typechecker happy
+
+ # round to seconds border
+ begin = int(ts.times[0] / coef + 1) * coef
+ end = int(ts.times[-1] / coef) * coef
+
+ # current real data time chunk begin time
+ edge_it = iter(ts.times)
+
+ # current real data value
+ val_it = iter(ts.data)
+
+ # result array, cumulative value per second
+ result = numpy.empty([(end - begin) // coef], dtype=ts.data.dtype)
+ idx = 0
+ curr_summ = 0
+
+ # end of current time slot
+ results_cell_ends = begin + coef
+
+ # hack to unify looping
+ real_data_end = next(edge_it)
+ while results_cell_ends <= end:
+ real_data_start = real_data_end
+ real_data_end = next(edge_it)
+ real_val_left = next(val_it)
+
+ # real data "speed" for interval [real_data_start, real_data_end]
+ real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
+
+ while real_data_end >= results_cell_ends and results_cell_ends <= end:
+ # part of current real value, which is fit into current result cell
+ curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
+
+ # calculate rest of real data for next result cell
+ real_val_left -= curr_real_chunk
+ result[idx] = curr_summ + curr_real_chunk
+ idx += 1
+ curr_summ = 0
+
+ # adjust real data start time
+ real_data_start = results_cell_ends
+ results_cell_ends += coef
+
+ # don't lost any real data
+ curr_summ += real_val_left
+
+ assert idx == len(result), "Wrong output array size - idx(={}) != len(result)(={})".format(idx, len(result))
+
+ res_ts = TimeSeries(ts.name, None, result,
+ times=int(begin // coef) + numpy.arange(idx, dtype=ts.times.dtype),
+ units=ts.units,
+ time_units='s',
+ source=ts.source(),
+ histo_bins=ts.histo_bins)
+
+ if not nc:
+ interpolated_cache[ts.source.tpl] = res_ts
+
+ return res_ts
+
+
+c_interp_func = None
+cdll = None
+
+
+def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
+ "Interpolate time series to values on seconds borders"
+
+ if not nc and ts.source.tpl in interpolated_cache:
+ return interpolated_cache[ts.source.tpl]
+
+ assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
+ .format(len(ts.times), len(ts.data), ts.source)
+
+ rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
+
+ if isinstance(rcoef, Fraction):
+ assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
+ rcoef = rcoef.numerator
+
+ assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
+ coef = int(rcoef) # make typechecker happy
+
+ global cdll
+ global c_interp_func
+ uint64_p = ctypes.POINTER(ctypes.c_uint64)
+
+ if c_interp_func is None:
+ dirname = os.path.dirname(os.path.dirname(wally.__file__))
+ path = os.path.join(dirname, 'clib', 'libwally.so')
+ cdll = ctypes.CDLL(path)
+ c_interp_func = cdll.interpolate_ts_on_seconds_border_v2
+ c_interp_func.argtypes = [
+ ctypes.c_uint, # input_size
+ ctypes.c_uint, # output_size
+ uint64_p, # times
+ uint64_p, # values
+ ctypes.c_uint, # time_scale_coef
+ uint64_p, # output
+ ]
+ c_interp_func.restype = None
+
+ assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
+ assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
+
+ output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
+ # print("output_sz =", output_sz, "coef =", coef)
+ result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
+
+ c_interp_func(ts.data.size,
+ output_sz,
+ ts.times.ctypes.data_as(uint64_p),
+ ts.data.ctypes.data_as(uint64_p),
+ coef,
+ result.ctypes.data_as(uint64_p))
+
+ res_ts = TimeSeries(ts.name, None, result,
+ times=int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype),
+ units=ts.units,
+ time_units='s',
+ source=ts.source(),
+ histo_bins=ts.histo_bins)
+
+ if not nc:
+ interpolated_cache[ts.source.tpl] = res_ts
+ return res_ts
+
+
+def get_ts_for_time_range(ts: TimeSeries, time_range: Tuple[int, int]) -> TimeSeries:
+ """Return sensor values for given node for given period. Return per second estimated values array
+ Raise an error if required range is not full covered by data in storage"""
+
+ assert ts.time_units == 's', "{} != s for {!s}".format(ts.time_units, ts.source)
+ assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
+ .format(len(ts.times), len(ts.data), ts.source)
+
+ if time_range[0] < ts.times[0] or time_range[1] > ts.times[-1]:
+ raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
+ "sensor = {}_{}.{}.{}").format(time_range, ts.times[0], ts.times[-1],
+ ts.source.node_id, ts.source.sensor, ts.source.dev,
+ ts.source.metric))
+ idx1, idx2 = numpy.searchsorted(ts.times, time_range)
+ return TimeSeries(ts.name, None,
+ ts.data[idx1:idx2],
+ times=ts.times[idx1:idx2],
+ units=ts.units,
+ time_units=ts.time_units,
+ source=ts.source,
+ histo_bins=ts.histo_bins)
+
+
+def make_2d_histo(tss: List[TimeSeries],
+ outliers_range: Tuple[float, float] = (0.02, 0.98),
+ bins_count: int = 20,
+ log_bins: bool = False) -> TimeSeries:
+
+ # validate input data
+ for ts in tss:
+ assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
+ .format(len(ts.times), len(ts.data), ts.source)
+ assert ts.time_units == 's', "All arrays should have the same data units"
+ assert ts.units == tss[0].units, "All arrays should have the same data units"
+ assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
+ assert len(ts.data.shape) == 1, "All arrays should be 1d"
+
+ whole_arr = numpy.concatenate([ts.data for ts in tss])
+ whole_arr.shape = [len(tss), -1]
+
+ if outliers_range is not None:
+ max_vl, begin, end, min_vl = numpy.percentile(whole_arr,
+ [0, outliers_range[0] * 100, outliers_range[1] * 100, 100])
+ bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
+ fixed_bins_edges = bins_edges.copy()
+ fixed_bins_edges[0] = begin
+ fixed_bins_edges[-1] = end
+ else:
+ begin, end = numpy.percentile(whole_arr, [0, 100])
+ bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
+ fixed_bins_edges = bins_edges
+
+ res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
+ res_data.shape = (len(tss), -1)
+ res = TimeSeries(name=tss[0].name,
+ raw=None,
+ data=res_data,
+ times=tss[0].times,
+ units=tss[0].units,
+ source=tss[0].source,
+ time_units=tss[0].time_units,
+ histo_bins=bins_edges)
+ return res
+
+
+def aggregate_histograms(tss: List[TimeSeries],
+ outliers_range: Tuple[float, float] = (0.02, 0.98),
+ bins_count: int = 20,
+ log_bins: bool = False) -> TimeSeries:
+
+ # validate input data
+ for ts in tss:
+ assert len(ts.times) == len(ts.data), "Need to use stripped time"
+ assert ts.time_units == 's', "All arrays should have the same data units"
+ assert ts.units == tss[0].units, "All arrays should have the same data units"
+ assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
+ assert len(ts.data.shape) == 2, "All arrays should be 2d"
+ assert ts.histo_bins is not None, "All arrays should be 2d"
+
+ whole_arr = numpy.concatenate([ts.data for ts in tss])
+ whole_arr.shape = [len(tss), -1]
+
+ max_val = whole_arr.min()
+ min_val = whole_arr.max()
+
+ if outliers_range is not None:
+ begin, end = numpy.percentile(whole_arr, [outliers_range[0] * 100, outliers_range[1] * 100])
+ else:
+ begin = min_val
+ end = max_val
+
+ bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
+
+ if outliers_range is not None:
+ fixed_bins_edges = bins_edges.copy()
+ fixed_bins_edges[0] = begin
+ fixed_bins_edges[-1] = end
+ else:
+ fixed_bins_edges = bins_edges
+
+ res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
+ res_data.shape = (len(tss), -1)
+ return TimeSeries(name=tss[0].name,
+ raw=None,
+ data=res_data,
+ times=tss[0].times,
+ units=tss[0].units,
+ source=tss[0].source,
+ time_units=tss[0].time_units,
+ histo_bins=fixed_bins_edges)
+
+
+def summ_sensors(rstorage: ResultStorage,
+ roles: List[str],
+ sensor: str,
+ metric: str,
+ time_range: Tuple[int, int]) -> Optional[TimeSeries]:
+
+ res = None # type: Optional[TimeSeries]
+ for node in find_nodes_by_roles(rstorage, roles):
+ for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
+ data = rstorage.load_sensor(ds)
+ data = c_interpolate_ts_on_seconds_border(data)
+ data = get_ts_for_time_range(data, time_range)
+ if res is None:
+ res = data
+ res.data = res.data.copy()
+ else:
+ res.data += data.data
+ return res
+
+
+def find_sensors_to_2d(rstorage: ResultStorage,
+ roles: List[str],
+ sensor: str,
+ devs: List[str],
+ metric: str,
+ time_range: Tuple[int, int]) -> numpy.ndarray:
+
+ res = [] # type: List[TimeSeries]
+ for node in find_nodes_by_roles(rstorage, roles):
+ for dev in devs:
+ for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
+ data = rstorage.load_sensor(ds)
+ data = c_interpolate_ts_on_seconds_border(data)
+ data = get_ts_for_time_range(data, time_range)
+ res.append(data.data)
+ res2d = numpy.concatenate(res)
+ res2d.shape = ((len(res), -1))
+ return res2d
diff --git a/wally/fuel.py b/wally/fuel.py
index a565805..3a11403 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -37,9 +37,9 @@
pass
def run(self, ctx: TestRun) -> None:
- full_discovery = 'fuel' in ctx.config.discovery
- metadata_only = (not full_discovery) and ('metadata' in ctx.config.discovery)
- ignore_errors = 'ignore_errors' in ctx.config.discovery
+ full_discovery = 'fuel' in ctx.config.discover
+ metadata_only = (not full_discovery) and ('metadata' in ctx.config.discover)
+ ignore_errors = 'ignore_errors' in ctx.config.discover
if not (metadata_only or full_discovery):
logger.debug("Skip ceph discovery due to config setting")
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
index 411b515..733eab0 100644
--- a/wally/hlstorage.py
+++ b/wally/hlstorage.py
@@ -1,12 +1,12 @@
import os
import pprint
import logging
-from typing import cast, Iterator, Tuple, Type, Dict, Optional, Any, List
+from typing import cast, Iterator, Tuple, Type, Dict, Optional, List
import numpy
from .suits.job import JobConfig
-from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage
+from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage, ArrayData
from .storage import Storage
from .utils import StopTestError
from .suits.all_suits import all_suits
@@ -75,87 +75,140 @@
def __init__(self, storage: Storage) -> None:
self.storage = storage
- self.cache = {} # type: Dict[str, Tuple[int, int, Any, List[str]]]
+ self.cache = {} # type: Dict[str, Tuple[int, int, ArrayData]]
def sync(self) -> None:
self.storage.sync()
# ----------------- SERIALIZATION / DESERIALIZATION -------------------------------------------------------------
- def load_array(self, path: str, skip_shape: bool = False) -> Tuple[numpy.array, Tuple[str, ...]]:
+ def read_headers(self, fd) -> Tuple[str, List[str], List[str], Optional[numpy.ndarray]]:
+ header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
+ dtype, has_header2, header2_dtype, *ext_header = header
+
+ if has_header2 == 'true':
+ ln = fd.readline().decode(self.csv_file_encoding).strip()
+ header2 = numpy.fromstring(ln, sep=',', dtype=header2_dtype)
+ else:
+ assert has_header2 == 'false', \
+ "In file {} has_header2 is not true/false, but {!r}".format(fd.name, has_header2)
+ header2 = None
+ return dtype, ext_header, header, header2
+
+ def load_array(self, path: str) -> ArrayData:
+ """
+ Load array from file, shoult not be called directly
+ :param path: file path
+ :return: ArrayData
+ """
with self.storage.get_fd(path, "rb") as fd:
+ fd.seek(0, os.SEEK_SET)
+
stats = os.fstat(fd.fileno())
if path in self.cache:
- size, atime, obj, header = self.cache[path]
+ size, atime, arr_info = self.cache[path]
if size == stats.st_size and atime == stats.st_atime_ns:
- return obj, header
+ return arr_info
- header = fd.readline().decode(self.csv_file_encoding).strip().split(",")
+ data_dtype, header, _, header2 = self.read_headers(fd)
+ assert data_dtype == 'uint64', path
+ dt = fd.read().decode(self.csv_file_encoding).strip()
- if skip_shape:
- header = header[1:]
- dt = fd.read().decode("utf-8").strip()
+ if len(dt) != 0:
+ arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=data_dtype)
+ lines = dt.count("\n") + 1
+ assert len(set(ln.count(',') for ln in dt.split("\n"))) == 1, \
+ "Data lines in {!r} have different element count".format(path)
+ arr.shape = [lines] if lines == arr.size else [lines, -1]
+ else:
+ arr = None
- arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=header[0])
- if len(dt) != 0:
- lines = dt.count("\n") + 1
- columns = dt.split("\n", 1)[0].count(",") + 1
- assert lines * columns == len(arr)
- if columns == 1:
- arr.shape = (lines,)
- else:
- arr.shape = (lines, columns)
+ arr_data = ArrayData(header, header2, arr)
+ self.cache[path] = (stats.st_size, stats.st_atime_ns, arr_data)
+ return arr_data
- self.cache[path] = (stats.st_size, stats.st_atime_ns, arr, header[1:])
- return arr, header[1:]
+ def put_array(self, path: str, data: numpy.array, header: List[str], header2: numpy.ndarray = None,
+ append_on_exists: bool = False) -> None:
- def put_array(self, path:str, data: numpy.array, header: List[str], append_on_exists: bool = False) -> None:
- header = [data.dtype.name] + header
+ header = [data.dtype.name] + \
+ (['false', ''] if header2 is None else ['true', header2.dtype.name]) + \
+ header
exists = append_on_exists and path in self.storage
- if len(data.shape) == 1:
- # make array vertical to simplify reading
- vw = data.view().reshape((data.shape[0], 1))
- else:
- vw = data
+ vw = data.view().reshape((data.shape[0], 1)) if len(data.shape) == 1 else data
+ mode = "cb" if not exists else "rb+"
- with self.storage.get_fd(path, "cb" if not exists else "rb+") as fd:
+ with self.storage.get_fd(path, mode) as fd:
if exists:
- curr_header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
- assert header == curr_header, \
- "Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
- .format(path, header, curr_header)
+ data_dtype, _, full_header, curr_header2 = self.read_headers(fd)
+
+ assert data_dtype == data.dtype.name, \
+ "Path {!r}. Passed data type ({!r}) and current data type ({!r}) doesn't match"\
+ .format(path, data.dtype.name, data_dtype)
+
+ assert header == full_header, \
+ "Path {!r}. Passed header ({!r}) and current header ({!r}) doesn't match"\
+ .format(path, header, full_header)
+
+ assert header2 == curr_header2, \
+ "Path {!r}. Passed header2 != current header2: {!r}\n{!r}"\
+ .format(path, header2, curr_header2)
+
fd.seek(0, os.SEEK_END)
else:
fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
+ if header2 is not None:
+ fd.write((",".join(map(str, header2)) + "\n").encode(self.csv_file_encoding))
numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
- arr, header = self.load_array(path, skip_shape=True)
- units, time_units = header
+ """
+ Load time series, generated by fio or other tool, should not be called directly,
+ use iter_ts istead.
+ :param ds: data source path
+ :param path: path in data storage
+ :return: TimeSeries
+ """
+ (units, time_units), header2, data = self.load_array(path)
+ times = data[:,0]
+ ts_data = data[:,1:]
- data = arr[:,1:]
- if data.shape[1] == 1:
- data = data.reshape((-1,))
+ if ts_data.shape[1] == 1:
+ ts_data.shape = (ts_data.shape[0],)
return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
raw=None,
- data=data,
- times=arr[:,0],
+ data=ts_data,
+ times=times,
source=ds,
units=units,
- time_units=time_units)
+ time_units=time_units,
+ histo_bins=header2)
def load_sensor(self, ds: DataSource) -> TimeSeries:
- collected_at, collect_header = self.load_array(DB_paths.sensor_time.format(**ds.__dict__))
- assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header)
- data, data_header = self.load_array(DB_paths.sensor_data.format(**ds.__dict__))
+ # sensors has no shape
+ path = DB_paths.sensor_time.format(**ds.__dict__)
+ collect_header, must_be_none, collected_at = self.load_array(path)
+
+ # cut 'collection end' time
+ collected_at = collected_at[::2]
+
+ # there must be no histogram for collected_at
+ assert must_be_none is None, "Extra header2 {!r} in collect_at file at {!r}".format(must_be_none, path)
+ assert collect_header == [ds.node_id, 'collected_at', 'us'],\
+ "Unexpected collect_at header {!r} at {!r}".format(collect_header, path)
+ assert len(collected_at.shape) == 1, "Collected_at must be 1D at {!r}".format(path)
+
+ data_path = DB_paths.sensor_data.format(**ds.__dict__)
+ data_header, must_be_none, data = self.load_array(data_path)
+
+ # there must be no histogram for any sensors
+ assert must_be_none is None, "Extra header2 {!r} in sensor data file {!r}".format(must_be_none, data_path)
data_units = data_header[2]
- assert data_header == [ds.node_id, ds.metric_fqdn, data_units]
-
- assert len(data.shape) == 1
- assert len(collected_at.shape) == 1
+ assert data_header == [ds.node_id, ds.metric_fqdn, data_units], \
+ "Unexpected data header {!r} at {!r}".format(data_header, data_path)
+ assert len(data.shape) == 1, "Sensor data must be 1D at {!r}".format(data_path)
return TimeSeries(ds.metric_fqdn,
raw=None,
@@ -190,12 +243,12 @@
self.storage.put(job, path)
def put_ts(self, ts: TimeSeries) -> None:
- assert ts.data.dtype == ts.times.dtype
- assert ts.data.dtype.kind == 'u'
- assert ts.source.tag == self.ts_arr_tag
-
+ assert ts.data.dtype == ts.times.dtype, "Data type {!r} != time type {!r}".format(ts.data.dtype, ts.times.dtype)
+ assert ts.data.dtype.kind == 'u', "Only unsigned ints are accepted"
+ assert ts.source.tag == self.ts_arr_tag, "Incorrect source tag == {!r}, must be {!r}".format(ts.source.tag,
+ self.ts_arr_tag)
csv_path = DB_paths.ts.format(**ts.source.__dict__)
- header = [ts.data.dtype.name, ts.units, ts.time_units]
+ header = [ts.units, ts.time_units]
tv = ts.times.view().reshape((-1, 1))
if len(ts.data.shape) == 1:
@@ -204,7 +257,10 @@
dv = ts.data
result = numpy.concatenate((tv, dv), axis=1)
- self.put_array(csv_path, result, header)
+ if ts.histo_bins is not None:
+ self.put_array(csv_path, result, header, header2=ts.histo_bins)
+ else:
+ self.put_array(csv_path, result, header)
if ts.raw:
raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
@@ -225,14 +281,20 @@
def put_report(self, report: str, name: str) -> str:
return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
- def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None:
+ def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
if ds.metric == 'collected_at':
path = DB_paths.sensor_time
metrics_fqn = 'collected_at'
else:
path = DB_paths.sensor_data
metrics_fqn = ds.metric_fqdn
- self.put_array(path.format(**ds.__dict__), data, [ds.node_id, metrics_fqn, units], append_on_exists=True)
+
+ if ds.metric == 'lat':
+ assert len(data.shape) == 2, "Latency should be histo array"
+ assert histo_bins is not None, "Latency should have histo bins"
+
+ path = path.format(**ds.__dict__)
+ self.put_array(path, data, [ds.node_id, metrics_fqn, units], header2=histo_bins, append_on_exists=True)
# ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
@@ -284,11 +346,12 @@
yield self.load_ts(ds, path)
def iter_sensors(self, node_id: str = None, sensor: str = None, dev: str = None, metric: str = None) -> \
- Iterator[Tuple[str, Dict[str, str]]]:
-
- path = fill_path(DB_paths.sensor_data_r, node_id=node_id, sensor=sensor, dev=dev, metric=metric)
+ Iterator[Tuple[str, DataSource]]:
+ vls = dict(node_id=node_id, sensor=sensor, dev=dev, metric=metric)
+ path = fill_path(DB_paths.sensor_data_r, **vls)
for is_file, path, groups in self.iter_paths(path):
- assert is_file
- yield path, groups
+ cvls = vls.copy()
+ cvls.update(groups)
+ yield path, DataSource(**cvls)
diff --git a/wally/main.py b/wally/main.py
index b931929..93922dd 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -4,10 +4,8 @@
import pprint
import getpass
import logging
-import tempfile
import argparse
import functools
-import subprocess
import contextlib
from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
from yaml import load as _yaml_load
@@ -43,7 +41,7 @@
# stages
-from .ceph import DiscoverCephStage
+from .ceph import DiscoverCephStage, FillCephInfoStage
from .openstack import DiscoverOSStage
from .fuel import DiscoverFuelStage
from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
@@ -212,6 +210,7 @@
def get_run_stages() -> List[Stage]:
return [DiscoverCephStage(),
+ FillCephInfoStage(),
DiscoverOSStage(),
DiscoverFuelStage(),
ExplicitNodesStage(),
@@ -245,7 +244,7 @@
config.build_description = opts.build_description
config.build_type = opts.build_type
config.settings_dir = get_config_path(config, opts.settings_dir)
- config.discovery = set(config.get('discovery', '').split(","))
+ config.discover = set(name for name in config.get('discover', '').split(",") if name)
storage = make_storage(config.storage_url)
storage.put(config, 'config')
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index 3992048..b10f83d 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -109,19 +109,20 @@
pass
@abc.abstractmethod
- def copy_file(self, local_path: str, remote_path: str = None, compress: bool = False) -> str:
+ def copy_file(self, local_path: str, remote_path: str = None,
+ expanduser: bool = False, compress: bool = False) -> str:
pass
@abc.abstractmethod
- def get_file_content(self, path: str, compress: bool = False) -> bytes:
+ def get_file_content(self, path: str, expanduser: bool = False, compress: bool = False) -> bytes:
pass
@abc.abstractmethod
- def put_to_file(self, path: Optional[str], content: bytes, compress: bool = False) -> str:
+ def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
pass
@abc.abstractmethod
- def stat_file(self, path:str) -> Any:
+ def stat_file(self, path:str, expanduser: bool = False) -> Any:
pass
@abc.abstractmethod
diff --git a/wally/openstack.py b/wally/openstack.py
index b046656..88189f5 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -105,7 +105,7 @@
pass
def run(self, ctx: TestRun) -> None:
- if 'openstack' not in ctx.config.discovery:
+ if 'openstack' not in ctx.config.discover:
logger.debug("Skip openstack discovery due to settings")
return
@@ -125,7 +125,7 @@
user, password = os_nodes_auth.split(":")
key_file = None
- if 'metadata' not in ctx.config.discovery:
+ if 'metadata' not in ctx.config.discover:
services = ctx.os_connection.nova.services.list() # type: List[Any]
host_services_mapping = {} # type: Dict[str, List[str]]
diff --git a/wally/report.py b/wally/report.py
index 75b8028..b8a7713 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,14 +1,23 @@
import os
import abc
import logging
+import warnings
from io import BytesIO
from functools import wraps
-from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set, Optional
from collections import defaultdict
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set
import numpy
import scipy.stats
+
+import matplotlib
+# matplotlib.use('GTKAgg')
+
import matplotlib.pyplot as plt
+from matplotlib import gridspec
+
+from cephlib.common import float2str
+from cephlib.plot import plot_hmap_with_y_histo, hmap_from_2d
import wally
@@ -17,15 +26,19 @@
from .test_run_class import TestRun
from .hlstorage import ResultStorage
from .node_interfaces import NodeInfo
-from .utils import b2ssize, b2ssize_10, STORAGE_ROLES
+from .utils import b2ssize, b2ssize_10, STORAGE_ROLES, unit_conversion_coef
from .statistic import (calc_norm_stat_props, calc_histo_stat_props, moving_average, moving_dev,
- hist_outliers_perc, ts_hist_outliers_perc, find_ouliers_ts, approximate_curve)
-from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig,
- IResultStorage)
-from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+ hist_outliers_perc, find_ouliers_ts, approximate_curve)
+from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig)
from .suits.io.fio import FioTest, FioJobConfig
from .suits.io.fio_job import FioJobParams
from .suits.job import JobConfig
+from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+
+
+with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ import seaborn
logger = logging.getLogger("wally")
@@ -65,6 +78,7 @@
hist_boxes = 10
hist_lat_boxes = 25
hm_hist_bins_count = 25
+ hm_x_slots = 25
min_points_for_dev = 5
dev_range_x = 2.0
@@ -86,11 +100,12 @@
extra_io_spine = True
legend_for_eng = True
- heatmap_interpolation = '1d'
+ # heatmap_interpolation = '1d'
+ heatmap_interpolation = None
heatmap_interpolation_points = 300
outliers_q_nd = 3.0
outliers_hide_q_nd = 4.0
- outliers_lat = (0.01, 0.995)
+ outliers_lat = (0.01, 0.9)
violin_instead_of_box = True
violin_point_count = 30000
@@ -105,6 +120,10 @@
'lat': ("ms", 1, "latency")
}
+ qd_bins = [0, 1, 2, 4, 6, 8, 12, 16, 20, 26, 32, 40, 48, 56, 64, 96, 128]
+ iotime_bins = list(range(0, 1030, 50))
+ block_size_bins = [0, 2, 4, 8, 16, 32, 48, 64, 96, 128, 192, 256, 384, 512, 1024, 2048]
+
# ---------------- STRUCTS -------------------------------------------------------------------------------------------
@@ -151,81 +170,14 @@
def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig) -> IOSummary:
lat = get_aggregated(rstorage, suite, job, "lat")
- bins_edges = numpy.array(get_lat_vals(lat.data.shape[1]), dtype='float32') / 1000
io = get_aggregated(rstorage, suite, job, "bw")
return IOSummary(job.qd,
nodes_count=len(suite.nodes_ids),
block_size=job.bsize,
- lat=calc_histo_stat_props(lat, bins_edges, rebins_count=StyleProfile.hist_boxes),
+ lat=calc_histo_stat_props(lat, rebins_count=StyleProfile.hist_boxes),
bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
-#
-# def iter_io_results(rstorage: ResultStorage,
-# qds: List[int] = None,
-# op_types: List[str] = None,
-# sync_types: List[str] = None,
-# block_sizes: List[int] = None) -> Iterator[Tuple[TestSuiteConfig, FioJobConfig]]:
-#
-# for suite in rstorage.iter_suite(FioTest.name):
-# for job in rstorage.iter_job(suite):
-# fjob = cast(FioJobConfig, job)
-# assert int(fjob.vals['numjobs']) == 1
-#
-# if sync_types is not None and fjob.sync_mode in sync_types:
-# continue
-#
-# if block_sizes is not None and fjob.bsize not in block_sizes:
-# continue
-#
-# if op_types is not None and fjob.op_type not in op_types:
-# continue
-#
-# if qds is not None and fjob.qd not in qds:
-# continue
-#
-# yield suite, fjob
-
-
-AGG_TAG = 'ALL'
-
-
-def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
- tss = list(rstorage.iter_ts(suite, job, metric=metric))
- ds = DataSource(suite_id=suite.storage_id,
- job_id=job.storage_id,
- node_id=AGG_TAG,
- sensor='fio',
- dev=AGG_TAG,
- metric=metric,
- tag='csv')
-
- agg_ts = TimeSeries(metric,
- raw=None,
- source=ds,
- data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
- times=tss[0].times.copy(),
- units=tss[0].units)
-
- for ts in tss:
- if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
- logger.error("Sensor %s.%s on node %s has" +
- "shape=%s. Can only process sensors with shape=[X, %s].",
- ts.source.dev, ts.source.sensor, ts.source.node_id,
- ts.data.shape, expected_lat_bins)
- raise ValueError()
-
- if metric != 'lat' and len(ts.data.shape) != 1:
- logger.error("Sensor %s.%s on node %s has " +
- "shape=%s. Can only process 1D sensors.",
- ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
- raise ValueError()
-
- # TODO: match times on different ts
- agg_ts.data += ts.data
-
- return agg_ts
-
def is_sensor_numarray(sensor: str, metric: str) -> bool:
"""Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
@@ -246,86 +198,6 @@
"""Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
return not is_level_sensor(sensor, metric)
-
-def get_sensor_for_time_range(storage: IResultStorage,
- node_id: str,
- sensor: str,
- dev: str,
- metric: str,
- time_range: Tuple[int, int]) -> numpy.array:
- """Return sensor values for given node for given period. Return per second estimated values array
-
- Raise an error if required range is not full covered by data in storage.
- First it finds range of results from sensor, which fully covers requested range.
- ...."""
-
- ds = DataSource(node_id=node_id, sensor=sensor, dev=dev, metric=metric)
- sensor_data = storage.load_sensor(ds)
- assert sensor_data.time_units == 'us'
-
- # collected_at is array of pairs (collection_started_at, collection_finished_at)
- # extract start time from each pair
- collection_start_at = sensor_data.times[::2] # type: numpy.array
-
- MICRO = 1000000
-
- # convert seconds to us
- begin = time_range[0] * MICRO
- end = time_range[1] * MICRO
-
- if begin < collection_start_at[0] or end > collection_start_at[-1] or end <= begin:
- raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
- "sensor = {}_{}.{}.{}").format(time_range,
- sensor_data.times[0] // MICRO,
- sensor_data.times[-1] // MICRO,
- node_id, sensor, dev, metric))
-
- pos1, pos2 = numpy.searchsorted(collection_start_at, (begin, end))
-
- # current real data time chunk begin time
- edge_it = iter(collection_start_at[pos1 - 1: pos2 + 1])
-
- # current real data value
- val_it = iter(sensor_data.data[pos1 - 1: pos2 + 1])
-
- # result array, cumulative value per second
- result = numpy.zeros(int(end - begin) // MICRO)
- idx = 0
- curr_summ = 0
-
- # end of current time slot
- results_cell_ends = begin + MICRO
-
- # hack to unify looping
- real_data_end = next(edge_it)
- while results_cell_ends <= end:
- real_data_start = real_data_end
- real_data_end = next(edge_it)
- real_val_left = next(val_it)
-
- # real data "speed" for interval [real_data_start, real_data_end]
- real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
-
- while real_data_end >= results_cell_ends and results_cell_ends <= end:
- # part of current real value, which is fit into current result cell
- curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
-
- # calculate rest of real data for next result cell
- real_val_left -= curr_real_chunk
- result[idx] = curr_summ + curr_real_chunk
- idx += 1
- curr_summ = 0
-
- # adjust real data start time
- real_data_start = results_cell_ends
- results_cell_ends += MICRO
-
- # don't lost any real data
- curr_summ += real_val_left
-
- return result
-
-
# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
def get_emb_data_svg(plt: Any, format: str = 'svg') -> bytes:
@@ -411,38 +283,76 @@
@provide_plot
-def plot_v_over_time(title: str, units: str,
+def plot_simple_over_time(tss: List[Tuple[str, numpy.ndarray]],
+ title: str,
+ ylabel: str,
+ xlabel: str = "time, s",
+ average: bool = False,
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+ fig, ax = plt.subplots(figsize=(12, 6))
+ for name, arr in tss:
+ if average:
+ avg_vals = moving_average(arr, style.avg_range)
+ if style.approx_average:
+ time_points = numpy.arange(len(avg_vals))
+ avg_vals = approximate_curve(time_points, avg_vals, time_points, style.curve_approx_level)
+ arr = avg_vals
+ ax.plot(arr, label=name)
+ ax.set_title(title)
+ ax.set_ylabel(ylabel)
+ ax.set_xlabel(xlabel)
+ apply_style(style, eng=True)
+
+
+@provide_plot
+def plot_hmap_from_2d(data2d: numpy.ndarray,
+ title: str, ylabel: str, xlabel: str = 'time, s', bins: numpy.ndarray = None,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+ ioq1d, ranges = hmap_from_2d(data2d)
+ ax, _ = plot_hmap_with_y_histo(ioq1d, ranges, bins=bins)
+ ax.set_ylabel(ylabel)
+ ax.set_xlabel(xlabel)
+ ax.set_title(title)
+
+
+@provide_plot
+def plot_v_over_time(title: str,
+ units: str,
ts: TimeSeries,
plot_avg_dev: bool = True,
+ plot_points: bool = True,
colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
min_time = min(ts.times)
- # /1000 is us to ms conversion
- time_points = numpy.array([(val_time - min_time) / 1000 for val_time in ts.times])
+ # convert time to ms
+ coef = float(unit_conversion_coef(ts.time_units, 's'))
+ time_points = numpy.array([(val_time - min_time) * coef for val_time in ts.times])
outliers_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_q_nd)
outliers_4q_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_hide_q_nd)
normal_idxs = numpy.logical_not(outliers_idxs)
outliers_idxs = outliers_idxs & numpy.logical_not(outliers_4q_idxs)
- hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
+ # hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
data = ts.data[normal_idxs]
data_times = time_points[normal_idxs]
outliers = ts.data[outliers_idxs]
outliers_times = time_points[outliers_idxs]
- alpha = colors.noise_alpha if plot_avg_dev else 1.0
- plt.plot(data_times, data, style.point_shape,
- color=colors.primary_color, alpha=alpha, label="Data")
- plt.plot(outliers_times, outliers, style.err_point_shape,
- color=colors.err_color, label="Outliers")
+ if plot_points:
+ alpha = colors.noise_alpha if plot_avg_dev else 1.0
+ plt.plot(data_times, data, style.point_shape,
+ color=colors.primary_color, alpha=alpha, label="Data")
+ plt.plot(outliers_times, outliers, style.err_point_shape,
+ color=colors.err_color, label="Outliers")
has_negative_dev = False
plus_minus = "\xb1"
if plot_avg_dev and len(data) < style.avg_range * 2:
- logger.warning("Array %r to small to plot average over %s points", title, style.avg_range)
+ logger.warning("Array %r to small to plot average over %s points", title, style.avg_range)
elif plot_avg_dev:
avg_vals = moving_average(data, style.avg_range)
dev_vals = moving_dev(data, style.avg_range)
@@ -467,7 +377,12 @@
plt.xlim(-5, max(time_points) + 5)
plt.xlabel("Time, seconds from test begin")
- plt.ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, style.avg_range))
+
+ if plot_avg_dev:
+ plt.ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, style.avg_range))
+ else:
+ plt.ylabel(units)
+
plt.title(title)
if has_negative_dev:
@@ -477,7 +392,9 @@
@provide_plot
-def plot_lat_over_time(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+def plot_lat_over_time(title: str, ts: TimeSeries,
+ ylabel: str,
+ samples: int = 5,
colors: Any = ColorProfile,
style: Any = StyleProfile) -> None:
@@ -499,16 +416,16 @@
# cut outliers
idx1, idx2 = hist_outliers_perc(agg_hist, style.outliers_lat)
agg_hist = agg_hist[idx1:idx2]
- curr_bins_vals = bins_vals[idx1:idx2]
+ curr_bins_vals = ts.histo_bins[idx1:idx2]
correct_coef = style.violin_point_count / sum(agg_hist)
if correct_coef > 1:
correct_coef = 1
else:
- curr_bins_vals = bins_vals
+ curr_bins_vals = ts.histo_bins
correct_coef = 1
- vals = numpy.empty(shape=(numpy.sum(agg_hist),), dtype='float32')
+ vals = numpy.empty(shape=[numpy.sum(agg_hist)], dtype='float32')
cidx = 0
non_zero, = agg_hist.nonzero()
@@ -541,36 +458,64 @@
plt.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
plt.xlim(min(times), max(times))
+ plt.ylabel(ylabel)
plt.xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
- plt.ylabel("Latency, ms")
plt.title(title)
apply_style(style, eng=True, no_legend=True)
@provide_plot
-def plot_heatmap(title: str,
- ts: TimeSeries,
- bins_vals: List[int],
- colors: Any = ColorProfile,
- style: Any = StyleProfile) -> None:
+def plot_histo_heatmap(title: str,
+ ts: TimeSeries,
+ ylabel: str,
+ xlabel: str = "time, s",
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+ # only histogram-based ts can be plotted
assert len(ts.data.shape) == 2
- assert ts.data.shape[1] == len(bins_vals)
+ # Find global outliers. As load is expected to be stable during one job
+ # outliers range can be detected globally
total_hist = ts.data.sum(axis=0)
+ idx1, idx2 = hist_outliers_perc(total_hist,
+ bounds_perc=style.outliers_lat,
+ min_bins_left=style.hm_hist_bins_count)
- # idx1, idx2 = hist_outliers_perc(total_hist, style.outliers_lat)
- idx1, idx2 = ts_hist_outliers_perc(ts.data, bounds_perc=style.outliers_lat)
+ # merge outliers with most close non-outliers cell
+ orig_data = ts.data[:, idx1:idx2].copy()
+ if idx1 > 0:
+ orig_data[:, 0] += ts.data[:, :idx1].sum(axis=1)
- # don't cut too many bins
- min_bins_left = style.hm_hist_bins_count
- if idx2 - idx1 < min_bins_left:
- missed = min_bins_left - (idx2 - idx1) // 2
- idx2 = min(len(total_hist), idx2 + missed)
- idx1 = max(0, idx1 - missed)
+ if idx2 < ts.data.shape[1]:
+ orig_data[:, -1] += ts.data[:, idx2:].sum(axis=1)
- data = ts.data[:, idx1:idx2]
- bins_vals = bins_vals[idx1:idx2]
+ bins_vals = ts.histo_bins[idx1:idx2]
+
+ # rebin over X axis
+ # aggregate some lines in ts.data to plot not more than style.hm_x_slots x bins
+ agg_idx = float(len(orig_data)) / style.hm_x_slots
+ if agg_idx >= 2:
+ data = numpy.zeros([style.hm_x_slots, orig_data.shape[1]], dtype=numpy.float32) # type: List[numpy.ndarray]
+ next = agg_idx
+ count = 0
+ data_idx = 0
+ for idx, arr in enumerate(orig_data):
+ if idx >= next:
+ data[data_idx] /= count
+ data_idx += 1
+ next += agg_idx
+ count = 0
+ data[data_idx] += arr
+ count += 1
+
+ if count > 1:
+ data[-1] /= count
+ else:
+ data = orig_data
+
+ # rebin over Y axis
+ # =================
# don't using rebin_histogram here, as we need apply same bins for many arrays
step = (bins_vals[-1] - bins_vals[0]) / style.hm_hist_bins_count
@@ -586,35 +531,38 @@
cmap.append(curr_bins)
ncmap = numpy.array(cmap)
- xmin = 0
- xmax = (ts.times[-1] - ts.times[0]) / 1000 + 1
- ymin = new_bins_edges[0]
- ymax = new_bins_edges[-1]
+ # plot data
+ # =========
- fig, ax = plt.subplots(figsize=style.figsize)
+ fig = plt.figure(figsize=(12, 6))
+ boxes = 3
+ gs = gridspec.GridSpec(1, boxes)
+ ax = fig.add_subplot(gs[0, :boxes - 1])
- if style.heatmap_interpolation == '1d':
- interpolation = 'none'
- res = []
- for column in ncmap:
- new_x = numpy.linspace(0, len(column), style.heatmap_interpolation_points)
- old_x = numpy.arange(len(column)) + 0.5
- new_vals = numpy.interp(new_x, old_x, column)
- res.append(new_vals)
- ncmap = numpy.array(res)
- else:
- interpolation = style.heatmap_interpolation
+ labels = list(map(float2str, (new_bins_edges[:-1] + new_bins_edges[1:]) / 2)) + \
+ [float2str(new_bins_edges[-1]) + "+"]
+ seaborn.heatmap(ncmap[:,::-1].T, xticklabels=False, cmap="Blues", ax=ax)
+ ax.set_yticklabels(labels, rotation='horizontal')
+ ax.set_xticklabels([])
- ax.imshow(ncmap[:,::-1].T,
- interpolation=interpolation,
- extent=(xmin, xmax, ymin, ymax),
- cmap=colors.imshow_colormap)
+ # plot overall histogram
+ # =======================
- ax.set_aspect((xmax - xmin) / (ymax - ymin) * (6 / 9))
- ax.set_ylabel("Latency, ms")
- ax.set_xlabel("Test time, s")
+ ax2 = fig.add_subplot(gs[0, boxes - 1])
+ ax2.set_yticklabels([])
+ ax2.set_xticklabels([])
- plt.title(title)
+ histo = ncmap.sum(axis=0).reshape((-1,))
+ ax2.set_ylim(top=histo.size, bottom=0)
+ plt.barh(numpy.arange(histo.size) + 0.5, width=histo, axes=ax2)
+
+ # Set labels
+ # ==========
+
+ ax.set_title(title)
+ ax.set_ylabel(ylabel)
+ ax.set_xlabel(xlabel)
+
@provide_plot
@@ -873,28 +821,6 @@
"""Creates graphs, which show how IOPS and Latency depend on block size"""
-def summ_sensors(rstorage: ResultStorage,
- nodes: List[str],
- sensor: str,
- metric: str,
- time_range: Tuple[int, int]) -> Optional[numpy.array]:
-
- res = None # type: Optional[numpy.array]
- for node_id in nodes:
- for _, groups in rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
- data = get_sensor_for_time_range(rstorage,
- node_id=node_id,
- sensor=sensor,
- dev=groups['dev'],
- metric=metric,
- time_range=time_range)
- if res is None:
- res = data
- else:
- res += data
- return res
-
-
# IOPS/latency distribution
class StatInfo(JobReporter):
"""Statistic info for job results"""
@@ -957,49 +883,44 @@
["Data transfered", b2ssize(io_transfered) + "B", "-"]
]
-
storage = rstorage.storage
nodes = storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
- storage_nodes = [node.node_id for node in nodes if node.roles.intersection(STORAGE_ROLES)]
- test_nodes = [node.node_id for node in nodes if "testnode" in node.roles]
-
- trange = (job.reliable_info_range[0] / 1000, job.reliable_info_range[1] / 1000)
+ trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
ops_done = io_transfered / fjob.bsize / KB
all_metrics = [
- ("Test nodes net send", 'net-io', 'send_bytes', b2ssize, test_nodes, "B", io_transfered),
- ("Test nodes net recv", 'net-io', 'recv_bytes', b2ssize, test_nodes, "B", io_transfered),
+ ("Test nodes net send", 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
+ ("Test nodes net recv", 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
- ("Test nodes disk write", 'block-io', 'sectors_written', b2ssize, test_nodes, "B", io_transfered),
- ("Test nodes disk read", 'block-io', 'sectors_read', b2ssize, test_nodes, "B", io_transfered),
- ("Test nodes writes", 'block-io', 'writes_completed', b2ssize_10, test_nodes, "OP", ops_done),
- ("Test nodes reads", 'block-io', 'reads_completed', b2ssize_10, test_nodes, "OP", ops_done),
+ ("Test nodes disk write", 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
+ ("Test nodes disk read", 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
+ ("Test nodes writes", 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+ ("Test nodes reads", 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
- ("Storage nodes net send", 'net-io', 'send_bytes', b2ssize, storage_nodes, "B", io_transfered),
- ("Storage nodes net recv", 'net-io', 'recv_bytes', b2ssize, storage_nodes, "B", io_transfered),
+ ("Storage nodes net send", 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ ("Storage nodes net recv", 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
- ("Storage nodes disk write", 'block-io', 'sectors_written', b2ssize, storage_nodes, "B", io_transfered),
- ("Storage nodes disk read", 'block-io', 'sectors_read', b2ssize, storage_nodes, "B", io_transfered),
- ("Storage nodes writes", 'block-io', 'writes_completed', b2ssize_10, storage_nodes, "OP", ops_done),
- ("Storage nodes reads", 'block-io', 'reads_completed', b2ssize_10, storage_nodes, "OP", ops_done),
+ ("Storage nodes disk write", 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ ("Storage nodes disk read", 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ ("Storage nodes writes", 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ ("Storage nodes reads", 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
]
all_agg = {}
- for descr, sensor, metric, ffunc, nodes, units, denom in all_metrics:
+ for descr, sensor, metric, ffunc, roles, units, denom in all_metrics:
if not nodes:
continue
- res_arr = summ_sensors(rstorage, nodes=nodes, sensor=sensor, metric=metric, time_range=trange)
- if res_arr is None:
+ res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=trange)
+ if res_ts is None:
continue
- agg = res_arr.sum()
+ agg = res_ts.data.sum()
resource_data.append([descr, ffunc(agg) + units, "{:.1f}".format(agg / denom)])
all_agg[descr] = agg
-
cums = [
("Test nodes writes", "Test nodes reads", "Total test ops", b2ssize_10, "OP", ops_done),
("Storage nodes writes", "Storage nodes reads", "Total storage ops", b2ssize_10, "OP", ops_done),
@@ -1018,6 +939,120 @@
yield Menu1st.per_job, job.summary, HTMLBlock(res)
+# CPU load
+class CPULoadPlot(JobReporter):
+ def get_divs(self,
+ suite: SuiteConfig,
+ job: JobConfig,
+ rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
+ trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
+
+ # plot CPU time
+ for rt, roles in [('storage', STORAGE_ROLES), ('test', ['testnode'])]:
+ cpu_ts = {}
+ cpu_metrics = "idle guest iowait irq nice sirq steal sys user".split()
+ for name in cpu_metrics:
+ cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=trange)
+
+ it = iter(cpu_ts.values())
+ total_over_time = next(it).data.copy()
+ for ts in it:
+ total_over_time += ts.data
+
+ fname = plot_simple_over_time(rstorage, cpu_ts['idle'].source(metric='allcpu', tag=rt + '.plt.svg'),
+ tss=[(name, ts.data * 100 / total_over_time) for name, ts in cpu_ts.items()],
+ average=True,
+ ylabel="CPU time %",
+ title="{} nodes CPU usage".format(rt.capitalize()))
+
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+
+# IO time and QD
+class QDIOTimeHeatmap(JobReporter):
+ def get_divs(self,
+ suite: SuiteConfig,
+ job: JobConfig,
+ rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
+ # TODO: fix this hardcode, need to track what devices are actually used on test and storage nodes
+ # use saved storage info in nodes
+
+ journal_devs = None
+ storage_devs = None
+ test_nodes_devs = ['rbd0']
+
+ for node in find_nodes_by_roles(rstorage, STORAGE_ROLES):
+ cjd = set(node.params['ceph_journal_devs'])
+ if journal_devs is None:
+ journal_devs = cjd
+ else:
+ assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
+
+ csd = set(node.params['ceph_storage_devs'])
+ if storage_devs is None:
+ storage_devs = csd
+ else:
+ assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
+
+ storage_nodes_devs = list(journal_devs) + list(storage_devs)
+ trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
+
+ for name, devs, roles in [('storage', storage_devs, STORAGE_ROLES),
+ ('journal', journal_devs, STORAGE_ROLES),
+ ('test', test_nodes_devs, ['testnode'])]:
+ # QD heatmap
+ ioq2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ metric='io_queue', time_range=trange)
+ fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
+ job.storage_id,
+ AGG_TAG,
+ 'block-io',
+ name,
+ metric='io_queue',
+ tag="hmap.svg"),
+ ioq2d, ylabel="IO QD", title=name.capitalize() + " devs QD",
+ bins=StyleProfile.qd_bins,
+ xlabel='Time') # type: str
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+ # Block size heatmap
+ wc2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ metric='writes_completed', time_range=trange)
+ wc2d[wc2d < 1E-3] = 1
+ sw2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ metric='sectors_written', time_range=trange)
+ data2d = sw2d / wc2d / 1024
+ fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
+ job.storage_id,
+ AGG_TAG,
+ 'block-io',
+ name,
+ metric='wr_block_size',
+ tag="hmap.svg"),
+ data2d, ylabel="IO bsize, KiB", title=name.capitalize() + " write block size",
+ xlabel='Time',
+ bins=StyleProfile.block_size_bins) # type: str
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+ # iotime heatmap
+ wtime2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ metric='io_time', time_range=trange)
+ fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
+ job.storage_id,
+ AGG_TAG,
+ 'block-io',
+ name,
+ metric='io_time',
+ tag="hmap.svg"),
+ wtime2d, ylabel="IO time (ms) per second",
+ title=name.capitalize() + " iotime",
+ xlabel='Time',
+ bins=StyleProfile.iotime_bins) # type: str
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+
# IOPS/latency distribution
class IOHist(JobReporter):
"""IOPS.latency distribution histogram"""
@@ -1032,20 +1067,17 @@
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load histograms")))
- agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
- bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000 # convert us to ms
- lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, rebins_count=StyleProfile.hist_lat_boxes)
-
- # import IPython
- # IPython.embed()
-
- long_summary = cast(FioJobParams, fjob.params).long_summary
-
- title = "Latency distribution"
- units = "ms"
-
- fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop) # type: str
- yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+ # agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ # # bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000 # convert us to ms
+ # lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges=None, rebins_count=StyleProfile.hist_lat_boxes)
+ #
+ # long_summary = cast(FioJobParams, fjob.params).long_summary
+ #
+ # title = "Latency distribution"
+ # units = "ms"
+ #
+ # fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop) # type: str
+ # yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
agg_io = get_aggregated(rstorage, suite, fjob, "bw")
@@ -1075,30 +1107,35 @@
fjob = cast(FioJobConfig, job)
- yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load over time")))
-
agg_io = get_aggregated(rstorage, suite, fjob, "bw")
if fjob.bsize >= LARGE_BLOCKS:
- title = "Bandwidth"
+ title = "Fio measured Bandwidth over time"
units = "MiBps"
agg_io.data //= MiB2KiB
else:
- title = "IOPS"
+ title = "Fio measured IOPS over time"
agg_io.data //= fjob.bsize
units = "IOPS"
fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io) # type: str
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
- agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
- bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000
- title = "Latency"
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat").copy()
+ TARGET_UNITS = 'ms'
+ coef = unit_conversion_coef(agg_lat.units, TARGET_UNITS)
+ agg_lat.histo_bins = agg_lat.histo_bins.copy() * float(coef)
+ agg_lat.units = TARGET_UNITS
- fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), title, agg_lat, bins_edges) # type: str
+ fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), "Latency",
+ agg_lat, ylabel="Latency, " + agg_lat.units) # type: str
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
- title = "Latency heatmap"
- fpath = plot_heatmap(rstorage, agg_lat.source(tag='hmap.png'), title, agg_lat, bins_edges) # type: str
+ fpath = plot_histo_heatmap(rstorage,
+ agg_lat.source(tag='hmap.svg'),
+ "Latency heatmap",
+ agg_lat,
+ ylabel="Latency, " + agg_lat.units,
+ xlabel='Test time') # type: str
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
@@ -1128,40 +1165,21 @@
storage_sensors = [
('block-io', 'reads_completed', "Read ops", 'iops'),
('block-io', 'writes_completed', "Write ops", 'iops'),
- ('block-io', 'sectors_read', "Read kb", 'kb'),
- ('block-io', 'sectors_written', "Write kb", 'kb'),
+ ('block-io', 'sectors_read', "Read kb", 'KB'),
+ ('block-io', 'sectors_written', "Write kb", 'KB'),
]
def get_divs(self,
suite: SuiteConfig,
job: JobConfig,
rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- # split nodes on test and other
- storage = rstorage.storage
- nodes = storage.load_list(NodeInfo, "all_nodes") # type: List[NodeInfo]
-
yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
- test_nodes = {node.node_id for node in nodes if 'testnode' in node.roles}
- cluster_nodes = {node.node_id for node in nodes if 'testnode' not in node.roles}
# convert ms to s
time_range = (job.reliable_info_range[0] // MS2S, job.reliable_info_range[1] // MS2S)
- len = time_range[1] - time_range[0]
- for sensor, metric, sensor_title, units in self.storage_sensors:
- sum_testnode = numpy.zeros((len,))
- sum_other = numpy.zeros((len,))
- for path, groups in rstorage.iter_sensors(sensor=sensor, metric=metric):
- # todo: should return sensor units
- data = get_sensor_for_time_range(rstorage,
- groups['node_id'],
- sensor,
- groups['dev'],
- metric, time_range)
- if groups['node_id'] in test_nodes:
- sum_testnode += data
- else:
- sum_other += data
+ for sensor, metric, sensor_title, units in self.storage_sensors:
+ ts = summ_sensors(rstorage, ['testnode'], sensor, metric, time_range)
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
node_id="test_nodes",
@@ -1170,14 +1188,16 @@
metric=metric,
tag="ts.svg")
- # s to ms
+ data = ts.data if units != 'KB' else ts.data * float(unit_conversion_coef(ts.units, 'KB'))
+
ts = TimeSeries(name="",
- times=numpy.arange(*time_range) * MS2S,
- data=sum_testnode,
+ times=numpy.arange(*time_range),
+ data=data,
raw=None,
- units=units,
- time_units="us",
- source=ds)
+ units=units if ts.units is None else ts.units,
+ time_units=ts.time_units,
+ source=ds,
+ histo_bins=ts.histo_bins)
fpath = plot_v_over_time(rstorage, ds, sensor_title, sensor_title, ts=ts) # type: str
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
@@ -1211,11 +1231,12 @@
def run(self, ctx: TestRun) -> None:
rstorage = ResultStorage(ctx.storage)
- job_reporters = [StatInfo(), IOTime(), IOHist(), ClusterLoad()] # type: List[JobReporter]
- reporters = [IO_QD()] # type: List[Reporter]
+ job_reporters = [StatInfo(), IOTime(), IOHist(), ClusterLoad(), CPULoadPlot(),
+ QDIOTimeHeatmap()] # type: List[JobReporter]
+ reporters = []
+ # reporters = [IO_QD()] # type: List[Reporter]
# job_reporters = [ClusterLoad()]
- # reporters = []
root_dir = os.path.dirname(os.path.dirname(wally.__file__))
doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -1232,25 +1253,30 @@
# StyleProfile.__dict__.update(ctx.config.reporting.style.raw())
items = defaultdict(lambda: defaultdict(list)) # type: Dict[str, Dict[str, List[HTMLBlock]]]
-
+ DEBUG = False
# TODO: filter reporters
for suite in rstorage.iter_suite(FioTest.name):
all_jobs = list(rstorage.iter_job(suite))
all_jobs.sort(key=lambda job: job.params)
for job in all_jobs:
for reporter in job_reporters:
+ logger.debug("Start reporter %s on job %s suite %s",
+ reporter.__class__.__name__, job.summary, suite.test_type)
for block, item, html in reporter.get_divs(suite, job, rstorage):
items[block][item].append(html)
if DEBUG:
break
for reporter in reporters:
+ logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
for block, item, html in reporter.get_divs(suite, rstorage):
items[block][item].append(html)
if DEBUG:
break
+ logger.debug("Generating result html")
+
for idx_1st, menu_1st in enumerate(sorted(items, key=lambda x: menu_1st_order.index(x))):
menu_block.append(
'<a href="#item{}" class="nav-group" data-toggle="collapse" data-parent="#MainMenu">{}</a>'
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 43ae721..1fbd094 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,5 +1,6 @@
import abc
-from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator
+import copy
+from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator, NamedTuple
import numpy
@@ -106,12 +107,13 @@
def __init__(self,
name: str,
raw: Optional[bytes],
- data: numpy.array,
- times: numpy.array,
+ data: numpy.ndarray,
+ times: numpy.ndarray,
units: str,
source: DataSource,
time_units: str = 'us',
- raw_tag: str = 'txt') -> None:
+ raw_tag: str = 'txt',
+ histo_bins: numpy.ndarray = None) -> None:
# Sensor name. Typically DEV_NAME.METRIC
self.name = name
@@ -130,6 +132,7 @@
self.raw = raw
self.raw_tag = raw_tag
self.source = source
+ self.histo_bins = histo_bins
def __str__(self) -> str:
res = "TS({}):\n".format(self.name)
@@ -141,6 +144,9 @@
def __repr__(self) -> str:
return str(self)
+ def copy(self) -> 'TimeSeries':
+ return copy.copy(self)
+
# (node_name, source_dev, metric_name) => metric_results
JobMetrics = Dict[Tuple[str, str, str], TimeSeries]
@@ -259,6 +265,12 @@
self.processed = None # type: JobStatMetrics
+ArrayData = NamedTuple("ArrayData",
+ [('header', List[str]),
+ ('histo_bins', Optional[numpy.ndarray]),
+ ('data', Optional[numpy.ndarray])])
+
+
class IResultStorage(metaclass=abc.ABCMeta):
@abc.abstractmethod
@@ -266,10 +278,18 @@
pass
@abc.abstractmethod
+ def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
+ pass
+
+ @abc.abstractmethod
def load_sensor(self, ds: DataSource) -> TimeSeries:
pass
@abc.abstractmethod
+ def iter_sensors(self, ds: DataSource) -> Iterator[TimeSeries]:
+ pass
+
+ @abc.abstractmethod
def put_or_check_suite(self, suite: SuiteConfig) -> None:
pass
diff --git a/wally/run_test.py b/wally/run_test.py
index d5f7d19..f8dc036 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -146,7 +146,7 @@
class SaveNodesStage(Stage):
"""Save nodes list to file"""
- priority = StepOrder.CONNECT
+ priority = StepOrder.UPDATE_NODES_INFO + 1
def run(self, ctx: TestRun) -> None:
ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
diff --git a/wally/sensors.py b/wally/sensors.py
index c19c350..1faa03b 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -4,14 +4,14 @@
import numpy
+from cephlib import sensors_rpc_plugin
+
from . import utils
from .test_run_class import TestRun
from .result_classes import DataSource
from .stage import Stage, StepOrder
from .hlstorage import ResultStorage
-from cephlib import sensors_rpc_plugin
-
plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
SENSORS_PLUGIN_CODE = open(plugin_fname, "rb").read() # type: bytes
@@ -21,16 +21,24 @@
sensor_units = {
- "system-cpu.idle_time": "ms",
- "system-cpu.nice_processes": "",
+ "system-cpu.idle": "",
+ "system-cpu.nice": "",
+ "system-cpu.user": "",
+ "system-cpu.sys": "",
+ "system-cpu.iowait": "",
+ "system-cpu.irq": "",
+ "system-cpu.sirq": "",
+ "system-cpu.steal": "",
+ "system-cpu.guest": "",
+
"system-cpu.procs_blocked": "",
"system-cpu.procs_queue_x10": "",
- "system-cpu.system_processes": "",
- "system-cpu.user_processes": "",
+
"net-io.recv_bytes": "B",
"net-io.recv_packets": "",
"net-io.send_bytes": "B",
"net-io.send_packets": "",
+
"block-io.io_queue": "",
"block-io.io_time": "ms",
"block-io.reads_completed": "",
@@ -69,9 +77,9 @@
for name, val in ctx.config.sensors.roles_mapping.raw().items():
if isinstance(val, str):
- val = {vl.strip(): ".*" for vl in val.split(",")}
+ val = {vl.strip(): (".*" if vl.strip() != 'ceph' else {}) for vl in val.split(",")}
elif isinstance(val, list):
- val = {vl: ".*" for vl in val}
+ val = {vl: (".*" if vl != 'ceph' else {}) for vl in val}
per_role_config[name] = val
if 'all' in per_role_config:
@@ -96,7 +104,7 @@
# ceph requires additional settings
if 'ceph' in node_cfg:
node_cfg['ceph'].update(node.info.params['ceph'])
- node_cfg['ceph']['osds'] = [osd.id for osd in node.info.params['ceph-osds']] # type: ignore
+ node_cfg['ceph']['osds'] = [osd['id'] for osd in node.info.params['ceph-osds']] # type: ignore
logger.debug("Setting up sensors RPC plugin for node %s", nid)
node.upload_plugin("sensors", SENSORS_PLUGIN_CODE)
@@ -128,6 +136,7 @@
logger.warning("Raw sensors data at path %r and, maybe, others are skipped", path)
raw_skipped = True
continue
+
if path == 'collected_at':
ds = DataSource(node_id=node_id, metric='collected_at')
units = 'us'
@@ -135,6 +144,7 @@
sensor, dev, metric = path.split(".")
ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor)
units = sensor_units["{}.{}".format(sensor, metric)]
+
rstorage.append_sensor(numpy.array(value), ds, units)
diff --git a/wally/stage.py b/wally/stage.py
index 95542f3..14e80ae 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -9,10 +9,11 @@
DISCOVER = 0
SPAWN = 10
CONNECT = 20
- START_SENSORS = 30
- TEST = 40
- COLLECT_SENSORS = 50
- REPORT = 60
+ UPDATE_NODES_INFO = 30
+ START_SENSORS = 40
+ TEST = 50
+ COLLECT_SENSORS = 60
+ REPORT = 70
class Stage(metaclass=abc.ABCMeta):
diff --git a/wally/statistic.py b/wally/statistic.py
index a256587..8543e0f 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -123,10 +123,12 @@
def calc_histo_stat_props(ts: TimeSeries,
- bins_edges: numpy.array,
+ bins_edges: numpy.array = None,
rebins_count: int = None,
tail: float = 0.005) -> HistoStatProps:
- log_bins = False
+ if bins_edges is None:
+ bins_edges = ts.histo_bins
+
res = HistoStatProps(ts.data)
# summ across all series
@@ -286,27 +288,37 @@
def hist_outliers_perc(bin_populations: numpy.array,
- bounds_perc: Tuple[float, float] = (0.01, 0.99)) -> Tuple[int, int]:
+ bounds_perc: Tuple[float, float] = (0.01, 0.99),
+ min_bins_left: int = None) -> Tuple[int, int]:
assert len(bin_populations.shape) == 1
total_count = bin_populations.sum()
lower_perc = total_count * bounds_perc[0]
upper_perc = total_count * bounds_perc[1]
- return numpy.searchsorted(numpy.cumsum(bin_populations), [lower_perc, upper_perc])
+ idx1, idx2 = numpy.searchsorted(numpy.cumsum(bin_populations), [lower_perc, upper_perc])
+
+ # don't cut too many bins. At least min_bins_left must left
+ if min_bins_left is not None and idx2 - idx1 < min_bins_left:
+ missed = min_bins_left - (idx2 - idx1) // 2
+ idx2 = min(len(bin_populations), idx2 + missed)
+ idx1 = max(0, idx1 - missed)
+
+ return idx1, idx2
def ts_hist_outliers_perc(bin_populations: numpy.array,
window_size: int = 10,
- bounds_perc: Tuple[float, float] = (0.01, 0.99)) -> Tuple[int, int]:
+ bounds_perc: Tuple[float, float] = (0.01, 0.99),
+ min_bins_left: int = None) -> Tuple[int, int]:
assert len(bin_populations.shape) == 2
points = list(range(0, len(bin_populations), window_size))
if len(bin_populations) % window_size != 0:
points.append(points[-1] + window_size)
- ranges = []
+ ranges = [] # type: List[List[int]]
for begin, end in zip(points[:-1], points[1:]):
window_hist = bin_populations[begin:end].sum(axis=0)
- ranges.append(hist_outliers_perc(window_hist, bounds_perc=bounds_perc))
+ ranges.append(hist_outliers_perc(window_hist, bounds_perc=bounds_perc, min_bins_left=min_bins_left))
return min(i[0] for i in ranges), max(i[1] for i in ranges)
diff --git a/wally/storage.py b/wally/storage.py
index 0459305..aa90ac9 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -235,6 +235,7 @@
def __init__(self, sstorage: ISimpleStorage, serializer: ISerializer) -> None:
self.sstorage = sstorage
self.serializer = serializer
+ self.cache = {}
def sub_storage(self, *path: str) -> 'Storage':
fpath = "/".join(path)
@@ -293,13 +294,17 @@
def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
path_s = "/".join(path)
- raw_val = cast(List[Dict[str, Any]], self.get(path_s))
- assert isinstance(raw_val, list)
- return [cast(ObjClass, obj_class.fromraw(val)) for val in raw_val]
+ if path_s not in self.cache:
+ raw_val = cast(List[Dict[str, Any]], self.get(path_s))
+ assert isinstance(raw_val, list)
+ self.cache[path_s] = [cast(ObjClass, obj_class.fromraw(val)) for val in raw_val]
+ return self.cache[path_s]
def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
path_s = "/".join(path)
- return cast(ObjClass, obj_class.fromraw(self.get(path_s)))
+ if path_s not in self.cache:
+ self.cache[path_s] = cast(ObjClass, obj_class.fromraw(self.get(path_s)))
+ return self.cache[path_s]
def sync(self) -> None:
self.sstorage.sync()
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index a44c749..22a4937 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,12 +1,13 @@
[global]
include defaults_qd.cfg
-QD_R={% 1, 5, 10, 15, 25, 40, 80, 120 %}
-QD_W={% 1, 5, 10, 15, 25, 40 %}
-QD_SEQ_R={% 1, 3, 10 %}
-QD_SEQ_W={% 1, 2, 4 %}
+# QD_R={% 1, 5, 10, 15, 25, 40, 80, 120 %}
+QD_R={% 1, 5, 10, 20, 40, 60, 100, 150, 200, 500 %}
+QD_W={% 1, 5, 10, 20, 40, 60, 100, 150, 200 %}
+QD_SEQ_R=1
+QD_SEQ_W=1
-ramp_time=30
+ramp_time=15
runtime=180
# ---------------------------------------------------------------------
@@ -30,12 +31,12 @@
# ---------------------------------------------------------------------
# sync write
# ---------------------------------------------------------------------
-[ceph_{TEST_SUMM}]
-blocksize=4k
-rw=randwrite
-direct=1
-sync=1
-numjobs=1
+#[ceph_{TEST_SUMM}]
+#blocksize=4k
+#rw=randwrite
+#direct=1
+#sync=1
+#numjobs=1
# ---------------------------------------------------------------------
# this is essentially sequential write operations
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 16da091..1895b6c 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -13,7 +13,7 @@
from ..job import JobConfig
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
from . import rpc_plugin
-from .fio_hist import expected_lat_bins
+from .fio_hist import get_lat_vals, expected_lat_bins
logger = logging.getLogger("wally")
@@ -223,9 +223,10 @@
vals = [int(i.strip()) for i in rest]
if len(vals) != expected_lat_bins:
- logger.error("Expect {} bins in latency histogram, but found {} at time {}"
- .format(expected_lat_bins, len(vals), time_ms_s))
- raise StopTestError()
+ msg = "Expect {} bins in latency histogram, but found {} at time {}" \
+ .format(expected_lat_bins, len(vals), time_ms_s)
+ logger.error(msg)
+ raise StopTestError(msg)
parsed.append(vals)
else:
@@ -237,13 +238,16 @@
if not self.suite.keep_raw_files:
raw_result = None
+ histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
+
result.append(TimeSeries(name=name,
raw=raw_result,
data=numpy.array(parsed, dtype='uint64'),
units=units,
times=numpy.array(times, dtype='uint64'),
time_units='ms',
- source=path(metric=name, tag='csv')))
+ source=path(metric=name, tag='csv'),
+ histo_bins=histo_bins))
return result
def format_for_console(self, data: Any) -> str:
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 58a94b0..0d85f94 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,8 +1,9 @@
[global]
include defaults_qd.cfg
-QD={% 1, 2, 4 %}
-runtime=30
+QD={% 32, 64, 128, 256 %}
+runtime=600
direct=1
+ramp_time=30
# ---------------------------------------------------------------------
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index fea846d..c4b5bc5 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -41,12 +41,13 @@
def get_pool(self):
return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
- def merge_node(self, creds: ConnCreds, roles: Set[str]) -> NodeInfo:
- info = NodeInfo(creds, roles)
+ def merge_node(self, creds: ConnCreds, roles: Set[str], **params) -> NodeInfo:
+ info = NodeInfo(creds, roles, params)
nid = info.node_id
if nid in self.nodes_info:
self.nodes_info[nid].roles.update(info.roles)
+ self.nodes_info[nid].params.update(info.params)
return self.nodes_info[nid]
else:
self.nodes_info[nid] = info
diff --git a/wally/texttable.py b/wally/texttable.py
index 1917663..504b04d 100644
--- a/wally/texttable.py
+++ b/wally/texttable.py
@@ -306,6 +306,7 @@
cells = []
for i, x in enumerate(array):
cells.append(self._str(i, x))
+
self._rows.append(cells)
def add_rows(self, rows, header=True):
@@ -366,6 +367,9 @@
i - index of the cell datatype in self._dtype
x - cell data to format
"""
+ if isinstance(x, str):
+ return x
+
try:
f = float(x)
except:
diff --git a/wally/utils.py b/wally/utils.py
index 5904aa7..151ed5f 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -462,7 +462,7 @@
if len(units) > 1 and units[0] in _coefs:
return _coefs[units[0]], units[1:]
else:
- return Fraction(1), units
+ return 1, units
def unit_conversion_coef(from_unit: str, to_unit: str) -> Union[Fraction, int]:
@@ -471,10 +471,18 @@
assert u1 == u2, "Can't convert {!r} to {!r}".format(from_unit, to_unit)
- if isinstance(f1, int) and isinstance(f2, int) and f1 % f2 != 0:
- return Fraction(f1, f2)
+ if isinstance(f1, int) and isinstance(f2, int):
+ if f1 % f2 != 0:
+ return Fraction(f1, f2)
+ else:
+ return f1 // f2
- return f1 // f2
+ res = f1 / f2
+
+ if isinstance(res, Fraction) and cast(Fraction, res).denominator == 1:
+ return cast(Fraction, res).numerator
+
+ return res
def shape2str(shape: Iterable[int]) -> str: