fix bugs in c code, update interpolation, etc
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 53b822b..66a6ee5 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -116,6 +116,7 @@
def interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
"Interpolate time series to values on seconds borders"
+ logging.warning("This implementation of interpolate_ts_on_seconds_border is deplricated and should be updated")
if not nc and ts.source.tpl in interpolated_cache:
return interpolated_cache[ts.source.tpl]
@@ -193,14 +194,20 @@
c_interp_func = None
-cdll = None
+c_interp_func_qd = None
-def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
+def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, qd: bool = False) -> TimeSeries:
"Interpolate time series to values on seconds borders"
+ key = (ts.source.tpl, qd)
+ if not nc and key in interpolated_cache:
+ return interpolated_cache[key].copy()
- if not nc and ts.source.tpl in interpolated_cache:
- return interpolated_cache[ts.source.tpl]
+ # both data and times must be 1d compact arrays
+ assert len(ts.data.strides) == 1, "ts.data.strides must be 1D, not " + repr(ts.data.strides)
+ assert ts.data.dtype.itemsize == ts.data.strides[0], "ts.data array must be compact"
+ assert len(ts.times.strides) == 1, "ts.times.strides must be 1D, not " + repr(ts.times.strides)
+ assert ts.times.dtype.itemsize == ts.times.strides[0], "ts.times array must be compact"
assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
.format(len(ts.times), len(ts.data), ts.source)
@@ -214,15 +221,17 @@
assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
coef = int(rcoef) # make typechecker happy
- global cdll
global c_interp_func
+ global c_interp_func_qd
+
uint64_p = ctypes.POINTER(ctypes.c_uint64)
if c_interp_func is None:
dirname = os.path.dirname(os.path.dirname(wally.__file__))
path = os.path.join(dirname, 'clib', 'libwally.so')
cdll = ctypes.CDLL(path)
- c_interp_func = cdll.interpolate_ts_on_seconds_border_v2
+
+ c_interp_func = cdll.interpolate_ts_on_seconds_border
c_interp_func.argtypes = [
ctypes.c_uint, # input_size
ctypes.c_uint, # output_size
@@ -233,6 +242,17 @@
]
c_interp_func.restype = None
+ c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
+ c_interp_func_qd.argtypes = [
+ ctypes.c_uint, # input_size
+ ctypes.c_uint, # output_size
+ uint64_p, # times
+ uint64_p, # values
+ ctypes.c_uint, # time_scale_coef
+ uint64_p, # output
+ ]
+ c_interp_func_qd.restype = ctypes.c_uint
+
assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
@@ -240,22 +260,35 @@
# print("output_sz =", output_sz, "coef =", coef)
result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
- c_interp_func(ts.data.size,
- output_sz,
- ts.times.ctypes.data_as(uint64_p),
- ts.data.ctypes.data_as(uint64_p),
- coef,
- result.ctypes.data_as(uint64_p))
+ if qd:
+ func = c_interp_func_qd
+ else:
+ func = c_interp_func
+ sz = func(ts.data.size,
+ output_sz,
+ ts.times.ctypes.data_as(uint64_p),
+ ts.data.ctypes.data_as(uint64_p),
+ coef,
+ result.ctypes.data_as(uint64_p))
+
+ if qd:
+ result = result[:sz]
+ output_sz = sz
+ else:
+ assert sz is None
+
+ rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
res_ts = TimeSeries(ts.name, None, result,
- times=int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype),
+ times=rtimes,
units=ts.units,
time_units='s',
source=ts.source(),
histo_bins=ts.histo_bins)
if not nc:
- interpolated_cache[ts.source.tpl] = res_ts
+ interpolated_cache[ts.source.tpl] = res_ts.copy()
+
return res_ts
@@ -371,6 +404,9 @@
histo_bins=fixed_bins_edges)
+qd_metrics = {'io_queue'}
+
+
def summ_sensors(rstorage: ResultStorage,
roles: List[str],
sensor: str,
@@ -381,7 +417,7 @@
for node in find_nodes_by_roles(rstorage, roles):
for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data)
+ data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
data = get_ts_for_time_range(data, time_range)
if res is None:
res = data
@@ -403,7 +439,7 @@
for dev in devs:
for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data)
+ data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
data = get_ts_for_time_range(data, time_range)
res.append(data.data)
res2d = numpy.concatenate(res)
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
index 733eab0..666c753 100644
--- a/wally/hlstorage.py
+++ b/wally/hlstorage.py
@@ -38,7 +38,7 @@
stat_r = job_root + r'{node_id}\.{sensor}\.{metric}\.stat\.yaml'
# sensor data
- sensor_data_r = r'sensors/{node_id}_{sensor}\.{dev}\.{metric}\.csv'
+ sensor_data_r = r'sensors/{node_id}_{sensor}\.{dev}\.{metric}\.{tag}'
sensor_time_r = r'sensors/{node_id}_collected_at\.csv'
report_root = 'report/'
@@ -185,13 +185,19 @@
time_units=time_units,
histo_bins=header2)
+ def load_sensor_raw(self, ds: DataSource) -> bytes:
+ path = DB_paths.sensor_data.format(**ds.__dict__)
+ with self.storage.get_fd(path, "rb") as fd:
+ return fd.read()
+
def load_sensor(self, ds: DataSource) -> TimeSeries:
# sensors has no shape
path = DB_paths.sensor_time.format(**ds.__dict__)
collect_header, must_be_none, collected_at = self.load_array(path)
# cut 'collection end' time
- collected_at = collected_at[::2]
+ # .copy needed to really remove 'collection end' element to make c_interpolate_.. works correctly
+ collected_at = collected_at[::2].copy()
# there must be no histogram for collected_at
assert must_be_none is None, "Extra header2 {!r} in collect_at file at {!r}".format(must_be_none, path)
@@ -281,6 +287,11 @@
def put_report(self, report: str, name: str) -> str:
return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
+ def put_sensor_raw(self, data: bytes, ds: DataSource) -> None:
+ path = DB_paths.sensor_data.format(**ds.__dict__)
+ with self.storage.get_fd(path, "cb") as fd:
+ fd.write(data)
+
def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
if ds.metric == 'collected_at':
path = DB_paths.sensor_time
diff --git a/wally/node.py b/wally/node.py
index a828492..32ec58a 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -282,7 +282,7 @@
log_file = node.run("mktemp", nolog=True).strip()
cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
- logger.info("Agent logs for node {} stored on node in file {} log level is {}".format(
+ logger.info("Agent logs for node {} stored remotely in file {}, log level is {}".format(
node.node_id, log_file, log_level))
else:
cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
diff --git a/wally/report.py b/wally/report.py
index b8a7713..861e513 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -10,7 +10,7 @@
import numpy
import scipy.stats
-import matplotlib
+# import matplotlib
# matplotlib.use('GTKAgg')
import matplotlib.pyplot as plt
@@ -49,8 +49,6 @@
DEBUG = False
LARGE_BLOCKS = 256
-MiB2KiB = 1024
-MS2S = 1000
# ---------------- PROFILES ------------------------------------------------------------------------------------------
@@ -114,17 +112,15 @@
min_iops_vs_qd_jobs = 3
- units = {
- 'bw': ("MiBps", MiB2KiB, "bandwith"),
- 'iops': ("IOPS", 1, "iops"),
- 'lat': ("ms", 1, "latency")
- }
-
qd_bins = [0, 1, 2, 4, 6, 8, 12, 16, 20, 26, 32, 40, 48, 56, 64, 96, 128]
iotime_bins = list(range(0, 1030, 50))
block_size_bins = [0, 2, 4, 8, 16, 32, 48, 64, 96, 128, 192, 256, 384, 512, 1024, 2048]
+DefColorProfile = ColorProfile()
+DefStyleProfile = StyleProfile()
+
+
# ---------------- STRUCTS -------------------------------------------------------------------------------------------
@@ -248,8 +244,8 @@
@provide_plot
def plot_hist(title: str, units: str,
prop: StatProps,
- colors: Any = ColorProfile,
- style: Any = StyleProfile) -> None:
+ colors: ColorProfile = DefColorProfile,
+ style: StyleProfile = DefStyleProfile) -> None:
# TODO: unit should came from ts
normed_bins = prop.bins_populations / prop.bins_populations.sum()
@@ -288,8 +284,8 @@
ylabel: str,
xlabel: str = "time, s",
average: bool = False,
- colors: Any = ColorProfile,
- style: Any = StyleProfile) -> None:
+ colors: ColorProfile = DefColorProfile,
+ style: StyleProfile = DefStyleProfile) -> None:
fig, ax = plt.subplots(figsize=(12, 6))
for name, arr in tss:
if average:
@@ -308,7 +304,7 @@
@provide_plot
def plot_hmap_from_2d(data2d: numpy.ndarray,
title: str, ylabel: str, xlabel: str = 'time, s', bins: numpy.ndarray = None,
- colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+ colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
ioq1d, ranges = hmap_from_2d(data2d)
ax, _ = plot_hmap_with_y_histo(ioq1d, ranges, bins=bins)
ax.set_ylabel(ylabel)
@@ -322,7 +318,8 @@
ts: TimeSeries,
plot_avg_dev: bool = True,
plot_points: bool = True,
- colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+ colors: ColorProfile = DefColorProfile,
+ style: StyleProfile = DefStyleProfile) -> None:
min_time = min(ts.times)
@@ -395,8 +392,7 @@
def plot_lat_over_time(title: str, ts: TimeSeries,
ylabel: str,
samples: int = 5,
- colors: Any = ColorProfile,
- style: Any = StyleProfile) -> None:
+ colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
min_time = min(ts.times)
times = [int(tm - min_time + 500) // 1000 for tm in ts.times]
@@ -469,8 +465,7 @@
ts: TimeSeries,
ylabel: str,
xlabel: str = "time, s",
- colors: Any = ColorProfile,
- style: Any = StyleProfile) -> None:
+ colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
# only histogram-based ts can be plotted
assert len(ts.data.shape) == 2
@@ -571,8 +566,7 @@
iosums: List[IOSummary],
iops_log_spine: bool = False,
lat_log_spine: bool = False,
- colors: Any = ColorProfile,
- style: Any = StyleProfile) -> None:
+ colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
# -------------- MAGIC VALUES ---------------------
# IOPS bar width
@@ -609,19 +603,23 @@
# gs = gridspec.GridSpec(1, 3, width_ratios=[1, 4, 1])
# p1 = plt.subplot(gs[1])
+ logger.warning("Check coef usage!")
+
fig, p1 = plt.subplots(figsize=StyleProfile.figsize)
# plot IOPS/BW bars
if block_size >= LARGE_BLOCKS:
iops_primary = False
- coef = MiB2KiB
+ coef = float(unit_conversion_coef(iosums[0].bw.units, "MiBps"))
p1.set_ylabel("BW (MiBps)")
else:
iops_primary = True
- coef = block_size
+ coef = float(unit_conversion_coef(iosums[0].bw.units, "MiBps")) / block_size
p1.set_ylabel("IOPS")
- p1.bar(xpos, [iosum.bw.average / coef for iosum in iosums], width=width, color=colors.box_color, label=legend)
+ vals = [iosum.bw.average * coef for iosum in iosums]
+
+ p1.bar(xpos, vals, width=width, color=colors.box_color, label=legend)
# set correct x limits for primary IO spine
min_io = min(iosum.bw.average - iosum.bw.deviation * style.dev_range_x for iosum in iosums)
@@ -629,19 +627,19 @@
border = (max_io - min_io) * extra_y_space
io_lims = (min_io - border, max_io + border)
- p1.set_ylim(io_lims[0] / coef, io_lims[-1] / coef)
+ p1.set_ylim(io_lims[0] * coef, io_lims[-1] * coef)
# plot deviation and confidence error ranges
err1_legend = err2_legend = None
for pos, iosum in zip(xpos, iosums):
err1_legend = p1.errorbar(pos + width / 2 - err_x_offset,
- iosum.bw.average / coef,
- iosum.bw.deviation * style.dev_range_x / coef,
+ iosum.bw.average * coef,
+ iosum.bw.deviation * style.dev_range_x * coef,
alpha=colors.subinfo_alpha,
color=colors.suppl_color1) # 'magenta'
err2_legend = p1.errorbar(pos + width / 2 + err_x_offset,
- iosum.bw.average / coef,
- iosum.bw.confidence / coef,
+ iosum.bw.average * coef,
+ iosum.bw.confidence * coef,
alpha=colors.subinfo_alpha,
color=colors.suppl_color2) # 'teal'
@@ -681,10 +679,10 @@
if iops_primary:
p3.set_ylabel("BW (MiBps)")
- p3.set_ylim(io_lims[0] / MiB2KiB, io_lims[1] / MiB2KiB)
+ p3.set_ylim(io_lims[0] * coef, io_lims[1] * coef)
else:
p3.set_ylabel("IOPS")
- p3.set_ylim(io_lims[0] / block_size, io_lims[1] / block_size)
+ p3.set_ylim(io_lims[0] * coef, io_lims[1] * coef)
p3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
p3.spines["left"].set_visible(True)
@@ -719,10 +717,10 @@
self.data = data
self.order_attr = order_attr
- def __eq__(self, o: object) -> bool:
+ def __eq__(self, o: Any) -> bool:
return o.order_attr == self.order_attr # type: ignore
- def __lt__(self, o: object) -> bool:
+ def __lt__(self, o: Any) -> bool:
return o.order_attr > self.order_attr # type: ignore
@@ -840,36 +838,41 @@
res += html.table("Test info", None, summary_data)
stat_data_headers = ["Name", "Average ~ Dev", "Conf interval", "Mediana", "Mode", "Kurt / Skew", "95%", "99%"]
- KB = 1024
+ bw_target_units = 'Bps'
+ bw_coef = float(unit_conversion_coef(io_sum.bw.units, bw_target_units))
+
bw_data = ["Bandwidth",
- "{}Bps ~ {}Bps".format(b2ssize(io_sum.bw.average * KB), b2ssize(io_sum.bw.deviation * KB)),
- b2ssize(io_sum.bw.confidence * KB) + "Bps",
- b2ssize(io_sum.bw.perc_50 * KB) + "Bps",
+ "{}{} ~ {}{}".format(b2ssize(io_sum.bw.average * bw_coef), bw_target_units,
+ b2ssize(io_sum.bw.deviation * bw_coef), bw_target_units),
+ b2ssize(io_sum.bw.confidence * bw_coef) + bw_target_units,
+ b2ssize(io_sum.bw.perc_50 * bw_coef) + bw_target_units,
"-",
"{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
- b2ssize(io_sum.bw.perc_5 * KB) + "Bps",
- b2ssize(io_sum.bw.perc_1 * KB) + "Bps"]
+ b2ssize(io_sum.bw.perc_5 * bw_coef) + bw_target_units,
+ b2ssize(io_sum.bw.perc_1 * bw_coef) + bw_target_units]
+ iops_coef = float(unit_conversion_coef(io_sum.bw.units, 'KiBps')) / fjob.bsize
iops_data = ["IOPS",
- "{}IOPS ~ {}IOPS".format(b2ssize_10(io_sum.bw.average / fjob.bsize),
- b2ssize_10(io_sum.bw.deviation / fjob.bsize)),
- b2ssize_10(io_sum.bw.confidence / fjob.bsize) + "IOPS",
- b2ssize_10(io_sum.bw.perc_50 / fjob.bsize) + "IOPS",
+ "{}IOPS ~ {}IOPS".format(b2ssize_10(io_sum.bw.average * iops_coef),
+ b2ssize_10(io_sum.bw.deviation * iops_coef)),
+ b2ssize_10(io_sum.bw.confidence * iops_coef) + "IOPS",
+ b2ssize_10(io_sum.bw.perc_50 * iops_coef) + "IOPS",
"-",
"{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
- b2ssize_10(io_sum.bw.perc_5 / fjob.bsize) + "IOPS",
- b2ssize_10(io_sum.bw.perc_1 / fjob.bsize) + "IOPS"]
+ b2ssize_10(io_sum.bw.perc_5 * iops_coef) + "IOPS",
+ b2ssize_10(io_sum.bw.perc_1 * iops_coef) + "IOPS"]
- MICRO = 1000000
+ lat_target_unit = 's'
+ lat_coef = unit_conversion_coef(io_sum.lat.units, lat_target_unit)
# latency
lat_data = ["Latency",
"-",
"-",
- b2ssize_10(io_sum.bw.perc_50 / MICRO) + "s",
+ b2ssize_10(io_sum.lat.perc_50 * lat_coef) + lat_target_unit,
"-",
"-",
- b2ssize_10(io_sum.bw.perc_95 / MICRO) + "s",
- b2ssize_10(io_sum.bw.perc_99 / MICRO) + "s"]
+ b2ssize_10(io_sum.lat.perc_95 * lat_coef) + lat_target_unit,
+ b2ssize_10(io_sum.lat.perc_99 * lat_coef) + lat_target_unit]
# sensor usage
stat_data = [iops_data, bw_data, lat_data]
@@ -877,17 +880,19 @@
resource_headers = ["Resource", "Usage count", "Proportional to work done"]
- io_transfered = io_sum.bw.data.sum() * KB
+ tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "KiBps"))
+ tot_ops_coef = tot_io_coef / fjob.bsize
+
+ io_transfered = io_sum.bw.data.sum() * tot_io_coef
resource_data = [
- ["IO made", b2ssize_10(io_transfered / KB / fjob.bsize) + "OP", "-"],
+ ["IO made", b2ssize_10(io_transfered * tot_ops_coef) + "OP", "-"],
["Data transfered", b2ssize(io_transfered) + "B", "-"]
]
storage = rstorage.storage
nodes = storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
- trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
- ops_done = io_transfered / fjob.bsize / KB
+ ops_done = io_transfered * tot_ops_coef
all_metrics = [
("Test nodes net send", 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
@@ -913,7 +918,7 @@
if not nodes:
continue
- res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=trange)
+ res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=job.reliable_info_range_s)
if res_ts is None:
continue
@@ -946,21 +951,23 @@
job: JobConfig,
rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
-
# plot CPU time
for rt, roles in [('storage', STORAGE_ROLES), ('test', ['testnode'])]:
cpu_ts = {}
cpu_metrics = "idle guest iowait irq nice sirq steal sys user".split()
for name in cpu_metrics:
- cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=trange)
+ cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name,
+ time_range=job.reliable_info_range_s)
it = iter(cpu_ts.values())
total_over_time = next(it).data.copy()
for ts in it:
total_over_time += ts.data
- fname = plot_simple_over_time(rstorage, cpu_ts['idle'].source(metric='allcpu', tag=rt + '.plt.svg'),
+ fname = plot_simple_over_time(rstorage,
+ cpu_ts['idle'].source(job_id=job.storage_id,
+ suite_id=suite.storage_id,
+ metric='allcpu', tag=rt + '.plt.svg'),
tss=[(name, ts.data * 100 / total_over_time) for name, ts in cpu_ts.items()],
average=True,
ylabel="CPU time %",
@@ -1084,10 +1091,10 @@
if fjob.bsize >= LARGE_BLOCKS:
title = "BW distribution"
units = "MiBps"
- agg_io.data //= MiB2KiB
+ agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
else:
title = "IOPS distribution"
- agg_io.data //= fjob.bsize
+ agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
units = "IOPS"
io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
@@ -1111,10 +1118,10 @@
if fjob.bsize >= LARGE_BLOCKS:
title = "Fio measured Bandwidth over time"
units = "MiBps"
- agg_io.data //= MiB2KiB
+ agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
else:
title = "Fio measured IOPS over time"
- agg_io.data //= fjob.bsize
+ agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
units = "IOPS"
fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io) # type: str
@@ -1163,10 +1170,10 @@
# TODO: units should came from sensor
storage_sensors = [
- ('block-io', 'reads_completed', "Read ops", 'iops'),
- ('block-io', 'writes_completed', "Write ops", 'iops'),
- ('block-io', 'sectors_read', "Read kb", 'KB'),
- ('block-io', 'sectors_written', "Write kb", 'KB'),
+ ('block-io', 'reads_completed', "Read", 'iop'),
+ ('block-io', 'writes_completed', "Write", 'iop'),
+ ('block-io', 'sectors_read', "Read", 'KiB'),
+ ('block-io', 'sectors_written', "Write", 'KiB'),
]
def get_divs(self,
@@ -1175,11 +1182,8 @@
rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
- # convert ms to s
- time_range = (job.reliable_info_range[0] // MS2S, job.reliable_info_range[1] // MS2S)
-
- for sensor, metric, sensor_title, units in self.storage_sensors:
- ts = summ_sensors(rstorage, ['testnode'], sensor, metric, time_range)
+ for sensor, metric, op, units in self.storage_sensors:
+ ts = summ_sensors(rstorage, ['testnode'], sensor, metric, job.reliable_info_range_s)
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
node_id="test_nodes",
@@ -1188,10 +1192,9 @@
metric=metric,
tag="ts.svg")
- data = ts.data if units != 'KB' else ts.data * float(unit_conversion_coef(ts.units, 'KB'))
-
+ data = ts.data if units != 'KiB' else ts.data * float(unit_conversion_coef(ts.units, 'KiB'))
ts = TimeSeries(name="",
- times=numpy.arange(*time_range),
+ times=numpy.arange(*job.reliable_info_range_s),
data=data,
raw=None,
units=units if ts.units is None else ts.units,
@@ -1199,6 +1202,7 @@
source=ds,
histo_bins=ts.histo_bins)
+ sensor_title = "{} {}".format(op, units)
fpath = plot_v_over_time(rstorage, ds, sensor_title, sensor_title, ts=ts) # type: str
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 1fbd094..9d2e83e 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -145,7 +145,11 @@
return str(self)
def copy(self) -> 'TimeSeries':
- return copy.copy(self)
+ cp = copy.copy(self)
+ cp.times = self.times.copy()
+ cp.data = self.data.copy()
+ cp.source = self.source()
+ return cp
# (node_name, source_dev, metric_name) => metric_results
@@ -157,7 +161,7 @@
__ignore_fields__ = ['data']
- def __init__(self, data: numpy.array) -> None:
+ def __init__(self, data: numpy.array, units: str) -> None:
self.perc_99 = None # type: float
self.perc_95 = None # type: float
self.perc_90 = None # type: float
@@ -177,6 +181,7 @@
self.bins_edges = None # type: numpy.array
self.data = data
+ self.units = units
def __str__(self) -> str:
res = ["{}(size = {}):".format(self.__class__.__name__, len(self.data))]
@@ -204,14 +209,14 @@
class HistoStatProps(StatProps):
"""Statistic properties for 2D timeseries with unknown data distribution and histogram as input value.
Used for latency"""
- def __init__(self, data: numpy.array) -> None:
- StatProps.__init__(self, data)
+ def __init__(self, data: numpy.array, units: str) -> None:
+ StatProps.__init__(self, data, units)
class NormStatProps(StatProps):
"Statistic properties for timeseries with normal data distribution. Used for iops/bw"
- def __init__(self, data: numpy.array) -> None:
- StatProps.__init__(self, data)
+ def __init__(self, data: numpy.array, units: str) -> None:
+ StatProps.__init__(self, data, units)
self.average = None # type: float
self.deviation = None # type: float
diff --git a/wally/sensors.py b/wally/sensors.py
index 1faa03b..60830a1 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,3 +1,4 @@
+import bz2
import array
import logging
from typing import List, Dict, Tuple
@@ -117,7 +118,6 @@
def collect_sensors_data(ctx: TestRun, stop: bool = False):
rstorage = ResultStorage(ctx.storage)
- raw_skipped = False
for node in ctx.nodes:
node_id = node.node_id
if node_id in ctx.sensors_run_on:
@@ -130,22 +130,23 @@
# TODO: units should came along with data
# TODO: process raw sensors data
- for path, value, is_parsed in sensors_rpc_plugin.unpack_rpc_updates(func()):
- if not is_parsed:
- if not raw_skipped:
- logger.warning("Raw sensors data at path %r and, maybe, others are skipped", path)
- raw_skipped = True
- continue
-
+ for path, value, is_array in sensors_rpc_plugin.unpack_rpc_updates(func()):
if path == 'collected_at':
- ds = DataSource(node_id=node_id, metric='collected_at')
- units = 'us'
+ ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
+ rstorage.append_sensor(numpy.array(value), ds, 'us')
else:
sensor, dev, metric = path.split(".")
- ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor)
- units = sensor_units["{}.{}".format(sensor, metric)]
+ ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
+ if is_array:
+ units = sensor_units["{}.{}".format(sensor, metric)]
+ rstorage.append_sensor(numpy.array(value), ds, units)
+ else:
+ if metric == 'historic':
+ rstorage.put_sensor_raw(bz2.compress(value), ds(tag='bin'))
+ else:
+ assert metric in ('perf_dump', 'historic_js')
+ rstorage.put_sensor_raw(value, ds(tag='js'))
- rstorage.append_sensor(numpy.array(value), ds, units)
class CollectSensorsStage(Stage):
diff --git a/wally/statistic.py b/wally/statistic.py
index 8543e0f..047f86d 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -27,7 +27,7 @@
# array.array has very basic support
data = cast(List[int], ts.data)
- res = NormStatProps(data) # type: ignore
+ res = NormStatProps(data, ts.units) # type: ignore
if len(data) == 0:
raise ValueError("Input array is empty")
@@ -129,7 +129,7 @@
if bins_edges is None:
bins_edges = ts.histo_bins
- res = HistoStatProps(ts.data)
+ res = HistoStatProps(ts.data, ts.units)
# summ across all series
aggregated = ts.data.sum(axis=0, dtype='int')
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index bdcec23..222589b 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -291,10 +291,10 @@
def get_log_files(sec: FioJobConfig, iops: bool = False) -> Iterator[Tuple[str, str, str]]:
res = [] # type: List[Tuple[str, str, str]]
- keys = [('write_bw_log', 'bw', 'kibps'),
+ keys = [('write_bw_log', 'bw', 'KiBps'),
('write_hist_log', 'lat', 'us')]
if iops:
- keys.append(('write_iops_log', 'iops', 'iops'))
+ keys.append(('write_iops_log', 'iops', 'IOPS'))
for key, name, units in keys:
log = sec.vals.get(key)
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 0d85f94..40055d7 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,9 +1,10 @@
[global]
include defaults_qd.cfg
-QD={% 32, 64, 128, 256 %}
-runtime=600
+# QD={% 32, 64, 128, 256 %}
+QD={% 32, 256 %}
+runtime=120
direct=1
-ramp_time=30
+ramp_time=15
# ---------------------------------------------------------------------
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 1e4c457..5cb27b5 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -65,10 +65,15 @@
# time interval, in seconds, when test was running on all nodes
self.reliable_info_range = None # type: Tuple[int, int]
+
# all job parameters, both from suite file and config file
self.vals = OrderedDict() # type: Dict[str, Any]
@property
+ def reliable_info_range_s(self) -> Tuple[int, int]:
+ return (self.reliable_info_range[0] // 1000, self.reliable_info_range[1] // 1000)
+
+ @property
def storage_id(self) -> str:
"""unique string, used as key in storage"""
return "{}_{}".format(self.summary, self.idx)
diff --git a/wally/utils.py b/wally/utils.py
index 151ed5f..af20367 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -465,7 +465,14 @@
return 1, units
+conversion_cache = {}
+
+
def unit_conversion_coef(from_unit: str, to_unit: str) -> Union[Fraction, int]:
+ key = (from_unit, to_unit)
+ if key in conversion_cache:
+ return conversion_cache[key]
+
f1, u1 = split_unit(from_unit)
f2, u2 = split_unit(to_unit)
@@ -473,14 +480,16 @@
if isinstance(f1, int) and isinstance(f2, int):
if f1 % f2 != 0:
- return Fraction(f1, f2)
+ res = Fraction(f1, f2)
else:
- return f1 // f2
-
- res = f1 / f2
+ res = f1 // f2
+ else:
+ res = f1 / f2
if isinstance(res, Fraction) and cast(Fraction, res).denominator == 1:
- return cast(Fraction, res).numerator
+ res = cast(Fraction, res).numerator
+
+ conversion_cache[key] = res
return res