blob: 2d563b95df07557afb1adfe027c0d04671eb263c [file] [log] [blame]
kdanylov aka koderb0833332017-05-13 20:39:17 +03001import logging
kdanylov aka koder13e58452018-07-15 02:51:51 +03002from typing import Tuple, Dict, cast, List, Optional, Union
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +03003
4import numpy
5
kdanylov aka koderb0833332017-05-13 20:39:17 +03006
7from cephlib.units import b2ssize_10, b2ssize, unit_conversion_coef_f
8from cephlib.statistic import NormStatProps, HistoStatProps, calc_norm_stat_props, calc_histo_stat_props
9from cephlib.numeric_types import TimeSeries
kdanylov aka koder84de1e42017-05-22 14:00:07 +030010from cephlib.wally_storage import find_nodes_by_roles, WallyDB
11from cephlib.storage_selectors import sum_sensors
kdanylov aka koderb0833332017-05-13 20:39:17 +030012
kdanylov aka koder026e5f22017-05-15 01:04:39 +030013from .result_classes import IWallyStorage, SuiteConfig
kdanylov aka koderb0833332017-05-13 20:39:17 +030014from .utils import STORAGE_ROLES
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030015from .suits.io.fio import FioJobConfig
16from .suits.job import JobConfig
kdanylov aka koderb0833332017-05-13 20:39:17 +030017from .data_selectors import get_aggregated
kdanylov aka koderb0833332017-05-13 20:39:17 +030018
19
20logger = logging.getLogger('wally')
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030021
22
23class IOSummary:
24 def __init__(self, qd: int, block_size: int, nodes_count:int, bw: NormStatProps, lat: HistoStatProps) -> None:
25 self.qd = qd
26 self.nodes_count = nodes_count
27 self.block_size = block_size
28 self.bw = bw
29 self.lat = lat
30
31
32class ResourceNames:
33 io_made = "Client IOP made"
34 data_tr = "Client data transfered"
35
36 test_send = "Test nodes net send"
37 test_recv = "Test nodes net recv"
38 test_net = "Test nodes net total"
39 test_send_pkt = "Test nodes send pkt"
40 test_recv_pkt = "Test nodes recv pkt"
41 test_net_pkt = "Test nodes total pkt"
42
43 test_write = "Test nodes disk write"
44 test_read = "Test nodes disk read"
45 test_write_iop = "Test nodes write IOP"
46 test_read_iop = "Test nodes read IOP"
47 test_iop = "Test nodes IOP"
48 test_rw = "Test nodes disk IO"
49
50 storage_send = "Storage nodes net send"
51 storage_recv = "Storage nodes net recv"
52 storage_send_pkt = "Storage nodes send pkt"
53 storage_recv_pkt = "Storage nodes recv pkt"
54 storage_net = "Storage nodes net total"
55 storage_net_pkt = "Storage nodes total pkt"
56
57 storage_write = "Storage nodes disk write"
58 storage_read = "Storage nodes disk read"
59 storage_write_iop = "Storage nodes write IOP"
60 storage_read_iop = "Storage nodes read IOP"
61 storage_iop = "Storage nodes IOP"
62 storage_rw = "Storage nodes disk IO"
63
64 storage_cpu = "Storage nodes CPU"
65 storage_cpu_s = "Storage nodes CPU s/IOP"
66 storage_cpu_s_b = "Storage nodes CPU s/B"
67
68
69def avg_dev_div(vec: numpy.ndarray, denom: numpy.ndarray, avg_ranges: int = 10) -> Tuple[float, float]:
70 step = min(vec.size, denom.size) // avg_ranges
71 assert step >= 1
72 vals = []
73
74 whole_sum = denom.sum() / denom.size * step * 0.5
75 for i in range(0, avg_ranges):
76 s1 = denom[i * step: (i + 1) * step].sum()
77 if s1 > 1e-5 and s1 >= whole_sum:
78 vals.append(vec[i * step: (i + 1) * step].sum() / s1)
79
80 assert len(vals) > 1
81 return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
82
83
kdanylov aka koder026e5f22017-05-15 01:04:39 +030084iosum_cache = {} # type: Dict[Tuple[str, str], IOSummary]
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030085
86
kdanylov aka koder026e5f22017-05-15 01:04:39 +030087def make_iosum(rstorage: IWallyStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030088 nc: bool = False) -> IOSummary:
89
90 key = (suite.storage_id, job.storage_id)
91 if not nc and key in iosum_cache:
92 return iosum_cache[key]
93
kdanylov aka koderb0833332017-05-13 20:39:17 +030094 lat = get_aggregated(rstorage, suite.storage_id, job.storage_id, "lat", job.reliable_info_range_s)
95 io = get_aggregated(rstorage, suite.storage_id, job.storage_id, "bw", job.reliable_info_range_s)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030096
97 res = IOSummary(job.qd,
98 nodes_count=len(suite.nodes_ids),
99 block_size=job.bsize,
100 lat=calc_histo_stat_props(lat, rebins_count=hist_boxes),
101 bw=calc_norm_stat_props(io, hist_boxes))
102
103 if not nc:
104 iosum_cache[key] = res
105
106 return res
107
108
109cpu_load_cache = {} # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
110
111
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300112def get_cluster_cpu_load(rstorage: IWallyStorage, roles: List[str],
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300113 time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
114
115 key = (id(rstorage), tuple(roles), time_range)
116 if not nc and key in cpu_load_cache:
117 return cpu_load_cache[key]
118
119 cpu_ts = {}
120 cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
kdanylov aka koderb0833332017-05-13 20:39:17 +0300121 nodes = find_nodes_by_roles(rstorage.storage, roles)
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300122
123 cores_per_node = {}
124 for node in rstorage.load_nodes():
kdanylov aka koder470a8fa2017-07-14 21:07:58 +0300125 cores_per_node[node.node_id] = 48 # sum(cores for _, cores in node.hw_info.cpus)
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300126
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300127 for name in cpu_metrics:
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300128 cpu_ts[name] = sum_sensors(rstorage, time_range, node_id=nodes, sensor='system-cpu', metric=name)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300129
130 it = iter(cpu_ts.values())
131 total_over_time = next(it).data.copy() # type: numpy.ndarray
132 for ts in it:
133 if ts is not None:
134 total_over_time += ts.data
135
136 total = cpu_ts['idle'].copy(no_data=True)
137 total.data = total_over_time
138 cpu_ts['total'] = total
139
140 if not nc:
141 cpu_load_cache[key] = cpu_ts
142
143 return cpu_ts
144
145
146def get_resources_usage(suite: SuiteConfig,
147 job: JobConfig,
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300148 rstorage: IWallyStorage,
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300149 large_block: int = 256,
150 hist_boxes: int = 10,
kdanylov aka koder13e58452018-07-15 02:51:51 +0300151 nc: bool = False) -> Tuple[Dict[str, Tuple[str, Optional[float], Optional[float]]], bool]:
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300152
153 if not nc:
kdanylov aka koder13e58452018-07-15 02:51:51 +0300154 jinfo = rstorage.get_job_info(suite, job, WallyDB.resource_usage_rel)
155 if jinfo is not None:
156 jinfo = jinfo.copy()
157 return jinfo, jinfo.pop('iops_ok') # type: ignore
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300158
159 fjob = cast(FioJobConfig, job)
160 iops_ok = fjob.bsize < large_block
161
162 io_sum = make_iosum(rstorage, suite, fjob, hist_boxes)
163
kdanylov aka koderb0833332017-05-13 20:39:17 +0300164 tot_io_coef = unit_conversion_coef_f(io_sum.bw.units, "Bps")
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300165 io_transfered = io_sum.bw.data * tot_io_coef
166
kdanylov aka koder13e58452018-07-15 02:51:51 +0300167 records: Dict[str, Tuple[str, Optional[float], Optional[float]]] = {
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300168 ResourceNames.data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300169 }
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300170
171 if iops_ok:
kdanylov aka koderb0833332017-05-13 20:39:17 +0300172 ops_done = io_transfered / (fjob.bsize * unit_conversion_coef_f("KiBps", "Bps"))
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300173 records[ResourceNames.io_made] = (b2ssize_10(ops_done.sum()) + "OP", None, None)
174 else:
175 ops_done = None
176
177 all_metrics = [
178 (ResourceNames.test_send, 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
179 (ResourceNames.test_recv, 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
180 (ResourceNames.test_send_pkt, 'net-io', 'send_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
181 (ResourceNames.test_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
182
183 (ResourceNames.test_write, 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
184 (ResourceNames.test_read, 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
185 (ResourceNames.test_write_iop, 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
186 (ResourceNames.test_read_iop, 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
187
188 (ResourceNames.storage_send, 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
189 (ResourceNames.storage_recv, 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
190 (ResourceNames.storage_send_pkt, 'net-io', 'send_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
191 (ResourceNames.storage_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
192
193 (ResourceNames.storage_write, 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
194 (ResourceNames.storage_read, 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
195 (ResourceNames.storage_write_iop, 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
196 (ResourceNames.storage_read_iop, 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
197 ]
198
199 all_agg = {}
200
201 for vname, sensor, metric, ffunc, roles, units, service_provided_count in all_metrics:
202 if service_provided_count is None:
203 continue
204
kdanylov aka koderb0833332017-05-13 20:39:17 +0300205 nodes = find_nodes_by_roles(rstorage.storage, roles)
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300206 res_ts = sum_sensors(rstorage, job.reliable_info_range_s, node_id=nodes, sensor=sensor, metric=metric)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300207 if res_ts is None:
208 continue
209
210 data = res_ts.data
211 if units == "B":
kdanylov aka koderb0833332017-05-13 20:39:17 +0300212 data = data * unit_conversion_coef_f(res_ts.units, "B")
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300213
214 avg, dev = avg_dev_div(data, service_provided_count)
215 if avg < 0.1:
kdanylov aka koder13e58452018-07-15 02:51:51 +0300216 dev = None # type: ignore
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300217 records[vname] = (ffunc(data.sum()) + units, avg, dev)
218 all_agg[vname] = data
219
220 # cpu usage
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300221 stor_cores_count = None
222 for node in rstorage.load_nodes():
223 if node.roles.intersection(STORAGE_ROLES):
224 if stor_cores_count is None:
225 stor_cores_count = sum(cores for _, cores in node.hw_info.cpus)
226 else:
227 assert stor_cores_count == sum(cores for _, cores in node.hw_info.cpus)
228
229 assert stor_cores_count != 0
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300230
kdanylov aka koderb0833332017-05-13 20:39:17 +0300231 cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
232 cpus_used_sec = (1.0 - (cpu_ts['idle'].data + cpu_ts['iowait'].data) / cpu_ts['total'].data) * stor_cores_count
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300233 used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
234
235 all_agg[ResourceNames.storage_cpu] = cpus_used_sec
236
237 if ops_done is not None:
238 records[ResourceNames.storage_cpu_s] = (used_s, *avg_dev_div(cpus_used_sec, ops_done))
239
240 records[ResourceNames.storage_cpu_s_b] = (used_s, *avg_dev_div(cpus_used_sec, io_transfered))
241
242 cums = [
243 (ResourceNames.test_iop, ResourceNames.test_read_iop, ResourceNames.test_write_iop,
244 b2ssize_10, "OP", ops_done),
245 (ResourceNames.test_rw, ResourceNames.test_read, ResourceNames.test_write, b2ssize, "B", io_transfered),
246 (ResourceNames.test_net, ResourceNames.test_send, ResourceNames.test_recv, b2ssize, "B", io_transfered),
247 (ResourceNames.test_net_pkt, ResourceNames.test_send_pkt, ResourceNames.test_recv_pkt, b2ssize_10,
248 "pkt", ops_done),
249
250 (ResourceNames.storage_iop, ResourceNames.storage_read_iop, ResourceNames.storage_write_iop, b2ssize_10,
251 "OP", ops_done),
252 (ResourceNames.storage_rw, ResourceNames.storage_read, ResourceNames.storage_write, b2ssize, "B",
253 io_transfered),
254 (ResourceNames.storage_net, ResourceNames.storage_send, ResourceNames.storage_recv, b2ssize, "B",
255 io_transfered),
256 (ResourceNames.storage_net_pkt, ResourceNames.storage_send_pkt, ResourceNames.storage_recv_pkt, b2ssize_10,
257 "pkt", ops_done),
258 ]
259
260 for vname, name1, name2, ffunc, units, service_provided_masked in cums:
261 if service_provided_masked is None:
262 continue
263 if name1 in all_agg and name2 in all_agg:
264 agg = all_agg[name1] + all_agg[name2]
265 avg, dev = avg_dev_div(agg, service_provided_masked)
266 if avg < 0.1:
kdanylov aka koder13e58452018-07-15 02:51:51 +0300267 dev = None # type: ignore
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300268 records[vname] = (ffunc(agg.sum()) + units, avg, dev)
269
270 if not nc:
271 toflt = lambda x: float(x) if x is not None else None
272
273 for name, (v1, v2, v3) in list(records.items()):
274 records[name] = v1, toflt(v2), toflt(v3)
275
276 srecords = records.copy()
kdanylov aka koder13e58452018-07-15 02:51:51 +0300277 srecords['iops_ok'] = iops_ok # type: ignore
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300278 rstorage.put_job_info(suite, job, WallyDB.resource_usage_rel, srecords)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300279
280 return records, iops_ok