blob: 2d563b95df07557afb1adfe027c0d04671eb263c [file] [log] [blame]
import logging
from typing import Tuple, Dict, cast, List, Optional, Union
import numpy
from cephlib.units import b2ssize_10, b2ssize, unit_conversion_coef_f
from cephlib.statistic import NormStatProps, HistoStatProps, calc_norm_stat_props, calc_histo_stat_props
from cephlib.numeric_types import TimeSeries
from cephlib.wally_storage import find_nodes_by_roles, WallyDB
from cephlib.storage_selectors import sum_sensors
from .result_classes import IWallyStorage, SuiteConfig
from .utils import STORAGE_ROLES
from .suits.io.fio import FioJobConfig
from .suits.job import JobConfig
from .data_selectors import get_aggregated
logger = logging.getLogger('wally')
class IOSummary:
def __init__(self, qd: int, block_size: int, nodes_count:int, bw: NormStatProps, lat: HistoStatProps) -> None:
self.qd = qd
self.nodes_count = nodes_count
self.block_size = block_size
self.bw = bw
self.lat = lat
class ResourceNames:
io_made = "Client IOP made"
data_tr = "Client data transfered"
test_send = "Test nodes net send"
test_recv = "Test nodes net recv"
test_net = "Test nodes net total"
test_send_pkt = "Test nodes send pkt"
test_recv_pkt = "Test nodes recv pkt"
test_net_pkt = "Test nodes total pkt"
test_write = "Test nodes disk write"
test_read = "Test nodes disk read"
test_write_iop = "Test nodes write IOP"
test_read_iop = "Test nodes read IOP"
test_iop = "Test nodes IOP"
test_rw = "Test nodes disk IO"
storage_send = "Storage nodes net send"
storage_recv = "Storage nodes net recv"
storage_send_pkt = "Storage nodes send pkt"
storage_recv_pkt = "Storage nodes recv pkt"
storage_net = "Storage nodes net total"
storage_net_pkt = "Storage nodes total pkt"
storage_write = "Storage nodes disk write"
storage_read = "Storage nodes disk read"
storage_write_iop = "Storage nodes write IOP"
storage_read_iop = "Storage nodes read IOP"
storage_iop = "Storage nodes IOP"
storage_rw = "Storage nodes disk IO"
storage_cpu = "Storage nodes CPU"
storage_cpu_s = "Storage nodes CPU s/IOP"
storage_cpu_s_b = "Storage nodes CPU s/B"
def avg_dev_div(vec: numpy.ndarray, denom: numpy.ndarray, avg_ranges: int = 10) -> Tuple[float, float]:
step = min(vec.size, denom.size) // avg_ranges
assert step >= 1
vals = []
whole_sum = denom.sum() / denom.size * step * 0.5
for i in range(0, avg_ranges):
s1 = denom[i * step: (i + 1) * step].sum()
if s1 > 1e-5 and s1 >= whole_sum:
vals.append(vec[i * step: (i + 1) * step].sum() / s1)
assert len(vals) > 1
return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
iosum_cache = {} # type: Dict[Tuple[str, str], IOSummary]
def make_iosum(rstorage: IWallyStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
nc: bool = False) -> IOSummary:
key = (suite.storage_id, job.storage_id)
if not nc and key in iosum_cache:
return iosum_cache[key]
lat = get_aggregated(rstorage, suite.storage_id, job.storage_id, "lat", job.reliable_info_range_s)
io = get_aggregated(rstorage, suite.storage_id, job.storage_id, "bw", job.reliable_info_range_s)
res = IOSummary(job.qd,
nodes_count=len(suite.nodes_ids),
block_size=job.bsize,
lat=calc_histo_stat_props(lat, rebins_count=hist_boxes),
bw=calc_norm_stat_props(io, hist_boxes))
if not nc:
iosum_cache[key] = res
return res
cpu_load_cache = {} # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
def get_cluster_cpu_load(rstorage: IWallyStorage, roles: List[str],
time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
key = (id(rstorage), tuple(roles), time_range)
if not nc and key in cpu_load_cache:
return cpu_load_cache[key]
cpu_ts = {}
cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
nodes = find_nodes_by_roles(rstorage.storage, roles)
cores_per_node = {}
for node in rstorage.load_nodes():
cores_per_node[node.node_id] = 48 # sum(cores for _, cores in node.hw_info.cpus)
for name in cpu_metrics:
cpu_ts[name] = sum_sensors(rstorage, time_range, node_id=nodes, sensor='system-cpu', metric=name)
it = iter(cpu_ts.values())
total_over_time = next(it).data.copy() # type: numpy.ndarray
for ts in it:
if ts is not None:
total_over_time += ts.data
total = cpu_ts['idle'].copy(no_data=True)
total.data = total_over_time
cpu_ts['total'] = total
if not nc:
cpu_load_cache[key] = cpu_ts
return cpu_ts
def get_resources_usage(suite: SuiteConfig,
job: JobConfig,
rstorage: IWallyStorage,
large_block: int = 256,
hist_boxes: int = 10,
nc: bool = False) -> Tuple[Dict[str, Tuple[str, Optional[float], Optional[float]]], bool]:
if not nc:
jinfo = rstorage.get_job_info(suite, job, WallyDB.resource_usage_rel)
if jinfo is not None:
jinfo = jinfo.copy()
return jinfo, jinfo.pop('iops_ok') # type: ignore
fjob = cast(FioJobConfig, job)
iops_ok = fjob.bsize < large_block
io_sum = make_iosum(rstorage, suite, fjob, hist_boxes)
tot_io_coef = unit_conversion_coef_f(io_sum.bw.units, "Bps")
io_transfered = io_sum.bw.data * tot_io_coef
records: Dict[str, Tuple[str, Optional[float], Optional[float]]] = {
ResourceNames.data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
}
if iops_ok:
ops_done = io_transfered / (fjob.bsize * unit_conversion_coef_f("KiBps", "Bps"))
records[ResourceNames.io_made] = (b2ssize_10(ops_done.sum()) + "OP", None, None)
else:
ops_done = None
all_metrics = [
(ResourceNames.test_send, 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
(ResourceNames.test_recv, 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
(ResourceNames.test_send_pkt, 'net-io', 'send_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
(ResourceNames.test_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
(ResourceNames.test_write, 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
(ResourceNames.test_read, 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
(ResourceNames.test_write_iop, 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
(ResourceNames.test_read_iop, 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
(ResourceNames.storage_send, 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
(ResourceNames.storage_recv, 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
(ResourceNames.storage_send_pkt, 'net-io', 'send_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
(ResourceNames.storage_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
(ResourceNames.storage_write, 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
(ResourceNames.storage_read, 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
(ResourceNames.storage_write_iop, 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
(ResourceNames.storage_read_iop, 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
]
all_agg = {}
for vname, sensor, metric, ffunc, roles, units, service_provided_count in all_metrics:
if service_provided_count is None:
continue
nodes = find_nodes_by_roles(rstorage.storage, roles)
res_ts = sum_sensors(rstorage, job.reliable_info_range_s, node_id=nodes, sensor=sensor, metric=metric)
if res_ts is None:
continue
data = res_ts.data
if units == "B":
data = data * unit_conversion_coef_f(res_ts.units, "B")
avg, dev = avg_dev_div(data, service_provided_count)
if avg < 0.1:
dev = None # type: ignore
records[vname] = (ffunc(data.sum()) + units, avg, dev)
all_agg[vname] = data
# cpu usage
stor_cores_count = None
for node in rstorage.load_nodes():
if node.roles.intersection(STORAGE_ROLES):
if stor_cores_count is None:
stor_cores_count = sum(cores for _, cores in node.hw_info.cpus)
else:
assert stor_cores_count == sum(cores for _, cores in node.hw_info.cpus)
assert stor_cores_count != 0
cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
cpus_used_sec = (1.0 - (cpu_ts['idle'].data + cpu_ts['iowait'].data) / cpu_ts['total'].data) * stor_cores_count
used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
all_agg[ResourceNames.storage_cpu] = cpus_used_sec
if ops_done is not None:
records[ResourceNames.storage_cpu_s] = (used_s, *avg_dev_div(cpus_used_sec, ops_done))
records[ResourceNames.storage_cpu_s_b] = (used_s, *avg_dev_div(cpus_used_sec, io_transfered))
cums = [
(ResourceNames.test_iop, ResourceNames.test_read_iop, ResourceNames.test_write_iop,
b2ssize_10, "OP", ops_done),
(ResourceNames.test_rw, ResourceNames.test_read, ResourceNames.test_write, b2ssize, "B", io_transfered),
(ResourceNames.test_net, ResourceNames.test_send, ResourceNames.test_recv, b2ssize, "B", io_transfered),
(ResourceNames.test_net_pkt, ResourceNames.test_send_pkt, ResourceNames.test_recv_pkt, b2ssize_10,
"pkt", ops_done),
(ResourceNames.storage_iop, ResourceNames.storage_read_iop, ResourceNames.storage_write_iop, b2ssize_10,
"OP", ops_done),
(ResourceNames.storage_rw, ResourceNames.storage_read, ResourceNames.storage_write, b2ssize, "B",
io_transfered),
(ResourceNames.storage_net, ResourceNames.storage_send, ResourceNames.storage_recv, b2ssize, "B",
io_transfered),
(ResourceNames.storage_net_pkt, ResourceNames.storage_send_pkt, ResourceNames.storage_recv_pkt, b2ssize_10,
"pkt", ops_done),
]
for vname, name1, name2, ffunc, units, service_provided_masked in cums:
if service_provided_masked is None:
continue
if name1 in all_agg and name2 in all_agg:
agg = all_agg[name1] + all_agg[name2]
avg, dev = avg_dev_div(agg, service_provided_masked)
if avg < 0.1:
dev = None # type: ignore
records[vname] = (ffunc(agg.sum()) + units, avg, dev)
if not nc:
toflt = lambda x: float(x) if x is not None else None
for name, (v1, v2, v3) in list(records.items()):
records[name] = v1, toflt(v2), toflt(v3)
srecords = records.copy()
srecords['iops_ok'] = iops_ok # type: ignore
rstorage.put_job_info(suite, job, WallyDB.resource_usage_rel, srecords)
return records, iops_ok