many updates and fixes
diff --git a/wally/resources.py b/wally/resources.py
new file mode 100644
index 0000000..f063eb5
--- /dev/null
+++ b/wally/resources.py
@@ -0,0 +1,259 @@
+from typing import Tuple, Dict, cast, List
+
+import numpy
+
+from .hlstorage import ResultStorage
+from .utils import b2ssize_10, b2ssize, unit_conversion_coef, STORAGE_ROLES
+from .result_classes import SuiteConfig
+from .suits.io.fio import FioJobConfig
+from .suits.job import JobConfig
+from .result_classes import NormStatProps, HistoStatProps, TimeSeries
+from .statistic import calc_norm_stat_props, calc_histo_stat_props
+from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+
+
+class IOSummary:
+ def __init__(self, qd: int, block_size: int, nodes_count:int, bw: NormStatProps, lat: HistoStatProps) -> None:
+ self.qd = qd
+ self.nodes_count = nodes_count
+ self.block_size = block_size
+ self.bw = bw
+ self.lat = lat
+
+
+class ResourceNames:
+ io_made = "Client IOP made"
+ data_tr = "Client data transfered"
+
+ test_send = "Test nodes net send"
+ test_recv = "Test nodes net recv"
+ test_net = "Test nodes net total"
+ test_send_pkt = "Test nodes send pkt"
+ test_recv_pkt = "Test nodes recv pkt"
+ test_net_pkt = "Test nodes total pkt"
+
+ test_write = "Test nodes disk write"
+ test_read = "Test nodes disk read"
+ test_write_iop = "Test nodes write IOP"
+ test_read_iop = "Test nodes read IOP"
+ test_iop = "Test nodes IOP"
+ test_rw = "Test nodes disk IO"
+
+ storage_send = "Storage nodes net send"
+ storage_recv = "Storage nodes net recv"
+ storage_send_pkt = "Storage nodes send pkt"
+ storage_recv_pkt = "Storage nodes recv pkt"
+ storage_net = "Storage nodes net total"
+ storage_net_pkt = "Storage nodes total pkt"
+
+ storage_write = "Storage nodes disk write"
+ storage_read = "Storage nodes disk read"
+ storage_write_iop = "Storage nodes write IOP"
+ storage_read_iop = "Storage nodes read IOP"
+ storage_iop = "Storage nodes IOP"
+ storage_rw = "Storage nodes disk IO"
+
+ storage_cpu = "Storage nodes CPU"
+ storage_cpu_s = "Storage nodes CPU s/IOP"
+ storage_cpu_s_b = "Storage nodes CPU s/B"
+
+
+def avg_dev_div(vec: numpy.ndarray, denom: numpy.ndarray, avg_ranges: int = 10) -> Tuple[float, float]:
+ step = min(vec.size, denom.size) // avg_ranges
+ assert step >= 1
+ vals = []
+
+ whole_sum = denom.sum() / denom.size * step * 0.5
+ for i in range(0, avg_ranges):
+ s1 = denom[i * step: (i + 1) * step].sum()
+ if s1 > 1e-5 and s1 >= whole_sum:
+ vals.append(vec[i * step: (i + 1) * step].sum() / s1)
+
+ assert len(vals) > 1
+ return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
+
+
+iosum_cache = {} # type: Dict[Tuple[str, str]]
+
+
+def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
+ nc: bool = False) -> IOSummary:
+
+ key = (suite.storage_id, job.storage_id)
+ if not nc and key in iosum_cache:
+ return iosum_cache[key]
+
+ lat = get_aggregated(rstorage, suite, job, "lat")
+ io = get_aggregated(rstorage, suite, job, "bw")
+
+ res = IOSummary(job.qd,
+ nodes_count=len(suite.nodes_ids),
+ block_size=job.bsize,
+ lat=calc_histo_stat_props(lat, rebins_count=hist_boxes),
+ bw=calc_norm_stat_props(io, hist_boxes))
+
+ if not nc:
+ iosum_cache[key] = res
+
+ return res
+
+
+cpu_load_cache = {} # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
+
+
+def get_cluster_cpu_load(rstorage: ResultStorage, roles: List[str],
+ time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
+
+ key = (id(rstorage), tuple(roles), time_range)
+ if not nc and key in cpu_load_cache:
+ return cpu_load_cache[key]
+
+ cpu_ts = {}
+ cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
+ for name in cpu_metrics:
+ cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=time_range)
+
+ it = iter(cpu_ts.values())
+ total_over_time = next(it).data.copy() # type: numpy.ndarray
+ for ts in it:
+ if ts is not None:
+ total_over_time += ts.data
+
+ total = cpu_ts['idle'].copy(no_data=True)
+ total.data = total_over_time
+ cpu_ts['total'] = total
+
+ if not nc:
+ cpu_load_cache[key] = cpu_ts
+
+ return cpu_ts
+
+
+def get_resources_usage(suite: SuiteConfig,
+ job: JobConfig,
+ rstorage: ResultStorage,
+ large_block: int = 256,
+ hist_boxes: int = 10,
+ nc: bool = False) -> Tuple[Dict[str, Tuple[str, float, float]], bool]:
+
+ if not nc:
+ records = rstorage.get_job_info(suite, job, "resource_usage")
+ if records is not None:
+ records = records.copy()
+ iops_ok = records.pop('iops_ok')
+ return records, iops_ok
+
+ fjob = cast(FioJobConfig, job)
+ iops_ok = fjob.bsize < large_block
+
+ io_sum = make_iosum(rstorage, suite, fjob, hist_boxes)
+
+ tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "Bps"))
+ io_transfered = io_sum.bw.data * tot_io_coef
+
+ records = {
+ ResourceNames.data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
+ } # type: Dict[str, Tuple[str, float, float]]
+
+ if iops_ok:
+ ops_done = io_transfered / (fjob.bsize * float(unit_conversion_coef("KiBps", "Bps")))
+ records[ResourceNames.io_made] = (b2ssize_10(ops_done.sum()) + "OP", None, None)
+ else:
+ ops_done = None
+
+ all_metrics = [
+ (ResourceNames.test_send, 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
+ (ResourceNames.test_recv, 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
+ (ResourceNames.test_send_pkt, 'net-io', 'send_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
+ (ResourceNames.test_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
+
+ (ResourceNames.test_write, 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
+ (ResourceNames.test_read, 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
+ (ResourceNames.test_write_iop, 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+ (ResourceNames.test_read_iop, 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+
+ (ResourceNames.storage_send, 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (ResourceNames.storage_recv, 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (ResourceNames.storage_send_pkt, 'net-io', 'send_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ (ResourceNames.storage_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+
+ (ResourceNames.storage_write, 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (ResourceNames.storage_read, 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ (ResourceNames.storage_write_iop, 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ (ResourceNames.storage_read_iop, 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ ]
+
+ all_agg = {}
+
+ for vname, sensor, metric, ffunc, roles, units, service_provided_count in all_metrics:
+ if service_provided_count is None:
+ continue
+
+ res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=job.reliable_info_range_s)
+ if res_ts is None:
+ continue
+
+ data = res_ts.data
+ if units == "B":
+ data = data * float(unit_conversion_coef(res_ts.units, "B"))
+
+ avg, dev = avg_dev_div(data, service_provided_count)
+ if avg < 0.1:
+ dev = None
+ records[vname] = (ffunc(data.sum()) + units, avg, dev)
+ all_agg[vname] = data
+
+ # cpu usage
+ nodes_count = len(list(find_nodes_by_roles(rstorage, STORAGE_ROLES)))
+ cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
+
+ cpus_used_sec = (1.0 - cpu_ts['idle'].data / cpu_ts['total'].data) * nodes_count
+ used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
+
+ all_agg[ResourceNames.storage_cpu] = cpus_used_sec
+
+ if ops_done is not None:
+ records[ResourceNames.storage_cpu_s] = (used_s, *avg_dev_div(cpus_used_sec, ops_done))
+
+ records[ResourceNames.storage_cpu_s_b] = (used_s, *avg_dev_div(cpus_used_sec, io_transfered))
+
+ cums = [
+ (ResourceNames.test_iop, ResourceNames.test_read_iop, ResourceNames.test_write_iop,
+ b2ssize_10, "OP", ops_done),
+ (ResourceNames.test_rw, ResourceNames.test_read, ResourceNames.test_write, b2ssize, "B", io_transfered),
+ (ResourceNames.test_net, ResourceNames.test_send, ResourceNames.test_recv, b2ssize, "B", io_transfered),
+ (ResourceNames.test_net_pkt, ResourceNames.test_send_pkt, ResourceNames.test_recv_pkt, b2ssize_10,
+ "pkt", ops_done),
+
+ (ResourceNames.storage_iop, ResourceNames.storage_read_iop, ResourceNames.storage_write_iop, b2ssize_10,
+ "OP", ops_done),
+ (ResourceNames.storage_rw, ResourceNames.storage_read, ResourceNames.storage_write, b2ssize, "B",
+ io_transfered),
+ (ResourceNames.storage_net, ResourceNames.storage_send, ResourceNames.storage_recv, b2ssize, "B",
+ io_transfered),
+ (ResourceNames.storage_net_pkt, ResourceNames.storage_send_pkt, ResourceNames.storage_recv_pkt, b2ssize_10,
+ "pkt", ops_done),
+ ]
+
+ for vname, name1, name2, ffunc, units, service_provided_masked in cums:
+ if service_provided_masked is None:
+ continue
+ if name1 in all_agg and name2 in all_agg:
+ agg = all_agg[name1] + all_agg[name2]
+ avg, dev = avg_dev_div(agg, service_provided_masked)
+ if avg < 0.1:
+ dev = None
+ records[vname] = (ffunc(agg.sum()) + units, avg, dev)
+
+ if not nc:
+ toflt = lambda x: float(x) if x is not None else None
+
+ for name, (v1, v2, v3) in list(records.items()):
+ records[name] = v1, toflt(v2), toflt(v3)
+
+ srecords = records.copy()
+ srecords['iops_ok'] = iops_ok
+ rstorage.put_job_info(suite, job, "resource_usage", srecords)
+
+ return records, iops_ok
+