fixes, fixes
diff --git a/clib/interpolate.cpp b/clib/interpolate.cpp
index a388ec1..c7f2b50 100644
--- a/clib/interpolate.cpp
+++ b/clib/interpolate.cpp
@@ -1,24 +1,33 @@
#include <algorithm>
#include <cstdint>
-#include <cstdio>
+//#include <cstdio>
-extern "C" void interpolate_ts_on_seconds_border(unsigned int input_size,
- unsigned int output_size,
- const uint64_t * times,
- const uint64_t * values,
- unsigned int time_step,
- uint64_t * output)
+uint64_t round(double x) {
+ return (uint64_t)(x + 0.5);
+}
+
+
+extern "C"
+unsigned int interpolate_ts_on_seconds_border(unsigned int input_size,
+ unsigned int output_size,
+ const uint64_t * times,
+ const uint64_t * values,
+ unsigned int time_step,
+ uint64_t * output)
{
- auto output_end = (*times / time_step) * time_step;
- auto output_begin = output_end - time_step;
+ auto output_begin = *times - time_step;
+ auto output_end = *times;
+
+ auto input_begin = *times - time_step;
+ auto input_end = *times;
+
auto output_cell = output;
auto input_cell = values;
- auto input_time = times;
auto input_val = *input_cell;
- auto input_begin = *input_time - time_step;
- auto input_end = *input_time;
+
+ auto input_time = times;
auto rate = ((double)*input_cell) / (input_end - input_begin);
// output array mush fully cover input array
@@ -28,7 +37,7 @@
// add intersection slice to output array
if(intersection > 0) {
- auto slice = (uint64_t)(intersection * rate);
+ auto slice = std::min(input_val, round(intersection * rate));
*output_cell += slice;
input_val -= slice;
}
@@ -41,7 +50,13 @@
++input_time;
if(input_time == times + input_size)
- return;
+ return output_cell - output + 1;
+
+ if (output_end == input_end) {
+ ++output_cell;
+ output_begin = output_end;
+ output_end += time_step;
+ }
input_val = *input_cell;
input_begin = input_end;
@@ -53,6 +68,7 @@
output_end += time_step;
}
}
+ return output_size;
}
@@ -78,3 +94,44 @@
return output_size;
}
+
+
+extern "C" int interpolate_ts_on_seconds_border_fio(unsigned int input_size,
+ unsigned int output_size,
+ const uint64_t * times,
+ unsigned int time_step,
+ uint64_t * output_idx,
+ uint64_t empty_cell_placeholder)
+{
+ auto input_end = times + input_size;
+ auto output_end = output_idx + output_size;
+ float no_step = time_step * 0.1;
+ float more_then_step = time_step * 1.9;
+ float step_min = time_step * 0.9;
+ float step_max = time_step * 1.1;
+
+ auto cinput = times;
+ auto ctime_val = *cinput - time_step;
+
+ for(; output_idx < output_end; ++output_idx) {
+
+ // skip repetition of same time
+ while(*cinput - ctime_val <= no_step and cinput < input_end)
+ ++cinput;
+
+ if (cinput == input_end)
+ break;
+
+ auto dt = *cinput - ctime_val;
+ if (dt <= step_max and dt > step_min) {
+ *output_idx = cinput - times;
+ ctime_val += time_step;
+ } else if (dt >= more_then_step) {
+ *output_idx = empty_cell_placeholder;
+ ctime_val += time_step;
+ } else
+ return -(int)(cinput - times);
+ }
+
+ return output_size - (output_end - output_idx);
+}
\ No newline at end of file
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index 01551ee..843a672 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -37,6 +37,7 @@
include: logging.yaml
default_test_local_folder: "/tmp/wally_{name}_{uuid}"
keep_raw_files: true
+download_rpc_logs: true
vm_configs:
keypair_file_private: wally_vm_key_perf3.pem
diff --git a/configs-examples/perf_lab.yml b/configs-examples/perf_lab.yml
index ddb05be..eae4c5b 100644
--- a/configs-examples/perf_lab.yml
+++ b/configs-examples/perf_lab.yml
@@ -8,12 +8,14 @@
nodes:
root@cz7625: testnode
+ root@cz7626: testnode
+ root@cz7627: testnode
-sleep: 5
+# sleep: 5
tests:
- fio:
load: verify
params:
FILENAME: /dev/rbd0
- FILESIZE: 100G
+ FILESIZE: 700G
diff --git a/requirements.txt b/requirements.txt
index 2162299..38c8b79 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,4 +23,4 @@
psutil
seaborn
pytest
-
+statsmodels
diff --git a/tests/test_math.py b/tests/test_math.py
index 9298792..9cebffd 100644
--- a/tests/test_math.py
+++ b/tests/test_math.py
@@ -1,4 +1,7 @@
import numpy
+import pytest
+
+
from wally.statistic import rebin_histogram
from wally.result_classes import DataSource, TimeSeries
from wally.data_selectors import c_interpolate_ts_on_seconds_border
@@ -105,6 +108,26 @@
assert ts2_min <= ts_sum <= ts2_max, "NOT {} <= {} <= {}".format(ts2_min, ts_sum, ts2_max)
+def test_interpolate2():
+ ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+ samples = 5
+ ms_coef = 1000
+
+ source_times = numpy.arange(samples, dtype='uint64') * ms_coef + ms_coef + 347
+ source_values = numpy.random.randint(10, 1000, size=samples, dtype='uint64')
+
+ ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
+ source=ds, time_units=TIME_UNITS)
+
+ ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True)
+
+ assert ts.time_units == 'ms'
+ assert ts2.time_units == 's'
+ assert ts2.times.dtype == ts.times.dtype
+ assert ts2.data.dtype == ts.data.dtype
+ assert (ts2.data == ts.data).all()
+
+
def test_interpolate_qd():
ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
samples = 200
@@ -120,14 +143,13 @@
ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
source=ds, time_units=TIME_UNITS)
- ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True, qd=True)
+ ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True, tp='qd')
assert ts.time_units == 'ms'
assert ts2.time_units == 's'
assert ts2.times.dtype == ts.times.dtype
assert ts2.data.dtype == ts.data.dtype
assert ts2.data.size == ts2.times.size
- assert abs(ts2.data.size - ts.data.size) <= 1
coef = unit_conversion_coef(ts2.time_units, ts.time_units)
assert isinstance(coef, int)
@@ -136,3 +158,52 @@
idxs = numpy.searchsorted(ts.times, ts2.times * coef - dtime)
assert (ts2.data == ts.data[idxs]).all()
+
+
+def test_interpolate_fio():
+ ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+ ms_coef = 1000
+ s_offset = 377 * ms_coef
+ gap_start = 5
+ gap_size = 5
+ full_size = 15
+
+ times = list(range(gap_start)) + list(range(gap_start + gap_size, full_size))
+ src_times = numpy.array(times, dtype='uint64') * ms_coef + s_offset
+ src_values = numpy.random.randint(10, 100, size=len(src_times), dtype='uint64')
+
+ ts = TimeSeries("test", raw=None, data=src_values, times=src_times, units=DATA_UNITS,
+ source=ds, time_units=TIME_UNITS)
+
+ ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True, tp='fio')
+
+ assert ts.time_units == 'ms'
+ assert ts2.time_units == 's'
+ assert ts2.times.dtype == ts.times.dtype
+ assert ts2.data.dtype == ts.data.dtype
+ assert ts2.times[0] == ts.times[0] // ms_coef
+ assert ts2.times[-1] == ts.times[-1] // ms_coef
+ assert ts2.data.size == ts2.times.size
+
+ expected_times = numpy.arange(ts.times[0] // ms_coef, ts.times[-1] // ms_coef + 1, dtype='uint64')
+ assert ts2.times.size == expected_times.size
+ assert (ts2.times == expected_times).all()
+
+ assert (ts2.data[:gap_start] == ts.data[:gap_start]).all()
+ assert (ts2.data[gap_start:gap_start + gap_size] == 0).all()
+ assert (ts2.data[gap_start + gap_size:] == ts.data[gap_start:]).all()
+
+
+def test_interpolate_fio_negative():
+ ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+ ms_coef = 1000
+ s_offset = 377 * ms_coef
+
+ src_times = (numpy.array([1, 2, 3, 4.5, 5, 6, 7]) * ms_coef + s_offset).astype('uint64')
+ src_values = numpy.random.randint(10, 100, size=len(src_times), dtype='uint64')
+
+ ts = TimeSeries("test", raw=None, data=src_values, times=src_times, units=DATA_UNITS,
+ source=ds, time_units=TIME_UNITS)
+
+ with pytest.raises(ValueError):
+ c_interpolate_ts_on_seconds_border(ts, nc=True, tp='fio')
diff --git a/v2_plans.md b/v2_plans.md
index 2e69de3..a2d5244 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,14 +1,41 @@
TODO today:
-----------
-* Добавить логирование скачивания сенсоров с размером скачанных данных
-* Почистить логи агента
-* Проверить/унифицировать генерацию картинок
-* Добавить тест дикки-фуллера
-* Генерировать суммарный отчет
-* Почистить таблицу потребления ресурсов, добавить в нее тест дикки-фуллера
-* Построить чарт потребления ресурсов
-* Изменить интерполяцию для iotime, что бы не скрывать 100% для журнала
+* Хранить OSD config отдельно в json для ускорения дампа/загрузки
+* Поправить границы heatmap - для QD они должны проходить по нечетным
+ числам. Верхняя должна включать часть предыдущего интервала, etc
+* Маркать девайсы на нодах по ролям при диагностике нод
+* Собирать idle load. Отдельная стадия типа sleep после установки
+ сенсоров, которая сохраняет время начала и конца слипа
+* Проверить и унифицировать все кеши. Отдельно поиск TS, который всегда
+ выдает Union[DataSource, Iterable[DataSource]]. Отдельно одна
+ унифицированная функция загрузки/постобработки, в которой все базовые
+ кеши. Дополнительно слой постобработки(агрегация по ролям, девайсам,
+ CPU) со своими кешами. Хранить original в TS.
+* Попробовать перевести 16m randwr в линейную операцию с
+ предрасчитанными смещениями? offset, offset_increment
+* Текстовый репорт поломан
+* Проверять опции дебага на сефе/уменьшать их для теста
+* Подписи к IOPS vs QD не влезают, если данных много, нужно наклонить
+* Построить resource consumption vs. QD
+* Не показывать девиацию для слишком маленьких значений
+* Добавить io_wait на графики CPU
+* Отсортировать репорты по типу и длинне очереди
+* Репорт криво генерируется
+* Что делать с дырой в данных от сенсоров при перезапуске теста
+* Убивать фоновые процессы агента при стопе работы. Сделать доп фу-цию,
+ в агенте, которая находит и убивает все дочерние процессы при запросе
+ стопа
+* починить текстовый репорт
+* bottleneck table
+* Рассмотреть pandas.dataframe как универсальный посредник для
+ ф-ций визуализации
+* Форматировать тики на графиках с помошью b2ssize/b2ssize_10
+* Сравнивать время на нодах и выдавать ошибку при его рассогласованности
+* scipy.stats.probplot - QQ plot
+* grid явно выставлять для выбранных графиков
+* Посмотреть почему тест дикки-фуллера так фигово работает
+* Генерировать суммарный отчет -
Wally состоит из частей, которые стоит разделить и унифицировать с другими тулами:
----------------------------------------------------------------------------------
@@ -57,7 +84,6 @@
TODO next
---------
-* Тест дикки-фуллера для результатов
* Merge FSStorage and serializer into ObjStorage, separate TSStorage.
* Build WallyStorage on top of it, use only WallyStorage in code
* check that OS key match what is stored on disk
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 66a6ee5..02d5075 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -1,7 +1,7 @@
import ctypes
import logging
import os.path
-from typing import Tuple, List, Iterable, Iterator, Optional, Union
+from typing import Tuple, List, Iterable, Iterator, Optional, Union, Dict
from fractions import Fraction
import numpy
@@ -72,6 +72,20 @@
tss = list(find_all_series(rstorage, suite, job, metric))
+ # TODO replace this with universal interpolator
+ # for ts in tss:
+ # from_s = float(unit_conversion_coef('s', ts.time_units))
+ # prev_time = ts.times[0]
+ # res = [ts.data[0]]
+ #
+ # for ln, (tm, val) in enumerate(zip(ts.times[1:], ts.data[1:]), 1):
+ # assert tm > prev_time, "Failed tm > prev_time, src={}, ln={}".format(ts.source, ln)
+ # while tm - prev_time > from_s * 1.2:
+ # res.append(0)
+ # prev_time += from_s
+ # res.append(val)
+ # prev_time = tm
+
if len(tss) == 0:
raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
@@ -106,7 +120,15 @@
raise ValueError(msg)
# TODO: match times on different ts
- agg_ts.data += ts.data
+ if abs(len(agg_ts.data) - len(ts.data)) > 1:
+ # import IPython
+ # IPython.embed()
+ pass
+ assert abs(len(agg_ts.data) - len(ts.data)) <= 1, \
+ "len(agg_ts.data)={}, len(ts.data)={}, need to be almost equals".format(len(agg_ts.data), len(ts.data))
+
+ mlen = min(len(agg_ts.data), len(ts.data))
+ agg_ts.data[:mlen] += ts.data[:mlen]
return agg_ts
@@ -193,13 +215,14 @@
return res_ts
-c_interp_func = None
+c_interp_func_agg = None
c_interp_func_qd = None
+c_interp_func_fio = None
-def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, qd: bool = False) -> TimeSeries:
+def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, tp: str = 'agg') -> TimeSeries:
"Interpolate time series to values on seconds borders"
- key = (ts.source.tpl, qd)
+ key = (ts.source.tpl, tp)
if not nc and key in interpolated_cache:
return interpolated_cache[key].copy()
@@ -221,64 +244,86 @@
assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
coef = int(rcoef) # make typechecker happy
- global c_interp_func
+ global c_interp_func_agg
global c_interp_func_qd
+ global c_interp_func_fio
uint64_p = ctypes.POINTER(ctypes.c_uint64)
- if c_interp_func is None:
+ if c_interp_func_agg is None:
dirname = os.path.dirname(os.path.dirname(wally.__file__))
path = os.path.join(dirname, 'clib', 'libwally.so')
cdll = ctypes.CDLL(path)
- c_interp_func = cdll.interpolate_ts_on_seconds_border
- c_interp_func.argtypes = [
- ctypes.c_uint, # input_size
- ctypes.c_uint, # output_size
- uint64_p, # times
- uint64_p, # values
- ctypes.c_uint, # time_scale_coef
- uint64_p, # output
- ]
- c_interp_func.restype = None
-
+ c_interp_func_agg = cdll.interpolate_ts_on_seconds_border
c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
- c_interp_func_qd.argtypes = [
- ctypes.c_uint, # input_size
- ctypes.c_uint, # output_size
- uint64_p, # times
- uint64_p, # values
- ctypes.c_uint, # time_scale_coef
- uint64_p, # output
- ]
- c_interp_func_qd.restype = ctypes.c_uint
+
+ for func in (c_interp_func_agg, c_interp_func_qd):
+ func.argtypes = [
+ ctypes.c_uint, # input_size
+ ctypes.c_uint, # output_size
+ uint64_p, # times
+ uint64_p, # values
+ ctypes.c_uint, # time_scale_coef
+ uint64_p, # output
+ ]
+ func.restype = ctypes.c_uint # output array used size
+
+ c_interp_func_fio = cdll.interpolate_ts_on_seconds_border_fio
+ c_interp_func_fio.restype = ctypes.c_int
+ c_interp_func_fio.argtypes = [
+ ctypes.c_uint, # input_size
+ ctypes.c_uint, # output_size
+ uint64_p, # times
+ ctypes.c_uint, # time_scale_coef
+ uint64_p, # output indexes
+ ctypes.c_uint64, # empty placeholder
+ ]
assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
- # print("output_sz =", output_sz, "coef =", coef)
result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
- if qd:
- func = c_interp_func_qd
- else:
- func = c_interp_func
+ if tp in ('qd', 'agg'):
+ func = c_interp_func_qd if tp == 'qd' else c_interp_func_agg
+ sz = func(ts.data.size,
+ output_sz,
+ ts.times.ctypes.data_as(uint64_p),
+ ts.data.ctypes.data_as(uint64_p),
+ coef,
+ result.ctypes.data_as(uint64_p))
- sz = func(ts.data.size,
- output_sz,
- ts.times.ctypes.data_as(uint64_p),
- ts.data.ctypes.data_as(uint64_p),
- coef,
- result.ctypes.data_as(uint64_p))
-
- if qd:
result = result[:sz]
output_sz = sz
- else:
- assert sz is None
- rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
+ rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
+ else:
+ assert tp == 'fio'
+ ridx = numpy.zeros(output_sz, dtype=ts.times.dtype)
+ no_data = (output_sz + 1)
+ sz_or_err = c_interp_func_fio(ts.times.size,
+ output_sz,
+ ts.times.ctypes.data_as(uint64_p),
+ coef,
+ ridx.ctypes.data_as(uint64_p),
+ no_data)
+
+ if sz_or_err <= 0:
+ raise ValueError("Error in input array at index %s. %s", -sz_or_err, ts.source)
+
+ rtimes = int(ts.times[0] // coef) + numpy.arange(sz_or_err, dtype=ts.times.dtype)
+
+ empty = numpy.zeros(len(ts.histo_bins), dtype=ts.data.dtype) if ts.source.metric == 'lat' else 0
+ res = []
+ for idx in ridx[:sz_or_err]:
+ if idx == no_data:
+ res.append(empty)
+ else:
+ res.append(ts.data[idx])
+ result = numpy.array(res, dtype=ts.data.dtype)
+
res_ts = TimeSeries(ts.name, None, result,
times=rtimes,
units=ts.units,
@@ -405,26 +450,38 @@
qd_metrics = {'io_queue'}
+summ_sensors_cache = {} # type: Dict[Tuple[Tuple[str, ...], str, str, Tuple[int, int], int], Optional[TimeSeries]]
def summ_sensors(rstorage: ResultStorage,
roles: List[str],
sensor: str,
metric: str,
- time_range: Tuple[int, int]) -> Optional[TimeSeries]:
+ time_range: Tuple[int, int],
+ nc: bool = False) -> Optional[TimeSeries]:
+
+ key = (tuple(roles), sensor, metric, time_range, id(ResultStorage))
+ if not nc and key in summ_sensors_cache:
+ return summ_sensors_cache[key].copy()
res = None # type: Optional[TimeSeries]
for node in find_nodes_by_roles(rstorage, roles):
for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
+ data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
data = get_ts_for_time_range(data, time_range)
if res is None:
res = data
res.data = res.data.copy()
else:
res.data += data.data
- return res
+
+ if not nc:
+ summ_sensors_cache[key] = res
+ if len(summ_sensors_cache) > 1024:
+ logger.warning("summ_sensors_cache cache too large %s > 1024", len(summ_sensors_cache))
+
+ return res if res is None else res.copy()
def find_sensors_to_2d(rstorage: ResultStorage,
@@ -439,7 +496,7 @@
for dev in devs:
for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
+ data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
data = get_ts_for_time_range(data, time_range)
res.append(data.data)
res2d = numpy.concatenate(res)
diff --git a/wally/html.py b/wally/html.py
index 553aff5..a1db5b3 100644
--- a/wally/html.py
+++ b/wally/html.py
@@ -22,7 +22,7 @@
return '<img src="{}">'.format(link)
-def table(caption: str, headers: Optional[List[str]], data: List[List[str]]) -> str:
+def table(caption: str, headers: Optional[List[str]], data: List[List[str]], align: List[str] = None) -> str:
doc = xmlbuilder3.XMLBuilder("table",
**{"class": "table table-bordered table-striped table-condensed table-hover",
"style": "width: auto;"})
@@ -35,10 +35,20 @@
for header in headers:
doc.th(header)
+ max_cols = max(len(line) for line in data if not isinstance(line, str))
+
with doc.tbody:
for line in data:
with doc.tr:
- for vl in line:
- doc.td(vl)
+ if isinstance(line, str):
+ with doc.td(colspan=str(max_cols)):
+ doc.center.b(line)
+ else:
+ if align:
+ for vl, col_align in zip(line, align):
+ doc.td(vl, align=col_align)
+ else:
+ for vl in line:
+ doc.td(vl)
return xmlbuilder3.tostr(doc).split("\n", 1)[1]
\ No newline at end of file
diff --git a/wally/main.py b/wally/main.py
index 93922dd..a20ade5 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -292,6 +292,7 @@
storage = make_storage(opts.data_dir, existing=True)
config = storage.load(Config, 'config')
stages.append(LoadStoredNodesStage())
+ stages.append(SaveNodesStage())
elif opts.subparser_name == 'compare':
# x = run_test.load_data_from_path(opts.data_path1)
@@ -320,11 +321,6 @@
IPython.embed()
return 0
- # elif opts.subparser_name == 'jupyter':
- # with tempfile.NamedTemporaryFile() as fd:
- # fd.write(notebook_kern.replace("$STORAGE", opts.storage_dir))
- # subprocess.call("jupyter notebook ", shell=True)
- # return 0
else:
print("Subparser {!r} is not supported".format(opts.subparser_name))
return 1
diff --git a/wally/report.py b/wally/report.py
index da81ab5..8c2c181 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -5,20 +5,19 @@
from io import BytesIO
from functools import wraps
from collections import defaultdict
-from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set, Optional, Union
import numpy
import scipy.stats
-
-# import matplotlib
-# matplotlib.use('GTKAgg')
-
+import matplotlib.style
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
from matplotlib import gridspec
+from statsmodels.tsa.stattools import adfuller
from cephlib.common import float2str
from cephlib.plot import plot_hmap_with_y_histo, hmap_from_2d
+import xmlbuilder3
import wally
@@ -26,7 +25,6 @@
from .stage import Stage, StepOrder
from .test_run_class import TestRun
from .hlstorage import ResultStorage
-from .node_interfaces import NodeInfo
from .utils import b2ssize, b2ssize_10, STORAGE_ROLES, unit_conversion_coef
from .statistic import (calc_norm_stat_props, calc_histo_stat_props, moving_average, moving_dev,
hist_outliers_perc, find_ouliers_ts, approximate_curve)
@@ -34,7 +32,8 @@
from .suits.io.fio import FioTest, FioJobConfig
from .suits.io.fio_job import FioJobParams
from .suits.job import JobConfig
-from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+from .data_selectors import (get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles,
+ get_ts_for_time_range)
with warnings.catch_warnings():
@@ -49,7 +48,6 @@
DEBUG = False
-LARGE_BLOCKS = 256
# ---------------- PROFILES ------------------------------------------------------------------------------------------
@@ -69,12 +67,17 @@
subinfo_alpha = 0.7
imshow_colormap = None # type: str
+ hmap_cmap = "Blues"
default_format = 'svg'
+io_chart_format = 'svg'
class StyleProfile:
+ default_style = 'seaborn-white'
+ io_chart_style = 'classic'
+
dpi = 80
grid = True
tide_layout = False
@@ -84,6 +87,8 @@
hm_x_slots = 25
min_points_for_dev = 5
+ x_label_rotation = 35
+
dev_range_x = 2.0
dev_perc = 95
@@ -99,11 +104,12 @@
# figure size in inches
figsize = (8, 4)
- figsize_long = (5.5, 3)
+ figsize_long = (8, 4)
+ qd_chart_inches = (16, 9)
subplot_adjust_r = 0.75
- subplot_adjust_r_no_leg = 0.9
- title_font_size = 10
+ subplot_adjust_r_no_legend = 0.9
+ title_font_size = 12
extra_io_spine = True
@@ -125,6 +131,7 @@
qd_bins = [0, 1, 2, 4, 6, 8, 12, 16, 20, 26, 32, 40, 48, 56, 64, 96, 128]
iotime_bins = list(range(0, 1030, 50))
block_size_bins = [0, 2, 4, 8, 16, 32, 48, 64, 96, 128, 192, 256, 384, 512, 1024, 2048]
+ large_blocks = 256
DefColorProfile = ColorProfile()
@@ -174,15 +181,27 @@
# -------------- AGGREGATION AND STAT FUNCTIONS ----------------------------------------------------------------------
-def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig) -> IOSummary:
+iosum_cache = {} # type: Dict[Tuple[str, str]]
+
+
+def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, nc: bool = False) -> IOSummary:
+ key = (suite.storage_id, job.storage_id)
+ if not nc and key in iosum_cache:
+ return iosum_cache[key]
+
lat = get_aggregated(rstorage, suite, job, "lat")
io = get_aggregated(rstorage, suite, job, "bw")
- return IOSummary(job.qd,
- nodes_count=len(suite.nodes_ids),
- block_size=job.bsize,
- lat=calc_histo_stat_props(lat, rebins_count=StyleProfile.hist_boxes),
- bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
+ res = IOSummary(job.qd,
+ nodes_count=len(suite.nodes_ids),
+ block_size=job.bsize,
+ lat=calc_histo_stat_props(lat, rebins_count=StyleProfile.hist_boxes),
+ bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
+
+ if not nc:
+ iosum_cache[key] = res
+
+ return res
def is_sensor_numarray(sensor: str, metric: str) -> bool:
@@ -204,6 +223,38 @@
"""Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
return not is_level_sensor(sensor, metric)
+
+cpu_load_cache = {} # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
+
+
+def get_cluster_cpu_load(rstorage: ResultStorage, roles: List[str],
+ time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
+
+ key = (id(rstorage), tuple(roles), time_range)
+ if not nc and key in cpu_load_cache:
+ return cpu_load_cache[key]
+
+ cpu_ts = {}
+ cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
+ for name in cpu_metrics:
+ cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=time_range)
+
+ it = iter(cpu_ts.values())
+ total_over_time = next(it).data.copy() # type: numpy.ndarray
+ for ts in it:
+ if ts is not None:
+ total_over_time += ts.data
+
+ total = cpu_ts['idle'].copy(no_data=True)
+ total.data = total_over_time
+ cpu_ts['total'] = total
+
+ if not nc:
+ cpu_load_cache[key] = cpu_ts
+
+ return cpu_ts
+
+
# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
def get_emb_image(fig: Figure, format: str, **opts) -> bytes:
@@ -226,6 +277,7 @@
if not fpath:
format = path.tag.split(".")[-1]
fig = plt.figure(figsize=StyleProfile.figsize)
+ plt.style.use(StyleProfile.default_style)
func(fig, *args, **kwargs)
fpath = storage.put_plot_file(get_emb_image(fig, format=format, dpi=DefStyleProfile.dpi), path)
logger.debug("Plot %s saved to %r", path, fpath)
@@ -234,7 +286,8 @@
return closure1
-def apply_style(fig: Figure, title: str, style: StyleProfile, eng: bool = True, no_legend: bool = False) -> None:
+def apply_style(fig: Figure, title: str, style: StyleProfile, eng: bool = True,
+ no_legend: bool = False) -> None:
for ax in fig.axes:
ax.grid(style.grid)
@@ -246,7 +299,7 @@
for ax in fig.axes:
ax.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
else:
- fig.subplots_adjust(right=StyleProfile.subplot_adjust_r_no_leg)
+ fig.subplots_adjust(right=StyleProfile.subplot_adjust_r_no_legend)
if style.tide_layout:
fig.set_tight_layout(True)
@@ -317,13 +370,36 @@
@provide_plot
+def plot_simple_bars(fig: Figure,
+ title: str,
+ names: List[str],
+ values: List[float],
+ errs: List[float] = None,
+ colors: ColorProfile = DefColorProfile,
+ style: StyleProfile = DefStyleProfile) -> None:
+
+ ax = fig.add_subplot(111)
+ ind = numpy.arange(len(names))
+ width = 0.35
+ ax.barh(ind, values, width, xerr=errs)
+
+ ax.set_yticks(ind + width / 2)
+ ax.set_yticklabels(names)
+ ax.set_xlim(0, max(val + err for val, err in zip(values, errs)) * 1.1)
+
+ apply_style(fig, title, style, no_legend=True)
+ ax.axvline(x=1.0, color='r', linestyle='--', linewidth=1, alpha=0.5)
+ fig.subplots_adjust(left=0.2)
+
+
+@provide_plot
def plot_hmap_from_2d(fig: Figure,
data2d: numpy.ndarray,
title: str, ylabel: str, xlabel: str = 'time, s', bins: numpy.ndarray = None,
colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
fig.set_size_inches(*style.figsize_long)
ioq1d, ranges = hmap_from_2d(data2d)
- ax, _ = plot_hmap_with_y_histo(fig, ioq1d, ranges, bins=bins)
+ ax, _ = plot_hmap_with_y_histo(fig, ioq1d, ranges, bins=bins, cmap=colors.hmap_cmap)
ax.set(ylabel=ylabel, xlabel=xlabel)
apply_style(fig, title, style, no_legend=True)
@@ -468,15 +544,15 @@
if style.legend_for_eng:
legend_location = "center left"
legend_bbox_to_anchor = (1.03, 0.81)
- plt.legend([patches['cmeans'], patches['cmedians']], ["mean", "median"],
- loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+ ax.legend([patches['cmeans'], patches['cmedians']], ["mean", "median"],
+ loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
else:
ax.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
ax.set_xlim(min(times), max(times))
ax.set(ylabel=ylabel, xlabel="Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
-
apply_style(fig, title, style, eng=True, no_legend=True)
+ fig.subplots_adjust(right=style.subplot_adjust_r)
@provide_plot
@@ -588,10 +664,10 @@
# -------------- MAGIC VALUES ---------------------
# IOPS bar width
- width = 0.35
+ width = 0.2
# offset from center of bar to deviation/confidence range indicator
- err_x_offset = 0.05
+ err_x_offset = 0.03
# extra space on top and bottom, comparing to maximal tight layout
extra_y_space = 0.05
@@ -606,38 +682,25 @@
legend_location = "center left"
legend_bbox_to_anchor = (1.1, 0.81)
- # plot box size adjust (only plot, not spines and legend)
- plot_box_adjust = {'right': 0.66}
# -------------- END OF MAGIC VALUES ---------------------
+ matplotlib.style.use(style.io_chart_style)
+
block_size = iosums[0].block_size
- lc = len(iosums)
- xt = list(range(1, lc + 1))
+ xpos = numpy.arange(1, len(iosums) + 1, dtype='uint')
- # x coordinate of middle of the bars
- xpos = [i - width / 2 for i in xt]
-
- # import matplotlib.gridspec as gridspec
- # gs = gridspec.GridSpec(1, 3, width_ratios=[1, 4, 1])
- # p1 = plt.subplot(gs[1])
-
- logger.warning("Check coef usage!")
ax = fig.add_subplot(111)
- # plot IOPS/BW bars
- if block_size >= LARGE_BLOCKS:
- iops_primary = False
- coef = float(unit_conversion_coef(iosums[0].bw.units, "MiBps"))
- ax.set_ylabel("BW (MiBps)")
- else:
- iops_primary = True
- coef = float(unit_conversion_coef(iosums[0].bw.units, "MiBps")) / block_size
- ax.set_ylabel("IOPS")
+ coef_mb = float(unit_conversion_coef(iosums[0].bw.units, "MiBps"))
+ coef_iops = float(unit_conversion_coef(iosums[0].bw.units, "KiBps")) / block_size
+
+ iops_primary = block_size < style.large_blocks
+
+ coef = coef_iops if iops_primary else coef_mb
+ ax.set_ylabel("IOPS" if iops_primary else "BW (MiBps)")
vals = [iosum.bw.average * coef for iosum in iosums]
- ax.bar(xpos, vals, width=width, color=colors.box_color, label=legend)
-
# set correct x limits for primary IO spine
min_io = min(iosum.bw.average - iosum.bw.deviation * style.dev_range_x for iosum in iosums)
max_io = max(iosum.bw.average + iosum.bw.deviation * style.dev_range_x for iosum in iosums)
@@ -645,16 +708,20 @@
io_lims = (min_io - border, max_io + border)
ax.set_ylim(io_lims[0] * coef, io_lims[-1] * coef)
+ ax.bar(xpos - width / 2, vals, width=width, color=colors.box_color, label=legend)
# plot deviation and confidence error ranges
err1_legend = err2_legend = None
for pos, iosum in zip(xpos, iosums):
- err1_legend = ax.errorbar(pos + width / 2 - err_x_offset,
+ dev_bar_pos = pos - err_x_offset
+ err1_legend = ax.errorbar(dev_bar_pos,
iosum.bw.average * coef,
iosum.bw.deviation * style.dev_range_x * coef,
alpha=colors.subinfo_alpha,
color=colors.suppl_color1) # 'magenta'
- err2_legend = ax.errorbar(pos + width / 2 + err_x_offset,
+
+ conf_bar_pos = pos + err_x_offset
+ err2_legend = ax.errorbar(conf_bar_pos,
iosum.bw.average * coef,
iosum.bw.confidence * coef,
alpha=colors.subinfo_alpha,
@@ -673,13 +740,43 @@
ax2 = ax.twinx()
# plot median and 95 perc latency
- ax2.plot(xt, [iosum.lat.perc_50 for iosum in iosums], label="lat med")
- ax2.plot(xt, [iosum.lat.perc_95 for iosum in iosums], label="lat 95%")
+ lat_coef_ms = float(unit_conversion_coef(iosums[0].lat.units, "ms"))
+ ax2.plot(xpos, [iosum.lat.perc_50 * lat_coef_ms for iosum in iosums], label="lat med")
+ ax2.plot(xpos, [iosum.lat.perc_95 * lat_coef_ms for iosum in iosums], label="lat 95%")
+
+ for grid_line in ax2.get_ygridlines():
+ grid_line.set_linestyle(":")
+
+ # extra y spine for BW/IOPS on left side
+ if style.extra_io_spine:
+ ax3 = ax.twinx()
+ if iops_log_spine:
+ ax3.set_yscale('log')
+
+ ax3.set_ylabel("BW (MiBps)" if iops_primary else "IOPS")
+ secondary_coef = coef_mb if iops_primary else coef_iops
+ ax3.set_ylim(io_lims[0] * secondary_coef, io_lims[1] * secondary_coef)
+ ax3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
+ ax3.spines["left"].set_visible(True)
+ ax3.yaxis.set_label_position('left')
+ ax3.yaxis.set_ticks_position('left')
+ else:
+ ax3 = None
+
+ ax2.set_ylabel("Latency (ms)")
+
+ # legend box
+ handles2, labels2 = ax2.get_legend_handles_labels()
+ ax.legend(handles1 + handles2, labels1 + labels2,
+ loc=legend_location,
+ bbox_to_anchor=legend_bbox_to_anchor)
# limit and label x spine
- plt.xlim(extra_x_space, lc + extra_x_space)
- plt.xticks(xt, ["{0} * {1}".format(iosum.qd, iosum.nodes_count) for iosum in iosums])
- ax.set_xlabel("QD * Test node count")
+ ax.set_xlim(extra_x_space, len(iosums) + extra_x_space)
+ ax.set_xticks(xpos)
+ ax.set_xticklabels(["{0} * {1} = {2}".format(iosum.qd, iosum.nodes_count, iosum.qd * iosum.nodes_count)
+ for iosum in iosums])
+ ax.set_xlabel("IO queue depth * test node count = total parallel requests")
# apply log scales for X spines, if set
if iops_log_spine:
@@ -688,36 +785,16 @@
if lat_log_spine:
ax2.set_yscale('log')
- # extra y spine for BW/IOPS on left side
- if style.extra_io_spine:
- ax3 = ax.twinx()
- if iops_log_spine:
- ax3.set_yscale('log')
-
- if iops_primary:
- ax3.set_ylabel("BW (MiBps)")
- ax3.set_ylim(io_lims[0] * coef, io_lims[1] * coef)
- else:
- ax3.set_ylabel("IOPS")
- ax3.set_ylim(io_lims[0] * coef, io_lims[1] * coef)
-
- ax3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
- ax3.spines["left"].set_visible(True)
- ax3.yaxis.set_label_position('left')
- ax3.yaxis.set_ticks_position('left')
-
- ax2.set_ylabel("Latency (ms)")
-
- # legend box
- handles2, labels2 = ax2.get_legend_handles_labels()
- plt.legend(handles1 + handles2, labels1 + labels2,
- loc=legend_location,
- bbox_to_anchor=legend_bbox_to_anchor)
-
# adjust central box size to fit legend
- # plt.subplots_adjust(**plot_box_adjust)
apply_style(fig, title, style, eng=False, no_legend=True)
+ # override some styles
+ fig.set_size_inches(*style.qd_chart_inches)
+ fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
+
+ if style.extra_io_spine:
+ ax3.grid(False)
+
# -------------------- REPORT HELPERS --------------------------------------------------------------------------------
@@ -812,21 +889,25 @@
ts_map[fjob_no_qd].append((suite, fjob))
for tpl, suites_jobs in ts_map.items():
- if len(suites_jobs) > StyleProfile.min_iops_vs_qd_jobs:
+ if len(suites_jobs) >= StyleProfile.min_iops_vs_qd_jobs:
+
iosums = [make_iosum(rstorage, suite, job) for suite, job in suites_jobs]
iosums.sort(key=lambda x: x.qd)
summary, summary_long = str_summary[tpl]
+
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, \
+ HTMLBlock(html.H2(html.center("IOPS, BW, Lat = func(QD). " + summary_long)))
+
ds = DataSource(suite_id=suite.storage_id,
job_id=summary,
node_id=AGG_TAG,
sensor="fio",
dev=AGG_TAG,
metric="io_over_qd",
- tag="svg")
+ tag=io_chart_format)
- title = "IOPS, BW, Lat vs. QD.\n" + summary_long
- fpath = io_chart(rstorage, ds, title=title, legend="IOPS/BW", iosums=iosums) # type: str
- yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.img(fpath))
+ fpath = io_chart(rstorage, ds, title="", legend="IOPS/BW", iosums=iosums) # type: str
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.center(html.img(fpath)))
# Linearization report
@@ -834,7 +915,6 @@
"""Creates graphs, which show how IOPS and Latency depend on block size"""
-# IOPS/latency distribution
class StatInfo(JobReporter):
"""Statistic info for job results"""
suite_types = {'fio'}
@@ -845,17 +925,22 @@
fjob = cast(FioJobConfig, job)
io_sum = make_iosum(rstorage, suite, fjob)
- summary_data = [
- ["Summary", job.params.long_summary],
- ]
-
- res = html.H2(html.center("Test summary"))
- res += html.table("Test info", None, summary_data)
- stat_data_headers = ["Name", "Average ~ Dev", "Conf interval", "Mediana", "Mode", "Kurt / Skew", "95%", "99%"]
+ res = html.H2(html.center("Test summary - " + job.params.long_summary))
+ stat_data_headers = ["Name", "Average ~ Dev", "Conf interval", "Mediana", "Mode", "Kurt / Skew", "95%", "99%",
+ "ADF test"]
bw_target_units = 'Bps'
bw_coef = float(unit_conversion_coef(io_sum.bw.units, bw_target_units))
+ adf_v, *_1, stats, _2 = adfuller(io_sum.bw.data)
+
+ for v in ("1%", "5%", "10%"):
+ if adf_v <= stats[v]:
+ ad_test = v
+ break
+ else:
+ ad_test = "Failed"
+
bw_data = ["Bandwidth",
"{}{} ~ {}{}".format(b2ssize(io_sum.bw.average * bw_coef), bw_target_units,
b2ssize(io_sum.bw.deviation * bw_coef), bw_target_units),
@@ -864,7 +949,8 @@
"-",
"{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
b2ssize(io_sum.bw.perc_5 * bw_coef) + bw_target_units,
- b2ssize(io_sum.bw.perc_1 * bw_coef) + bw_target_units]
+ b2ssize(io_sum.bw.perc_1 * bw_coef) + bw_target_units,
+ ad_test]
iops_coef = float(unit_conversion_coef(io_sum.bw.units, 'KiBps')) / fjob.bsize
iops_data = ["IOPS",
@@ -875,7 +961,8 @@
"-",
"{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
b2ssize_10(io_sum.bw.perc_5 * iops_coef) + "IOPS",
- b2ssize_10(io_sum.bw.perc_1 * iops_coef) + "IOPS"]
+ b2ssize_10(io_sum.bw.perc_1 * iops_coef) + "IOPS",
+ ad_test]
lat_target_unit = 's'
lat_coef = unit_conversion_coef(io_sum.lat.units, lat_target_unit)
@@ -887,76 +974,304 @@
"-",
"-",
b2ssize_10(io_sum.lat.perc_95 * lat_coef) + lat_target_unit,
- b2ssize_10(io_sum.lat.perc_99 * lat_coef) + lat_target_unit]
+ b2ssize_10(io_sum.lat.perc_99 * lat_coef) + lat_target_unit,
+ '-']
# sensor usage
stat_data = [iops_data, bw_data, lat_data]
- res += html.table("Load stats info", stat_data_headers, stat_data)
+ res += html.center(html.table("Load stats info", stat_data_headers, stat_data))
+ yield Menu1st.per_job, job.summary, HTMLBlock(res)
- resource_headers = ["Resource", "Usage count", "Proportional to work done"]
- tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "KiBps"))
- tot_ops_coef = tot_io_coef / fjob.bsize
+def avg_dev_div(vec: numpy.ndarray, denom: numpy.ndarray, avg_ranges: int = 10) -> Tuple[float, float]:
+ step = min(vec.size, denom.size) // avg_ranges
+ assert step >= 1
+ vals = []
- io_transfered = io_sum.bw.data.sum() * tot_io_coef
- resource_data = [
- ["IO made", b2ssize_10(io_transfered * tot_ops_coef) + "OP", "-"],
- ["Data transfered", b2ssize(io_transfered) + "B", "-"]
- ]
+ whole_sum = denom.sum() / denom.size * step * 0.5
+ for i in range(0, avg_ranges):
+ s1 = denom[i * step: (i + 1) * step].sum()
+ if s1 >= whole_sum:
+ vals.append(vec[i * step: (i + 1) * step].sum() / s1)
- storage = rstorage.storage
- nodes = storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
+ assert len(vals) > 1
+ return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
- ops_done = io_transfered * tot_ops_coef
+
+class Resources(JobReporter):
+ """Statistic info for job results"""
+ suite_types = {'fio'}
+
+ def get_divs(self, suite: SuiteConfig, job: JobConfig,
+ rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
+ fjob = cast(FioJobConfig, job)
+ io_sum = make_iosum(rstorage, suite, fjob)
+
+ tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "Bps"))
+ io_transfered = io_sum.bw.data * tot_io_coef
+ ops_done = io_transfered / (fjob.bsize * float(unit_conversion_coef("KiBps", "Bps")))
+
+ io_made = "Client IOP made"
+ data_tr = "Client data transfered"
+
+ records = {
+ io_made: (b2ssize_10(ops_done.sum()) + "OP", None, None),
+ data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
+ } # type: Dict[str, Tuple[str, float, float]]
+
+ test_send = "Test nodes net send"
+ test_recv = "Test nodes net recv"
+ test_net = "Test nodes net total"
+ test_send_pkt = "Test nodes send pkt"
+ test_recv_pkt = "Test nodes recv pkt"
+ test_net_pkt = "Test nodes total pkt"
+
+ test_write = "Test nodes disk write"
+ test_read = "Test nodes disk read"
+ test_write_iop = "Test nodes write IOP"
+ test_read_iop = "Test nodes read IOP"
+ test_iop = "Test nodes IOP"
+ test_rw = "Test nodes disk IO"
+
+ storage_send = "Storage nodes net send"
+ storage_recv = "Storage nodes net recv"
+ storage_send_pkt = "Storage nodes send pkt"
+ storage_recv_pkt = "Storage nodes recv pkt"
+ storage_net = "Storage nodes net total"
+ storage_net_pkt = "Storage nodes total pkt"
+
+ storage_write = "Storage nodes disk write"
+ storage_read = "Storage nodes disk read"
+ storage_write_iop = "Storage nodes write IOP"
+ storage_read_iop = "Storage nodes read IOP"
+ storage_iop = "Storage nodes IOP"
+ storage_rw = "Storage nodes disk IO"
+
+ storage_cpu = "Storage nodes CPU"
+ storage_cpu_s = "Storage nodes CPU s/IOP"
+ storage_cpu_s_b = "Storage nodes CPU s/B"
all_metrics = [
- ("Test nodes net send", 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
- ("Test nodes net recv", 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
+ (test_send, 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
+ (test_recv, 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
+ (test_send_pkt, 'net-io', 'send_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
+ (test_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
- ("Test nodes disk write", 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
- ("Test nodes disk read", 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
- ("Test nodes writes", 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
- ("Test nodes reads", 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+ (test_write, 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
+ (test_read, 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
+ (test_write_iop, 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+ (test_read_iop, 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
- ("Storage nodes net send", 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
- ("Storage nodes net recv", 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (storage_send, 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (storage_recv, 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (storage_send_pkt, 'net-io', 'send_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ (storage_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
- ("Storage nodes disk write", 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
- ("Storage nodes disk read", 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
- ("Storage nodes writes", 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
- ("Storage nodes reads", 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ (storage_write, 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (storage_read, 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (storage_write_iop, 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ (storage_read_iop, 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
]
all_agg = {}
- for descr, sensor, metric, ffunc, roles, units, denom in all_metrics:
- if not nodes:
- continue
-
+ for vname, sensor, metric, ffunc, roles, units, service_provided_count in all_metrics:
res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=job.reliable_info_range_s)
if res_ts is None:
continue
- agg = res_ts.data.sum()
- resource_data.append([descr, ffunc(agg) + units, "{:.1f}".format(agg / denom)])
- all_agg[descr] = agg
+ data = res_ts.data
+ if units == "B":
+ data = data * float(unit_conversion_coef(res_ts.units, "B"))
+
+ records[vname] = (ffunc(data.sum()) + units, *avg_dev_div(data, service_provided_count))
+ all_agg[vname] = data
+
+ # cpu usage
+ nodes_count = len(list(find_nodes_by_roles(rstorage, STORAGE_ROLES)))
+ cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
+
+ cpus_used_sec = (1.0 - cpu_ts['idle'].data / cpu_ts['total'].data) * nodes_count
+ used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
+
+ all_agg[storage_cpu] = cpus_used_sec
+ records[storage_cpu_s] = (used_s, *avg_dev_div(cpus_used_sec, ops_done))
+ records[storage_cpu_s_b] = (used_s, *avg_dev_div(cpus_used_sec, io_transfered))
cums = [
- ("Test nodes writes", "Test nodes reads", "Total test ops", b2ssize_10, "OP", ops_done),
- ("Storage nodes writes", "Storage nodes reads", "Total storage ops", b2ssize_10, "OP", ops_done),
- ("Storage nodes disk write", "Storage nodes disk read", "Total storage IO size", b2ssize,
- "B", io_transfered),
- ("Test nodes disk write", "Test nodes disk read", "Total test nodes IO size", b2ssize, "B", io_transfered),
+ (test_iop, test_read_iop, test_write_iop, b2ssize_10, "OP", ops_done),
+ (test_rw, test_read, test_write, b2ssize, "B", io_transfered),
+ (test_net, test_send, test_recv, b2ssize, "B", io_transfered),
+ (test_net_pkt, test_send_pkt, test_recv_pkt, b2ssize_10, "pkt", ops_done),
+
+ (storage_iop, storage_read_iop, storage_write_iop, b2ssize_10, "OP", ops_done),
+ (storage_rw, storage_read, storage_write, b2ssize, "B", io_transfered),
+ (storage_net, storage_send, storage_recv, b2ssize, "B", io_transfered),
+ (storage_net_pkt, storage_send_pkt, storage_recv_pkt, b2ssize_10, "pkt", ops_done),
]
- for name1, name2, descr, ffunc, units, denom in cums:
+ for vname, name1, name2, ffunc, units, service_provided_masked in cums:
if name1 in all_agg and name2 in all_agg:
agg = all_agg[name1] + all_agg[name2]
- resource_data.append([descr, ffunc(agg) + units, "{:.1f}".format(agg / denom)])
+ records[vname] = (ffunc(agg.sum()) + units, *avg_dev_div(agg, service_provided_masked))
- res += html.table("Resources usage", resource_headers, resource_data)
+ table_structure = [
+ "Service provided",
+ (io_made, data_tr),
+ "Test nodes total load",
+ (test_send_pkt, test_send),
+ (test_recv_pkt, test_recv),
+ (test_net_pkt, test_net),
+ (test_write_iop, test_write),
+ (test_read_iop, test_read),
+ (test_iop, test_rw),
+ (test_iop, test_rw),
+ "Storage nodes resource consumed",
+ (storage_send_pkt, storage_send),
+ (storage_recv_pkt, storage_recv),
+ (storage_net_pkt, storage_net),
+ (storage_write_iop, storage_write),
+ (storage_read_iop, storage_read),
+ (storage_iop, storage_rw),
+ (storage_cpu_s, storage_cpu_s_b),
+ ] # type: List[Union[str, Tuple[Optional[str], Optional[str]]]
- yield Menu1st.per_job, job.summary, HTMLBlock(res)
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Resources usage")))
+
+ doc = xmlbuilder3.XMLBuilder("table",
+ **{"class": "table table-bordered table-striped table-condensed table-hover",
+ "style": "width: auto;"})
+
+ with doc.thead:
+ with doc.tr:
+ [doc.th(header) for header in ["Resource", "Usage count", "To service"] * 2]
+
+ cols = 6
+
+ short_name = {
+ name: (name if name in {io_made, data_tr} else " ".join(name.split()[2:]).capitalize())
+ for name in records.keys()
+ }
+
+ short_name[storage_cpu_s] = "CPU (s/IOP)"
+ short_name[storage_cpu_s_b] = "CPU (s/B)"
+
+ with doc.tbody:
+ with doc.tr:
+ doc.td(colspan=str(cols // 2)).center.b("Operations")
+ doc.td(colspan=str(cols // 2)).center.b("Bytes")
+
+ for line in table_structure:
+ with doc.tr:
+ if isinstance(line, str):
+ with doc.td(colspan=str(cols)):
+ doc.center.b(line)
+ else:
+ for name in line:
+ if name is None:
+ doc.td("-", colspan=str(cols // 2))
+ continue
+
+ amount_s, avg, dev = records[name]
+
+ if name in (storage_cpu_s, storage_cpu_s_b) and avg is not None:
+ dev_s = str(int(dev * 100 / avg)) + "%" if avg > 1E-9 else b2ssize_10(dev) + 's'
+ rel_val_s = "{}s ~ {}".format(b2ssize_10(avg), dev_s)
+ else:
+ if avg is None:
+ rel_val_s = '-'
+ else:
+ avg_s = int(avg) if avg > 10 else '{:.1f}'.format(avg)
+ if avg > 1E-5:
+ dev_s = str(int(dev * 100 / avg)) + "%"
+ else:
+ dev_s = int(dev) if dev > 10 else '{:.1f}'.format(dev)
+ rel_val_s = "{} ~ {}".format(avg_s, dev_s)
+
+ doc.td(short_name[name], align="left")
+ doc.td(amount_s, align="right")
+
+ if avg is None or avg < 0.9:
+ doc.td(rel_val_s, align="right")
+ elif avg < 2.0:
+ doc.td(align="right").font(rel_val_s, color='green')
+ elif avg < 5.0:
+ doc.td(align="right").font(rel_val_s, color='orange')
+ else:
+ doc.td(align="right").font(rel_val_s, color='red')
+
+ res = xmlbuilder3.tostr(doc).split("\n", 1)[1]
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.center(res))
+
+ iop_names = [test_write_iop, test_read_iop, test_iop,
+ storage_write_iop, storage_read_iop, storage_iop]
+
+ bytes_names = [test_write, test_read, test_rw,
+ test_send, test_recv, test_net,
+ storage_write, storage_read, storage_rw,
+ storage_send, storage_recv, storage_net]
+
+ net_pkt_names = [test_send_pkt, test_recv_pkt, test_net_pkt,
+ storage_send_pkt, storage_recv_pkt, storage_net_pkt]
+
+ for tp, names in [('iop', iop_names), ("bytes", bytes_names), ('Net packets per IOP', net_pkt_names)]:
+ vals = []
+ devs = []
+ avail_names = []
+ for name in names:
+ if name in records:
+ avail_names.append(name)
+ _, avg, dev = records[name]
+ vals.append(avg)
+ devs.append(dev)
+
+ # synchronously sort values and names, values is a key
+ vals, names, devs = map(list, zip(*sorted(zip(vals, names, devs))))
+
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id=AGG_TAG,
+ sensor='resources',
+ dev=AGG_TAG,
+ metric=tp.replace(' ', "_") + '2service_bar',
+ tag=default_format)
+
+ fname = plot_simple_bars(rstorage, ds,
+ "Resource consuption / service provided, " + tp,
+ [name.replace(" nodes", "") for name in names],
+ vals, devs)
+
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+
+class BottleNeck(JobReporter):
+ """Statistic info for job results"""
+ suite_types = {'fio'}
+
+ def get_divs(self, suite: SuiteConfig, job: JobConfig, rstorage: ResultStorage) -> \
+ Iterator[Tuple[str, str, HTMLBlock]]:
+
+ nodes = list(find_nodes_by_roles(rstorage, STORAGE_ROLES))
+
+ sensor = 'block-io'
+ metric = 'io_queue'
+ bn_val = 16
+
+ for node in nodes:
+ bn = 0
+ tot = 0
+ for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
+ if ds.dev in ('sdb', 'sdc', 'sdd', 'sde'):
+ data = rstorage.load_sensor(ds)
+ p1 = job.reliable_info_range_s[0] * unit_conversion_coef('s', data.time_units)
+ p2 = job.reliable_info_range_s[1] * unit_conversion_coef('s', data.time_units)
+ idx1, idx2 = numpy.searchsorted(data.times, (p1, p2))
+ bn += (data.data[idx1: idx2] > bn_val).sum()
+ tot += idx2 - idx1
+ print(node, bn, tot)
+
+ yield Menu1st.per_job, job.summary, HTMLBlock("")
# CPU load
@@ -968,22 +1283,15 @@
# plot CPU time
for rt, roles in [('storage', STORAGE_ROLES), ('test', ['testnode'])]:
- cpu_ts = {}
- cpu_metrics = "idle guest iowait irq nice sirq steal sys user".split()
- for name in cpu_metrics:
- cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name,
- time_range=job.reliable_info_range_s)
-
- it = iter(cpu_ts.values())
- total_over_time = next(it).data.copy()
- for ts in it:
- total_over_time += ts.data
-
+ cpu_ts = get_cluster_cpu_load(rstorage, roles, job.reliable_info_range_s)
+ tss = [(name, ts.data * 100 / cpu_ts['total'].data)
+ for name, ts in cpu_ts.items()
+ if name in {'user', 'sys', 'irq', 'idle'}]
fname = plot_simple_over_time(rstorage,
cpu_ts['idle'].source(job_id=job.storage_id,
suite_id=suite.storage_id,
metric='allcpu', tag=rt + '.plt.' + default_format),
- tss=[(name, ts.data * 100 / total_over_time) for name, ts in cpu_ts.items()],
+ tss=tss,
average=True,
ylabel="CPU time %",
title="{} nodes CPU usage".format(rt.capitalize()))
@@ -1023,19 +1331,23 @@
for name, devs, roles in [('storage', storage_devs, STORAGE_ROLES),
('journal', journal_devs, STORAGE_ROLES),
('test', test_nodes_devs, ['testnode'])]:
+
+ yield Menu1st.per_job, job.summary, \
+ HTMLBlock(html.H2(html.center("{} IO heatmaps".format(name.capitalize()))))
+
# QD heatmap
ioq2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
metric='io_queue', time_range=trange)
- fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
- job.storage_id,
- AGG_TAG,
- 'block-io',
- name,
- metric='io_queue',
- tag="hmap." + default_format),
- ioq2d, ylabel="IO QD", title=name.capitalize() + " devs QD",
- bins=StyleProfile.qd_bins,
- xlabel='Time') # type: str
+
+ ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
+
+ fname = plot_hmap_from_2d(rstorage,
+ ds(metric='io_queue'),
+ ioq2d,
+ ylabel="IO QD",
+ title=name.capitalize() + " devs QD",
+ xlabel='Time',
+ bins=StyleProfile.qd_bins) # type: str
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# Block size heatmap
@@ -1045,14 +1357,11 @@
sw2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
metric='sectors_written', time_range=trange)
data2d = sw2d / wc2d / 1024
- fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
- job.storage_id,
- AGG_TAG,
- 'block-io',
- name,
- metric='wr_block_size',
- tag="hmap." + default_format),
- data2d, ylabel="IO bsize, KiB", title=name.capitalize() + " write block size",
+ fname = plot_hmap_from_2d(rstorage,
+ ds(metric='wr_block_size'),
+ data2d,
+ ylabel="IO bsize, KiB",
+ title=name.capitalize() + " write block size",
xlabel='Time',
bins=StyleProfile.block_size_bins) # type: str
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
@@ -1060,65 +1369,18 @@
# iotime heatmap
wtime2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
metric='io_time', time_range=trange)
- fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
- job.storage_id,
- AGG_TAG,
- 'block-io',
- name,
- metric='io_time',
- tag="hmap." + default_format),
- wtime2d, ylabel="IO time (ms) per second",
+ fname = plot_hmap_from_2d(rstorage,
+ ds(metric='io_time'),
+ wtime2d,
+ ylabel="IO time (ms) per second",
title=name.capitalize() + " iotime",
xlabel='Time',
bins=StyleProfile.iotime_bins) # type: str
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
-# IOPS/latency distribution
-class IOHist(JobReporter):
- """IOPS.latency distribution histogram"""
- suite_types = {'fio'}
-
- def get_divs(self,
- suite: SuiteConfig,
- job: JobConfig,
- rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
-
- fjob = cast(FioJobConfig, job)
-
- yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load histograms")))
-
- # agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
- # # bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000 # convert us to ms
- # lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges=None, rebins_count=StyleProfile.hist_lat_boxes)
- #
- # long_summary = cast(FioJobParams, fjob.params).long_summary
- #
- # title = "Latency distribution"
- # units = "ms"
- #
- # fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop) # type: str
- # yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
-
- agg_io = get_aggregated(rstorage, suite, fjob, "bw")
-
- if fjob.bsize >= LARGE_BLOCKS:
- title = "BW distribution"
- units = "MiBps"
- agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
- else:
- title = "IOPS distribution"
- agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
- units = "IOPS"
-
- io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
- fpath = plot_hist(rstorage, agg_io.source(tag='hist.' + default_format),
- title, units, io_stat_prop) # type: str
- yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
-
-
# IOPS/latency over test time for each job
-class IOTime(JobReporter):
+class LoadToolResults(JobReporter):
"""IOPS/latency during test"""
suite_types = {'fio'}
@@ -1129,8 +1391,10 @@
fjob = cast(FioJobConfig, job)
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Load tool results")))
+
agg_io = get_aggregated(rstorage, suite, fjob, "bw")
- if fjob.bsize >= LARGE_BLOCKS:
+ if fjob.bsize >= DefStyleProfile.large_blocks:
title = "Fio measured Bandwidth over time"
units = "MiBps"
agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
@@ -1161,22 +1425,35 @@
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+ fjob = cast(FioJobConfig, job)
-class ResourceUsage:
- def __init__(self, io_r_ops: int, io_w_ops: int, io_r_kb: int, io_w_kb: int) -> None:
- self.io_w_ops = io_w_ops
- self.io_r_ops = io_r_ops
- self.io_w_kb = io_w_kb
- self.io_r_kb = io_r_kb
+ # agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ # # bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000 # convert us to ms
+ # lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges=None, rebins_count=StyleProfile.hist_lat_boxes)
+ #
+ # long_summary = cast(FioJobParams, fjob.params).long_summary
+ #
+ # title = "Latency distribution"
+ # units = "ms"
+ #
+ # fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop) # type: str
+ # yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
- self.cpu_used_user = None # type: int
- self.cpu_used_sys = None # type: int
- self.cpu_wait_io = None # type: int
+ agg_io = get_aggregated(rstorage, suite, fjob, "bw")
- self.net_send_packets = None # type: int
- self.net_recv_packets = None # type: int
- self.net_send_kb = None # type: int
- self.net_recv_kb = None # type: int
+ if fjob.bsize >= DefStyleProfile.large_blocks:
+ title = "BW distribution"
+ units = "MiBps"
+ agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
+ else:
+ title = "IOPS distribution"
+ agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
+ units = "IOPS"
+
+ io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
+ fpath = plot_hist(rstorage, agg_io.source(tag='hist.' + default_format),
+ title, units, io_stat_prop) # type: str
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
# Cluster load over test time
@@ -1187,8 +1464,8 @@
storage_sensors = [
('block-io', 'reads_completed', "Read", 'iop'),
('block-io', 'writes_completed', "Write", 'iop'),
- ('block-io', 'sectors_read', "Read", 'KiB'),
- ('block-io', 'sectors_written', "Write", 'KiB'),
+ ('block-io', 'sectors_read', "Read", 'MiB'),
+ ('block-io', 'sectors_written', "Write", 'MiB'),
]
def get_divs(self,
@@ -1198,16 +1475,16 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
for sensor, metric, op, units in self.storage_sensors:
- ts = summ_sensors(rstorage, ['testnode'], sensor, metric, job.reliable_info_range_s)
+ ts = summ_sensors(rstorage, STORAGE_ROLES, sensor, metric, job.reliable_info_range_s)
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
- node_id="test_nodes",
+ node_id="storage",
sensor=sensor,
dev=AGG_TAG,
metric=metric,
tag="ts." + default_format)
- data = ts.data if units != 'KiB' else ts.data * float(unit_conversion_coef(ts.units, 'KiB'))
+ data = ts.data if units != 'MiB' else ts.data * float(unit_conversion_coef(ts.units, 'MiB'))
ts = TimeSeries(name="",
times=numpy.arange(*job.reliable_info_range_s),
data=data,
@@ -1218,14 +1495,10 @@
histo_bins=ts.histo_bins)
sensor_title = "{} {}".format(op, units)
- fpath = plot_v_over_time(rstorage, ds, sensor_title, sensor_title, ts=ts) # type: str
+ fpath = plot_v_over_time(rstorage, ds, sensor_title, units, ts=ts) # type: str
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
-# Ceph cluster summary
-class ResourceConsumption(Reporter):
- """Resources consumption report, only text"""
-
# Node load over test time
class NodeLoad(Reporter):
@@ -1250,12 +1523,12 @@
def run(self, ctx: TestRun) -> None:
rstorage = ResultStorage(ctx.storage)
- job_reporters = [StatInfo(), IOTime(), IOHist(), ClusterLoad(), CPULoadPlot(),
+ job_reporters = [StatInfo(), Resources(), LoadToolResults(), ClusterLoad(), CPULoadPlot(),
QDIOTimeHeatmap()] # type: List[JobReporter]
- reporters = []
-
- # reporters = [IO_QD()] # type: List[Reporter]
- # job_reporters = [ClusterLoad()]
+ # job_reporters = [QDIOTimeHeatmap()] # type: List[JobReporter]
+ # job_reporters = []
+ reporters = [IO_QD()] # type: List[Reporter]
+ # reporters = [] # type: List[Reporter]
root_dir = os.path.dirname(os.path.dirname(wally.__file__))
doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -1278,18 +1551,25 @@
all_jobs = list(rstorage.iter_job(suite))
all_jobs.sort(key=lambda job: job.params)
for job in all_jobs:
- for reporter in job_reporters:
- logger.debug("Start reporter %s on job %s suite %s",
- reporter.__class__.__name__, job.summary, suite.test_type)
- for block, item, html in reporter.get_divs(suite, job, rstorage):
- items[block][item].append(html)
- if DEBUG:
- break
+ if 'rwd16384_qd1' == job.summary:
+ try:
+ for reporter in job_reporters:
+ logger.debug("Start reporter %s on job %s suite %s",
+ reporter.__class__.__name__, job.summary, suite.test_type)
+ for block, item, html in reporter.get_divs(suite, job, rstorage):
+ items[block][item].append(html)
+ if DEBUG:
+ break
+ except Exception:
+ logger.exception("Failed to generate report for %s", job)
for reporter in reporters:
- logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
- for block, item, html in reporter.get_divs(suite, rstorage):
- items[block][item].append(html)
+ try:
+ logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
+ for block, item, html in reporter.get_divs(suite, rstorage):
+ items[block][item].append(html)
+ except Exception as exc:
+ logger.exception("Failed to generate report")
if DEBUG:
break
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 9d2e83e..60a0a55 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -144,10 +144,13 @@
def __repr__(self) -> str:
return str(self)
- def copy(self) -> 'TimeSeries':
+ def copy(self, no_data: bool = False) -> 'TimeSeries':
cp = copy.copy(self)
- cp.times = self.times.copy()
- cp.data = self.data.copy()
+
+ if not no_data:
+ cp.times = self.times.copy()
+ cp.data = self.data.copy()
+
cp.source = self.source()
return cp
diff --git a/wally/run_test.py b/wally/run_test.py
index f8dc036..c4581d3 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,5 +1,6 @@
import time
import json
+import copy
import logging
from concurrent.futures import Future
from typing import List, Dict, Tuple, Optional, Union, cast
@@ -143,15 +144,6 @@
logger.debug("Add node %s with roles %s", url, roles)
-class SaveNodesStage(Stage):
- """Save nodes list to file"""
-
- priority = StepOrder.UPDATE_NODES_INFO + 1
-
- def run(self, ctx: TestRun) -> None:
- ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
-
-
class SleepStage(Stage):
"""Save nodes list to file"""
@@ -266,15 +258,38 @@
pass
+class SaveNodesStage(Stage):
+ """Save nodes list to file"""
+ nodes_path = 'all_nodes'
+ params_path = 'all_nodes_params.js'
+ priority = StepOrder.UPDATE_NODES_INFO + 1
+
+ def run(self, ctx: TestRun) -> None:
+ infos = list(ctx.nodes_info.values())
+ params = {node.node_id: node.params for node in infos}
+ ninfos = [copy.copy(node) for node in infos]
+ for node in ninfos:
+ node.params = "in {!r} file".format(self.params_path)
+ ctx.storage.put_list(ninfos, self.nodes_path)
+ ctx.storage.put_raw(json.dumps(params).encode('utf8'), self.params_path)
+
+
class LoadStoredNodesStage(Stage):
priority = StepOrder.DISCOVER
def run(self, ctx: TestRun) -> None:
- if 'all_nodes' in ctx.storage:
+ if SaveNodesStage.nodes_path in ctx.storage:
if ctx.nodes_info:
logger.error("Internal error: Some nodes already stored in " +
"nodes_info before LoadStoredNodesStage stage")
raise StopTestError()
- ctx.nodes_info = {node.node_id: node
- for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
+
+ nodes = {node.node_id: node for node in ctx.storage.load_list(NodeInfo, SaveNodesStage.nodes_path)}
+
+ if SaveNodesStage.params_path in ctx.storage:
+ params = json.loads(ctx.storage.get_raw(SaveNodesStage.params_path).decode('utf8'))
+ for node_id, node in nodes.items():
+ node.params = params.get(node_id, {})
+
+ ctx.nodes_info = nodes
logger.info("%s nodes loaded from database", len(ctx.nodes_info))
diff --git a/wally/sensors.py b/wally/sensors.py
index 60830a1..89bc224 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,7 +1,7 @@
import bz2
import array
import logging
-from typing import List, Dict, Tuple
+from typing import Dict
import numpy
@@ -21,37 +21,6 @@
logger = logging.getLogger("wally")
-sensor_units = {
- "system-cpu.idle": "",
- "system-cpu.nice": "",
- "system-cpu.user": "",
- "system-cpu.sys": "",
- "system-cpu.iowait": "",
- "system-cpu.irq": "",
- "system-cpu.sirq": "",
- "system-cpu.steal": "",
- "system-cpu.guest": "",
-
- "system-cpu.procs_blocked": "",
- "system-cpu.procs_queue_x10": "",
-
- "net-io.recv_bytes": "B",
- "net-io.recv_packets": "",
- "net-io.send_bytes": "B",
- "net-io.send_packets": "",
-
- "block-io.io_queue": "",
- "block-io.io_time": "ms",
- "block-io.reads_completed": "",
- "block-io.rtime": "ms",
- "block-io.sectors_read": "B",
- "block-io.sectors_written": "B",
- "block-io.writes_completed": "",
- "block-io.wtime": "ms",
- "block-io.weighted_io_time": "ms"
-}
-
-
# TODO(koder): in case if node has more than one role sensor settings might be incorrect
class StartSensorsStage(Stage):
priority = StepOrder.START_SENSORS
@@ -118,27 +87,29 @@
def collect_sensors_data(ctx: TestRun, stop: bool = False):
rstorage = ResultStorage(ctx.storage)
+ total_sz = 0
+
+ logger.info("Start loading sensors")
for node in ctx.nodes:
node_id = node.node_id
if node_id in ctx.sensors_run_on:
+ func = node.conn.sensors.stop if stop else node.conn.sensors.get_updates
- if stop:
- func = node.conn.sensors.stop
- else:
- func = node.conn.sensors.get_updates
+ # hack to calculate total transferred size
+ offset_map, compressed_blob, compressed_collected_at_b = func()
+ data_tpl = (offset_map, compressed_blob, compressed_collected_at_b)
- # TODO: units should came along with data
- # TODO: process raw sensors data
+ total_sz += len(compressed_blob) + len(compressed_collected_at_b) + sum(map(len, offset_map)) + \
+ 16 * len(offset_map)
- for path, value, is_array in sensors_rpc_plugin.unpack_rpc_updates(func()):
+ for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
if path == 'collected_at':
ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
- rstorage.append_sensor(numpy.array(value), ds, 'us')
+ rstorage.append_sensor(numpy.array(value), ds, units)
else:
sensor, dev, metric = path.split(".")
ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
if is_array:
- units = sensor_units["{}.{}".format(sensor, metric)]
rstorage.append_sensor(numpy.array(value), ds, units)
else:
if metric == 'historic':
@@ -146,6 +117,7 @@
else:
assert metric in ('perf_dump', 'historic_js')
rstorage.put_sensor_raw(value, ds(tag='js'))
+ logger.info("Download %sB of sensors data", utils.b2ssize(total_sz))
@@ -156,33 +128,3 @@
def run(self, ctx: TestRun) -> None:
collect_sensors_data(ctx, True)
-
-# def delta(func, only_upd=True):
-# prev = {}
-# while True:
-# for dev_name, vals in func():
-# if dev_name not in prev:
-# prev[dev_name] = {}
-# for name, (val, _) in vals.items():
-# prev[dev_name][name] = val
-# else:
-# dev_prev = prev[dev_name]
-# res = {}
-# for stat_name, (val, accum_val) in vals.items():
-# if accum_val:
-# if stat_name in dev_prev:
-# delta = int(val) - int(dev_prev[stat_name])
-# if not only_upd or 0 != delta:
-# res[stat_name] = str(delta)
-# dev_prev[stat_name] = val
-# elif not only_upd or '0' != val:
-# res[stat_name] = val
-#
-# if only_upd and len(res) == 0:
-# continue
-# yield dev_name, res
-# yield None, None
-#
-#
-
-
diff --git a/wally/storage.py b/wally/storage.py
index aa90ac9..e07d9b3 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -6,7 +6,6 @@
import re
import abc
import shutil
-import sqlite3
import logging
from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable, Iterator
@@ -15,7 +14,6 @@
from yaml import CLoader as Loader, CDumper as Dumper # type: ignore
except ImportError:
from yaml import Loader, Dumper # type: ignore
-import numpy
from .common_types import IStorable
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 22a4937..e134e4c 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -11,7 +11,7 @@
runtime=180
# ---------------------------------------------------------------------
-# check different thread count, sync mode. (latency, iops) = func(th_count)
+# check different QD, direct mode. (latency, iops) = func(th_count)
# ---------------------------------------------------------------------
[ceph_{TEST_SUMM}]
blocksize=4k
@@ -19,17 +19,16 @@
iodepth={QD_W}
# ---------------------------------------------------------------------
-# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# check different QD, direct read mode. (latency, iops) = func(th_count)
# also check iops for randread
# ---------------------------------------------------------------------
[ceph_{TEST_SUMM}]
blocksize=4k
rw=randread
-direct=1
iodepth={QD_R}
# ---------------------------------------------------------------------
-# sync write
+# sync write - disabled for now
# ---------------------------------------------------------------------
#[ceph_{TEST_SUMM}]
#blocksize=4k
@@ -43,10 +42,15 @@
# we can't use sequential with numjobs > 1 due to caching and block merging
# ---------------------------------------------------------------------
[ceph_{TEST_SUMM}]
-blocksize=16m
-rw=randwrite
-direct=1
-iodepth={QD_SEQ_W}
+blocksize=1m
+rw=write
+iodepth=1
+# offset_increment={OFFSET_INC}
+
+#[ceph_{TEST_SUMM}]
+#blocksize=16m
+#rw=randwrite
+#iodepth={QD_SEQ_W}
# ---------------------------------------------------------------------
# this is essentially sequential read operations
@@ -55,5 +59,4 @@
[ceph_{TEST_SUMM}]
blocksize=16m
rw=randread
-direct=1
iodepth={QD_SEQ_R}
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 40055d7..b8d1076 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,18 +1,23 @@
[global]
include defaults_qd.cfg
-# QD={% 32, 64, 128, 256 %}
-QD={% 32, 256 %}
-runtime=120
+# QD={% 16, 32, 64, 128 %}
+# QD={% 32, 256 %}
+QD=1
+runtime=10
direct=1
-ramp_time=15
+ramp_time=0
# ---------------------------------------------------------------------
[verify_{TEST_SUMM}]
-blocksize=4k
-rw=randwrite
-direct=1
-iodepth={QD}
+blocksize=1m
+rw=write
+iodepth=1
+
+#[verify_{TEST_SUMM}]
+#blocksize=16m
+#rw=randwrite
+#iodepth={% 1, 4 %}
# [verify_{TEST_SUMM}]
# blocksize=4k
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 9aa4ce6..36c349e 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -108,7 +108,7 @@
expected_run_time = int(sum(run_times) * 1.05)
exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
- logger.info("Entire test should takes around %s and finished at %s", exec_time_s, end_dt_s)
+ logger.info("Entire test should takes around %s and finish at %s", exec_time_s, end_dt_s)
for job in not_in_storage:
results = [] # type: List[TimeSeries]
@@ -120,7 +120,7 @@
expected_job_time = self.get_expected_runtime(job)
exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
- logger.info("Job should takes around %s and finished at %s", exec_time_s, end_dt_s)
+ logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
jfutures = [pool.submit(self.run_iteration, node, job) for node in self.suite.nodes]
failed = False