many updates and fixes
diff --git a/clib/interpolate.cpp b/clib/interpolate.cpp
index c7f2b50..d7186a7 100644
--- a/clib/interpolate.cpp
+++ b/clib/interpolate.cpp
@@ -101,36 +101,40 @@
const uint64_t * times,
unsigned int time_step,
uint64_t * output_idx,
- uint64_t empty_cell_placeholder)
+ uint64_t empty_cell_placeholder,
+ bool allow_broken_step)
{
auto input_end = times + input_size;
auto output_end = output_idx + output_size;
- float no_step = time_step * 0.1;
+
+ float no_step = time_step * (allow_broken_step ? 0.3 : 0.1);
float more_then_step = time_step * 1.9;
float step_min = time_step * 0.9;
- float step_max = time_step * 1.1;
+ float step_max = time_step * (allow_broken_step ? 1.9 : 1.1);
- auto cinput = times;
- auto ctime_val = *cinput - time_step;
+ auto curr_input_tm = times;
+ long int curr_output_tm = *curr_input_tm - time_step;
for(; output_idx < output_end; ++output_idx) {
// skip repetition of same time
- while(*cinput - ctime_val <= no_step and cinput < input_end)
- ++cinput;
+ while(((long int)*curr_input_tm - curr_output_tm) <= no_step and curr_input_tm < input_end)
+ ++curr_input_tm;
- if (cinput == input_end)
+ if (curr_input_tm == input_end)
break;
- auto dt = *cinput - ctime_val;
- if (dt <= step_max and dt > step_min) {
- *output_idx = cinput - times;
- ctime_val += time_step;
- } else if (dt >= more_then_step) {
+ long int dt = *curr_input_tm - curr_output_tm;
+// std::printf("dt=%ld curr_input_tm=%lu curr_output_tm=%ld\n", dt, *curr_input_tm, curr_output_tm);
+
+ if (dt <= step_max and (dt > step_min or allow_broken_step)) {
+ *output_idx = curr_input_tm - times;
+ } else if (dt >= more_then_step or (allow_broken_step and dt >= step_max)) {
*output_idx = empty_cell_placeholder;
- ctime_val += time_step;
} else
- return -(int)(cinput - times);
+ return -(int)(curr_input_tm - times);
+
+ curr_output_tm += time_step;
}
return output_size - (output_end - output_idx);
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index 843a672..4ded43f 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -33,6 +33,7 @@
var_dir_root: /tmp/perf_tests
settings_dir: ~/.wally
connect_timeout: 30
+max_time_diff_ms: 5000
rpc_log_level: DEBUG
include: logging.yaml
default_test_local_folder: "/tmp/wally_{name}_{uuid}"
diff --git a/configs-examples/perf_lab.yml b/configs-examples/perf_lab.yml
index eae4c5b..a9060fc 100644
--- a/configs-examples/perf_lab.yml
+++ b/configs-examples/perf_lab.yml
@@ -11,7 +11,7 @@
root@cz7626: testnode
root@cz7627: testnode
-# sleep: 5
+sleep: 30
tests:
- fio:
diff --git a/v2_plans.md b/v2_plans.md
index a2d5244..bcb4d54 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,41 +1,46 @@
TODO today:
-----------
-* Хранить OSD config отдельно в json для ускорения дампа/загрузки
-* Поправить границы heatmap - для QD они должны проходить по нечетным
- числам. Верхняя должна включать часть предыдущего интервала, etc
+* Интерполировать и кешировать сенсоры только для запрошенных интервалов
+* Resource consumption vs QD. сделать заголовок, поставить в
+ Engineering, так-же в ingeneering перенести
+* PerformanceSummary
+* для чтения нужно строить heatmap 'storage read block size', не write
+* iowait не учитывать в cpu_consumption
+ (вообще там только sys & user & nice & irq & sirq считать)
+* Неправильно IOPS vs QD генерируется
+* Resource consumption per service provided - подписи к графикам
+* Проверить генерацию png, сделать опцию для report - какие картинки
+ ренерировать
+* Storage nodes cpu non-idle heatmap
+* Check ceph fs flush time is larger that tests time
+* Collect nodes ram size, CPU cores counts and use it for
+ resource calculations
+* scipy.stats.shapiro для теста на нормальность
+* Построить resource consumption vs. QD
+* Проверить стоп по ctrl+c
+* bottleneck table
+* Рассмотреть pandas.dataframe как универсальный посредник для
+ ф-ций визуализации
+* Форматировать тики на графиках с помошью b2ssize/b2ssize_10
+* scipy.stats.probplot - QQ plot
+* Генерировать суммарный отчет -
* Маркать девайсы на нодах по ролям при диагностике нод
-* Собирать idle load. Отдельная стадия типа sleep после установки
- сенсоров, которая сохраняет время начала и конца слипа
+* Унифицировать имена настроек - e.g. hmap_XXX для хитмапа.
+* Может шестигранники вместо heatmap?
* Проверить и унифицировать все кеши. Отдельно поиск TS, который всегда
выдает Union[DataSource, Iterable[DataSource]]. Отдельно одна
унифицированная функция загрузки/постобработки, в которой все базовые
кеши. Дополнительно слой постобработки(агрегация по ролям, девайсам,
CPU) со своими кешами. Хранить original в TS.
-* Попробовать перевести 16m randwr в линейную операцию с
- предрасчитанными смещениями? offset, offset_increment
-* Текстовый репорт поломан
-* Проверять опции дебага на сефе/уменьшать их для теста
-* Подписи к IOPS vs QD не влезают, если данных много, нужно наклонить
-* Построить resource consumption vs. QD
-* Не показывать девиацию для слишком маленьких значений
-* Добавить io_wait на графики CPU
-* Отсортировать репорты по типу и длинне очереди
-* Репорт криво генерируется
-* Что делать с дырой в данных от сенсоров при перезапуске теста
-* Убивать фоновые процессы агента при стопе работы. Сделать доп фу-цию,
- в агенте, которая находит и убивает все дочерние процессы при запросе
- стопа
-* починить текстовый репорт
-* bottleneck table
-* Рассмотреть pandas.dataframe как универсальный посредник для
- ф-ций визуализации
-* Форматировать тики на графиках с помошью b2ssize/b2ssize_10
-* Сравнивать время на нодах и выдавать ошибку при его рассогласованности
-* scipy.stats.probplot - QQ plot
-* grid явно выставлять для выбранных графиков
+* Собирать репорт с кластера
+
+Проблемы
+--------
+
* Посмотреть почему тест дикки-фуллера так фигово работает
-* Генерировать суммарный отчет -
+* Что делать с IDLE load?
+* Что делать с дырой в данных от сенсоров при перезапуске теста
Wally состоит из частей, которые стоит разделить и унифицировать с другими тулами:
----------------------------------------------------------------------------------
@@ -216,7 +221,7 @@
2.0:
- * Сравнения билдов - пока по папкам из CLI, текcтовое
+ * Сравнения билдов - по папкам из CLI, текcтовое
* Занести интервал усреднения в конфиг
* починить SW & HW info, добавить настройки qemu и все такое
* Перед началом теста проверять наличие его результатов и скипать
@@ -244,33 +249,7 @@
* Зарефакторить запуск/мониторинг/оставнов процесса по SSH, запуск в фоне с чеком - в отдельную ф-цию
* prefill запускать в фоне и чекать периодически
* починить все подвисания во всех потоках - дампить стеки при подвисании и таймаут
- * При убивании - грохать все удаленные процессы. Хранить машины и пиды в контесте и в файле
* fadvise_hint=0
* Изменить в репорте сенсоров все на % от суммы от тестнод
* посмотреть что с сетевыми картами
* Intellectual granular sensors
-
-Стат-обработка:
- расчет async
- расчет количества измерений
- расчет смешанных IOPS
-
-
-Проверить работу вольюмов
-Чего такого с port fw
-python 2.6
-Почему тайминги некорректные
-Копировать в папку оригинальный конфиг
-реюз вольюмс сделать
-
-assumption_check.py
- почти все криво
-
-charts.py
- 1) генерировать картинки с фиксированными именами
-
-report.py
- украсить
-
-rest_api.py
- переписать на prest
diff --git a/wally/console_report.py b/wally/console_report.py
index 3733de1..ac54965 100644
--- a/wally/console_report.py
+++ b/wally/console_report.py
@@ -1,3 +1,6 @@
+import logging
+
+
import numpy
from cephlib.common import float2str
@@ -9,6 +12,11 @@
from .suits.io.fio import FioTest
from .statistic import calc_norm_stat_props, calc_histo_stat_props
from .suits.io.fio_hist import get_lat_vals
+from .data_selectors import get_aggregated
+
+
+logger = logging.getLogger("wally")
+
class ConsoleReportStage(Stage):
@@ -19,22 +27,26 @@
for suite in rstorage.iter_suite(FioTest.name):
table = texttable.Texttable(max_width=200)
- table.header(["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms'])
- table.set_cols_align(('l', 'r', 'r', 'r', 'r', 'r'))
+ tbl = rstorage.get_txt_report(suite)
+ if tbl is None:
+ table.header(["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms'])
+ table.set_cols_align(('l', 'r', 'r', 'r', 'r', 'r'))
- for job in sorted(rstorage.iter_job(suite), key=lambda job: job.params):
- bw_ts, = list(rstorage.iter_ts(suite, job, metric='bw'))
- props = calc_norm_stat_props(bw_ts)
- avg_iops = props.average // job.params.params['bsize']
- iops_dev = props.deviation // job.params.params['bsize']
+ for job in sorted(rstorage.iter_job(suite), key=lambda job: job.params):
+ bw_ts = get_aggregated(rstorage, suite, job, metric='bw')
+ props = calc_norm_stat_props(bw_ts)
+ avg_iops = props.average // job.params.params['bsize']
+ iops_dev = props.deviation // job.params.params['bsize']
- lat_ts, = list(rstorage.iter_ts(suite, job, metric='lat'))
- bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000 # convert us to ms
- lat_props = calc_histo_stat_props(lat_ts, bins_edges)
- table.add_row([job.params.summary,
- "{} ~ {}".format(float2str(avg_iops), float2str(iops_dev)),
- float2str(props.average / 1024), # Ki -> Mi
- "{}/{}".format(float2str(props.skew), float2str(props.kurt)),
- float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
+ lat_ts = get_aggregated(rstorage, suite, job, metric='lat')
+ bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000 # convert us to ms
+ lat_props = calc_histo_stat_props(lat_ts, bins_edges)
+ table.add_row([job.params.summary,
+ "{} ~ {}".format(float2str(avg_iops), float2str(iops_dev)),
+ float2str(props.average / 1024), # Ki -> Mi
+ "{}/{}".format(float2str(props.skew), float2str(props.kurt)),
+ float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
- print(table.draw())
+ tbl = table.draw()
+ rstorage.put_txt_report(suite, tbl)
+ print(tbl)
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 02d5075..a5ac400 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -72,20 +72,6 @@
tss = list(find_all_series(rstorage, suite, job, metric))
- # TODO replace this with universal interpolator
- # for ts in tss:
- # from_s = float(unit_conversion_coef('s', ts.time_units))
- # prev_time = ts.times[0]
- # res = [ts.data[0]]
- #
- # for ln, (tm, val) in enumerate(zip(ts.times[1:], ts.data[1:]), 1):
- # assert tm > prev_time, "Failed tm > prev_time, src={}, ln={}".format(ts.source, ln)
- # while tm - prev_time > from_s * 1.2:
- # res.append(0)
- # prev_time += from_s
- # res.append(val)
- # prev_time = tm
-
if len(tss) == 0:
raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
@@ -97,16 +83,16 @@
metric=metric,
tag='csv')
- agg_ts = TimeSeries(metric,
- raw=None,
- source=ds,
- data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
- times=tss[0].times.copy(),
- units=tss[0].units,
- histo_bins=tss[0].histo_bins,
- time_units=tss[0].time_units)
+ tss_inp = [c_interpolate_ts_on_seconds_border(ts, tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
+ res = None
+ trange = job.reliable_info_range_s
- for ts in tss:
+ for ts in tss_inp:
+ if ts.time_units != 's':
+ msg = "time_units must be 's' for fio sensor"
+ logger.error(msg)
+ raise ValueError(msg)
+
if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
@@ -119,16 +105,30 @@
logger.error(msg)
raise ValueError(msg)
- # TODO: match times on different ts
- if abs(len(agg_ts.data) - len(ts.data)) > 1:
- # import IPython
- # IPython.embed()
- pass
- assert abs(len(agg_ts.data) - len(ts.data)) <= 1, \
- "len(agg_ts.data)={}, len(ts.data)={}, need to be almost equals".format(len(agg_ts.data), len(ts.data))
+ assert trange[0] >= ts.times[0] and trange[1] <= ts.times[-1], \
+ "[{}, {}] not in [{}, {}]".format(ts.times[0], ts.times[-1], trange[0], trange[-1])
- mlen = min(len(agg_ts.data), len(ts.data))
- agg_ts.data[:mlen] += ts.data[:mlen]
+ idx1, idx2 = numpy.searchsorted(ts.times, trange)
+ idx2 += 1
+
+ assert (idx2 - idx1) == (trange[1] - trange[0] + 1), \
+ "Broken time array at {} for {}".format(trange, ts.source)
+
+ dt = ts.data[idx1: idx2]
+ if res is None:
+ res = dt
+ else:
+ assert res.shape == dt.shape, "res.shape(={}) != dt.shape(={})".format(res.shape, dt.shape)
+ res += dt
+
+ agg_ts = TimeSeries(metric,
+ raw=None,
+ source=ds,
+ data=res,
+ times=tss_inp[0].times.copy(),
+ units=tss_inp[0].units,
+ histo_bins=tss_inp[0].histo_bins,
+ time_units=tss_inp[0].time_units)
return agg_ts
@@ -136,103 +136,27 @@
interpolated_cache = {}
-def interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
- "Interpolate time series to values on seconds borders"
- logging.warning("This implementation of interpolate_ts_on_seconds_border is deplricated and should be updated")
-
- if not nc and ts.source.tpl in interpolated_cache:
- return interpolated_cache[ts.source.tpl]
-
- assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
- .format(len(ts.times), len(ts.data), ts.source)
-
- rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
-
- if isinstance(rcoef, Fraction):
- assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
- rcoef = rcoef.numerator
-
- assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
- coef = int(rcoef) # make typechecker happy
-
- # round to seconds border
- begin = int(ts.times[0] / coef + 1) * coef
- end = int(ts.times[-1] / coef) * coef
-
- # current real data time chunk begin time
- edge_it = iter(ts.times)
-
- # current real data value
- val_it = iter(ts.data)
-
- # result array, cumulative value per second
- result = numpy.empty([(end - begin) // coef], dtype=ts.data.dtype)
- idx = 0
- curr_summ = 0
-
- # end of current time slot
- results_cell_ends = begin + coef
-
- # hack to unify looping
- real_data_end = next(edge_it)
- while results_cell_ends <= end:
- real_data_start = real_data_end
- real_data_end = next(edge_it)
- real_val_left = next(val_it)
-
- # real data "speed" for interval [real_data_start, real_data_end]
- real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
-
- while real_data_end >= results_cell_ends and results_cell_ends <= end:
- # part of current real value, which is fit into current result cell
- curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
-
- # calculate rest of real data for next result cell
- real_val_left -= curr_real_chunk
- result[idx] = curr_summ + curr_real_chunk
- idx += 1
- curr_summ = 0
-
- # adjust real data start time
- real_data_start = results_cell_ends
- results_cell_ends += coef
-
- # don't lost any real data
- curr_summ += real_val_left
-
- assert idx == len(result), "Wrong output array size - idx(={}) != len(result)(={})".format(idx, len(result))
-
- res_ts = TimeSeries(ts.name, None, result,
- times=int(begin // coef) + numpy.arange(idx, dtype=ts.times.dtype),
- units=ts.units,
- time_units='s',
- source=ts.source(),
- histo_bins=ts.histo_bins)
-
- if not nc:
- interpolated_cache[ts.source.tpl] = res_ts
-
- return res_ts
-
-
c_interp_func_agg = None
c_interp_func_qd = None
c_interp_func_fio = None
-def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, tp: str = 'agg') -> TimeSeries:
+def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, tp: str = 'agg',
+ allow_broken_step: bool = False) -> TimeSeries:
"Interpolate time series to values on seconds borders"
key = (ts.source.tpl, tp)
if not nc and key in interpolated_cache:
return interpolated_cache[key].copy()
- # both data and times must be 1d compact arrays
- assert len(ts.data.strides) == 1, "ts.data.strides must be 1D, not " + repr(ts.data.strides)
- assert ts.data.dtype.itemsize == ts.data.strides[0], "ts.data array must be compact"
+ if tp in ('qd', 'agg'):
+ # both data and times must be 1d compact arrays
+ assert len(ts.data.strides) == 1, "ts.data.strides must be 1D, not " + repr(ts.data.strides)
+ assert ts.data.dtype.itemsize == ts.data.strides[0], "ts.data array must be compact"
+
assert len(ts.times.strides) == 1, "ts.times.strides must be 1D, not " + repr(ts.times.strides)
assert ts.times.dtype.itemsize == ts.times.strides[0], "ts.times array must be compact"
- assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
+ assert len(ts.times) == len(ts.data), "len(times)={} != len(data)={} for {!s}"\
.format(len(ts.times), len(ts.data), ts.source)
rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
@@ -278,6 +202,7 @@
ctypes.c_uint, # time_scale_coef
uint64_p, # output indexes
ctypes.c_uint64, # empty placeholder
+ ctypes.c_bool # allow broken steps
]
assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
@@ -287,6 +212,7 @@
result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
if tp in ('qd', 'agg'):
+ assert not allow_broken_step, "Broken steps aren't supported for non-fio arrays"
func = c_interp_func_qd if tp == 'qd' else c_interp_func_agg
sz = func(ts.data.size,
output_sz,
@@ -308,10 +234,10 @@
ts.times.ctypes.data_as(uint64_p),
coef,
ridx.ctypes.data_as(uint64_p),
- no_data)
-
+ no_data,
+ allow_broken_step)
if sz_or_err <= 0:
- raise ValueError("Error in input array at index %s. %s", -sz_or_err, ts.source)
+ raise ValueError("Error in input array at index {}. {}".format(-sz_or_err, ts.source))
rtimes = int(ts.times[0] // coef) + numpy.arange(sz_or_err, dtype=ts.times.dtype)
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
index 666c753..9f66030 100644
--- a/wally/hlstorage.py
+++ b/wally/hlstorage.py
@@ -1,7 +1,7 @@
import os
import pprint
import logging
-from typing import cast, Iterator, Tuple, Type, Dict, Optional, List
+from typing import cast, Iterator, Tuple, Type, Dict, Optional, List, Any
import numpy
@@ -43,6 +43,9 @@
report_root = 'report/'
plot_r = r'{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
+ txt_report = report_root + '{suite_id}_report.txt'
+
+ job_extra = 'meta/{suite_id}.{job_id}/{tag}'
job_cfg = job_cfg_r.replace("\\.", '.')
suite_cfg = suite_cfg_r.replace("\\.", '.')
@@ -170,7 +173,7 @@
:return: TimeSeries
"""
(units, time_units), header2, data = self.load_array(path)
- times = data[:,0]
+ times = data[:,0].copy()
ts_data = data[:,1:]
if ts_data.shape[1] == 1:
@@ -201,7 +204,8 @@
# there must be no histogram for collected_at
assert must_be_none is None, "Extra header2 {!r} in collect_at file at {!r}".format(must_be_none, path)
- assert collect_header == [ds.node_id, 'collected_at', 'us'],\
+ node, tp, units = collect_header
+ assert node == ds.node_id and tp == 'collected_at' and units in ('ms', 'us'),\
"Unexpected collect_at header {!r} at {!r}".format(collect_header, path)
assert len(collected_at.shape) == 1, "Collected_at must be 1D at {!r}".format(path)
@@ -222,7 +226,7 @@
times=collected_at,
source=ds,
units=data_units,
- time_units='us')
+ time_units=units)
# ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
@@ -365,4 +369,19 @@
cvls.update(groups)
yield path, DataSource(**cvls)
+ def get_txt_report(self, suite: SuiteConfig) -> Optional[str]:
+ path = DB_paths.txt_report.format(suite_id=suite.storage_id)
+ if path in self.storage:
+ return self.storage.get_raw(path).decode('utf8')
+ def put_txt_report(self, suite: SuiteConfig, report: str) -> None:
+ path = DB_paths.txt_report.format(suite_id=suite.storage_id)
+ self.storage.put_raw(report.encode('utf8'), path)
+
+ def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
+ path = DB_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+ self.storage.put(data, path)
+
+ def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
+ path = DB_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+ return self.storage.get(path, None)
diff --git a/wally/plot.py b/wally/plot.py
new file mode 100644
index 0000000..6729584
--- /dev/null
+++ b/wally/plot.py
@@ -0,0 +1,529 @@
+import logging
+from io import BytesIO
+from functools import wraps
+from typing import Tuple, cast, List, Callable, Optional, Any
+
+import numpy
+import scipy.stats
+import matplotlib.axis
+import matplotlib.style
+from matplotlib.ticker import FuncFormatter
+from matplotlib.figure import Figure
+import matplotlib.pyplot as plt
+
+# to make seaborn styles available
+import warnings
+with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ import seaborn
+
+from cephlib.plot import process_heatmap_data, hmap_from_2d, do_plot_hmap_with_histo
+
+from .hlstorage import ResultStorage
+from .utils import unit_conversion_coef
+from .statistic import moving_average, moving_dev, hist_outliers_perc, find_ouliers_ts, approximate_curve
+from .result_classes import StatProps, DataSource, TimeSeries, NormStatProps
+from .report_profiles import StyleProfile, ColorProfile
+from .resources import IOSummary
+
+
+logger = logging.getLogger("wally")
+
+
+# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
+
+def get_emb_image(fig: Figure, file_format: str, **opts) -> bytes:
+ bio = BytesIO()
+ if file_format == 'svg':
+ fig.savefig(bio, format='svg', **opts)
+ img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+ return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
+ else:
+ fig.savefig(bio, format=file_format, **opts)
+ return bio.getvalue()
+
+
+class PlotParams:
+ def __init__(self, fig: Figure, ax: Any, title: str,
+ style: StyleProfile, colors: ColorProfile) -> None:
+ self.fig = fig
+ self.ax = ax
+ self.style = style
+ self.colors = colors
+ self.title = title
+
+
+def provide_plot(noaxis: bool = False,
+ eng: bool = False,
+ no_legend: bool = False,
+ long_plot: bool = True,
+ grid: Any = None,
+ style_name: str = 'default',
+ noadjust: bool = False) -> Callable[..., Callable[..., str]]:
+ def closure1(func: Callable[..., None]) -> Callable[..., str]:
+ @wraps(func)
+ def closure2(storage: ResultStorage,
+ style: StyleProfile,
+ colors: ColorProfile,
+ path: DataSource,
+ title: Optional[str],
+ *args, **kwargs) -> str:
+ fpath = storage.check_plot_file(path)
+ if not fpath:
+
+ assert style_name in ('default', 'ioqd')
+ mlstyle = style.default_style if style_name == 'default' else style.io_chart_style
+ with matplotlib.style.context(mlstyle):
+ file_format = path.tag.split(".")[-1]
+ fig = plt.figure(figsize=style.figsize_long if long_plot else style.figsize)
+
+ if not noaxis:
+ xlabel = kwargs.pop('xlabel', None)
+ ylabel = kwargs.pop('ylabel', None)
+ ax = fig.add_subplot(111)
+
+ if xlabel is not None:
+ ax.set_xlabel(xlabel)
+
+ if ylabel is not None:
+ ax.set_ylabel(ylabel)
+
+ if grid:
+ ax.grid(axis=grid)
+ else:
+ ax = None
+
+ if title:
+ fig.suptitle(title, fontsize=style.title_font_size)
+
+ pp = PlotParams(fig, ax, title, style, colors)
+ func(pp, *args, **kwargs)
+ apply_style(pp, eng=eng, no_legend=no_legend, noadjust=noadjust)
+
+ fpath = storage.put_plot_file(get_emb_image(fig, file_format=file_format, dpi=style.dpi), path)
+ logger.debug("Plot %s saved to %r", path, fpath)
+ plt.close(fig)
+ return fpath
+ return closure2
+ return closure1
+
+
+def apply_style(pp: PlotParams, eng: bool = True, no_legend: bool = False, noadjust: bool = False) -> None:
+
+ if (pp.style.legend_for_eng or not eng) and not no_legend:
+ if not noadjust:
+ pp.fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.03, 0.81)
+
+ for ax in pp.fig.axes:
+ ax.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+ elif not noadjust:
+ pp.fig.subplots_adjust(right=StyleProfile.subplot_adjust_r_no_legend)
+
+ if pp.style.tide_layout:
+ pp.fig.set_tight_layout(True)
+
+
+# -------------- PLOT FUNCTIONS --------------------------------------------------------------------------------------
+
+
+@provide_plot(eng=True)
+def plot_hist(pp: PlotParams, units: str, prop: StatProps) -> None:
+
+ normed_bins = prop.bins_populations / prop.bins_populations.sum()
+ bar_width = prop.bins_edges[1] - prop.bins_edges[0]
+ pp.ax.bar(prop.bins_edges, normed_bins, color=pp.colors.box_color, width=bar_width, label="Real data")
+
+ pp.ax.set(xlabel=units, ylabel="Value probability")
+
+ if isinstance(prop, NormStatProps):
+ nprop = cast(NormStatProps, prop)
+ stats = scipy.stats.norm(nprop.average, nprop.deviation)
+
+ new_edges, step = numpy.linspace(prop.bins_edges[0], prop.bins_edges[-1],
+ len(prop.bins_edges) * 10, retstep=True)
+
+ ypoints = stats.cdf(new_edges) * 11
+ ypoints = [nextpt - prevpt for (nextpt, prevpt) in zip(ypoints[1:], ypoints[:-1])]
+ xpoints = (new_edges[1:] + new_edges[:-1]) / 2
+
+ pp.ax.plot(xpoints, ypoints, color=pp.colors.primary_color, label="Expected from\nnormal\ndistribution")
+
+ pp.ax.set_xlim(left=prop.bins_edges[0])
+ if prop.log_bins:
+ pp.ax.set_xscale('log')
+
+
+@provide_plot(grid='y')
+def plot_simple_over_time(pp: PlotParams, tss: List[Tuple[str, numpy.ndarray]], average: bool = False) -> None:
+ max_len = 0
+ for name, arr in tss:
+ if average:
+ avg_vals = moving_average(arr, pp.style.avg_range)
+ if pp.style.approx_average_no_points:
+ time_points = numpy.arange(len(avg_vals))
+ avg_vals = approximate_curve(cast(List[int], time_points),
+ avg_vals,
+ cast(List[int], time_points),
+ pp.style.curve_approx_level)
+ arr = avg_vals
+ pp.ax.plot(arr, label=name)
+ max_len = max(max_len, len(arr))
+ pp.ax.set_xlim(-5, max_len + 5)
+
+
+@provide_plot(no_legend=True, grid='x', noadjust=True)
+def plot_simple_bars(pp: PlotParams,
+ names: List[str],
+ values: List[float],
+ errs: List[float] = None,
+ x_formatter: Callable[[float, float], str] = None,
+ one_point_zero_line: bool = True) -> None:
+
+ ind = numpy.arange(len(names))
+ width = 0.35
+ pp.ax.barh(ind, values, width, xerr=errs)
+
+ pp.ax.set_yticks(ind)
+ pp.ax.set_yticklabels(names)
+ pp.ax.set_xlim(0, max(val + err for val, err in zip(values, errs)) * 1.1)
+
+ if one_point_zero_line:
+ pp.ax.axvline(x=1.0, color='r', linestyle='--', linewidth=1, alpha=0.5)
+
+ if x_formatter:
+ pp.ax.xaxis.set_major_formatter(FuncFormatter(x_formatter))
+
+ pp.fig.subplots_adjust(left=0.2)
+
+
+@provide_plot(no_legend=True, long_plot=True, noaxis=True)
+def plot_hmap_from_2d(pp: PlotParams, data2d: numpy.ndarray, xlabel: str, ylabel: str,
+ bins: numpy.ndarray = None) -> None:
+ ioq1d, ranges = hmap_from_2d(data2d)
+ heatmap, bins = process_heatmap_data(ioq1d, bin_ranges=ranges, bins=bins)
+ bins_populations, _ = numpy.histogram(ioq1d, bins)
+
+ ax, _ = do_plot_hmap_with_histo(pp.fig,
+ heatmap,
+ bins_populations,
+ bins,
+ cmap=pp.colors.hmap_cmap,
+ cbar=pp.style.heatmap_colorbar,
+ histo_grid=pp.style.histo_grid)
+ ax.set(ylabel=ylabel, xlabel=xlabel)
+
+
+@provide_plot(eng=True, grid='y')
+def plot_v_over_time(pp: PlotParams, units: str, ts: TimeSeries,
+ plot_avg_dev: bool = True, plot_points: bool = True) -> None:
+
+ min_time = min(ts.times)
+
+ # convert time to ms
+ coef = float(unit_conversion_coef(ts.time_units, 's'))
+ time_points = numpy.array([(val_time - min_time) * coef for val_time in ts.times])
+
+ outliers_idxs = find_ouliers_ts(ts.data, cut_range=pp.style.outliers_q_nd)
+ outliers_4q_idxs = find_ouliers_ts(ts.data, cut_range=pp.style.outliers_hide_q_nd)
+ normal_idxs = numpy.logical_not(outliers_idxs)
+ outliers_idxs = outliers_idxs & numpy.logical_not(outliers_4q_idxs)
+ # hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
+
+ data = ts.data[normal_idxs]
+ data_times = time_points[normal_idxs]
+ outliers = ts.data[outliers_idxs]
+ outliers_times = time_points[outliers_idxs]
+
+ if plot_points:
+ alpha = pp.colors.noise_alpha if plot_avg_dev else 1.0
+ pp.ax.plot(data_times, data, pp.style.point_shape, color=pp.colors.primary_color, alpha=alpha, label="Data")
+ pp.ax.plot(outliers_times, outliers, pp.style.err_point_shape, color=pp.colors.err_color, label="Outliers")
+
+ has_negative_dev = False
+ plus_minus = "\xb1"
+
+ if plot_avg_dev and len(data) < pp.style.avg_range * 2:
+ logger.warning("Array %r to small to plot average over %s points", pp.title, pp.style.avg_range)
+ elif plot_avg_dev:
+ avg_vals = moving_average(data, pp.style.avg_range)
+ dev_vals = moving_dev(data, pp.style.avg_range)
+ avg_times = moving_average(data_times, pp.style.avg_range)
+
+ if (plot_points and pp.style.approx_average) or (not plot_points and pp.style.approx_average_no_points):
+ avg_vals = approximate_curve(avg_times, avg_vals, avg_times, pp.style.curve_approx_level)
+ dev_vals = approximate_curve(avg_times, dev_vals, avg_times, pp.style.curve_approx_level)
+
+ pp.ax.plot(avg_times, avg_vals, c=pp.colors.suppl_color1, label="Average")
+
+ low_vals_dev = avg_vals - dev_vals * pp.style.dev_range_x
+ hight_vals_dev = avg_vals + dev_vals * pp.style.dev_range_x
+ if (pp.style.dev_range_x - int(pp.style.dev_range_x)) < 0.01:
+ pp.ax.plot(avg_times, low_vals_dev, c=pp.colors.suppl_color2,
+ label="{}{}*stdev".format(plus_minus, int(pp.style.dev_range_x)))
+ else:
+ pp.ax.plot(avg_times, low_vals_dev, c=pp.colors.suppl_color2,
+ label="{}{}*stdev".format(plus_minus, pp.style.dev_range_x))
+ pp.ax.plot(avg_times, hight_vals_dev, c=pp.colors.suppl_color2)
+ has_negative_dev = low_vals_dev.min() < 0
+
+ pp.ax.set_xlim(-5, max(time_points) + 5)
+ pp.ax.set_xlabel("Time, seconds from test begin")
+
+ if plot_avg_dev:
+ pp.ax.set_ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, pp.style.avg_range))
+ else:
+ pp.ax.set_ylabel(units)
+
+ if has_negative_dev:
+ pp.ax.set_ylim(bottom=0)
+
+
+@provide_plot(eng=True, no_legend=True, grid='y', noadjust=True)
+def plot_lat_over_time(pp: PlotParams, ts: TimeSeries) -> None:
+ times = ts.times - min(ts.times)
+ step = len(times) / pp.style.lat_samples
+ points = [times[int(i * step + 0.5)] for i in range(pp.style.lat_samples)]
+ points.append(times[-1])
+ bounds = list(zip(points[:-1], points[1:]))
+ agg_data = []
+ positions = []
+ labels = []
+
+ for begin, end in bounds:
+ agg_hist = ts.data[begin:end].sum(axis=0)
+
+ if pp.style.violin_instead_of_box:
+ # cut outliers
+ idx1, idx2 = hist_outliers_perc(agg_hist, pp.style.outliers_lat)
+ agg_hist = agg_hist[idx1:idx2]
+ curr_bins_vals = ts.histo_bins[idx1:idx2]
+
+ correct_coef = pp.style.violin_point_count / sum(agg_hist)
+ if correct_coef > 1:
+ correct_coef = 1
+ else:
+ curr_bins_vals = ts.histo_bins
+ correct_coef = 1
+
+ vals = numpy.empty(shape=[numpy.sum(agg_hist)], dtype='float32')
+ cidx = 0
+
+ non_zero, = agg_hist.nonzero()
+ for pos in non_zero:
+ count = int(agg_hist[pos] * correct_coef + 0.5)
+
+ if count != 0:
+ vals[cidx: cidx + count] = curr_bins_vals[pos]
+ cidx += count
+
+ agg_data.append(vals[:cidx])
+ positions.append((end + begin) / 2)
+ labels.append(str((end + begin) // 2))
+
+ if pp.style.violin_instead_of_box:
+ patches = pp.ax.violinplot(agg_data, positions=positions, showmeans=True, showmedians=True, widths=step / 2)
+ patches['cmeans'].set_color("blue")
+ patches['cmedians'].set_color("green")
+ if pp.style.legend_for_eng:
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.03, 0.81)
+ pp.ax.legend([patches['cmeans'], patches['cmedians']], ["mean", "median"],
+ loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+ else:
+ pp.ax.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
+
+ pp.ax.set_xlim(min(times), max(times))
+ pp.ax.set_xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
+ pp.fig.subplots_adjust(right=pp.style.subplot_adjust_r)
+
+
+@provide_plot(eng=True, no_legend=True, noaxis=True, long_plot=True)
+def plot_histo_heatmap(pp: PlotParams, ts: TimeSeries, ylabel: str, xlabel: str = "time, s") -> None:
+
+ # only histogram-based ts can be plotted
+ assert len(ts.data.shape) == 2
+
+ # Find global outliers. As load is expected to be stable during one job
+ # outliers range can be detected globally
+ total_hist = ts.data.sum(axis=0)
+ idx1, idx2 = hist_outliers_perc(total_hist,
+ bounds_perc=pp.style.outliers_lat,
+ min_bins_left=pp.style.hm_hist_bins_count)
+
+ # merge outliers with most close non-outliers cell
+ orig_data = ts.data[:, idx1:idx2].copy()
+ if idx1 > 0:
+ orig_data[:, 0] += ts.data[:, :idx1].sum(axis=1)
+
+ if idx2 < ts.data.shape[1]:
+ orig_data[:, -1] += ts.data[:, idx2:].sum(axis=1)
+
+ bins_vals = ts.histo_bins[idx1:idx2]
+
+ # rebin over X axis
+ # aggregate some lines in ts.data to plot ~style.hm_x_slots x bins
+ agg_idx = float(len(orig_data)) / pp.style.hm_x_slots
+ if agg_idx >= 2:
+ idxs = list(map(int, numpy.round(numpy.arange(0, len(orig_data) + 1, agg_idx))))
+ assert len(idxs) > 1
+ data = numpy.empty([len(idxs) - 1, orig_data.shape[1]], dtype=numpy.float32) # type: List[numpy.ndarray]
+ for idx, (sidx, eidx) in enumerate(zip(idxs[:-1], idxs[1:])):
+ data[idx] = orig_data[sidx:eidx,:].sum(axis=0) / (eidx - sidx)
+ else:
+ data = orig_data
+
+ # rebin over Y axis
+ # =================
+
+ # don't using rebin_histogram here, as we need apply same bins for many arrays
+ step = (bins_vals[-1] - bins_vals[0]) / pp.style.hm_hist_bins_count
+ new_bins_edges = numpy.arange(pp.style.hm_hist_bins_count) * step + bins_vals[0]
+ bin_mapping = numpy.clip(numpy.searchsorted(new_bins_edges, bins_vals) - 1, 0, len(new_bins_edges) - 1)
+
+ # map origin bins ranges to heatmap bins, iterate over rows
+ cmap = []
+ for line in data:
+ curr_bins = [0] * pp.style.hm_hist_bins_count
+ for idx, count in zip(bin_mapping, line):
+ curr_bins[idx] += count
+ cmap.append(curr_bins)
+ ncmap = numpy.array(cmap)
+
+ histo = ncmap.sum(axis=0).reshape((-1,))
+ ax, _ = do_plot_hmap_with_histo(pp.fig, ncmap, histo, new_bins_edges,
+ cmap=pp.colors.hmap_cmap,
+ cbar=pp.style.heatmap_colorbar, avg_labels=True)
+ ax.set(ylabel=ylabel, xlabel=xlabel)
+
+
+@provide_plot(eng=False, no_legend=True, grid='y', style_name='ioqd', noadjust=True)
+def io_chart(pp: PlotParams,
+ legend: str,
+ iosums: List[IOSummary],
+ iops_log_spine: bool = False,
+ lat_log_spine: bool = False) -> None:
+
+ # -------------- MAGIC VALUES ---------------------
+ # IOPS bar width
+ width = 0.2
+
+ # offset from center of bar to deviation/confidence range indicator
+ err_x_offset = 0.03
+
+ # extra space on top and bottom, comparing to maximal tight layout
+ extra_y_space = 0.05
+
+ # additional spine for BW/IOPS on left side of plot
+ extra_io_spine_x_offset = -0.1
+
+ # extra space on left and right sides
+ extra_x_space = 0.5
+
+ # legend location settings
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.1, 0.81)
+
+ # -------------- END OF MAGIC VALUES ---------------------
+
+ block_size = iosums[0].block_size
+ xpos = numpy.arange(1, len(iosums) + 1, dtype='uint')
+
+ coef_mb = float(unit_conversion_coef(iosums[0].bw.units, "MiBps"))
+ coef_iops = float(unit_conversion_coef(iosums[0].bw.units, "KiBps")) / block_size
+
+ iops_primary = block_size < pp.style.large_blocks
+
+ coef = coef_iops if iops_primary else coef_mb
+ pp.ax.set_ylabel("IOPS" if iops_primary else "BW (MiBps)")
+
+ vals = [iosum.bw.average * coef for iosum in iosums]
+
+ # set correct x limits for primary IO spine
+ min_io = min(iosum.bw.average - iosum.bw.deviation * pp.style.dev_range_x for iosum in iosums)
+ max_io = max(iosum.bw.average + iosum.bw.deviation * pp.style.dev_range_x for iosum in iosums)
+ border = (max_io - min_io) * extra_y_space
+ io_lims = (min_io - border, max_io + border)
+
+ pp.ax.set_ylim(io_lims[0] * coef, io_lims[-1] * coef)
+ pp.ax.bar(xpos - width / 2, vals, width=width, color=pp.colors.box_color, label=legend)
+
+ # plot deviation and confidence error ranges
+ err1_legend = err2_legend = None
+ for pos, iosum in zip(xpos, iosums):
+ dev_bar_pos = pos - err_x_offset
+ err1_legend = pp.ax.errorbar(dev_bar_pos,
+ iosum.bw.average * coef,
+ iosum.bw.deviation * pp.style.dev_range_x * coef,
+ alpha=pp.colors.subinfo_alpha,
+ color=pp.colors.suppl_color1) # 'magenta'
+
+ conf_bar_pos = pos + err_x_offset
+ err2_legend = pp.ax.errorbar(conf_bar_pos,
+ iosum.bw.average * coef,
+ iosum.bw.confidence * coef,
+ alpha=pp.colors.subinfo_alpha,
+ color=pp.colors.suppl_color2) # 'teal'
+
+ handles1, labels1 = pp.ax.get_legend_handles_labels()
+
+ handles1 += [err1_legend, err2_legend]
+ labels1 += ["{}% dev".format(pp.style.dev_perc),
+ "{}% conf".format(int(100 * iosums[0].bw.confidence_level))]
+
+ # extra y spine for latency on right side
+ ax2 = pp.ax.twinx()
+
+ # plot median and 95 perc latency
+ lat_coef_ms = float(unit_conversion_coef(iosums[0].lat.units, "ms"))
+ ax2.plot(xpos, [iosum.lat.perc_50 * lat_coef_ms for iosum in iosums], label="lat med")
+ ax2.plot(xpos, [iosum.lat.perc_95 * lat_coef_ms for iosum in iosums], label="lat 95%")
+
+ for grid_line in ax2.get_ygridlines():
+ grid_line.set_linestyle(":")
+
+ # extra y spine for BW/IOPS on left side
+ if pp.style.extra_io_spine:
+ ax3 = pp.ax.twinx()
+ if iops_log_spine:
+ ax3.set_yscale('log')
+
+ ax3.set_ylabel("BW (MiBps)" if iops_primary else "IOPS")
+ secondary_coef = coef_mb if iops_primary else coef_iops
+ ax3.set_ylim(io_lims[0] * secondary_coef, io_lims[1] * secondary_coef)
+ ax3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
+ ax3.spines["left"].set_visible(True)
+ ax3.yaxis.set_label_position('left')
+ ax3.yaxis.set_ticks_position('left')
+ else:
+ ax3 = None
+
+ ax2.set_ylabel("Latency (ms)")
+
+ # legend box
+ handles2, labels2 = ax2.get_legend_handles_labels()
+ pp.ax.legend(handles1 + handles2, labels1 + labels2, loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+
+ # limit and label x spine
+ pp.ax.set_xlim(extra_x_space, len(iosums) + extra_x_space)
+ pp.ax.set_xticks(xpos)
+ pp.ax.set_xticklabels(["{0}*{1}={2}".format(iosum.qd, iosum.nodes_count, iosum.qd * iosum.nodes_count)
+ for iosum in iosums],
+ rotation=30 if len(iosums) > 9 else 0)
+ pp.ax.set_xlabel("IO queue depth * test node count = total parallel requests")
+
+ # apply log scales for X spines, if set
+ if iops_log_spine:
+ pp.ax.set_yscale('log')
+
+ if lat_log_spine:
+ ax2.set_yscale('log')
+
+ # override some styles
+ pp.fig.set_size_inches(*pp.style.qd_chart_inches)
+ pp.fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
+
+ if pp.style.extra_io_spine:
+ ax3.grid(False)
+
diff --git a/wally/report.py b/wally/report.py
index 8c2c181..3ac1292 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,22 +1,12 @@
import os
import abc
import logging
-import warnings
-from io import BytesIO
-from functools import wraps
from collections import defaultdict
-from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set, Optional, Union
+from typing import Dict, Any, Iterator, Tuple, cast, List, Set, Optional, Union
import numpy
-import scipy.stats
-import matplotlib.style
-from matplotlib.figure import Figure
-import matplotlib.pyplot as plt
-from matplotlib import gridspec
from statsmodels.tsa.stattools import adfuller
-from cephlib.common import float2str
-from cephlib.plot import plot_hmap_with_y_histo, hmap_from_2d
import xmlbuilder3
import wally
@@ -26,21 +16,17 @@
from .test_run_class import TestRun
from .hlstorage import ResultStorage
from .utils import b2ssize, b2ssize_10, STORAGE_ROLES, unit_conversion_coef
-from .statistic import (calc_norm_stat_props, calc_histo_stat_props, moving_average, moving_dev,
- hist_outliers_perc, find_ouliers_ts, approximate_curve)
-from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig)
+from .statistic import calc_norm_stat_props
+from .result_classes import DataSource, TimeSeries, SuiteConfig
from .suits.io.fio import FioTest, FioJobConfig
from .suits.io.fio_job import FioJobParams
from .suits.job import JobConfig
-from .data_selectors import (get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles,
- get_ts_for_time_range)
-
-
-with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- import seaborn
-
-
+from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+from .report_profiles import (DefStyleProfile, DefColorProfile, StyleProfile, ColorProfile,
+ default_format, io_chart_format)
+from .plot import (io_chart, plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
+ plot_histo_heatmap, plot_v_over_time, plot_hist)
+from .resources import ResourceNames, get_resources_usage, make_iosum, IOSummary, get_cluster_cpu_load
logger = logging.getLogger("wally")
@@ -50,100 +36,12 @@
DEBUG = False
-# ---------------- PROFILES ------------------------------------------------------------------------------------------
-
-
-# this is default values, real values is loaded from config
-
-class ColorProfile:
- primary_color = 'b'
- suppl_color1 = 'teal'
- suppl_color2 = 'magenta'
- suppl_color3 = 'orange'
- box_color = 'y'
- err_color = 'red'
-
- noise_alpha = 0.3
- subinfo_alpha = 0.7
-
- imshow_colormap = None # type: str
- hmap_cmap = "Blues"
-
-
-default_format = 'svg'
-io_chart_format = 'svg'
-
-
-class StyleProfile:
- default_style = 'seaborn-white'
- io_chart_style = 'classic'
-
- dpi = 80
- grid = True
- tide_layout = False
- hist_boxes = 10
- hist_lat_boxes = 25
- hm_hist_bins_count = 25
- hm_x_slots = 25
- min_points_for_dev = 5
-
- x_label_rotation = 35
-
- dev_range_x = 2.0
- dev_perc = 95
-
- point_shape = 'o'
- err_point_shape = '*'
-
- avg_range = 20
- approx_average = True
-
- curve_approx_level = 6
- curve_approx_points = 100
- assert avg_range >= min_points_for_dev
-
- # figure size in inches
- figsize = (8, 4)
- figsize_long = (8, 4)
- qd_chart_inches = (16, 9)
-
- subplot_adjust_r = 0.75
- subplot_adjust_r_no_legend = 0.9
- title_font_size = 12
-
- extra_io_spine = True
-
- legend_for_eng = True
- # heatmap_interpolation = '1d'
- heatmap_interpolation = None
- heatmap_interpolation_points = 300
- outliers_q_nd = 3.0
- outliers_hide_q_nd = 4.0
- outliers_lat = (0.01, 0.9)
-
- violin_instead_of_box = True
- violin_point_count = 30000
-
- heatmap_colorbar = False
-
- min_iops_vs_qd_jobs = 3
-
- qd_bins = [0, 1, 2, 4, 6, 8, 12, 16, 20, 26, 32, 40, 48, 56, 64, 96, 128]
- iotime_bins = list(range(0, 1030, 50))
- block_size_bins = [0, 2, 4, 8, 16, 32, 48, 64, 96, 128, 192, 256, 384, 512, 1024, 2048]
- large_blocks = 256
-
-
-DefColorProfile = ColorProfile()
-DefStyleProfile = StyleProfile()
-
-
# ---------------- STRUCTS -------------------------------------------------------------------------------------------
# TODO: need to be revised, have to user StatProps fields instead
class StoragePerfSummary:
- def __init__(self, name: str) -> None:
+ def __init__(self) -> None:
self.direct_iops_r_max = 0 # type: int
self.direct_iops_w_max = 0 # type: int
@@ -163,55 +61,9 @@
self.lat_95 = None # type: float
-class IOSummary:
- def __init__(self,
- qd: int,
- block_size: int,
- nodes_count:int,
- bw: NormStatProps,
- lat: HistoStatProps) -> None:
-
- self.qd = qd
- self.nodes_count = nodes_count
- self.block_size = block_size
-
- self.bw = bw
- self.lat = lat
-
-
# -------------- AGGREGATION AND STAT FUNCTIONS ----------------------------------------------------------------------
-iosum_cache = {} # type: Dict[Tuple[str, str]]
-
-
-def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, nc: bool = False) -> IOSummary:
- key = (suite.storage_id, job.storage_id)
- if not nc and key in iosum_cache:
- return iosum_cache[key]
-
- lat = get_aggregated(rstorage, suite, job, "lat")
- io = get_aggregated(rstorage, suite, job, "bw")
-
- res = IOSummary(job.qd,
- nodes_count=len(suite.nodes_ids),
- block_size=job.bsize,
- lat=calc_histo_stat_props(lat, rebins_count=StyleProfile.hist_boxes),
- bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
-
- if not nc:
- iosum_cache[key] = res
-
- return res
-
-
-def is_sensor_numarray(sensor: str, metric: str) -> bool:
- """Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
- return True
-
-
-LEVEL_SENSORS = {("block-io", "io_queue"),
- ("system-cpu", "procs_blocked"),
- ("system-cpu", "procs_queue")}
+LEVEL_SENSORS = {("block-io", "io_queue"), ("system-cpu", "procs_blocked"), ("system-cpu", "procs_queue")}
def is_level_sensor(sensor: str, metric: str) -> bool:
@@ -224,576 +76,12 @@
return not is_level_sensor(sensor, metric)
-cpu_load_cache = {} # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
-
-
-def get_cluster_cpu_load(rstorage: ResultStorage, roles: List[str],
- time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
-
- key = (id(rstorage), tuple(roles), time_range)
- if not nc and key in cpu_load_cache:
- return cpu_load_cache[key]
-
- cpu_ts = {}
- cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
- for name in cpu_metrics:
- cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=time_range)
-
- it = iter(cpu_ts.values())
- total_over_time = next(it).data.copy() # type: numpy.ndarray
- for ts in it:
- if ts is not None:
- total_over_time += ts.data
-
- total = cpu_ts['idle'].copy(no_data=True)
- total.data = total_over_time
- cpu_ts['total'] = total
-
- if not nc:
- cpu_load_cache[key] = cpu_ts
-
- return cpu_ts
-
-
-# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
-
-def get_emb_image(fig: Figure, format: str, **opts) -> bytes:
- bio = BytesIO()
- if format == 'svg':
- fig.savefig(bio, format='svg', **opts)
- img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
- return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
- else:
- fig.savefig(bio, format=format, **opts)
- return bio.getvalue()
-
-
-def provide_plot(func: Callable[..., None]) -> Callable[..., str]:
- @wraps(func)
- def closure1(storage: ResultStorage,
- path: DataSource,
- *args, **kwargs) -> str:
- fpath = storage.check_plot_file(path)
- if not fpath:
- format = path.tag.split(".")[-1]
- fig = plt.figure(figsize=StyleProfile.figsize)
- plt.style.use(StyleProfile.default_style)
- func(fig, *args, **kwargs)
- fpath = storage.put_plot_file(get_emb_image(fig, format=format, dpi=DefStyleProfile.dpi), path)
- logger.debug("Plot %s saved to %r", path, fpath)
- plt.close(fig)
- return fpath
- return closure1
-
-
-def apply_style(fig: Figure, title: str, style: StyleProfile, eng: bool = True,
- no_legend: bool = False) -> None:
-
- for ax in fig.axes:
- ax.grid(style.grid)
-
- if (style.legend_for_eng or not eng) and not no_legend:
- fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
- legend_location = "center left"
- legend_bbox_to_anchor = (1.03, 0.81)
- for ax in fig.axes:
- ax.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
- else:
- fig.subplots_adjust(right=StyleProfile.subplot_adjust_r_no_legend)
-
- if style.tide_layout:
- fig.set_tight_layout(True)
-
- fig.suptitle(title, fontsize=style.title_font_size)
-
-
-# -------------- PLOT FUNCTIONS --------------------------------------------------------------------------------------
-
-
-@provide_plot
-def plot_hist(fig: Figure, title: str, units: str,
- prop: StatProps,
- colors: ColorProfile = DefColorProfile,
- style: StyleProfile = DefStyleProfile) -> None:
-
- ax = fig.add_subplot(111)
-
- # TODO: unit should came from ts
- normed_bins = prop.bins_populations / prop.bins_populations.sum()
- bar_width = prop.bins_edges[1] - prop.bins_edges[0]
- ax.bar(prop.bins_edges, normed_bins, color=colors.box_color, width=bar_width, label="Real data")
-
- ax.set(xlabel=units, ylabel="Value probability")
-
- dist_plotted = False
- if isinstance(prop, NormStatProps):
- nprop = cast(NormStatProps, prop)
- stats = scipy.stats.norm(nprop.average, nprop.deviation)
-
- new_edges, step = numpy.linspace(prop.bins_edges[0], prop.bins_edges[-1],
- len(prop.bins_edges) * 10, retstep=True)
-
- ypoints = stats.cdf(new_edges) * 11
- ypoints = [next - prev for (next, prev) in zip(ypoints[1:], ypoints[:-1])]
- xpoints = (new_edges[1:] + new_edges[:-1]) / 2
-
- ax.plot(xpoints, ypoints, color=colors.primary_color, label="Expected from\nnormal\ndistribution")
- dist_plotted = True
-
- ax.set_xlim(left=prop.bins_edges[0])
- if prop.log_bins:
- ax.set_xscale('log')
-
- apply_style(fig, title, style, eng=True, no_legend=not dist_plotted)
-
-
-@provide_plot
-def plot_simple_over_time(fig: Figure,
- tss: List[Tuple[str, numpy.ndarray]],
- title: str,
- ylabel: str,
- xlabel: str = "time, s",
- average: bool = False,
- colors: ColorProfile = DefColorProfile,
- style: StyleProfile = DefStyleProfile) -> None:
- ax = fig.add_subplot(111)
- for name, arr in tss:
- if average:
- avg_vals = moving_average(arr, style.avg_range)
- if style.approx_average:
- time_points = numpy.arange(len(avg_vals))
- avg_vals = approximate_curve(time_points, avg_vals, time_points, style.curve_approx_level)
- arr = avg_vals
- ax.plot(arr, label=name)
- ax.set(xlabel=xlabel, ylabel=ylabel)
- apply_style(fig, title, style, eng=True)
-
-
-@provide_plot
-def plot_simple_bars(fig: Figure,
- title: str,
- names: List[str],
- values: List[float],
- errs: List[float] = None,
- colors: ColorProfile = DefColorProfile,
- style: StyleProfile = DefStyleProfile) -> None:
-
- ax = fig.add_subplot(111)
- ind = numpy.arange(len(names))
- width = 0.35
- ax.barh(ind, values, width, xerr=errs)
-
- ax.set_yticks(ind + width / 2)
- ax.set_yticklabels(names)
- ax.set_xlim(0, max(val + err for val, err in zip(values, errs)) * 1.1)
-
- apply_style(fig, title, style, no_legend=True)
- ax.axvline(x=1.0, color='r', linestyle='--', linewidth=1, alpha=0.5)
- fig.subplots_adjust(left=0.2)
-
-
-@provide_plot
-def plot_hmap_from_2d(fig: Figure,
- data2d: numpy.ndarray,
- title: str, ylabel: str, xlabel: str = 'time, s', bins: numpy.ndarray = None,
- colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
- fig.set_size_inches(*style.figsize_long)
- ioq1d, ranges = hmap_from_2d(data2d)
- ax, _ = plot_hmap_with_y_histo(fig, ioq1d, ranges, bins=bins, cmap=colors.hmap_cmap)
- ax.set(ylabel=ylabel, xlabel=xlabel)
- apply_style(fig, title, style, no_legend=True)
-
-
-@provide_plot
-def plot_v_over_time(fig: Figure,
- title: str,
- units: str,
- ts: TimeSeries,
- plot_avg_dev: bool = True,
- plot_points: bool = True,
- colors: ColorProfile = DefColorProfile,
- style: StyleProfile = DefStyleProfile) -> None:
-
- min_time = min(ts.times)
-
- # convert time to ms
- coef = float(unit_conversion_coef(ts.time_units, 's'))
- time_points = numpy.array([(val_time - min_time) * coef for val_time in ts.times])
-
- outliers_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_q_nd)
- outliers_4q_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_hide_q_nd)
- normal_idxs = numpy.logical_not(outliers_idxs)
- outliers_idxs = outliers_idxs & numpy.logical_not(outliers_4q_idxs)
- # hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
-
- data = ts.data[normal_idxs]
- data_times = time_points[normal_idxs]
- outliers = ts.data[outliers_idxs]
- outliers_times = time_points[outliers_idxs]
-
- ax = fig.add_subplot(111)
-
- if plot_points:
- alpha = colors.noise_alpha if plot_avg_dev else 1.0
- ax.plot(data_times, data, style.point_shape,
- color=colors.primary_color, alpha=alpha, label="Data")
- ax.plot(outliers_times, outliers, style.err_point_shape,
- color=colors.err_color, label="Outliers")
-
- has_negative_dev = False
- plus_minus = "\xb1"
-
- if plot_avg_dev and len(data) < style.avg_range * 2:
- logger.warning("Array %r to small to plot average over %s points", title, style.avg_range)
- elif plot_avg_dev:
- avg_vals = moving_average(data, style.avg_range)
- dev_vals = moving_dev(data, style.avg_range)
- avg_times = moving_average(data_times, style.avg_range)
-
- if style.approx_average:
- avg_vals = approximate_curve(avg_times, avg_vals, avg_times, style.curve_approx_level)
- dev_vals = approximate_curve(avg_times, dev_vals, avg_times, style.curve_approx_level)
-
- ax.plot(avg_times, avg_vals, c=colors.suppl_color1, label="Average")
-
- low_vals_dev = avg_vals - dev_vals * style.dev_range_x
- hight_vals_dev = avg_vals + dev_vals * style.dev_range_x
- if style.dev_range_x - int(style.dev_range_x) < 0.01:
- ax.plot(avg_times, low_vals_dev, c=colors.suppl_color2,
- label="{}{}*stdev".format(plus_minus, int(style.dev_range_x)))
- else:
- ax.plot(avg_times, low_vals_dev, c=colors.suppl_color2,
- label="{}{}*stdev".format(plus_minus, style.dev_range_x))
- ax.plot(avg_times, hight_vals_dev, c=colors.suppl_color2)
- has_negative_dev = low_vals_dev.min() < 0
-
- ax.set_xlim(-5, max(time_points) + 5)
- ax.set_xlabel("Time, seconds from test begin")
-
- if plot_avg_dev:
- ax.set_ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, style.avg_range))
- else:
- ax.set_ylabel(units)
-
- if has_negative_dev:
- ax.set_ylim(bottom=0)
-
- apply_style(fig, title, style, eng=True)
-
-
-@provide_plot
-def plot_lat_over_time(fig: Figure,
- title: str,
- ts: TimeSeries,
- ylabel: str,
- samples: int = 5,
- colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
-
- min_time = min(ts.times)
- times = [int(tm - min_time + 500) // 1000 for tm in ts.times]
- ts_len = len(times)
- step = ts_len / samples
- points = [times[int(i * step + 0.5)] for i in range(samples)]
- points.append(times[-1])
- bounds = list(zip(points[:-1], points[1:]))
- agg_data = []
- positions = []
- labels = []
-
- for begin, end in bounds:
- agg_hist = ts.data[begin:end].sum(axis=0)
-
- if style.violin_instead_of_box:
- # cut outliers
- idx1, idx2 = hist_outliers_perc(agg_hist, style.outliers_lat)
- agg_hist = agg_hist[idx1:idx2]
- curr_bins_vals = ts.histo_bins[idx1:idx2]
-
- correct_coef = style.violin_point_count / sum(agg_hist)
- if correct_coef > 1:
- correct_coef = 1
- else:
- curr_bins_vals = ts.histo_bins
- correct_coef = 1
-
- vals = numpy.empty(shape=[numpy.sum(agg_hist)], dtype='float32')
- cidx = 0
-
- non_zero, = agg_hist.nonzero()
- for pos in non_zero:
- count = int(agg_hist[pos] * correct_coef + 0.5)
-
- if count != 0:
- vals[cidx: cidx + count] = curr_bins_vals[pos]
- cidx += count
-
- agg_data.append(vals[:cidx])
- positions.append((end + begin) / 2)
- labels.append(str((end + begin) // 2))
-
- ax = fig.add_subplot(111)
- if style.violin_instead_of_box:
- patches = ax.violinplot(agg_data,
- positions=positions,
- showmeans=True,
- showmedians=True,
- widths=step / 2)
-
- patches['cmeans'].set_color("blue")
- patches['cmedians'].set_color("green")
- if style.legend_for_eng:
- legend_location = "center left"
- legend_bbox_to_anchor = (1.03, 0.81)
- ax.legend([patches['cmeans'], patches['cmedians']], ["mean", "median"],
- loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
- else:
- ax.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
-
- ax.set_xlim(min(times), max(times))
- ax.set(ylabel=ylabel, xlabel="Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
- apply_style(fig, title, style, eng=True, no_legend=True)
- fig.subplots_adjust(right=style.subplot_adjust_r)
-
-
-@provide_plot
-def plot_histo_heatmap(fig: Figure,
- title: str,
- ts: TimeSeries,
- ylabel: str,
- xlabel: str = "time, s",
- colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
-
- fig.set_size_inches(*style.figsize_long)
-
- # only histogram-based ts can be plotted
- assert len(ts.data.shape) == 2
-
- # Find global outliers. As load is expected to be stable during one job
- # outliers range can be detected globally
- total_hist = ts.data.sum(axis=0)
- idx1, idx2 = hist_outliers_perc(total_hist,
- bounds_perc=style.outliers_lat,
- min_bins_left=style.hm_hist_bins_count)
-
- # merge outliers with most close non-outliers cell
- orig_data = ts.data[:, idx1:idx2].copy()
- if idx1 > 0:
- orig_data[:, 0] += ts.data[:, :idx1].sum(axis=1)
-
- if idx2 < ts.data.shape[1]:
- orig_data[:, -1] += ts.data[:, idx2:].sum(axis=1)
-
- bins_vals = ts.histo_bins[idx1:idx2]
-
- # rebin over X axis
- # aggregate some lines in ts.data to plot not more than style.hm_x_slots x bins
- agg_idx = float(len(orig_data)) / style.hm_x_slots
- if agg_idx >= 2:
- data = numpy.zeros([style.hm_x_slots, orig_data.shape[1]], dtype=numpy.float32) # type: List[numpy.ndarray]
- next = agg_idx
- count = 0
- data_idx = 0
- for idx, arr in enumerate(orig_data):
- if idx >= next:
- data[data_idx] /= count
- data_idx += 1
- next += agg_idx
- count = 0
- data[data_idx] += arr
- count += 1
-
- if count > 1:
- data[-1] /= count
- else:
- data = orig_data
-
- # rebin over Y axis
- # =================
-
- # don't using rebin_histogram here, as we need apply same bins for many arrays
- step = (bins_vals[-1] - bins_vals[0]) / style.hm_hist_bins_count
- new_bins_edges = numpy.arange(style.hm_hist_bins_count) * step + bins_vals[0]
- bin_mapping = numpy.clip(numpy.searchsorted(new_bins_edges, bins_vals) - 1, 0, len(new_bins_edges) - 1)
-
- # map origin bins ranges to heatmap bins, iterate over rows
- cmap = []
- for line in data:
- curr_bins = [0] * style.hm_hist_bins_count
- for idx, count in zip(bin_mapping, line):
- curr_bins[idx] += count
- cmap.append(curr_bins)
- ncmap = numpy.array(cmap)
-
- # plot data
- # =========
-
- boxes = 3
- gs = gridspec.GridSpec(1, boxes)
- ax = fig.add_subplot(gs[0, :boxes - 1])
-
- labels = list(map(float2str, (new_bins_edges[:-1] + new_bins_edges[1:]) / 2)) + \
- [float2str(new_bins_edges[-1]) + "+"]
- seaborn.heatmap(ncmap[:,::-1].T, xticklabels=False, cmap="Blues", ax=ax)
- ax.set_yticklabels(labels, rotation='horizontal')
- ax.set_xticklabels([])
-
- # plot overall histogram
- # =======================
-
- ax2 = fig.add_subplot(gs[0, boxes - 1])
- ax2.set_yticklabels([])
- ax2.set_xticklabels([])
-
- histo = ncmap.sum(axis=0).reshape((-1,))
- ax2.set_ylim(top=histo.size, bottom=0)
- ax2.barh(numpy.arange(histo.size) + 0.5, width=histo)
-
- ax.set(ylabel=ylabel, xlabel=xlabel)
-
- apply_style(fig, title, style, eng=True, no_legend=True)
-
-
-@provide_plot
-def io_chart(fig: Figure,
- title: str,
- legend: str,
- iosums: List[IOSummary],
- iops_log_spine: bool = False,
- lat_log_spine: bool = False,
- colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
-
- # -------------- MAGIC VALUES ---------------------
- # IOPS bar width
- width = 0.2
-
- # offset from center of bar to deviation/confidence range indicator
- err_x_offset = 0.03
-
- # extra space on top and bottom, comparing to maximal tight layout
- extra_y_space = 0.05
-
- # additional spine for BW/IOPS on left side of plot
- extra_io_spine_x_offset = -0.1
-
- # extra space on left and right sides
- extra_x_space = 0.5
-
- # legend location settings
- legend_location = "center left"
- legend_bbox_to_anchor = (1.1, 0.81)
-
- # -------------- END OF MAGIC VALUES ---------------------
-
- matplotlib.style.use(style.io_chart_style)
-
- block_size = iosums[0].block_size
- xpos = numpy.arange(1, len(iosums) + 1, dtype='uint')
-
- ax = fig.add_subplot(111)
-
- coef_mb = float(unit_conversion_coef(iosums[0].bw.units, "MiBps"))
- coef_iops = float(unit_conversion_coef(iosums[0].bw.units, "KiBps")) / block_size
-
- iops_primary = block_size < style.large_blocks
-
- coef = coef_iops if iops_primary else coef_mb
- ax.set_ylabel("IOPS" if iops_primary else "BW (MiBps)")
-
- vals = [iosum.bw.average * coef for iosum in iosums]
-
- # set correct x limits for primary IO spine
- min_io = min(iosum.bw.average - iosum.bw.deviation * style.dev_range_x for iosum in iosums)
- max_io = max(iosum.bw.average + iosum.bw.deviation * style.dev_range_x for iosum in iosums)
- border = (max_io - min_io) * extra_y_space
- io_lims = (min_io - border, max_io + border)
-
- ax.set_ylim(io_lims[0] * coef, io_lims[-1] * coef)
- ax.bar(xpos - width / 2, vals, width=width, color=colors.box_color, label=legend)
-
- # plot deviation and confidence error ranges
- err1_legend = err2_legend = None
- for pos, iosum in zip(xpos, iosums):
- dev_bar_pos = pos - err_x_offset
- err1_legend = ax.errorbar(dev_bar_pos,
- iosum.bw.average * coef,
- iosum.bw.deviation * style.dev_range_x * coef,
- alpha=colors.subinfo_alpha,
- color=colors.suppl_color1) # 'magenta'
-
- conf_bar_pos = pos + err_x_offset
- err2_legend = ax.errorbar(conf_bar_pos,
- iosum.bw.average * coef,
- iosum.bw.confidence * coef,
- alpha=colors.subinfo_alpha,
- color=colors.suppl_color2) # 'teal'
-
- if style.grid:
- ax.grid(True)
-
- handles1, labels1 = ax.get_legend_handles_labels()
-
- handles1 += [err1_legend, err2_legend]
- labels1 += ["{}% dev".format(style.dev_perc),
- "{}% conf".format(int(100 * iosums[0].bw.confidence_level))]
-
- # extra y spine for latency on right side
- ax2 = ax.twinx()
-
- # plot median and 95 perc latency
- lat_coef_ms = float(unit_conversion_coef(iosums[0].lat.units, "ms"))
- ax2.plot(xpos, [iosum.lat.perc_50 * lat_coef_ms for iosum in iosums], label="lat med")
- ax2.plot(xpos, [iosum.lat.perc_95 * lat_coef_ms for iosum in iosums], label="lat 95%")
-
- for grid_line in ax2.get_ygridlines():
- grid_line.set_linestyle(":")
-
- # extra y spine for BW/IOPS on left side
- if style.extra_io_spine:
- ax3 = ax.twinx()
- if iops_log_spine:
- ax3.set_yscale('log')
-
- ax3.set_ylabel("BW (MiBps)" if iops_primary else "IOPS")
- secondary_coef = coef_mb if iops_primary else coef_iops
- ax3.set_ylim(io_lims[0] * secondary_coef, io_lims[1] * secondary_coef)
- ax3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
- ax3.spines["left"].set_visible(True)
- ax3.yaxis.set_label_position('left')
- ax3.yaxis.set_ticks_position('left')
- else:
- ax3 = None
-
- ax2.set_ylabel("Latency (ms)")
-
- # legend box
- handles2, labels2 = ax2.get_legend_handles_labels()
- ax.legend(handles1 + handles2, labels1 + labels2,
- loc=legend_location,
- bbox_to_anchor=legend_bbox_to_anchor)
-
- # limit and label x spine
- ax.set_xlim(extra_x_space, len(iosums) + extra_x_space)
- ax.set_xticks(xpos)
- ax.set_xticklabels(["{0} * {1} = {2}".format(iosum.qd, iosum.nodes_count, iosum.qd * iosum.nodes_count)
- for iosum in iosums])
- ax.set_xlabel("IO queue depth * test node count = total parallel requests")
-
- # apply log scales for X spines, if set
- if iops_log_spine:
- ax.set_yscale('log')
-
- if lat_log_spine:
- ax2.set_yscale('log')
-
- # adjust central box size to fit legend
- apply_style(fig, title, style, eng=False, no_legend=True)
-
- # override some styles
- fig.set_size_inches(*style.qd_chart_inches)
- fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
-
- if style.extra_io_spine:
- ax3.grid(False)
+# def get_idle_load(rstorage: ResultStorage, *args, **kwargs) -> float:
+# if 'idle' not in rstorage.storage:
+# return 0.0
+# idle_time = rstorage.storage.get('idle')
+# ssum = summ_sensors(rstorage, time_range=idle_time, *args, **kwargs)
+# return numpy.average(ssum)
# -------------------- REPORT HELPERS --------------------------------------------------------------------------------
@@ -842,6 +130,7 @@
class Menu2ndSumm:
io_lat_qd = "IO & Lat vs QD"
+ cpu_usage_qd = "CPU usage"
menu_1st_order = [Menu1st.summary, Menu1st.engineering, Menu1st.per_job]
@@ -849,49 +138,68 @@
# -------------------- REPORTS --------------------------------------------------------------------------------------
+class ReporterBase:
+ def __init__(self, rstorage: ResultStorage, style: StyleProfile, colors: ColorProfile) -> None:
+ self.style = style
+ self.colors = colors
+ self.rstorage = rstorage
-class Reporter(metaclass=abc.ABCMeta):
- suite_types = set() # type: Set[str]
+ def plt(self, func, ds: DataSource, *args, **kwargs) -> str:
+ return func(self.rstorage, self.style, self.colors, ds, *args, **kwargs)
+
+
+class SuiteReporter(ReporterBase, metaclass=abc.ABCMeta):
+ suite_types = set() # type: Set[str]
@abc.abstractmethod
- def get_divs(self, suite: SuiteConfig, storage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
pass
-class JobReporter(metaclass=abc.ABCMeta):
+class JobReporter(ReporterBase, metaclass=abc.ABCMeta):
suite_type = set() # type: Set[str]
@abc.abstractmethod
- def get_divs(self,
- suite: SuiteConfig,
- job: JobConfig,
- storage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
pass
-# Main performance report
-class PerformanceSummary(Reporter):
- """Aggregated summary fro storage"""
+# # Linearization report
+# class IOPSBsize(SuiteReporter):
+# """Creates graphs, which show how IOPS and Latency depend on block size"""
+#
+#
+# # Main performance report
+# class PerformanceSummary(SuiteReporter):
+# """Aggregated summary fro storage"""
+
+# # Node load over test time
+# class NodeLoad(SuiteReporter):
+# """IOPS/latency during test"""
+
+# # Ceph operation breakout report
+# class CephClusterSummary(SuiteReporter):
+# """IOPS/latency during test"""
# Main performance report
-class IO_QD(Reporter):
+class IOQD(SuiteReporter):
"""Creates graph, which show how IOPS and Latency depend on QD"""
suite_types = {'fio'}
- def get_divs(self, suite: SuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
ts_map = defaultdict(list) # type: Dict[FioJobParams, List[Tuple[SuiteConfig, FioJobConfig]]]
str_summary = {} # type: Dict[FioJobParams, List[IOSummary]]
- for job in rstorage.iter_job(suite):
+
+ for job in self.rstorage.iter_job(suite):
fjob = cast(FioJobConfig, job)
fjob_no_qd = cast(FioJobParams, fjob.params.copy(qd=None))
str_summary[fjob_no_qd] = (fjob_no_qd.summary, fjob_no_qd.long_summary)
ts_map[fjob_no_qd].append((suite, fjob))
for tpl, suites_jobs in ts_map.items():
- if len(suites_jobs) >= StyleProfile.min_iops_vs_qd_jobs:
-
- iosums = [make_iosum(rstorage, suite, job) for suite, job in suites_jobs]
+ if len(suites_jobs) >= self.style.min_iops_vs_qd_jobs:
+ iosums = [make_iosum(self.rstorage, suite, job, self.style.hist_boxes) for suite, job in suites_jobs]
iosums.sort(key=lambda x: x.qd)
summary, summary_long = str_summary[tpl]
@@ -906,30 +214,89 @@
metric="io_over_qd",
tag=io_chart_format)
- fpath = io_chart(rstorage, ds, title="", legend="IOPS/BW", iosums=iosums) # type: str
+ fpath = self.plt(io_chart, ds, title="", legend="IOPS/BW", iosums=iosums)
yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.center(html.img(fpath)))
-# Linearization report
-class IOPS_Bsize(Reporter):
- """Creates graphs, which show how IOPS and Latency depend on block size"""
+class ResourceQD(SuiteReporter):
+ suite_types = {'fio'}
+
+ def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ qd_grouped_jobs = {} # type: Dict[FioJobParams, List[FioJobConfig]]
+ test_nc = len(list(find_nodes_by_roles(self.rstorage, ['testnode'])))
+ for job in self.rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ if fjob.bsize != 4:
+ continue
+
+ fjob_no_qd = cast(FioJobParams, fjob.params.copy(qd=None))
+ qd_grouped_jobs.setdefault(fjob_no_qd, []).append(fjob)
+
+ for jc_no_qd, jobs in sorted(qd_grouped_jobs.items()):
+ cpu_usage2qd = {}
+ for job in jobs:
+ usage, iops_ok = get_resources_usage(suite, job, self.rstorage, hist_boxes=self.style.hist_boxes,
+ large_block=self.style.large_blocks)
+
+ if iops_ok:
+ cpu_usage2qd[job.qd] = usage[ResourceNames.storage_cpu_s]
+
+ if len(cpu_usage2qd) < StyleProfile.min_iops_vs_qd_jobs:
+ continue
+
+ labels, vals, errs = zip(*((l, avg, dev) for l, (_, avg, dev) in sorted(cpu_usage2qd.items())))
+
+ if test_nc == 1:
+ labels = list(map(str, labels))
+ else:
+ labels = ["{} * {}".format(label, test_nc) for label in labels]
+
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=jc_no_qd.summary,
+ node_id="cluster",
+ sensor=AGG_TAG,
+ dev='cpu',
+ metric="cpu_for_iop",
+ tag=io_chart_format)
+
+ fpath = self.plt(plot_simple_bars, ds, jc_no_qd.long_summary, labels, vals, errs,
+ xlabel="CPU time per IOP", ylabel="QD * Test nodes" if test_nc != 1 else "QD",
+ x_formatter=(lambda x, pos: b2ssize_10(x) + 's'),
+ one_point_zero_line=False)
+
+ yield Menu1st.summary, Menu2ndSumm.cpu_usage_qd, HTMLBlock(html.center(html.img(fpath)))
class StatInfo(JobReporter):
"""Statistic info for job results"""
suite_types = {'fio'}
- def get_divs(self, suite: SuiteConfig, job: JobConfig,
- rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
fjob = cast(FioJobConfig, job)
- io_sum = make_iosum(rstorage, suite, fjob)
+ io_sum = make_iosum(self.rstorage, suite, fjob, self.style.hist_boxes)
- res = html.H2(html.center("Test summary - " + job.params.long_summary))
- stat_data_headers = ["Name", "Average ~ Dev", "Conf interval", "Mediana", "Mode", "Kurt / Skew", "95%", "99%",
+ caption = "Test summary - " + job.params.long_summary
+ test_nc = len(list(find_nodes_by_roles(self.rstorage, ['testnode'])))
+ if test_nc > 1:
+ caption += " * {} nodes".format(test_nc)
+
+ res = html.H2(html.center(caption))
+ stat_data_headers = ["Name",
+ "Total done",
+ "Average ~ Dev",
+ "Conf interval",
+ "Mediana",
+ "Mode",
+ "Kurt / Skew",
+ "95%",
+ "99%",
"ADF test"]
- bw_target_units = 'Bps'
+ align = ['left'] + ['right'] * (len(stat_data_headers) - 1)
+
+ bw_units = "B"
+ bw_target_units = bw_units + 'ps'
bw_coef = float(unit_conversion_coef(io_sum.bw.units, bw_target_units))
adf_v, *_1, stats, _2 = adfuller(io_sum.bw.data)
@@ -942,6 +309,7 @@
ad_test = "Failed"
bw_data = ["Bandwidth",
+ b2ssize(io_sum.bw.data.sum() * bw_coef) + bw_units,
"{}{} ~ {}{}".format(b2ssize(io_sum.bw.average * bw_coef), bw_target_units,
b2ssize(io_sum.bw.deviation * bw_coef), bw_target_units),
b2ssize(io_sum.bw.confidence * bw_coef) + bw_target_units,
@@ -952,190 +320,82 @@
b2ssize(io_sum.bw.perc_1 * bw_coef) + bw_target_units,
ad_test]
- iops_coef = float(unit_conversion_coef(io_sum.bw.units, 'KiBps')) / fjob.bsize
- iops_data = ["IOPS",
- "{}IOPS ~ {}IOPS".format(b2ssize_10(io_sum.bw.average * iops_coef),
- b2ssize_10(io_sum.bw.deviation * iops_coef)),
- b2ssize_10(io_sum.bw.confidence * iops_coef) + "IOPS",
- b2ssize_10(io_sum.bw.perc_50 * iops_coef) + "IOPS",
- "-",
- "{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
- b2ssize_10(io_sum.bw.perc_5 * iops_coef) + "IOPS",
- b2ssize_10(io_sum.bw.perc_1 * iops_coef) + "IOPS",
- ad_test]
+ stat_data = [bw_data]
- lat_target_unit = 's'
- lat_coef = unit_conversion_coef(io_sum.lat.units, lat_target_unit)
- # latency
- lat_data = ["Latency",
- "-",
- "-",
- b2ssize_10(io_sum.lat.perc_50 * lat_coef) + lat_target_unit,
- "-",
- "-",
- b2ssize_10(io_sum.lat.perc_95 * lat_coef) + lat_target_unit,
- b2ssize_10(io_sum.lat.perc_99 * lat_coef) + lat_target_unit,
- '-']
+ if fjob.bsize < StyleProfile.large_blocks:
+ iops_coef = float(unit_conversion_coef(io_sum.bw.units, 'KiBps')) / fjob.bsize
+ iops_data = ["IOPS",
+ b2ssize_10(io_sum.bw.data.sum() * iops_coef),
+ "{}IOPS ~ {}IOPS".format(b2ssize_10(io_sum.bw.average * iops_coef),
+ b2ssize_10(io_sum.bw.deviation * iops_coef)),
+ b2ssize_10(io_sum.bw.confidence * iops_coef) + "IOPS",
+ b2ssize_10(io_sum.bw.perc_50 * iops_coef) + "IOPS",
+ "-",
+ "{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
+ b2ssize_10(io_sum.bw.perc_5 * iops_coef) + "IOPS",
+ b2ssize_10(io_sum.bw.perc_1 * iops_coef) + "IOPS",
+ ad_test]
- # sensor usage
- stat_data = [iops_data, bw_data, lat_data]
- res += html.center(html.table("Load stats info", stat_data_headers, stat_data))
+ lat_target_unit = 's'
+ lat_coef = unit_conversion_coef(io_sum.lat.units, lat_target_unit)
+ # latency
+ lat_data = ["Latency",
+ "-",
+ "-",
+ "-",
+ b2ssize_10(io_sum.lat.perc_50 * lat_coef) + lat_target_unit,
+ "-",
+ "-",
+ b2ssize_10(io_sum.lat.perc_95 * lat_coef) + lat_target_unit,
+ b2ssize_10(io_sum.lat.perc_99 * lat_coef) + lat_target_unit,
+ '-']
+
+ # sensor usage
+ stat_data.extend([iops_data, lat_data])
+
+ res += html.center(html.table("Load stats info", stat_data_headers, stat_data, align=align))
yield Menu1st.per_job, job.summary, HTMLBlock(res)
-def avg_dev_div(vec: numpy.ndarray, denom: numpy.ndarray, avg_ranges: int = 10) -> Tuple[float, float]:
- step = min(vec.size, denom.size) // avg_ranges
- assert step >= 1
- vals = []
-
- whole_sum = denom.sum() / denom.size * step * 0.5
- for i in range(0, avg_ranges):
- s1 = denom[i * step: (i + 1) * step].sum()
- if s1 >= whole_sum:
- vals.append(vec[i * step: (i + 1) * step].sum() / s1)
-
- assert len(vals) > 1
- return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
-
-
class Resources(JobReporter):
"""Statistic info for job results"""
suite_types = {'fio'}
- def get_divs(self, suite: SuiteConfig, job: JobConfig,
- rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
- fjob = cast(FioJobConfig, job)
- io_sum = make_iosum(rstorage, suite, fjob)
-
- tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "Bps"))
- io_transfered = io_sum.bw.data * tot_io_coef
- ops_done = io_transfered / (fjob.bsize * float(unit_conversion_coef("KiBps", "Bps")))
-
- io_made = "Client IOP made"
- data_tr = "Client data transfered"
-
- records = {
- io_made: (b2ssize_10(ops_done.sum()) + "OP", None, None),
- data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
- } # type: Dict[str, Tuple[str, float, float]]
-
- test_send = "Test nodes net send"
- test_recv = "Test nodes net recv"
- test_net = "Test nodes net total"
- test_send_pkt = "Test nodes send pkt"
- test_recv_pkt = "Test nodes recv pkt"
- test_net_pkt = "Test nodes total pkt"
-
- test_write = "Test nodes disk write"
- test_read = "Test nodes disk read"
- test_write_iop = "Test nodes write IOP"
- test_read_iop = "Test nodes read IOP"
- test_iop = "Test nodes IOP"
- test_rw = "Test nodes disk IO"
-
- storage_send = "Storage nodes net send"
- storage_recv = "Storage nodes net recv"
- storage_send_pkt = "Storage nodes send pkt"
- storage_recv_pkt = "Storage nodes recv pkt"
- storage_net = "Storage nodes net total"
- storage_net_pkt = "Storage nodes total pkt"
-
- storage_write = "Storage nodes disk write"
- storage_read = "Storage nodes disk read"
- storage_write_iop = "Storage nodes write IOP"
- storage_read_iop = "Storage nodes read IOP"
- storage_iop = "Storage nodes IOP"
- storage_rw = "Storage nodes disk IO"
-
- storage_cpu = "Storage nodes CPU"
- storage_cpu_s = "Storage nodes CPU s/IOP"
- storage_cpu_s_b = "Storage nodes CPU s/B"
-
- all_metrics = [
- (test_send, 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
- (test_recv, 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
- (test_send_pkt, 'net-io', 'send_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
- (test_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
-
- (test_write, 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
- (test_read, 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
- (test_write_iop, 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
- (test_read_iop, 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
-
- (storage_send, 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
- (storage_recv, 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
- (storage_send_pkt, 'net-io', 'send_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
- (storage_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
-
- (storage_write, 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
- (storage_read, 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
- (storage_write_iop, 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
- (storage_read_iop, 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
- ]
-
- all_agg = {}
-
- for vname, sensor, metric, ffunc, roles, units, service_provided_count in all_metrics:
- res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=job.reliable_info_range_s)
- if res_ts is None:
- continue
-
- data = res_ts.data
- if units == "B":
- data = data * float(unit_conversion_coef(res_ts.units, "B"))
-
- records[vname] = (ffunc(data.sum()) + units, *avg_dev_div(data, service_provided_count))
- all_agg[vname] = data
-
- # cpu usage
- nodes_count = len(list(find_nodes_by_roles(rstorage, STORAGE_ROLES)))
- cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
-
- cpus_used_sec = (1.0 - cpu_ts['idle'].data / cpu_ts['total'].data) * nodes_count
- used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
-
- all_agg[storage_cpu] = cpus_used_sec
- records[storage_cpu_s] = (used_s, *avg_dev_div(cpus_used_sec, ops_done))
- records[storage_cpu_s_b] = (used_s, *avg_dev_div(cpus_used_sec, io_transfered))
-
- cums = [
- (test_iop, test_read_iop, test_write_iop, b2ssize_10, "OP", ops_done),
- (test_rw, test_read, test_write, b2ssize, "B", io_transfered),
- (test_net, test_send, test_recv, b2ssize, "B", io_transfered),
- (test_net_pkt, test_send_pkt, test_recv_pkt, b2ssize_10, "pkt", ops_done),
-
- (storage_iop, storage_read_iop, storage_write_iop, b2ssize_10, "OP", ops_done),
- (storage_rw, storage_read, storage_write, b2ssize, "B", io_transfered),
- (storage_net, storage_send, storage_recv, b2ssize, "B", io_transfered),
- (storage_net_pkt, storage_send_pkt, storage_recv_pkt, b2ssize_10, "pkt", ops_done),
- ]
-
- for vname, name1, name2, ffunc, units, service_provided_masked in cums:
- if name1 in all_agg and name2 in all_agg:
- agg = all_agg[name1] + all_agg[name2]
- records[vname] = (ffunc(agg.sum()) + units, *avg_dev_div(agg, service_provided_masked))
+ records, iops_ok = get_resources_usage(suite, job, self.rstorage,
+ large_block=self.style.large_blocks,
+ hist_boxes=self.style.hist_boxes)
table_structure = [
"Service provided",
- (io_made, data_tr),
+ (ResourceNames.io_made, ResourceNames.data_tr),
"Test nodes total load",
- (test_send_pkt, test_send),
- (test_recv_pkt, test_recv),
- (test_net_pkt, test_net),
- (test_write_iop, test_write),
- (test_read_iop, test_read),
- (test_iop, test_rw),
- (test_iop, test_rw),
+ (ResourceNames.test_send_pkt, ResourceNames.test_send),
+ (ResourceNames.test_recv_pkt, ResourceNames.test_recv),
+ (ResourceNames.test_net_pkt, ResourceNames.test_net),
+ (ResourceNames.test_write_iop, ResourceNames.test_write),
+ (ResourceNames.test_read_iop, ResourceNames.test_read),
+ (ResourceNames.test_iop, ResourceNames.test_rw),
"Storage nodes resource consumed",
- (storage_send_pkt, storage_send),
- (storage_recv_pkt, storage_recv),
- (storage_net_pkt, storage_net),
- (storage_write_iop, storage_write),
- (storage_read_iop, storage_read),
- (storage_iop, storage_rw),
- (storage_cpu_s, storage_cpu_s_b),
- ] # type: List[Union[str, Tuple[Optional[str], Optional[str]]]
+ (ResourceNames.storage_send_pkt, ResourceNames.storage_send),
+ (ResourceNames.storage_recv_pkt, ResourceNames.storage_recv),
+ (ResourceNames.storage_net_pkt, ResourceNames.storage_net),
+ (ResourceNames.storage_write_iop, ResourceNames.storage_write),
+ (ResourceNames.storage_read_iop, ResourceNames.storage_read),
+ (ResourceNames.storage_iop, ResourceNames.storage_rw),
+ (ResourceNames.storage_cpu_s, ResourceNames.storage_cpu_s_b),
+ ] # type: List[Union[str, Tuple[Optional[str], ...]]
+
+ if not iops_ok:
+ table_structure2 = []
+ for line in table_structure:
+ if isinstance(line, str):
+ table_structure2.append(line)
+ else:
+ assert len(line) == 2
+ table_structure2.append((line[1],))
+ table_structure = table_structure2
yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Resources usage")))
@@ -1145,22 +405,25 @@
with doc.thead:
with doc.tr:
- [doc.th(header) for header in ["Resource", "Usage count", "To service"] * 2]
+ [doc.th(header) for header in ["Resource", "Usage count", "To service"] * (2 if iops_ok else 1)]
- cols = 6
+ cols = 6 if iops_ok else 3
+ col_per_tp = 3
short_name = {
- name: (name if name in {io_made, data_tr} else " ".join(name.split()[2:]).capitalize())
+ name: (name if name in {ResourceNames.io_made, ResourceNames.data_tr}
+ else " ".join(name.split()[2:]).capitalize())
for name in records.keys()
}
- short_name[storage_cpu_s] = "CPU (s/IOP)"
- short_name[storage_cpu_s_b] = "CPU (s/B)"
+ short_name[ResourceNames.storage_cpu_s] = "CPU (s/IOP)"
+ short_name[ResourceNames.storage_cpu_s_b] = "CPU (s/B)"
with doc.tbody:
with doc.tr:
- doc.td(colspan=str(cols // 2)).center.b("Operations")
- doc.td(colspan=str(cols // 2)).center.b("Bytes")
+ if iops_ok:
+ doc.td(colspan=str(col_per_tp)).center.b("Operations")
+ doc.td(colspan=str(col_per_tp)).center.b("Bytes")
for line in table_structure:
with doc.tr:
@@ -1170,24 +433,30 @@
else:
for name in line:
if name is None:
- doc.td("-", colspan=str(cols // 2))
+ doc.td("-", colspan=str(col_per_tp))
continue
amount_s, avg, dev = records[name]
- if name in (storage_cpu_s, storage_cpu_s_b) and avg is not None:
- dev_s = str(int(dev * 100 / avg)) + "%" if avg > 1E-9 else b2ssize_10(dev) + 's'
- rel_val_s = "{}s ~ {}".format(b2ssize_10(avg), dev_s)
+ if name in (ResourceNames.storage_cpu_s, ResourceNames.storage_cpu_s_b) and avg is not None:
+ if dev is None:
+ rel_val_s = b2ssize_10(avg) + 's'
+ else:
+ dev_s = str(int(dev * 100 / avg)) + "%" if avg > 1E-9 else b2ssize_10(dev) + 's'
+ rel_val_s = "{}s ~ {}".format(b2ssize_10(avg), dev_s)
else:
if avg is None:
rel_val_s = '-'
else:
avg_s = int(avg) if avg > 10 else '{:.1f}'.format(avg)
- if avg > 1E-5:
- dev_s = str(int(dev * 100 / avg)) + "%"
+ if dev is None:
+ rel_val_s = avg_s
else:
- dev_s = int(dev) if dev > 10 else '{:.1f}'.format(dev)
- rel_val_s = "{} ~ {}".format(avg_s, dev_s)
+ if avg > 1E-5:
+ dev_s = str(int(dev * 100 / avg)) + "%"
+ else:
+ dev_s = int(dev) if dev > 10 else '{:.1f}'.format(dev)
+ rel_val_s = "{} ~ {}".format(avg_s, dev_s)
doc.td(short_name[name], align="left")
doc.td(amount_s, align="right")
@@ -1204,18 +473,26 @@
res = xmlbuilder3.tostr(doc).split("\n", 1)[1]
yield Menu1st.per_job, job.summary, HTMLBlock(html.center(res))
- iop_names = [test_write_iop, test_read_iop, test_iop,
- storage_write_iop, storage_read_iop, storage_iop]
+ iop_names = [ResourceNames.test_write_iop, ResourceNames.test_read_iop, ResourceNames.test_iop,
+ ResourceNames.storage_write_iop, ResourceNames.storage_read_iop, ResourceNames.storage_iop]
- bytes_names = [test_write, test_read, test_rw,
- test_send, test_recv, test_net,
- storage_write, storage_read, storage_rw,
- storage_send, storage_recv, storage_net]
+ bytes_names = [ResourceNames.test_write, ResourceNames.test_read, ResourceNames.test_rw,
+ ResourceNames.test_send, ResourceNames.test_recv, ResourceNames.test_net,
+ ResourceNames.storage_write, ResourceNames.storage_read, ResourceNames.storage_rw,
+ ResourceNames.storage_send, ResourceNames.storage_recv, ResourceNames.storage_net]
- net_pkt_names = [test_send_pkt, test_recv_pkt, test_net_pkt,
- storage_send_pkt, storage_recv_pkt, storage_net_pkt]
+ net_pkt_names = [ResourceNames.test_send_pkt, ResourceNames.test_recv_pkt, ResourceNames.test_net_pkt,
+ ResourceNames.storage_send_pkt, ResourceNames.storage_recv_pkt, ResourceNames.storage_net_pkt]
- for tp, names in [('iop', iop_names), ("bytes", bytes_names), ('Net packets per IOP', net_pkt_names)]:
+ pairs = [("bytes", bytes_names)]
+ if iops_ok:
+ pairs.insert(0, ('iop', iop_names))
+ pairs.append(('Net packets per IOP', net_pkt_names))
+
+ yield Menu1st.per_job, job.summary, \
+ HTMLBlock(html.H2(html.center("Resource consumption per service provided")))
+
+ for tp, names in pairs:
vals = []
devs = []
avail_names = []
@@ -1223,6 +500,10 @@
if name in records:
avail_names.append(name)
_, avg, dev = records[name]
+
+ if dev is None:
+ dev = 0
+
vals.append(avg)
devs.append(dev)
@@ -1237,10 +518,9 @@
metric=tp.replace(' ', "_") + '2service_bar',
tag=default_format)
- fname = plot_simple_bars(rstorage, ds,
- "Resource consuption / service provided, " + tp,
- [name.replace(" nodes", "") for name in names],
- vals, devs)
+ fname = self.plt(plot_simple_bars, ds, tp.capitalize(),
+ [name.replace(" nodes", "") for name in names],
+ vals, devs)
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
@@ -1249,10 +529,9 @@
"""Statistic info for job results"""
suite_types = {'fio'}
- def get_divs(self, suite: SuiteConfig, job: JobConfig, rstorage: ResultStorage) -> \
- Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
- nodes = list(find_nodes_by_roles(rstorage, STORAGE_ROLES))
+ nodes = list(find_nodes_by_roles(self.rstorage, STORAGE_ROLES))
sensor = 'block-io'
metric = 'io_queue'
@@ -1261,9 +540,9 @@
for node in nodes:
bn = 0
tot = 0
- for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
+ for _, ds in self.rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
if ds.dev in ('sdb', 'sdc', 'sdd', 'sde'):
- data = rstorage.load_sensor(ds)
+ data = self.rstorage.load_sensor(ds)
p1 = job.reliable_info_range_s[0] * unit_conversion_coef('s', data.time_units)
p2 = job.reliable_info_range_s[1] * unit_conversion_coef('s', data.time_units)
idx1, idx2 = numpy.searchsorted(data.times, (p1, p2))
@@ -1276,35 +555,29 @@
# CPU load
class CPULoadPlot(JobReporter):
- def get_divs(self,
- suite: SuiteConfig,
- job: JobConfig,
- rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
# plot CPU time
for rt, roles in [('storage', STORAGE_ROLES), ('test', ['testnode'])]:
- cpu_ts = get_cluster_cpu_load(rstorage, roles, job.reliable_info_range_s)
+ cpu_ts = get_cluster_cpu_load(self.rstorage, roles, job.reliable_info_range_s)
tss = [(name, ts.data * 100 / cpu_ts['total'].data)
for name, ts in cpu_ts.items()
- if name in {'user', 'sys', 'irq', 'idle'}]
- fname = plot_simple_over_time(rstorage,
- cpu_ts['idle'].source(job_id=job.storage_id,
- suite_id=suite.storage_id,
- metric='allcpu', tag=rt + '.plt.' + default_format),
- tss=tss,
- average=True,
- ylabel="CPU time %",
- title="{} nodes CPU usage".format(rt.capitalize()))
+ if name in {'user', 'sys', 'idle', 'iowait'}]
+
+
+ ds = cpu_ts['idle'].source(job_id=job.storage_id, suite_id=suite.storage_id,
+ node_id=AGG_TAG, metric='allcpu', tag=rt + '.plt.' + default_format)
+
+ fname = self.plt(plot_simple_over_time, ds, tss=tss, average=True, ylabel="CPU time %",
+ title="{} nodes CPU usage".format(rt.capitalize()),
+ xlabel="Time from test begin")
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# IO time and QD
class QDIOTimeHeatmap(JobReporter):
- def get_divs(self,
- suite: SuiteConfig,
- job: JobConfig,
- rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
# TODO: fix this hardcode, need to track what devices are actually used on test and storage nodes
# use saved storage info in nodes
@@ -1313,7 +586,7 @@
storage_devs = None
test_nodes_devs = ['rbd0']
- for node in find_nodes_by_roles(rstorage, STORAGE_ROLES):
+ for node in find_nodes_by_roles(self.rstorage, STORAGE_ROLES):
cjd = set(node.params['ceph_journal_devs'])
if journal_devs is None:
journal_devs = cjd
@@ -1336,46 +609,33 @@
HTMLBlock(html.H2(html.center("{} IO heatmaps".format(name.capitalize()))))
# QD heatmap
- ioq2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ ioq2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
metric='io_queue', time_range=trange)
ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
- fname = plot_hmap_from_2d(rstorage,
- ds(metric='io_queue'),
- ioq2d,
- ylabel="IO QD",
- title=name.capitalize() + " devs QD",
- xlabel='Time',
- bins=StyleProfile.qd_bins) # type: str
+ fname = self.plt(plot_hmap_from_2d, ds(metric='io_queue'), data2d=ioq2d, xlabel='Time', ylabel="IO QD",
+ title=name.capitalize() + " devs QD", bins=StyleProfile.qd_bins)
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# Block size heatmap
- wc2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ wc2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
metric='writes_completed', time_range=trange)
wc2d[wc2d < 1E-3] = 1
- sw2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ sw2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
metric='sectors_written', time_range=trange)
data2d = sw2d / wc2d / 1024
- fname = plot_hmap_from_2d(rstorage,
- ds(metric='wr_block_size'),
- data2d,
- ylabel="IO bsize, KiB",
- title=name.capitalize() + " write block size",
- xlabel='Time',
- bins=StyleProfile.block_size_bins) # type: str
+ fname = self.plt(plot_hmap_from_2d, ds(metric='wr_block_size'),
+ data2d=data2d, title=name.capitalize() + " write block size",
+ ylabel="IO bsize, KiB", xlabel='Time', bins=StyleProfile.block_size_bins)
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# iotime heatmap
- wtime2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ wtime2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
metric='io_time', time_range=trange)
- fname = plot_hmap_from_2d(rstorage,
- ds(metric='io_time'),
- wtime2d,
- ylabel="IO time (ms) per second",
- title=name.capitalize() + " iotime",
- xlabel='Time',
- bins=StyleProfile.iotime_bins) # type: str
+ fname = self.plt(plot_hmap_from_2d, ds(metric='io_time'), data2d=wtime2d,
+ xlabel='Time', ylabel="IO time (ms) per second",
+ title=name.capitalize() + " iotime", bins=StyleProfile.iotime_bins)
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
@@ -1384,16 +644,13 @@
"""IOPS/latency during test"""
suite_types = {'fio'}
- def get_divs(self,
- suite: SuiteConfig,
- job: JobConfig,
- rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
fjob = cast(FioJobConfig, job)
yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Load tool results")))
- agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+ agg_io = get_aggregated(self.rstorage, suite, fjob, "bw")
if fjob.bsize >= DefStyleProfile.large_blocks:
title = "Fio measured Bandwidth over time"
units = "MiBps"
@@ -1403,43 +660,28 @@
agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
units = "IOPS"
- fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.' + default_format), title, units, agg_io) # type: str
+ fpath = self.plt(plot_v_over_time, agg_io.source(tag='ts.' + default_format), title, units, agg_io)
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
- agg_lat = get_aggregated(rstorage, suite, fjob, "lat").copy()
- TARGET_UNITS = 'ms'
- coef = unit_conversion_coef(agg_lat.units, TARGET_UNITS)
- agg_lat.histo_bins = agg_lat.histo_bins.copy() * float(coef)
- agg_lat.units = TARGET_UNITS
+ if fjob.bsize < DefStyleProfile.large_blocks:
+ agg_lat = get_aggregated(self.rstorage, suite, fjob, "lat").copy()
+ TARGET_UNITS = 'ms'
+ coef = unit_conversion_coef(agg_lat.units, TARGET_UNITS)
+ agg_lat.histo_bins = agg_lat.histo_bins.copy() * float(coef)
+ agg_lat.units = TARGET_UNITS
- fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.' + default_format), "Latency",
- agg_lat, ylabel="Latency, " + agg_lat.units) # type: str
- yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+ fpath = self.plt(plot_lat_over_time, agg_lat.source(tag='ts.' + default_format), "Latency", agg_lat,
+ ylabel="Latency, " + agg_lat.units)
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
- fpath = plot_histo_heatmap(rstorage,
- agg_lat.source(tag='hmap.' + default_format),
- "Latency heatmap",
- agg_lat,
- ylabel="Latency, " + agg_lat.units,
- xlabel='Test time') # type: str
+ fpath = self.plt(plot_histo_heatmap, agg_lat.source(tag='hmap.' + default_format),
+ "Latency heatmap", agg_lat, ylabel="Latency, " + agg_lat.units, xlabel='Test time')
- yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
fjob = cast(FioJobConfig, job)
- # agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
- # # bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000 # convert us to ms
- # lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges=None, rebins_count=StyleProfile.hist_lat_boxes)
- #
- # long_summary = cast(FioJobParams, fjob.params).long_summary
- #
- # title = "Latency distribution"
- # units = "ms"
- #
- # fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop) # type: str
- # yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
-
- agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+ agg_io = get_aggregated(self.rstorage, suite, fjob, "bw")
if fjob.bsize >= DefStyleProfile.large_blocks:
title = "BW distribution"
@@ -1451,8 +693,7 @@
units = "IOPS"
io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
- fpath = plot_hist(rstorage, agg_io.source(tag='hist.' + default_format),
- title, units, io_stat_prop) # type: str
+ fpath = self.plt(plot_hist, agg_io.source(tag='hist.' + default_format), title, units, io_stat_prop)
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
@@ -1468,14 +709,16 @@
('block-io', 'sectors_written', "Write", 'MiB'),
]
- def get_divs(self,
- suite: SuiteConfig,
- job: JobConfig,
- rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
+ sensors = []
+ max_iop = 0
+ max_bytes = 0
+
for sensor, metric, op, units in self.storage_sensors:
- ts = summ_sensors(rstorage, STORAGE_ROLES, sensor, metric, job.reliable_info_range_s)
+ ts = summ_sensors(self.rstorage, STORAGE_ROLES, sensor, metric, job.reliable_info_range_s)
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
node_id="storage",
@@ -1494,24 +737,21 @@
source=ds,
histo_bins=ts.histo_bins)
- sensor_title = "{} {}".format(op, units)
- fpath = plot_v_over_time(rstorage, ds, sensor_title, units, ts=ts) # type: str
- yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
+ sensors.append(("{} {}".format(op, units), ds, ts, units))
+ if units == 'iop':
+ max_iop = max(max_iop, data.sum())
+ else:
+ assert units == 'MiB'
+ max_bytes = max(max_bytes, data.sum())
-
-# Node load over test time
-class NodeLoad(Reporter):
- """IOPS/latency during test"""
-
-
-# Ceph cluster summary
-class CephClusterSummary(Reporter):
- """IOPS/latency during test"""
-
-
-# TODO: Ceph operation breakout report
-# TODO: Resource consumption for different type of test
+ for title, ds, ts, units in sensors:
+ if ts.data.sum() >= (max_iop if units == 'iop' else max_bytes) * DefStyleProfile.min_load_diff:
+ fpath = self.plt(plot_v_over_time, ds, title, units, ts=ts)
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
+ else:
+ logger.info("Hide '%s' plot for %s, as it's cum load is less then %s%%",
+ title, job.summary, int(DefStyleProfile.min_load_diff * 100))
# ------------------------------------------ REPORT STAGES -----------------------------------------------------------
@@ -1523,12 +763,11 @@
def run(self, ctx: TestRun) -> None:
rstorage = ResultStorage(ctx.storage)
- job_reporters = [StatInfo(), Resources(), LoadToolResults(), ClusterLoad(), CPULoadPlot(),
- QDIOTimeHeatmap()] # type: List[JobReporter]
- # job_reporters = [QDIOTimeHeatmap()] # type: List[JobReporter]
- # job_reporters = []
- reporters = [IO_QD()] # type: List[Reporter]
- # reporters = [] # type: List[Reporter]
+ job_reporters_cls = [StatInfo, Resources, LoadToolResults, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
+ job_reporters = [rcls(rstorage, DefStyleProfile, DefColorProfile) for rcls in job_reporters_cls]
+
+ suite_reporters_cls = [IOQD, ResourceQD]
+ suite_reporters = [rcls(rstorage, DefStyleProfile, DefColorProfile) for rcls in suite_reporters_cls]
root_dir = os.path.dirname(os.path.dirname(wally.__file__))
doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -1546,27 +785,34 @@
items = defaultdict(lambda: defaultdict(list)) # type: Dict[str, Dict[str, List[HTMLBlock]]]
DEBUG = False
+ job_summ_sort_order = []
+
# TODO: filter reporters
for suite in rstorage.iter_suite(FioTest.name):
all_jobs = list(rstorage.iter_job(suite))
all_jobs.sort(key=lambda job: job.params)
- for job in all_jobs:
- if 'rwd16384_qd1' == job.summary:
- try:
- for reporter in job_reporters:
- logger.debug("Start reporter %s on job %s suite %s",
- reporter.__class__.__name__, job.summary, suite.test_type)
- for block, item, html in reporter.get_divs(suite, job, rstorage):
- items[block][item].append(html)
- if DEBUG:
- break
- except Exception:
- logger.exception("Failed to generate report for %s", job)
- for reporter in reporters:
+ new_jobs_in_order = [job.summary for job in all_jobs]
+ same = set(new_jobs_in_order).intersection(set(job_summ_sort_order))
+ assert not same, "Job with same names in different suits found: " + ",".join(same)
+ job_summ_sort_order.extend(new_jobs_in_order)
+
+ for job in all_jobs[:1]:
+ try:
+ for reporter in job_reporters:
+ logger.debug("Start reporter %s on job %s suite %s",
+ reporter.__class__.__name__, job.summary, suite.test_type)
+ for block, item, html in reporter.get_divs(suite, job):
+ items[block][item].append(html)
+ if DEBUG:
+ break
+ except Exception:
+ logger.exception("Failed to generate report for %s", job.summary)
+
+ for reporter in suite_reporters:
try:
logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
- for block, item, html in reporter.get_divs(suite, rstorage):
+ for block, item, html in reporter.get_divs(suite):
items[block][item].append(html)
except Exception as exc:
logger.exception("Failed to generate report")
@@ -1582,7 +828,13 @@
.format(idx_1st, menu_1st)
)
menu_block.append('<div class="collapse" id="item{}">'.format(idx_1st))
- for menu_2nd in sorted(items[menu_1st]):
+
+ if menu_1st == Menu1st.per_job:
+ in_order = sorted(items[menu_1st], key=job_summ_sort_order.index)
+ else:
+ in_order = sorted(items[menu_1st])
+
+ for menu_2nd in in_order:
menu_block.append(' <a href="#content{}" class="nav-group-item">{}</a>'
.format(link_idx, menu_2nd))
content_block.append('<div id="content{}">'.format(link_idx))
diff --git a/wally/report_profiles.py b/wally/report_profiles.py
new file mode 100644
index 0000000..920c320
--- /dev/null
+++ b/wally/report_profiles.py
@@ -0,0 +1,94 @@
+# ---------------- PROFILES ------------------------------------------------------------------------------------------
+
+
+# this is default values, real values is loaded from config
+class ColorProfile:
+ primary_color = 'b'
+ suppl_color1 = 'teal'
+ suppl_color2 = 'magenta'
+ suppl_color3 = 'orange'
+ box_color = 'y'
+ err_color = 'red'
+
+ noise_alpha = 0.3
+ subinfo_alpha = 0.7
+
+ imshow_colormap = None # type: str
+ hmap_cmap = "Blues"
+
+
+default_format = 'svg'
+io_chart_format = 'svg'
+
+
+class StyleProfile:
+ default_style = 'seaborn-white'
+ io_chart_style = 'classic'
+
+ dpi = 80
+
+ lat_samples = 5
+
+ tide_layout = False
+ hist_boxes = 10
+ hist_lat_boxes = 25
+ hm_hist_bins_count = 25
+ hm_x_slots = 25
+ min_points_for_dev = 5
+
+ x_label_rotation = 35
+
+ dev_range_x = 2.0
+ dev_perc = 95
+
+ point_shape = 'o'
+ err_point_shape = '*'
+
+ avg_range = 20
+ approx_average = True
+ approx_average_no_points = False
+
+ curve_approx_level = 6
+ curve_approx_points = 100
+ assert avg_range >= min_points_for_dev
+
+ # figure size in inches
+ figsize = (8, 4)
+ figsize_long = (8, 4)
+ qd_chart_inches = (16, 9)
+
+ subplot_adjust_r = 0.75
+ subplot_adjust_r_no_legend = 0.9
+ title_font_size = 12
+
+ extra_io_spine = True
+
+ legend_for_eng = True
+
+ # heatmap interpolation is deprecated
+ # heatmap_interpolation = '1d'
+ # heatmap_interpolation = None
+ # heatmap_interpolation_points = 300
+
+ heatmap_colorbar = False
+ outliers_q_nd = 3.0
+ outliers_hide_q_nd = 4.0
+ outliers_lat = (0.01, 0.9)
+
+ violin_instead_of_box = True
+ violin_point_count = 30000
+
+ min_iops_vs_qd_jobs = 3
+
+ qd_bins = [0, 1, 2, 4, 6, 8, 12, 16, 20, 26, 32, 40, 48, 56, 64, 96, 128]
+ iotime_bins = list(range(0, 1030, 50))
+ block_size_bins = [0, 2, 4, 8, 16, 32, 48, 64, 96, 128, 192, 256, 384, 512, 1024, 2048]
+ large_blocks = 256
+
+ min_load_diff = 0.05
+
+ histo_grid = 'x'
+
+
+DefColorProfile = ColorProfile()
+DefStyleProfile = StyleProfile()
diff --git a/wally/resources.py b/wally/resources.py
new file mode 100644
index 0000000..f063eb5
--- /dev/null
+++ b/wally/resources.py
@@ -0,0 +1,259 @@
+from typing import Tuple, Dict, cast, List
+
+import numpy
+
+from .hlstorage import ResultStorage
+from .utils import b2ssize_10, b2ssize, unit_conversion_coef, STORAGE_ROLES
+from .result_classes import SuiteConfig
+from .suits.io.fio import FioJobConfig
+from .suits.job import JobConfig
+from .result_classes import NormStatProps, HistoStatProps, TimeSeries
+from .statistic import calc_norm_stat_props, calc_histo_stat_props
+from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+
+
+class IOSummary:
+ def __init__(self, qd: int, block_size: int, nodes_count:int, bw: NormStatProps, lat: HistoStatProps) -> None:
+ self.qd = qd
+ self.nodes_count = nodes_count
+ self.block_size = block_size
+ self.bw = bw
+ self.lat = lat
+
+
+class ResourceNames:
+ io_made = "Client IOP made"
+ data_tr = "Client data transfered"
+
+ test_send = "Test nodes net send"
+ test_recv = "Test nodes net recv"
+ test_net = "Test nodes net total"
+ test_send_pkt = "Test nodes send pkt"
+ test_recv_pkt = "Test nodes recv pkt"
+ test_net_pkt = "Test nodes total pkt"
+
+ test_write = "Test nodes disk write"
+ test_read = "Test nodes disk read"
+ test_write_iop = "Test nodes write IOP"
+ test_read_iop = "Test nodes read IOP"
+ test_iop = "Test nodes IOP"
+ test_rw = "Test nodes disk IO"
+
+ storage_send = "Storage nodes net send"
+ storage_recv = "Storage nodes net recv"
+ storage_send_pkt = "Storage nodes send pkt"
+ storage_recv_pkt = "Storage nodes recv pkt"
+ storage_net = "Storage nodes net total"
+ storage_net_pkt = "Storage nodes total pkt"
+
+ storage_write = "Storage nodes disk write"
+ storage_read = "Storage nodes disk read"
+ storage_write_iop = "Storage nodes write IOP"
+ storage_read_iop = "Storage nodes read IOP"
+ storage_iop = "Storage nodes IOP"
+ storage_rw = "Storage nodes disk IO"
+
+ storage_cpu = "Storage nodes CPU"
+ storage_cpu_s = "Storage nodes CPU s/IOP"
+ storage_cpu_s_b = "Storage nodes CPU s/B"
+
+
+def avg_dev_div(vec: numpy.ndarray, denom: numpy.ndarray, avg_ranges: int = 10) -> Tuple[float, float]:
+ step = min(vec.size, denom.size) // avg_ranges
+ assert step >= 1
+ vals = []
+
+ whole_sum = denom.sum() / denom.size * step * 0.5
+ for i in range(0, avg_ranges):
+ s1 = denom[i * step: (i + 1) * step].sum()
+ if s1 > 1e-5 and s1 >= whole_sum:
+ vals.append(vec[i * step: (i + 1) * step].sum() / s1)
+
+ assert len(vals) > 1
+ return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
+
+
+iosum_cache = {} # type: Dict[Tuple[str, str]]
+
+
+def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
+ nc: bool = False) -> IOSummary:
+
+ key = (suite.storage_id, job.storage_id)
+ if not nc and key in iosum_cache:
+ return iosum_cache[key]
+
+ lat = get_aggregated(rstorage, suite, job, "lat")
+ io = get_aggregated(rstorage, suite, job, "bw")
+
+ res = IOSummary(job.qd,
+ nodes_count=len(suite.nodes_ids),
+ block_size=job.bsize,
+ lat=calc_histo_stat_props(lat, rebins_count=hist_boxes),
+ bw=calc_norm_stat_props(io, hist_boxes))
+
+ if not nc:
+ iosum_cache[key] = res
+
+ return res
+
+
+cpu_load_cache = {} # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
+
+
+def get_cluster_cpu_load(rstorage: ResultStorage, roles: List[str],
+ time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
+
+ key = (id(rstorage), tuple(roles), time_range)
+ if not nc and key in cpu_load_cache:
+ return cpu_load_cache[key]
+
+ cpu_ts = {}
+ cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
+ for name in cpu_metrics:
+ cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=time_range)
+
+ it = iter(cpu_ts.values())
+ total_over_time = next(it).data.copy() # type: numpy.ndarray
+ for ts in it:
+ if ts is not None:
+ total_over_time += ts.data
+
+ total = cpu_ts['idle'].copy(no_data=True)
+ total.data = total_over_time
+ cpu_ts['total'] = total
+
+ if not nc:
+ cpu_load_cache[key] = cpu_ts
+
+ return cpu_ts
+
+
+def get_resources_usage(suite: SuiteConfig,
+ job: JobConfig,
+ rstorage: ResultStorage,
+ large_block: int = 256,
+ hist_boxes: int = 10,
+ nc: bool = False) -> Tuple[Dict[str, Tuple[str, float, float]], bool]:
+
+ if not nc:
+ records = rstorage.get_job_info(suite, job, "resource_usage")
+ if records is not None:
+ records = records.copy()
+ iops_ok = records.pop('iops_ok')
+ return records, iops_ok
+
+ fjob = cast(FioJobConfig, job)
+ iops_ok = fjob.bsize < large_block
+
+ io_sum = make_iosum(rstorage, suite, fjob, hist_boxes)
+
+ tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "Bps"))
+ io_transfered = io_sum.bw.data * tot_io_coef
+
+ records = {
+ ResourceNames.data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
+ } # type: Dict[str, Tuple[str, float, float]]
+
+ if iops_ok:
+ ops_done = io_transfered / (fjob.bsize * float(unit_conversion_coef("KiBps", "Bps")))
+ records[ResourceNames.io_made] = (b2ssize_10(ops_done.sum()) + "OP", None, None)
+ else:
+ ops_done = None
+
+ all_metrics = [
+ (ResourceNames.test_send, 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
+ (ResourceNames.test_recv, 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
+ (ResourceNames.test_send_pkt, 'net-io', 'send_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
+ (ResourceNames.test_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
+
+ (ResourceNames.test_write, 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
+ (ResourceNames.test_read, 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
+ (ResourceNames.test_write_iop, 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+ (ResourceNames.test_read_iop, 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+
+ (ResourceNames.storage_send, 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (ResourceNames.storage_recv, 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (ResourceNames.storage_send_pkt, 'net-io', 'send_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ (ResourceNames.storage_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+
+ (ResourceNames.storage_write, 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (ResourceNames.storage_read, 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (ResourceNames.storage_write_iop, 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ (ResourceNames.storage_read_iop, 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ ]
+
+ all_agg = {}
+
+ for vname, sensor, metric, ffunc, roles, units, service_provided_count in all_metrics:
+ if service_provided_count is None:
+ continue
+
+ res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=job.reliable_info_range_s)
+ if res_ts is None:
+ continue
+
+ data = res_ts.data
+ if units == "B":
+ data = data * float(unit_conversion_coef(res_ts.units, "B"))
+
+ avg, dev = avg_dev_div(data, service_provided_count)
+ if avg < 0.1:
+ dev = None
+ records[vname] = (ffunc(data.sum()) + units, avg, dev)
+ all_agg[vname] = data
+
+ # cpu usage
+ nodes_count = len(list(find_nodes_by_roles(rstorage, STORAGE_ROLES)))
+ cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
+
+ cpus_used_sec = (1.0 - cpu_ts['idle'].data / cpu_ts['total'].data) * nodes_count
+ used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
+
+ all_agg[ResourceNames.storage_cpu] = cpus_used_sec
+
+ if ops_done is not None:
+ records[ResourceNames.storage_cpu_s] = (used_s, *avg_dev_div(cpus_used_sec, ops_done))
+
+ records[ResourceNames.storage_cpu_s_b] = (used_s, *avg_dev_div(cpus_used_sec, io_transfered))
+
+ cums = [
+ (ResourceNames.test_iop, ResourceNames.test_read_iop, ResourceNames.test_write_iop,
+ b2ssize_10, "OP", ops_done),
+ (ResourceNames.test_rw, ResourceNames.test_read, ResourceNames.test_write, b2ssize, "B", io_transfered),
+ (ResourceNames.test_net, ResourceNames.test_send, ResourceNames.test_recv, b2ssize, "B", io_transfered),
+ (ResourceNames.test_net_pkt, ResourceNames.test_send_pkt, ResourceNames.test_recv_pkt, b2ssize_10,
+ "pkt", ops_done),
+
+ (ResourceNames.storage_iop, ResourceNames.storage_read_iop, ResourceNames.storage_write_iop, b2ssize_10,
+ "OP", ops_done),
+ (ResourceNames.storage_rw, ResourceNames.storage_read, ResourceNames.storage_write, b2ssize, "B",
+ io_transfered),
+ (ResourceNames.storage_net, ResourceNames.storage_send, ResourceNames.storage_recv, b2ssize, "B",
+ io_transfered),
+ (ResourceNames.storage_net_pkt, ResourceNames.storage_send_pkt, ResourceNames.storage_recv_pkt, b2ssize_10,
+ "pkt", ops_done),
+ ]
+
+ for vname, name1, name2, ffunc, units, service_provided_masked in cums:
+ if service_provided_masked is None:
+ continue
+ if name1 in all_agg and name2 in all_agg:
+ agg = all_agg[name1] + all_agg[name2]
+ avg, dev = avg_dev_div(agg, service_provided_masked)
+ if avg < 0.1:
+ dev = None
+ records[vname] = (ffunc(agg.sum()) + units, avg, dev)
+
+ if not nc:
+ toflt = lambda x: float(x) if x is not None else None
+
+ for name, (v1, v2, v3) in list(records.items()):
+ records[name] = v1, toflt(v2), toflt(v3)
+
+ srecords = records.copy()
+ srecords['iops_ok'] = iops_ok
+ rstorage.put_job_info(suite, job, "resource_usage", srecords)
+
+ return records, iops_ok
+
diff --git a/wally/run_test.py b/wally/run_test.py
index c4581d3..555cfa1 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -68,9 +68,36 @@
if not failed_nodes:
logger.info("All nodes connected successfully")
+ def get_time(node):
+ return node.conn.sys.time()
+
+ t_start = time.time()
+ tms = pool.map(get_time, ctx.nodes)
+ t_end = time.time()
+
+ for node, val in zip(ctx.nodes, tms):
+ max_delta = int(max(t_start - val, val - t_end) * 1000)
+ if max_delta > ctx.config.max_time_diff_ms:
+ msg = ("Too large time shift {}ms on node {}. Stopping test." +
+ " Fix time on cluster nodes and restart test, or change " +
+ "max_time_diff_ms(={}ms) setting in config").format(max_delta,
+ str(node),
+ ctx.config.max_time_diff_ms)
+ logger.error(msg)
+ raise StopTestError(msg)
+ if max_delta > 0:
+ logger.warning("Node %s has time shift at least %s ms", node, max_delta)
+
+
def cleanup(self, ctx: TestRun) -> None:
if ctx.config.get("download_rpc_logs", False):
+ logger.info("Killing all outstanding processes")
for node in ctx.nodes:
+ node.conn.cli.killall()
+
+ logger.info("Downloading RPC servers logs")
+ for node in ctx.nodes:
+ node.conn.cli.killall()
if node.rpc_log_file is not None:
nid = node.node_id
path = "rpc_logs/{}.txt".format(nid)
@@ -82,6 +109,7 @@
ctx.storage.put_raw(log, path)
logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
+ logger.info("Disconnecting")
with ctx.get_pool() as pool:
list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
@@ -152,7 +180,9 @@
def run(self, ctx: TestRun) -> None:
logger.debug("Will sleep for %r seconds", ctx.config.sleep)
+ stime = time.time()
time.sleep(ctx.config.sleep)
+ ctx.storage.put([int(stime), int(time.time())], 'idle')
class PrepareNodes(Stage):
diff --git a/wally/statistic.py b/wally/statistic.py
index 047f86d..2b1e83a 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -25,42 +25,39 @@
def calc_norm_stat_props(ts: TimeSeries, bins_count: int = None, confidence: float = 0.95) -> NormStatProps:
"Calculate statistical properties of array of numbers"
- # array.array has very basic support
- data = cast(List[int], ts.data)
- res = NormStatProps(data, ts.units) # type: ignore
+ res = NormStatProps(ts.data, ts.units) # type: ignore
- if len(data) == 0:
+ if len(ts.data) == 0:
raise ValueError("Input array is empty")
- data = sorted(data)
- res.average = average(data)
- res.deviation = dev(data)
+ res.average = average(ts.data)
+ res.deviation = dev(ts.data)
+ data = sorted(ts.data)
res.max = data[-1]
res.min = data[0]
-
pcs = numpy.percentile(data, q=[1.0, 5.0, 10., 50., 90., 95., 99.])
res.perc_1, res.perc_5, res.perc_10, res.perc_50, res.perc_90, res.perc_95, res.perc_99 = pcs
if len(data) >= MIN_VALUES_FOR_CONFIDENCE:
- res.confidence = stats.sem(data) * \
- stats.t.ppf((1 + confidence) / 2, len(data) - 1)
+ res.confidence = stats.sem(ts.data) * \
+ stats.t.ppf((1 + confidence) / 2, len(ts.data) - 1)
res.confidence_level = confidence
else:
res.confidence = None
res.confidence_level = None
if bins_count is not None:
- res.bins_populations, res.bins_edges = numpy.histogram(data, bins=bins_count)
+ res.bins_populations, res.bins_edges = numpy.histogram(ts.data, bins=bins_count)
res.bins_edges = res.bins_edges[:-1]
try:
- res.normtest = stats.mstats.normaltest(data)
+ res.normtest = stats.mstats.normaltest(ts.data)
except Exception as exc:
logger.warning("stats.mstats.normaltest failed with error: %s", exc)
- res.skew = stats.skew(data)
- res.kurt = stats.kurtosis(data)
+ res.skew = stats.skew(ts.data)
+ res.kurt = stats.kurtosis(ts.data)
return res
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index b8d1076..033d771 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,26 +1,42 @@
[global]
include defaults_qd.cfg
-# QD={% 16, 32, 64, 128 %}
-# QD={% 32, 256 %}
-QD=1
-runtime=10
+QDW={% 4, 16, 32, 64, 128, 256 %}
+QDR={% 16, 32, 64, 128, 256, 512 %}
+
+LQDW={% 1, 4, 16, 64 %}
+LQDR={% 1, 4, 16, 64 %}
+
+runtime=300
direct=1
-ramp_time=0
+ramp_time=30
# ---------------------------------------------------------------------
[verify_{TEST_SUMM}]
blocksize=1m
rw=write
+iodepth={LQDW}
+
+[verify_{TEST_SUMM}]
+blocksize=1m
+rw=read
+iodepth={LQDR}
+
+[verify_{TEST_SUMM}]
+blocksize=4k
+rw=randwrite
+direct=1
+iodepth={QDW}
+
+[verify_{TEST_SUMM}]
+blocksize=4k
+rw=randread
+direct=1
+iodepth={QDR}
+
+[verify_{TEST_SUMM}]
+blocksize=4k
+rw=randwrite
+sync=1
+direct=1
iodepth=1
-
-#[verify_{TEST_SUMM}]
-#blocksize=16m
-#rw=randwrite
-#iodepth={% 1, 4 %}
-
-# [verify_{TEST_SUMM}]
-# blocksize=4k
-# rw=randwr
-# direct=1
-# sync=1
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 36c349e..e848442 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -113,7 +113,7 @@
for job in not_in_storage:
results = [] # type: List[TimeSeries]
for retry_idx in range(self.max_retry):
- logger.debug("Prepare job %s", job.params.summary)
+ logger.info("Preparing job %s", job.params.summary)
# prepare nodes for new iterations
wait([pool.submit(self.prepare_iteration, node, job) for node in self.suite.nodes])
diff --git a/wally/types.py b/wally/types.py
new file mode 100644
index 0000000..30db83f
--- /dev/null
+++ b/wally/types.py
@@ -0,0 +1,8 @@
+from typing import TypeVar, List, Union
+
+import numpy
+
+
+TNumber = TypeVar('TNumber', int, float)
+Number = Union[int, float]
+NumVector = Union[numpy.ndarray, List[int], List[float]]
diff --git a/wally/utils.py b/wally/utils.py
index af20367..a0f10e8 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -13,7 +13,6 @@
import subprocess
from fractions import Fraction
-
from typing import Any, Tuple, Union, List, Iterator, Iterable, Optional, IO, cast, TypeVar, Callable
try:
@@ -28,9 +27,10 @@
return str(uuid.uuid4())
+from .types import TNumber, Number
+
+
logger = logging.getLogger("wally")
-TNumber = TypeVar('TNumber', int, float)
-Number = Union[int, float]
STORAGE_ROLES = {'ceph-osd'}