| kdanylov aka koder | cdfcdaf | 2017-04-29 10:03:39 +0300 | [diff] [blame^] | 1 | import ctypes | 
|  | 2 | import logging | 
|  | 3 | import os.path | 
|  | 4 | from typing import Tuple, List, Iterable, Iterator, Optional, Union | 
|  | 5 | from fractions import Fraction | 
|  | 6 |  | 
|  | 7 | import numpy | 
|  | 8 |  | 
|  | 9 | from cephlib.numeric import auto_edges2 | 
|  | 10 |  | 
|  | 11 | import wally | 
|  | 12 | from .hlstorage import ResultStorage | 
|  | 13 | from .node_interfaces import NodeInfo | 
|  | 14 | from .result_classes import DataSource, TimeSeries, SuiteConfig, JobConfig | 
|  | 15 | from .suits.io.fio import FioJobConfig | 
|  | 16 | from .suits.io.fio_hist import expected_lat_bins | 
|  | 17 | from .utils import unit_conversion_coef | 
|  | 18 |  | 
|  | 19 |  | 
|  | 20 | logger = logging.getLogger("wally") | 
|  | 21 |  | 
|  | 22 | # Separately for each test heatmaps & agg acroos whole time histos: | 
|  | 23 | #   * fio latency heatmap for all instances | 
|  | 24 | #   * data dev iops across all osd | 
|  | 25 | #   * data dev bw across all osd | 
|  | 26 | #   * date dev qd across all osd | 
|  | 27 | #   * journal dev iops across all osd | 
|  | 28 | #   * journal dev bw across all osd | 
|  | 29 | #   * journal dev qd across all osd | 
|  | 30 | #   * net dev pps across all hosts | 
|  | 31 | #   * net dev bps across all hosts | 
|  | 32 |  | 
|  | 33 | # Main API's | 
|  | 34 | #   get sensors by pattern | 
|  | 35 | #   allign values to seconds | 
|  | 36 | #   cut ranges for particular test | 
|  | 37 | #   transform into 2d histos (either make histos or rebin them) and clip outliers same time | 
|  | 38 |  | 
|  | 39 |  | 
|  | 40 | AGG_TAG = 'ALL' | 
|  | 41 |  | 
|  | 42 |  | 
|  | 43 | def find_nodes_by_roles(rstorage: ResultStorage, node_roles: Iterable[str]) -> List[NodeInfo]: | 
|  | 44 | nodes = rstorage.storage.load_list(NodeInfo, 'all_nodes')  # type: List[NodeInfo] | 
|  | 45 | node_roles_s = set(node_roles) | 
|  | 46 | return [node for node in nodes if node.roles.intersection(node_roles_s)] | 
|  | 47 |  | 
|  | 48 |  | 
|  | 49 | def find_all_sensors(rstorage: ResultStorage, | 
|  | 50 | node_roles: Iterable[str], | 
|  | 51 | sensor: str, | 
|  | 52 | metric: str) -> Iterator[TimeSeries]: | 
|  | 53 | all_nodes_rr = "|".join(node.node_id for node in find_nodes_by_roles(rstorage, node_roles)) | 
|  | 54 | all_nodes_rr = "(?P<node>{})".format(all_nodes_rr) | 
|  | 55 |  | 
|  | 56 | for path, ds in rstorage.iter_sensors(all_nodes_rr, sensor=sensor, metric=metric): | 
|  | 57 | ts = rstorage.load_sensor(ds) | 
|  | 58 |  | 
|  | 59 | # for sensors ts.times is array of pairs - collection_start_at, colelction_finished_at | 
|  | 60 | # to make this array consistent with times in load data second item if each pair is dropped | 
|  | 61 | ts.times = ts.times[::2] | 
|  | 62 | yield ts | 
|  | 63 |  | 
|  | 64 |  | 
|  | 65 | def find_all_series(rstorage: ResultStorage, suite: SuiteConfig, job: JobConfig, metric: str) -> Iterator[TimeSeries]: | 
|  | 66 | "Iterated over selected metric for all nodes for given Suite/job" | 
|  | 67 | return rstorage.iter_ts(suite, job, metric=metric) | 
|  | 68 |  | 
|  | 69 |  | 
|  | 70 | def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries: | 
|  | 71 | "Sum selected metric for all nodes for given Suite/job" | 
|  | 72 |  | 
|  | 73 | tss = list(find_all_series(rstorage, suite, job, metric)) | 
|  | 74 |  | 
|  | 75 | if len(tss) == 0: | 
|  | 76 | raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric)) | 
|  | 77 |  | 
|  | 78 | ds = DataSource(suite_id=suite.storage_id, | 
|  | 79 | job_id=job.storage_id, | 
|  | 80 | node_id=AGG_TAG, | 
|  | 81 | sensor='fio', | 
|  | 82 | dev=AGG_TAG, | 
|  | 83 | metric=metric, | 
|  | 84 | tag='csv') | 
|  | 85 |  | 
|  | 86 | agg_ts = TimeSeries(metric, | 
|  | 87 | raw=None, | 
|  | 88 | source=ds, | 
|  | 89 | data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype), | 
|  | 90 | times=tss[0].times.copy(), | 
|  | 91 | units=tss[0].units, | 
|  | 92 | histo_bins=tss[0].histo_bins, | 
|  | 93 | time_units=tss[0].time_units) | 
|  | 94 |  | 
|  | 95 | for ts in tss: | 
|  | 96 | if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins): | 
|  | 97 | msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format( | 
|  | 98 | ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins) | 
|  | 99 | logger.error(msg) | 
|  | 100 | raise ValueError(msg) | 
|  | 101 |  | 
|  | 102 | if metric != 'lat' and len(ts.data.shape) != 1: | 
|  | 103 | msg = "Sensor {}.{} on node {} has shape={}. Can only process 1D sensors.".format( | 
|  | 104 | ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape) | 
|  | 105 | logger.error(msg) | 
|  | 106 | raise ValueError(msg) | 
|  | 107 |  | 
|  | 108 | # TODO: match times on different ts | 
|  | 109 | agg_ts.data += ts.data | 
|  | 110 |  | 
|  | 111 | return agg_ts | 
|  | 112 |  | 
|  | 113 |  | 
|  | 114 | interpolated_cache = {} | 
|  | 115 |  | 
|  | 116 |  | 
|  | 117 | def interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries: | 
|  | 118 | "Interpolate time series to values on seconds borders" | 
|  | 119 |  | 
|  | 120 | if not nc and ts.source.tpl in interpolated_cache: | 
|  | 121 | return interpolated_cache[ts.source.tpl] | 
|  | 122 |  | 
|  | 123 | assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\ | 
|  | 124 | .format(len(ts.times), len(ts.data), ts.source) | 
|  | 125 |  | 
|  | 126 | rcoef = 1 / unit_conversion_coef(ts.time_units, 's')  # type: Union[int, Fraction] | 
|  | 127 |  | 
|  | 128 | if isinstance(rcoef, Fraction): | 
|  | 129 | assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef) | 
|  | 130 | rcoef = rcoef.numerator | 
|  | 131 |  | 
|  | 132 | assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef) | 
|  | 133 | coef = int(rcoef)   # make typechecker happy | 
|  | 134 |  | 
|  | 135 | # round to seconds border | 
|  | 136 | begin = int(ts.times[0] / coef + 1) * coef | 
|  | 137 | end = int(ts.times[-1] / coef) * coef | 
|  | 138 |  | 
|  | 139 | # current real data time chunk begin time | 
|  | 140 | edge_it = iter(ts.times) | 
|  | 141 |  | 
|  | 142 | # current real data value | 
|  | 143 | val_it = iter(ts.data) | 
|  | 144 |  | 
|  | 145 | # result array, cumulative value per second | 
|  | 146 | result = numpy.empty([(end - begin) // coef], dtype=ts.data.dtype) | 
|  | 147 | idx = 0 | 
|  | 148 | curr_summ = 0 | 
|  | 149 |  | 
|  | 150 | # end of current time slot | 
|  | 151 | results_cell_ends = begin + coef | 
|  | 152 |  | 
|  | 153 | # hack to unify looping | 
|  | 154 | real_data_end = next(edge_it) | 
|  | 155 | while results_cell_ends <= end: | 
|  | 156 | real_data_start = real_data_end | 
|  | 157 | real_data_end = next(edge_it) | 
|  | 158 | real_val_left = next(val_it) | 
|  | 159 |  | 
|  | 160 | # real data "speed" for interval [real_data_start, real_data_end] | 
|  | 161 | real_val_ps = float(real_val_left) / (real_data_end - real_data_start) | 
|  | 162 |  | 
|  | 163 | while real_data_end >= results_cell_ends and results_cell_ends <= end: | 
|  | 164 | # part of current real value, which is fit into current result cell | 
|  | 165 | curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps) | 
|  | 166 |  | 
|  | 167 | # calculate rest of real data for next result cell | 
|  | 168 | real_val_left -= curr_real_chunk | 
|  | 169 | result[idx] = curr_summ + curr_real_chunk | 
|  | 170 | idx += 1 | 
|  | 171 | curr_summ = 0 | 
|  | 172 |  | 
|  | 173 | # adjust real data start time | 
|  | 174 | real_data_start = results_cell_ends | 
|  | 175 | results_cell_ends += coef | 
|  | 176 |  | 
|  | 177 | # don't lost any real data | 
|  | 178 | curr_summ += real_val_left | 
|  | 179 |  | 
|  | 180 | assert idx == len(result), "Wrong output array size - idx(={}) != len(result)(={})".format(idx, len(result)) | 
|  | 181 |  | 
|  | 182 | res_ts = TimeSeries(ts.name, None, result, | 
|  | 183 | times=int(begin // coef) + numpy.arange(idx, dtype=ts.times.dtype), | 
|  | 184 | units=ts.units, | 
|  | 185 | time_units='s', | 
|  | 186 | source=ts.source(), | 
|  | 187 | histo_bins=ts.histo_bins) | 
|  | 188 |  | 
|  | 189 | if not nc: | 
|  | 190 | interpolated_cache[ts.source.tpl] = res_ts | 
|  | 191 |  | 
|  | 192 | return res_ts | 
|  | 193 |  | 
|  | 194 |  | 
|  | 195 | c_interp_func = None | 
|  | 196 | cdll = None | 
|  | 197 |  | 
|  | 198 |  | 
|  | 199 | def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries: | 
|  | 200 | "Interpolate time series to values on seconds borders" | 
|  | 201 |  | 
|  | 202 | if not nc and ts.source.tpl in interpolated_cache: | 
|  | 203 | return interpolated_cache[ts.source.tpl] | 
|  | 204 |  | 
|  | 205 | assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\ | 
|  | 206 | .format(len(ts.times), len(ts.data), ts.source) | 
|  | 207 |  | 
|  | 208 | rcoef = 1 / unit_conversion_coef(ts.time_units, 's')  # type: Union[int, Fraction] | 
|  | 209 |  | 
|  | 210 | if isinstance(rcoef, Fraction): | 
|  | 211 | assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef) | 
|  | 212 | rcoef = rcoef.numerator | 
|  | 213 |  | 
|  | 214 | assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef) | 
|  | 215 | coef = int(rcoef)   # make typechecker happy | 
|  | 216 |  | 
|  | 217 | global cdll | 
|  | 218 | global c_interp_func | 
|  | 219 | uint64_p = ctypes.POINTER(ctypes.c_uint64) | 
|  | 220 |  | 
|  | 221 | if c_interp_func is None: | 
|  | 222 | dirname = os.path.dirname(os.path.dirname(wally.__file__)) | 
|  | 223 | path = os.path.join(dirname, 'clib', 'libwally.so') | 
|  | 224 | cdll = ctypes.CDLL(path) | 
|  | 225 | c_interp_func = cdll.interpolate_ts_on_seconds_border_v2 | 
|  | 226 | c_interp_func.argtypes = [ | 
|  | 227 | ctypes.c_uint,  # input_size | 
|  | 228 | ctypes.c_uint,  # output_size | 
|  | 229 | uint64_p,  # times | 
|  | 230 | uint64_p,  # values | 
|  | 231 | ctypes.c_uint,  # time_scale_coef | 
|  | 232 | uint64_p,  # output | 
|  | 233 | ] | 
|  | 234 | c_interp_func.restype = None | 
|  | 235 |  | 
|  | 236 | assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name) | 
|  | 237 | assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name) | 
|  | 238 |  | 
|  | 239 | output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2 | 
|  | 240 | # print("output_sz =", output_sz, "coef =", coef) | 
|  | 241 | result = numpy.zeros(output_sz, dtype=ts.data.dtype.name) | 
|  | 242 |  | 
|  | 243 | c_interp_func(ts.data.size, | 
|  | 244 | output_sz, | 
|  | 245 | ts.times.ctypes.data_as(uint64_p), | 
|  | 246 | ts.data.ctypes.data_as(uint64_p), | 
|  | 247 | coef, | 
|  | 248 | result.ctypes.data_as(uint64_p)) | 
|  | 249 |  | 
|  | 250 | res_ts = TimeSeries(ts.name, None, result, | 
|  | 251 | times=int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype), | 
|  | 252 | units=ts.units, | 
|  | 253 | time_units='s', | 
|  | 254 | source=ts.source(), | 
|  | 255 | histo_bins=ts.histo_bins) | 
|  | 256 |  | 
|  | 257 | if not nc: | 
|  | 258 | interpolated_cache[ts.source.tpl] = res_ts | 
|  | 259 | return res_ts | 
|  | 260 |  | 
|  | 261 |  | 
|  | 262 | def get_ts_for_time_range(ts: TimeSeries, time_range: Tuple[int, int]) -> TimeSeries: | 
|  | 263 | """Return sensor values for given node for given period. Return per second estimated values array | 
|  | 264 | Raise an error if required range is not full covered by data in storage""" | 
|  | 265 |  | 
|  | 266 | assert ts.time_units == 's', "{} != s for {!s}".format(ts.time_units, ts.source) | 
|  | 267 | assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\ | 
|  | 268 | .format(len(ts.times), len(ts.data), ts.source) | 
|  | 269 |  | 
|  | 270 | if time_range[0] < ts.times[0] or time_range[1] > ts.times[-1]: | 
|  | 271 | raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," + | 
|  | 272 | "sensor = {}_{}.{}.{}").format(time_range, ts.times[0], ts.times[-1], | 
|  | 273 | ts.source.node_id, ts.source.sensor, ts.source.dev, | 
|  | 274 | ts.source.metric)) | 
|  | 275 | idx1, idx2 = numpy.searchsorted(ts.times, time_range) | 
|  | 276 | return TimeSeries(ts.name, None, | 
|  | 277 | ts.data[idx1:idx2], | 
|  | 278 | times=ts.times[idx1:idx2], | 
|  | 279 | units=ts.units, | 
|  | 280 | time_units=ts.time_units, | 
|  | 281 | source=ts.source, | 
|  | 282 | histo_bins=ts.histo_bins) | 
|  | 283 |  | 
|  | 284 |  | 
|  | 285 | def make_2d_histo(tss: List[TimeSeries], | 
|  | 286 | outliers_range: Tuple[float, float] = (0.02, 0.98), | 
|  | 287 | bins_count: int = 20, | 
|  | 288 | log_bins: bool = False) -> TimeSeries: | 
|  | 289 |  | 
|  | 290 | # validate input data | 
|  | 291 | for ts in tss: | 
|  | 292 | assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\ | 
|  | 293 | .format(len(ts.times), len(ts.data), ts.source) | 
|  | 294 | assert ts.time_units == 's', "All arrays should have the same data units" | 
|  | 295 | assert ts.units == tss[0].units, "All arrays should have the same data units" | 
|  | 296 | assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size" | 
|  | 297 | assert len(ts.data.shape) == 1, "All arrays should be 1d" | 
|  | 298 |  | 
|  | 299 | whole_arr = numpy.concatenate([ts.data for ts in tss]) | 
|  | 300 | whole_arr.shape = [len(tss), -1] | 
|  | 301 |  | 
|  | 302 | if outliers_range is not None: | 
|  | 303 | max_vl, begin, end, min_vl = numpy.percentile(whole_arr, | 
|  | 304 | [0, outliers_range[0] * 100, outliers_range[1] * 100, 100]) | 
|  | 305 | bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins) | 
|  | 306 | fixed_bins_edges = bins_edges.copy() | 
|  | 307 | fixed_bins_edges[0] = begin | 
|  | 308 | fixed_bins_edges[-1] = end | 
|  | 309 | else: | 
|  | 310 | begin, end = numpy.percentile(whole_arr, [0, 100]) | 
|  | 311 | bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins) | 
|  | 312 | fixed_bins_edges = bins_edges | 
|  | 313 |  | 
|  | 314 | res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T) | 
|  | 315 | res_data.shape = (len(tss), -1) | 
|  | 316 | res = TimeSeries(name=tss[0].name, | 
|  | 317 | raw=None, | 
|  | 318 | data=res_data, | 
|  | 319 | times=tss[0].times, | 
|  | 320 | units=tss[0].units, | 
|  | 321 | source=tss[0].source, | 
|  | 322 | time_units=tss[0].time_units, | 
|  | 323 | histo_bins=bins_edges) | 
|  | 324 | return res | 
|  | 325 |  | 
|  | 326 |  | 
|  | 327 | def aggregate_histograms(tss: List[TimeSeries], | 
|  | 328 | outliers_range: Tuple[float, float] = (0.02, 0.98), | 
|  | 329 | bins_count: int = 20, | 
|  | 330 | log_bins: bool = False) -> TimeSeries: | 
|  | 331 |  | 
|  | 332 | # validate input data | 
|  | 333 | for ts in tss: | 
|  | 334 | assert len(ts.times) == len(ts.data), "Need to use stripped time" | 
|  | 335 | assert ts.time_units == 's', "All arrays should have the same data units" | 
|  | 336 | assert ts.units == tss[0].units, "All arrays should have the same data units" | 
|  | 337 | assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size" | 
|  | 338 | assert len(ts.data.shape) == 2, "All arrays should be 2d" | 
|  | 339 | assert ts.histo_bins is not None, "All arrays should be 2d" | 
|  | 340 |  | 
|  | 341 | whole_arr = numpy.concatenate([ts.data for ts in tss]) | 
|  | 342 | whole_arr.shape = [len(tss), -1] | 
|  | 343 |  | 
|  | 344 | max_val = whole_arr.min() | 
|  | 345 | min_val = whole_arr.max() | 
|  | 346 |  | 
|  | 347 | if outliers_range is not None: | 
|  | 348 | begin, end = numpy.percentile(whole_arr, [outliers_range[0] * 100, outliers_range[1] * 100]) | 
|  | 349 | else: | 
|  | 350 | begin = min_val | 
|  | 351 | end = max_val | 
|  | 352 |  | 
|  | 353 | bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins) | 
|  | 354 |  | 
|  | 355 | if outliers_range is not None: | 
|  | 356 | fixed_bins_edges = bins_edges.copy() | 
|  | 357 | fixed_bins_edges[0] = begin | 
|  | 358 | fixed_bins_edges[-1] = end | 
|  | 359 | else: | 
|  | 360 | fixed_bins_edges = bins_edges | 
|  | 361 |  | 
|  | 362 | res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T) | 
|  | 363 | res_data.shape = (len(tss), -1) | 
|  | 364 | return TimeSeries(name=tss[0].name, | 
|  | 365 | raw=None, | 
|  | 366 | data=res_data, | 
|  | 367 | times=tss[0].times, | 
|  | 368 | units=tss[0].units, | 
|  | 369 | source=tss[0].source, | 
|  | 370 | time_units=tss[0].time_units, | 
|  | 371 | histo_bins=fixed_bins_edges) | 
|  | 372 |  | 
|  | 373 |  | 
|  | 374 | def summ_sensors(rstorage: ResultStorage, | 
|  | 375 | roles: List[str], | 
|  | 376 | sensor: str, | 
|  | 377 | metric: str, | 
|  | 378 | time_range: Tuple[int, int]) -> Optional[TimeSeries]: | 
|  | 379 |  | 
|  | 380 | res = None  # type: Optional[TimeSeries] | 
|  | 381 | for node in find_nodes_by_roles(rstorage, roles): | 
|  | 382 | for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric): | 
|  | 383 | data = rstorage.load_sensor(ds) | 
|  | 384 | data = c_interpolate_ts_on_seconds_border(data) | 
|  | 385 | data = get_ts_for_time_range(data, time_range) | 
|  | 386 | if res is None: | 
|  | 387 | res = data | 
|  | 388 | res.data = res.data.copy() | 
|  | 389 | else: | 
|  | 390 | res.data += data.data | 
|  | 391 | return res | 
|  | 392 |  | 
|  | 393 |  | 
|  | 394 | def find_sensors_to_2d(rstorage: ResultStorage, | 
|  | 395 | roles: List[str], | 
|  | 396 | sensor: str, | 
|  | 397 | devs: List[str], | 
|  | 398 | metric: str, | 
|  | 399 | time_range: Tuple[int, int]) -> numpy.ndarray: | 
|  | 400 |  | 
|  | 401 | res = []  # type: List[TimeSeries] | 
|  | 402 | for node in find_nodes_by_roles(rstorage, roles): | 
|  | 403 | for dev in devs: | 
|  | 404 | for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric): | 
|  | 405 | data = rstorage.load_sensor(ds) | 
|  | 406 | data = c_interpolate_ts_on_seconds_border(data) | 
|  | 407 | data = get_ts_for_time_range(data, time_range) | 
|  | 408 | res.append(data.data) | 
|  | 409 | res2d = numpy.concatenate(res) | 
|  | 410 | res2d.shape = ((len(res), -1)) | 
|  | 411 | return res2d |