blob: 4074efbdf3dcf01ad45eec4197289a3711129839 [file] [log] [blame]
kdanylov aka koderb0833332017-05-13 20:39:17 +03001import logging
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +03002from typing import Tuple, Dict, cast, List
3
4import numpy
5
kdanylov aka koderb0833332017-05-13 20:39:17 +03006
7from cephlib.units import b2ssize_10, b2ssize, unit_conversion_coef_f
8from cephlib.statistic import NormStatProps, HistoStatProps, calc_norm_stat_props, calc_histo_stat_props
9from cephlib.numeric_types import TimeSeries
kdanylov aka koder84de1e42017-05-22 14:00:07 +030010from cephlib.wally_storage import find_nodes_by_roles, WallyDB
11from cephlib.storage_selectors import sum_sensors
kdanylov aka koderb0833332017-05-13 20:39:17 +030012
kdanylov aka koder026e5f22017-05-15 01:04:39 +030013from .result_classes import IWallyStorage, SuiteConfig
kdanylov aka koderb0833332017-05-13 20:39:17 +030014from .utils import STORAGE_ROLES
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030015from .suits.io.fio import FioJobConfig
16from .suits.job import JobConfig
kdanylov aka koderb0833332017-05-13 20:39:17 +030017from .data_selectors import get_aggregated
kdanylov aka koderb0833332017-05-13 20:39:17 +030018
19
20logger = logging.getLogger('wally')
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030021
22
23class IOSummary:
24 def __init__(self, qd: int, block_size: int, nodes_count:int, bw: NormStatProps, lat: HistoStatProps) -> None:
25 self.qd = qd
26 self.nodes_count = nodes_count
27 self.block_size = block_size
28 self.bw = bw
29 self.lat = lat
30
31
32class ResourceNames:
33 io_made = "Client IOP made"
34 data_tr = "Client data transfered"
35
36 test_send = "Test nodes net send"
37 test_recv = "Test nodes net recv"
38 test_net = "Test nodes net total"
39 test_send_pkt = "Test nodes send pkt"
40 test_recv_pkt = "Test nodes recv pkt"
41 test_net_pkt = "Test nodes total pkt"
42
43 test_write = "Test nodes disk write"
44 test_read = "Test nodes disk read"
45 test_write_iop = "Test nodes write IOP"
46 test_read_iop = "Test nodes read IOP"
47 test_iop = "Test nodes IOP"
48 test_rw = "Test nodes disk IO"
49
50 storage_send = "Storage nodes net send"
51 storage_recv = "Storage nodes net recv"
52 storage_send_pkt = "Storage nodes send pkt"
53 storage_recv_pkt = "Storage nodes recv pkt"
54 storage_net = "Storage nodes net total"
55 storage_net_pkt = "Storage nodes total pkt"
56
57 storage_write = "Storage nodes disk write"
58 storage_read = "Storage nodes disk read"
59 storage_write_iop = "Storage nodes write IOP"
60 storage_read_iop = "Storage nodes read IOP"
61 storage_iop = "Storage nodes IOP"
62 storage_rw = "Storage nodes disk IO"
63
64 storage_cpu = "Storage nodes CPU"
65 storage_cpu_s = "Storage nodes CPU s/IOP"
66 storage_cpu_s_b = "Storage nodes CPU s/B"
67
68
69def avg_dev_div(vec: numpy.ndarray, denom: numpy.ndarray, avg_ranges: int = 10) -> Tuple[float, float]:
70 step = min(vec.size, denom.size) // avg_ranges
71 assert step >= 1
72 vals = []
73
74 whole_sum = denom.sum() / denom.size * step * 0.5
75 for i in range(0, avg_ranges):
76 s1 = denom[i * step: (i + 1) * step].sum()
77 if s1 > 1e-5 and s1 >= whole_sum:
78 vals.append(vec[i * step: (i + 1) * step].sum() / s1)
79
80 assert len(vals) > 1
81 return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
82
83
kdanylov aka koder026e5f22017-05-15 01:04:39 +030084iosum_cache = {} # type: Dict[Tuple[str, str], IOSummary]
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030085
86
kdanylov aka koder026e5f22017-05-15 01:04:39 +030087def make_iosum(rstorage: IWallyStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030088 nc: bool = False) -> IOSummary:
89
90 key = (suite.storage_id, job.storage_id)
91 if not nc and key in iosum_cache:
92 return iosum_cache[key]
93
kdanylov aka koderb0833332017-05-13 20:39:17 +030094 lat = get_aggregated(rstorage, suite.storage_id, job.storage_id, "lat", job.reliable_info_range_s)
95 io = get_aggregated(rstorage, suite.storage_id, job.storage_id, "bw", job.reliable_info_range_s)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030096
97 res = IOSummary(job.qd,
98 nodes_count=len(suite.nodes_ids),
99 block_size=job.bsize,
100 lat=calc_histo_stat_props(lat, rebins_count=hist_boxes),
101 bw=calc_norm_stat_props(io, hist_boxes))
102
103 if not nc:
104 iosum_cache[key] = res
105
106 return res
107
108
109cpu_load_cache = {} # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
110
111
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300112def get_cluster_cpu_load(rstorage: IWallyStorage, roles: List[str],
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300113 time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
114
115 key = (id(rstorage), tuple(roles), time_range)
116 if not nc and key in cpu_load_cache:
117 return cpu_load_cache[key]
118
119 cpu_ts = {}
120 cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
kdanylov aka koderb0833332017-05-13 20:39:17 +0300121 nodes = find_nodes_by_roles(rstorage.storage, roles)
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300122
123 cores_per_node = {}
124 for node in rstorage.load_nodes():
kdanylov aka koder470a8fa2017-07-14 21:07:58 +0300125 cores_per_node[node.node_id] = 48 # sum(cores for _, cores in node.hw_info.cpus)
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300126
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300127 for name in cpu_metrics:
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300128 cpu_ts[name] = sum_sensors(rstorage, time_range, node_id=nodes, sensor='system-cpu', metric=name)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300129
130 it = iter(cpu_ts.values())
131 total_over_time = next(it).data.copy() # type: numpy.ndarray
132 for ts in it:
133 if ts is not None:
134 total_over_time += ts.data
135
136 total = cpu_ts['idle'].copy(no_data=True)
137 total.data = total_over_time
138 cpu_ts['total'] = total
139
140 if not nc:
141 cpu_load_cache[key] = cpu_ts
142
143 return cpu_ts
144
145
146def get_resources_usage(suite: SuiteConfig,
147 job: JobConfig,
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300148 rstorage: IWallyStorage,
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300149 large_block: int = 256,
150 hist_boxes: int = 10,
151 nc: bool = False) -> Tuple[Dict[str, Tuple[str, float, float]], bool]:
152
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300153 records = {} # type: Dict[str, Tuple[str, float, float]]
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300154 if not nc:
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300155 records = rstorage.get_job_info(suite, job, WallyDB.resource_usage_rel)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300156 if records is not None:
157 records = records.copy()
158 iops_ok = records.pop('iops_ok')
159 return records, iops_ok
160
161 fjob = cast(FioJobConfig, job)
162 iops_ok = fjob.bsize < large_block
163
164 io_sum = make_iosum(rstorage, suite, fjob, hist_boxes)
165
kdanylov aka koderb0833332017-05-13 20:39:17 +0300166 tot_io_coef = unit_conversion_coef_f(io_sum.bw.units, "Bps")
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300167 io_transfered = io_sum.bw.data * tot_io_coef
168
169 records = {
170 ResourceNames.data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300171 }
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300172
173 if iops_ok:
kdanylov aka koderb0833332017-05-13 20:39:17 +0300174 ops_done = io_transfered / (fjob.bsize * unit_conversion_coef_f("KiBps", "Bps"))
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300175 records[ResourceNames.io_made] = (b2ssize_10(ops_done.sum()) + "OP", None, None)
176 else:
177 ops_done = None
178
179 all_metrics = [
180 (ResourceNames.test_send, 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
181 (ResourceNames.test_recv, 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
182 (ResourceNames.test_send_pkt, 'net-io', 'send_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
183 (ResourceNames.test_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
184
185 (ResourceNames.test_write, 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
186 (ResourceNames.test_read, 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
187 (ResourceNames.test_write_iop, 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
188 (ResourceNames.test_read_iop, 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
189
190 (ResourceNames.storage_send, 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
191 (ResourceNames.storage_recv, 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
192 (ResourceNames.storage_send_pkt, 'net-io', 'send_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
193 (ResourceNames.storage_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
194
195 (ResourceNames.storage_write, 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
196 (ResourceNames.storage_read, 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
197 (ResourceNames.storage_write_iop, 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
198 (ResourceNames.storage_read_iop, 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
199 ]
200
201 all_agg = {}
202
203 for vname, sensor, metric, ffunc, roles, units, service_provided_count in all_metrics:
204 if service_provided_count is None:
205 continue
206
kdanylov aka koderb0833332017-05-13 20:39:17 +0300207 nodes = find_nodes_by_roles(rstorage.storage, roles)
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300208 res_ts = sum_sensors(rstorage, job.reliable_info_range_s, node_id=nodes, sensor=sensor, metric=metric)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300209 if res_ts is None:
210 continue
211
212 data = res_ts.data
213 if units == "B":
kdanylov aka koderb0833332017-05-13 20:39:17 +0300214 data = data * unit_conversion_coef_f(res_ts.units, "B")
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300215
216 avg, dev = avg_dev_div(data, service_provided_count)
217 if avg < 0.1:
218 dev = None
219 records[vname] = (ffunc(data.sum()) + units, avg, dev)
220 all_agg[vname] = data
221
222 # cpu usage
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300223 stor_cores_count = None
224 for node in rstorage.load_nodes():
225 if node.roles.intersection(STORAGE_ROLES):
226 if stor_cores_count is None:
227 stor_cores_count = sum(cores for _, cores in node.hw_info.cpus)
228 else:
229 assert stor_cores_count == sum(cores for _, cores in node.hw_info.cpus)
230
231 assert stor_cores_count != 0
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300232
kdanylov aka koderb0833332017-05-13 20:39:17 +0300233 cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
234 cpus_used_sec = (1.0 - (cpu_ts['idle'].data + cpu_ts['iowait'].data) / cpu_ts['total'].data) * stor_cores_count
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300235 used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
236
237 all_agg[ResourceNames.storage_cpu] = cpus_used_sec
238
239 if ops_done is not None:
240 records[ResourceNames.storage_cpu_s] = (used_s, *avg_dev_div(cpus_used_sec, ops_done))
241
242 records[ResourceNames.storage_cpu_s_b] = (used_s, *avg_dev_div(cpus_used_sec, io_transfered))
243
244 cums = [
245 (ResourceNames.test_iop, ResourceNames.test_read_iop, ResourceNames.test_write_iop,
246 b2ssize_10, "OP", ops_done),
247 (ResourceNames.test_rw, ResourceNames.test_read, ResourceNames.test_write, b2ssize, "B", io_transfered),
248 (ResourceNames.test_net, ResourceNames.test_send, ResourceNames.test_recv, b2ssize, "B", io_transfered),
249 (ResourceNames.test_net_pkt, ResourceNames.test_send_pkt, ResourceNames.test_recv_pkt, b2ssize_10,
250 "pkt", ops_done),
251
252 (ResourceNames.storage_iop, ResourceNames.storage_read_iop, ResourceNames.storage_write_iop, b2ssize_10,
253 "OP", ops_done),
254 (ResourceNames.storage_rw, ResourceNames.storage_read, ResourceNames.storage_write, b2ssize, "B",
255 io_transfered),
256 (ResourceNames.storage_net, ResourceNames.storage_send, ResourceNames.storage_recv, b2ssize, "B",
257 io_transfered),
258 (ResourceNames.storage_net_pkt, ResourceNames.storage_send_pkt, ResourceNames.storage_recv_pkt, b2ssize_10,
259 "pkt", ops_done),
260 ]
261
262 for vname, name1, name2, ffunc, units, service_provided_masked in cums:
263 if service_provided_masked is None:
264 continue
265 if name1 in all_agg and name2 in all_agg:
266 agg = all_agg[name1] + all_agg[name2]
267 avg, dev = avg_dev_div(agg, service_provided_masked)
268 if avg < 0.1:
269 dev = None
270 records[vname] = (ffunc(agg.sum()) + units, avg, dev)
271
272 if not nc:
273 toflt = lambda x: float(x) if x is not None else None
274
275 for name, (v1, v2, v3) in list(records.items()):
276 records[name] = v1, toflt(v2), toflt(v3)
277
278 srecords = records.copy()
279 srecords['iops_ok'] = iops_ok
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300280 rstorage.put_job_info(suite, job, WallyDB.resource_usage_rel, srecords)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300281
282 return records, iops_ok